E.g.
package test import com.cotdp.hadoop.ZipFileInputFormat import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.spark.sql.SparkSession class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key : Any, value : Any) : Any = NullWritable.get() } object test { def main(args : Array[String]) : Unit = { val spark = SparkSession.builder().appName( "test" ).getOrCreate() val zipFileRDD = spark.sparkContext.newAPIHadoopFile( "/tmp/spark" , classOf[ZipFileInputFormat], classOf[Text], classOf[BytesWritable], spark.sparkContext.hadoopConfiguration) println( "The file name in the zipFile is: " + zipFileRDD.map(s = > s. _ 1 .toString).first) println( "The file contents are: " + zipFileRDD.map(s = > new String(s. _ 2 .getBytes, "UTF-8" )).first) zipFileRDD.map(s = > (s. _ 1 .toString, new String(s. _ 2 .getBytes, "UTF-8" ))).saveAsHadoopFile( "/tmp/test/zip2" , classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } |
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.