- Old hadoop API
- New hadoop API
importorg.apache.hadoop.io.{NullWritable, Text}importorg.apache.hadoop.mapreduce._importorg.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}importorg.apache.hadoop.mapreduce.task.MapContextImplclassTextMultipleOutputsFormatextendsTextOutputFormat[Text, Text] {overridedefgetRecordWriter(context:TaskAttemptContext):RecordWriter[Text, Text]=newRecordWriter[Text, Text] {valjob:Job=Job.getInstance(context.getConfiguration)valmoContext=newMapContextImpl(job.getConfiguration, context.getTaskAttemptID,null,newDummyRecordWriter,null,null,null)valmultipleOutputs=newMultipleOutputs[NullWritable, Text](moContext)LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[_,_]])overridedefwrite(key:Text, value:Text):Unit={multipleOutputs.write(NullWritable.get, value, key.toString)}overridedefclose(context:TaskAttemptContext):Unit=multipleOutputs.close()}privateclassDummyRecordWriterextendsRecordWriter[NullWritable, Text] {overridedefwrite(key:NullWritable, value:Text):Unit=()overridedefclose(context:TaskAttemptContext):Unit=()}}rdd.saveAsNewAPIHadoopFile[TextMultipleOutputsFormat](outputPath) - Spark API
rdd.toDF("file","data").write.partitionBy("file").text(outputPath)
Tuesday, October 10, 2017
Specify the name of output file using key and remove it from output file in spark
- Old hadoop API
- New hadoop API
- Spark API
Specify the name of output file using key and remove it from output file in MR
- Old hadoop API
- New hadoop API
- Using mapper
importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.output.MultipleOutputs;importjava.io.IOException;classCustomMapperextendsMapper<Text, Text, NullWritable, Text> {privateMultipleOutputs<NullWritable, Text> multipleOutputs;@Overrideprotectedvoidsetup(Context context)throwsIOException, InterruptedException {multipleOutputs =newMultipleOutputs<>(context);}@Overrideprotectedvoidmap(Text key, Text value, Context context)throwsIOException, InterruptedException {multipleOutputs.write(NullWritable.get(), value, key.toString());}@Overrideprotectedvoidcleanup(Context context)throwsIOException, InterruptedException {multipleOutputs.close();}}job.setMapperClass(CustomMapper.class);LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); - Using OutputFormat
importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.RecordWriter;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importorg.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;importorg.apache.hadoop.mapreduce.lib.output.MultipleOutputs;importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;importorg.apache.hadoop.mapreduce.task.MapContextImpl;importjava.io.IOException;publicclassTextMultipleOutputsFormatextendsTextOutputFormat<Text, Text> {publicRecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)throwsIOException, InterruptedException {Job job = Job.getInstance(context.getConfiguration());LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);returnnewRecordWriter<Text, Text>() {MapContextImpl<Text, Text, NullWritable, Text> moContext =newMapContextImpl<>(job.getConfiguration(), context.getTaskAttemptID(),null,newDummyRecordWriter(),null,null,null);MultipleOutputs<NullWritable, Text> multipleOutputs =newMultipleOutputs<>(moContext);@Overridepublicvoidwrite(Text key, Text value)throwsIOException, InterruptedException {multipleOutputs.write(NullWritable.get(), value, key.toString());}@Overridepublicvoidclose(TaskAttemptContext context)throwsIOException, InterruptedException {multipleOutputs.close();}};}privateclassDummyRecordWriterextendsRecordWriter<NullWritable, Text> {publicvoidwrite(NullWritable key, Text value) {}publicvoidclose(TaskAttemptContext context) {}}}job.setOutputFormatClass(TextMultipleOutputsFormat.class);
Subscribe to:
Comments (Atom)