import
org.apache.hadoop.io.{NullWritable, Text}
import
org.apache.hadoop.mapreduce.
_
import
org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
import
org.apache.hadoop.mapreduce.task.MapContextImpl
class
TextMultipleOutputsFormat
extends
TextOutputFormat[Text, Text] {
override
def
getRecordWriter(context
:
TaskAttemptContext)
:
RecordWriter[Text, Text]
=
new
RecordWriter[Text, Text] {
val
job
:
Job
=
Job.getInstance(context.getConfiguration)
val
moContext
=
new
MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
null
,
new
DummyRecordWriter,
null
,
null
,
null
)
val
multipleOutputs
=
new
MultipleOutputs[NullWritable, Text](moContext)
LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[
_
,
_
]])
override
def
write(key
:
Text, value
:
Text)
:
Unit
=
{
multipleOutputs.write(NullWritable.get, value, key.toString)
}
override
def
close(context
:
TaskAttemptContext)
:
Unit
=
multipleOutputs.close()
}
private
class
DummyRecordWriter
extends
RecordWriter[NullWritable, Text] {
override
def
write(key
:
NullWritable, value
:
Text)
:
Unit
=
()
override
def
close(context
:
TaskAttemptContext)
:
Unit
=
()
}
}
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.