Tuesday, October 10, 2017

Specify the name of output file using key and remove it from output file in MR

  • Old hadoop API
  • New hadoop API
    • Using mapper
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
       
      import java.io.IOException;
       
      class CustomMapper extends Mapper<Text, Text, NullWritable, Text> {
          private MultipleOutputs<NullWritable, Text> multipleOutputs;
       
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              multipleOutputs = new MultipleOutputs<>(context);
          }
       
          @Override
          protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
              multipleOutputs.write(NullWritable.get(), value, key.toString());
          }
       
          @Override
          protected void cleanup(Context context) throws IOException, InterruptedException {
              multipleOutputs.close();
          }
      }
      job.setMapperClass(CustomMapper.class);
      LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    • Using OutputFormat
       import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.RecordWriter;
      import org.apache.hadoop.mapreduce.TaskAttemptContext;
      import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.mapreduce.task.MapContextImpl;
       
      import java.io.IOException;
       
      public class TextMultipleOutputsFormat extends TextOutputFormat<Text, Text> {
          public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
                  throws IOException, InterruptedException {
              Job job = Job.getInstance(context.getConfiguration());
              LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
       
              return new RecordWriter<Text, Text>() {
       
                  MapContextImpl<Text, Text, NullWritable, Text> moContext = new MapContextImpl<>(job.getConfiguration(), context.getTaskAttemptID(),
                          nullnew DummyRecordWriter(), nullnullnull);
       
                  MultipleOutputs<NullWritable, Text> multipleOutputs = new MultipleOutputs<>(moContext);
       
                  @Override
                  public void write(Text key, Text value) throws IOException, InterruptedException {
                      multipleOutputs.write(NullWritable.get(), value, key.toString());
                  }
       
                  @Override
                  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                      multipleOutputs.close();
                  }
              };
          }
       
          private class DummyRecordWriter extends RecordWriter<NullWritable, Text> {
              public void write(NullWritable key, Text value) {
              }
       
              public void close(TaskAttemptContext context) {
              }
          }
      }
       job.setOutputFormatClass(TextMultipleOutputsFormat.class);

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.