import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
import org.apache.hadoop.mapreduce.task.MapContextImpl
class TextMultipleOutputsFormat extends TextOutputFormat[Text, Text] {
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Text, Text] =
new RecordWriter[Text, Text] {
val job: Job = Job.getInstance(context.getConfiguration)
val moContext = new MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
null, new DummyRecordWriter, null, null, null)
val multipleOutputs = new MultipleOutputs[NullWritable, Text](moContext)
LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[_, _]])
override def write(key: Text, value: Text): Unit = {
multipleOutputs.write(NullWritable.get, value, key.toString)
}
override def close(context: TaskAttemptContext): Unit = multipleOutputs.close()
}
private class DummyRecordWriter extends RecordWriter[NullWritable, Text] {
override def write(key: NullWritable, value: Text): Unit = ()
override def close(context: TaskAttemptContext): Unit = ()
}
}
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.