Tuesday, October 10, 2017

Specify the name of output file using key and remove it from output file in spark

  • Old hadoop API
  • New hadoop API
    import org.apache.hadoop.io.{NullWritable, Text}
    import org.apache.hadoop.mapreduce._
    import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
    import org.apache.hadoop.mapreduce.task.MapContextImpl
     
    class TextMultipleOutputsFormat extends TextOutputFormat[Text, Text] {
     
      override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Text, Text] =
        new RecordWriter[Text, Text] {
          val job: Job = Job.getInstance(context.getConfiguration)
     
          val moContext = new MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
            nullnew DummyRecordWriter, nullnullnull)
     
          val multipleOutputs = new MultipleOutputs[NullWritable, Text](moContext)
     
          LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[__]])
     
          override def write(key: Text, value: Text): Unit = {
            multipleOutputs.write(NullWritable.get, value, key.toString)
          }
     
          override def close(context: TaskAttemptContext): Unit = multipleOutputs.close()
        }
     
      private class DummyRecordWriter extends RecordWriter[NullWritable, Text] {
        override def write(key: NullWritable, value: Text): Unit = ()
     
        override def close(context: TaskAttemptContext): Unit = ()
      }
     
    }
    rdd.saveAsNewAPIHadoopFile[TextMultipleOutputsFormat](outputPath)
  • Spark API
    rdd.toDF("file""data").write.partitionBy("file").text(outputPath)

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.