E.g.
package testimport com.cotdp.hadoop.ZipFileInputFormatimport org.apache.hadoop.io.{BytesWritable, NullWritable, Text}import org.apache.hadoop.mapred.lib.MultipleTextOutputFormatimport org.apache.spark.sql.SparkSessionclass RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] + "-" + name}object test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").getOrCreate() val zipFileRDD = spark.sparkContext.newAPIHadoopFile("/tmp/spark", classOf[ZipFileInputFormat], classOf[Text], classOf[BytesWritable], spark.sparkContext.hadoopConfiguration) println("The file name in the zipFile is: " + zipFileRDD.map(s => s._1.toString).first) println("The file contents are: " + zipFileRDD.map(s => new String(s._2.getBytes, "UTF-8")).first) zipFileRDD.map(s => (s._1.toString, new String(s._2.getBytes, "UTF-8"))).saveAsHadoopFile("/tmp/test/zip2", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() }} |
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.