Monday, December 4, 2017

Spark and Jupyter

  • Download
  • Install
    • chmod +x Anaconda2-5.0.1-Linux-x86_64.sh
    • sudo bash Anaconda2-5.0.1-Linux-x86_64.sh
  • Secure
    • source .bashrc
    • jupyter notebook --generate-config
    • jupyter notebook password
  • IP
    • vim .jupyter/jupyter_notebook_config.py
    • #c.NotebookApp.ip = 'localhost' → c.NotebookApp.ip = '*'
  • PySpark
    • vim .bashr
    • Add the following contents
      PYSPARK_PYTHON=/usr/bin/python
      PYSPARK_DRIVER_PYTHON=/usr/bin/python
      SPARK_HOME=/usr/hdp/current/spark2-client/
      PATH=$PATH:/usr/hdp/current/spark2-client/bin
      PYSPARK_DRIVER_PYTHON=jupyter
      PYSPARK_DRIVER_PYTHON_OPTS=notebook
    • sudo su -
    • mkdir -p /usr/local/share/jupyter/kernels/pyspark
    • chown -R test_user:test_user jupyter/
    • exit
    • vim /usr/local/share/jupyter/kernels/pyspark/kernel.json
    • Add the following contents
      {
       "display_name""PySpark",
       "language""python",
       "argv": [ "/home/test_user/anaconda2/bin/python""-m""ipykernel",
       "-f""{connection_file}" ],
       "env": {
       "SPARK_HOME""/usr/hdp/current/spark2-client/",
       "PYSPARK_PYTHON":"/home/test_user/anaconda2/bin/python",
       "PYTHONPATH""/usr/hdp/current/spark2-client/python/:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip",
       "PYTHONSTARTUP""/usr/hdp/current/spark2-client/python/pyspark/shell.py",
       "PYSPARK_SUBMIT_ARGS""--master yarn pyspark-shell"
       }
      }
  • Run
    • jupyter notebook

Tuesday, October 10, 2017

Specify the name of output file using key and remove it from output file in spark

  • Old hadoop API
  • New hadoop API
    import org.apache.hadoop.io.{NullWritable, Text}
    import org.apache.hadoop.mapreduce._
    import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs, TextOutputFormat}
    import org.apache.hadoop.mapreduce.task.MapContextImpl
     
    class TextMultipleOutputsFormat extends TextOutputFormat[Text, Text] {
     
      override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Text, Text] =
        new RecordWriter[Text, Text] {
          val job: Job = Job.getInstance(context.getConfiguration)
     
          val moContext = new MapContextImpl(job.getConfiguration, context.getTaskAttemptID,
            nullnew DummyRecordWriter, nullnullnull)
     
          val multipleOutputs = new MultipleOutputs[NullWritable, Text](moContext)
     
          LazyOutputFormat.setOutputFormatClass(job, classOf[TextOutputFormat[__]])
     
          override def write(key: Text, value: Text): Unit = {
            multipleOutputs.write(NullWritable.get, value, key.toString)
          }
     
          override def close(context: TaskAttemptContext): Unit = multipleOutputs.close()
        }
     
      private class DummyRecordWriter extends RecordWriter[NullWritable, Text] {
        override def write(key: NullWritable, value: Text): Unit = ()
     
        override def close(context: TaskAttemptContext): Unit = ()
      }
     
    }
    rdd.saveAsNewAPIHadoopFile[TextMultipleOutputsFormat](outputPath)
  • Spark API
    rdd.toDF("file""data").write.partitionBy("file").text(outputPath)