E.g.
package test import com.cotdp.hadoop.ZipFileInputFormat import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.spark.sql.SparkSession class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateFileNameForKeyValue(key : Any, value : Any, name : String) : String = key.asInstanceOf[String] + "-" + name } object test { def main(args : Array[String]) : Unit = { val spark = SparkSession.builder().appName( "test" ).getOrCreate() val zipFileRDD = spark.sparkContext.newAPIHadoopFile( "/tmp/spark" , classOf[ZipFileInputFormat], classOf[Text], classOf[BytesWritable], spark.sparkContext.hadoopConfiguration) println( "The file name in the zipFile is: " + zipFileRDD.map(s = > s. _ 1 .toString).first) println( "The file contents are: " + zipFileRDD.map(s = > new String(s. _ 2 .getBytes, "UTF-8" )).first) zipFileRDD.map(s = > (s. _ 1 .toString, new String(s. _ 2 .getBytes, "UTF-8" ))).saveAsHadoopFile( "/tmp/test/zip2" , classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } |