Tuesday, October 10, 2017

Specify the name of output file using key and remove it from output file in spark

  • Old hadoop API
  • New hadoop API
    import org.apache.hadoop.io.{NullWritable, Text}
    import org.apache.hadoop.mapreduce._
    import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
    import org.apache.hadoop.mapreduce.task.MapContextImpl
     
    class TextMultipleOutputsFormat extends TextOutputFormat[Text, Text] {
     
      override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Text, Text] =
        new RecordWriter[Text, Text] {
          val job: Job = Job.getInstance(context.getConfiguration)
     
          val moContext = new MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
            nullnew DummyRecordWriter, nullnullnull)
     
          val multipleOutputs = new MultipleOutputs[NullWritable, Text](moContext)
     
          LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[__]])
     
          override def write(key: Text, value: Text): Unit = {
            multipleOutputs.write(NullWritable.get, value, key.toString)
          }
     
          override def close(context: TaskAttemptContext): Unit = multipleOutputs.close()
        }
     
      private class DummyRecordWriter extends RecordWriter[NullWritable, Text] {
        override def write(key: NullWritable, value: Text): Unit = ()
     
        override def close(context: TaskAttemptContext): Unit = ()
      }
     
    }
    rdd.saveAsNewAPIHadoopFile[TextMultipleOutputsFormat](outputPath)
  • Spark API
    rdd.toDF("file""data").write.partitionBy("file").text(outputPath)

Specify the name of output file using key and remove it from output file in MR

  • Old hadoop API
  • New hadoop API
    • Using mapper
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
       
      import java.io.IOException;
       
      class CustomMapper extends Mapper<Text, Text, NullWritable, Text> {
          private MultipleOutputs<NullWritable, Text> multipleOutputs;
       
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              multipleOutputs = new MultipleOutputs<>(context);
          }
       
          @Override
          protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
              multipleOutputs.write(NullWritable.get(), value, key.toString());
          }
       
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              multipleOutputs.close();
          }
      }
      job.setMapperClass(CustomMapper.class);
      LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    • Using OutputFormat
       import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.RecordWriter;
      import org.apache.hadoop.mapreduce.TaskAttemptContext;
      import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.mapreduce.task.MapContextImpl;
       
      import java.io.IOException;
       
      public class TextMultipleOutputsFormat extends TextOutputFormat<Text, Text> {
          public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
                  throws IOException, InterruptedException {
              Job job = Job.getInstance(context.getConfiguration());
              LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
       
              return new RecordWriter<Text, Text>() {
       
                  MapContextImpl<Text, Text, NullWritable, Text> moContext = new MapContextImpl<>(job.getConfiguration(), context.getTaskAttemptID(),
                          nullnew DummyRecordWriter(), nullnullnull);
       
                  MultipleOutputs<NullWritable, Text> multipleOutputs = new MultipleOutputs<>(moContext);
       
                  @Override
                  public void write(Text key, Text value) throws IOException, InterruptedException {
                      multipleOutputs.write(NullWritable.get(), value, key.toString());
                  }
       
                  @Override
                  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                      multipleOutputs.close();
                  }
              };
          }
       
          private class DummyRecordWriter extends RecordWriter<NullWritable, Text> {
              public void write(NullWritable key, Text value) {
              }
       
              public void close(TaskAttemptContext context) {
              }
          }
      }
       job.setOutputFormatClass(TextMultipleOutputsFormat.class);