Monday, September 4, 2017

Remove keys from output files in MR

E.g.
public class WordCount {
 
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setOutputFormat(CustomMultipleTextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}
 
class CustomMultipleTextOutputFormat extends MultipleTextOutputFormat<Object, Object> {
    protected Object generateActualKey(Object key, Object value) {
        return NullWritable.get();
    }
}

Specify the name of output files in MR

E.g.
public class WordCount {
 
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setOutputFormat(CustomMultipleTextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}
 
class CustomMultipleTextOutputFormat extends MultipleTextOutputFormat<Object, Object> {
    protected String generateFileNameForKeyValue(Object key, Object value, String name) {
        return key.toString() + "-" + name;
    }
}

Remove keys from output files in spark

E.g.
package test
 
import com.cotdp.hadoop.ZipFileInputFormat
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.sql.SparkSession
 
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()
}
 
object test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test").getOrCreate()
 
    val zipFileRDD = spark.sparkContext.newAPIHadoopFile("/tmp/spark", classOf[ZipFileInputFormat],
      classOf[Text],
      classOf[BytesWritable],
      spark.sparkContext.hadoopConfiguration)
 
    println("The file name in the zipFile is: " + zipFileRDD.map(s => s._1.toString).first)
    println("The file contents are: " + zipFileRDD.map(s =new String(s._2.getBytes, "UTF-8")).first)
 
    zipFileRDD.map(s => (s._1.toString, new String(s._2.getBytes, "UTF-8"))).saveAsHadoopFile("/tmp/test/zip2", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}