- Old hadoop API
- New hadoop API
import
org.apache.hadoop.io.{NullWritable, Text}
import
org.apache.hadoop.mapreduce.
_
import
org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
import
org.apache.hadoop.mapreduce.task.MapContextImpl
class
TextMultipleOutputsFormat
extends
TextOutputFormat[Text, Text] {
override
def
getRecordWriter(context
:
TaskAttemptContext)
:
RecordWriter[Text, Text]
=
new
RecordWriter[Text, Text] {
val
job
:
Job
=
Job.getInstance(context.getConfiguration)
val
moContext
=
new
MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
null
,
new
DummyRecordWriter,
null
,
null
,
null
)
val
multipleOutputs
=
new
MultipleOutputs[NullWritable, Text](moContext)
LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[
_
,
_
]])
override
def
write(key
:
Text, value
:
Text)
:
Unit
=
{
multipleOutputs.write(NullWritable.get, value, key.toString)
}
override
def
close(context
:
TaskAttemptContext)
:
Unit
=
multipleOutputs.close()
}
private
class
DummyRecordWriter
extends
RecordWriter[NullWritable, Text] {
override
def
write(key
:
NullWritable, value
:
Text)
:
Unit
=
()
override
def
close(context
:
TaskAttemptContext)
:
Unit
=
()
}
}
rdd.saveAsNewAPIHadoopFile[TextMultipleOutputsFormat](outputPath)
- Spark API
rdd.toDF(
"file"
,
"data"
).write.partitionBy(
"file"
).text(outputPath)
Tuesday, October 10, 2017
Specify the name of output file using key and remove it from output file in spark
- Old hadoop API
- New hadoop API
- Spark API
Specify the name of output file using key and remove it from output file in MR
- Old hadoop API
- New hadoop API
- Using mapper
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
java.io.IOException;
class
CustomMapper
extends
Mapper<Text, Text, NullWritable, Text> {
private
MultipleOutputs<NullWritable, Text> multipleOutputs;
@Override
protected
void
setup(Context context)
throws
IOException, InterruptedException {
multipleOutputs =
new
MultipleOutputs<>(context);
}
@Override
protected
void
map(Text key, Text value, Context context)
throws
IOException, InterruptedException {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
@Override
protected
void
cleanup(Context context)
throws
IOException, InterruptedException {
multipleOutputs.close();
}
}
job.setMapperClass(CustomMapper.
class
);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.
class
);
- Using OutputFormat
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.RecordWriter;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.mapreduce.task.MapContextImpl;
import
java.io.IOException;
public
class
TextMultipleOutputsFormat
extends
TextOutputFormat<Text, Text> {
public
RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws
IOException, InterruptedException {
Job job = Job.getInstance(context.getConfiguration());
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.
class
);
return
new
RecordWriter<Text, Text>() {
MapContextImpl<Text, Text, NullWritable, Text> moContext =
new
MapContextImpl<>(job.getConfiguration(), context.getTaskAttemptID(),
null
,
new
DummyRecordWriter(),
null
,
null
,
null
);
MultipleOutputs<NullWritable, Text> multipleOutputs =
new
MultipleOutputs<>(moContext);
@Override
public
void
write(Text key, Text value)
throws
IOException, InterruptedException {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
@Override
public
void
close(TaskAttemptContext context)
throws
IOException, InterruptedException {
multipleOutputs.close();
}
};
}
private
class
DummyRecordWriter
extends
RecordWriter<NullWritable, Text> {
public
void
write(NullWritable key, Text value) {
}
public
void
close(TaskAttemptContext context) {
}
}
}
job.setOutputFormatClass(TextMultipleOutputsFormat.
class
);
Subscribe to:
Posts (Atom)