Wednesday, August 30, 2017

Specify the name of output files in spark

E.g.
package test
 
import com.cotdp.hadoop.ZipFileInputFormat
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.sql.SparkSession
 
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    key.asInstanceOf[String] + "-" + name
}
 
object test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test").getOrCreate()
 
    val zipFileRDD = spark.sparkContext.newAPIHadoopFile("/tmp/spark", classOf[ZipFileInputFormat],
      classOf[Text],
      classOf[BytesWritable],
      spark.sparkContext.hadoopConfiguration)
 
    println("The file name in the zipFile is: " + zipFileRDD.map(s => s._1.toString).first)
    println("The file contents are: " + zipFileRDD.map(s =new String(s._2.getBytes, "UTF-8")).first)
 
    zipFileRDD.map(s => (s._1.toString, new String(s._2.getBytes, "UTF-8"))).saveAsHadoopFile("/tmp/test/zip2", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.