Friday, December 30, 2016

Zookeeper

  • supervisory process
    • Zookeeper is designed to “fail fast,” meaning it will shut down if an error occurs that it can’t recover from.
    • Because of this, we must have a supervisory process manage the Zookeeper instances so that if a Zookeeper instance does go down, the cluster as a whole can continue handling requests
    • The supervisory process will handle restarting any failed individual Zookeeper server, allowing the Zookeeper cluster to be self-healing
  • log management
    • Because Zookeeper is a long-running process, its transaction logs can get quite large
    • This will eventually result in Zookeeper running out of disk space
    • Therefore, it’s critical to set up some sort of process to compact (and even archive) the data produced in these logs

Storm

  • controlling the rate data flows into a topology
    • builder.setSpout(...).setMaxSpoutPending(N)
    • N > the total prallelism
  • metrics-collecting API
    • use it for testing
  • configuration
    • topology.stats.sample.rate
      • set to 1 for testing and verifying

Spark Action VS Spark Submit

  • spark action
    • <workflow-app xmlns="uri:oozie:workflow:0.3" name="oozie-spark-action-test">
      <start to="spark-node" />
      <action name="spark-node">
      <spark xmlns="uri:oozie:spark-action:0.1">
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <master>yarn-cluster</master>
      <name>person</name>
      <class>com.spark.batch.Generator</class>
      <jar>${nameNode}/user/${wf:user()}/${appDir}/lib/spark-0.0.3-jar-with-dependencies.jar</jar>
      <spark-opts>--executor-memory 19g --num-executors 31 --executor-cores 3 --driver-memory 9g --driver-cores 2 --conf spark.yarn.historyServer.address=http://lqahadoopdata03.net:18080 --conf spark.eventLog.dir=${nameNode}/spark-history --conf spark.eventLog.enabled=true</spark-opts>
      <arg>-P=job.conf</arg>
      <arg>-C</arg>
      </spark>
      <ok to="end" />
      <error to="fail" />
      </action>
      <kill name="fail">
      <message>Workflow failed, error
      message[${wf:errorMessage(wf:lastErrorNode())}] </message>
      </kill>
      <end name="end" />
      </workflow-app>
      • spark submit
        • spark-submit --master yarn-cluster --name person --executor-memory 19g --num-executors 31 --executor-cores 3 --driver-memory 9g --driver-cores 2 --files /usr/hdp/current/spark-client/conf/hive-site.xml,/home/hdfs/person/conf/job.conf --driver-class-path /home/hdfs/lib/sqljdbc4.jar --jars /usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar,/home/hdfs/lib/sqljdbc4.jar --class com.spark.batch.Generator spark-0.0.3-jar-with-dependencies.jar -P job.conf -C
  • spark action (oozie) VS spark submit (spark)
    • 항목
      oozie
      spark
      master<master></master>--master
      name<name></name>--name
      class<class></class>--class
      app (JAR)<jar></jar>--class 뒤
      spark options<spark-opts></spark-opts>--num-executors, --executor-cores, --executor-memory, 기타
      app arguments<arg></arg>app (JAR) 뒤
      file(s)${oozie-workflow-dir}/lib (HDFS)--files
      JAR(s)${oozie-workflow-dir}/lib (HDFS)--jars
      other- app option 지정 시 key와 value 사이에 '=' 부호를 사용하여야 함
      - 예, -P=job.conf
      - app option 지정 시 key와 value 사이에 '=' 부호가 없어도 됨
      - 예, -P job.conf

Spark 1.6

  • Hardware
  • Conf
    • spark.yarn.maxAppAttempts
    • spark.yarn.am.attemptFailuresValidityInterval
    • spark.default.parallelism
    • spark.rdd.compress
    • spark.streaming.concurrentJobs
    • spark.scheduler.mode=fair
      • Actions in a job can execute in parallel.
    • spark.task.cpus
      • Use this setting for heavy-duty, CPU-bound tasks. Examples of such operations include compression and matrix multiplication.
    • parallelized collection
      • Typically you want 2-4 partitions for each CPU in your cluster. 
      • Normally, Spark tries to set the number of partitions automatically based on your cluster. 
      • However, you can also set it manually by passing it as a second parameter to parallelize 
    • spark.sql.orc.filterPushdown
    • spark.serializer=org.apache.spark.serializer.KryoSerializer
    • spark.cleaner.ttl
      • If you are planning to run Spark for a long time on a cluster, you may wish to enable it
    • spark.sql.codegen
      • for large or repeated queries
    • spark.sql.inMemoryColumnarStorage.compressed
    • spark.sql.inMemoryColumnarStorage.batchSize
    • spark.sql.autoBroadcastJoinThreshold
    • spark.sql.tungsten.enabled
    • spark.sql.shuffle.partitions
      • Configures the number of partitions to use when shuffling data for joins or aggregations.
    • spark.memory.fraction
      • Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended.
    • spark.streaming.blockInterval
      • For receiver
  • RDD
    • partitioning
      • operations that benefit from partitioning
        • cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), lookup()
      • If both RDDs have the same partitioner, and if they are cached on the same machines (e.g., one was created using mapValues() on the other, which preserves keys and partitioning) or if one of them has not yet been computed, then no shuffling across the network will occur.
      • all the operations that result in a partitioner being set on the output RDD
        • cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort()
        • if the parent RDD has a partitioner
          • mapValues(), flatMapValues(), filter()
  • API
    • RDD
      • coalesce
        • coalesce causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce, which may be undesirable in some cases. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce to true or by using the repartition function instead.
          • val log2 = log.coalesce(1, true)
        • With shuffle = true, you can actually coalesce to a larger number of partitions
          • val log3 = log2.coalesce(100, true)
  • Debug
    • RDD.toDebugString()
    • YARN UI
    • spark job history UI
    • http://<host>:8088/proxy/<job_id>/environment/
    • http://<host>:8088/proxy/<app_id>/stages/
    • yarn logs -applicationId <app_id>
    • Flame graph
  • Job
    • Executor processes will be not released if the job has not finished, even if they are no longer in use. Therefore, please do not overallocate executors above your estimated requirements. 
    • Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action). There are tradeoffs between num-executors and executor-memory. 
    • Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs
  • JVM
    • java 1.7+
      • G1: garbage-first GC
    • G1GC suffers from fragmentation due to humongous allocations, if object size is more than 32 MB, then use parallel GC instead of G1GC
      • .set("spark.executor.extraJavaOptions""-XX:ParallelGCThreads=4 -XX:+UseParallelGC")
    • GMS is recommended for streaming job
      • --driver-java-options -XX:+UseConcMarkSweepGC
      • .set("spark.executor.extraJavaOptions""-XX:+UseConcMarkSweepGC")
  • GC
    • Using fewer objects and the data structures that use fewer objects (simpler and smaller data structures, such as arrays) helps
      • Use primitive types rather than custom classes
      • Use arrays rather than case classes or tuples
      • Within a function, it is often beneficial to avoid intermediate object creation. It is important to remember that converting between types (such as between different flavors of Scala collections) creates intermediate objects.
    • Serialization also shines here as a byte array needs only one object to be garbage collected
    • If you find that your Spark cluster uses too much time for collecting garbage, you can reduce the amount of space used for RDD caching by changing spark.storage.memoryFraction
    • Persist RDDs using off_heap storage level.
    • Use more executors with smaller heap sizes.
  • Shuffle
    • Early projection and filtering
    • Always use a combiner
      • Use reduceByKey(_ + _) instead of groupByKey().mapValues(_.sum)
    • Generous parallelism
      • Increase the parallelism of *ByKey() tasks
    • File Consolidation
      • In situations with a large number of reduce tasks, it is useful to consolidate intermediate files to improve disk seeks. Setting spark.shuffle.consolidateFiles to true turns on this consolidation. This invariably improves performance in Ext4 and XFS file systems. For Ext3, in certain cases, it may actually degrade performance, especially on machines with more than eight cores.
    • More memory
      • The executor Java heap is shared between RDDs, shuffle, and application objects. By default, RDDs use 60% ( spark.storage.memoryFraction ) of the memory, and shuffle has 20% at its disposal ( spark.shuffle. memoryFraction ). Excessive use spills the contents of the aggregation phase of the shuffle to disk. If your application contains many shuffle steps, you should consider increasing the share of shuffle memory to reduce the number of spills to disk. This obviously trades RDD storage memory for shuffle memory.
  • Join
    • RDD
      • Known partitioner join
        def joinScoresWithAddress3(scoreRDD: RDD[(Long, Double)],
        addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
        // If addressRDD has a known partitioner we should use that,
        // otherwise it has a default hash parttioner, which we can reconstruct by
        // getting the number of partitions.
        val addressDataPartitioner = addressRDD.partitioner match {
        case (Some(p)) => p
        case (None) =new HashPartitioner(addressRDD.partitions.length)
        }
        val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner,
        (x, y) =if(x > y) x else y)
        bestScoreData.join(addressRDD)
        }
      • Manual broadcast hash join
        def manualBroadCastHashJoin[K : Ordering : ClassTag, V1 : ClassTag,
        V2 : ClassTag](bigRDD : RDD[(K, V1)],
         smallRDD : RDD[(K, V2)])= {
         val smallRDDLocal: Map[K, V2= smallRDD.collectAsMap()
         bigRDD.sparkContext.broadcast(smallRDDLocal)
         bigRDD.mapPartitions(iter => {
         iter.flatMap{
         case (k,v1 =>
         smallRDDLocal.get(k) match {
         case None => Seq.empty[(K, (V1, V2))]
         case Some(v2=> Seq((k, (v1, v2)))
         }
         }
         }, preservesPartitioning = true)
         }
        //end:coreBroadCast[]
        }
      • Partial manual broadcast hash join
        • Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition. In this case you can use countByKeyApprox2 on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using sc.broadcast you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include a large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.
    • Dataframe
      • Broadcast hash join
        • df1.join(broadcast(df2), "key")
        • spark.sql.conf.autoBroadcastJoinThreshold
  • Memory
    • Serialization can reduce memory use.
    • Another option is alluxio.
  • CPU
    • Both compression and serialization lead to more CPU cycles. 
    • This trend has also pushed the need for CPU optimization to reduce the CPU cycle cost
  • Kafka
    • Receiver-based consumer
      • Parallelism
        • Create multiple topics and load-balance messages across them. These multiple KafkaInputDStreams can then be merged in a union() operation.
      • Cons
        • Under certain circumstances, this approach does not ensure exactly once semantics and falls back on at-least-once: for instance, if the receiver has received messages and crashes before the offsets have been updated in ZooKeeper. 
        • In addition, maintaining a write-ahead log affects performance because messages need to be written to HDFS before they can be consumed.
    • Direct consumer (recommended)
      • Pros
        • Performance: Unlike the receiver approach, in which data is replicated twice (once by Kafka and then by the write-ahead log), the data-replication buck in the direct model is passed to Kafka. The Spark Streaming application only maintains offsets.
        • Parallelism: There is a one-to-one mapping between Kafka partitions and RDD partitions. This means you do not need to load-balance data across topics and consumers or create multiple KafkaInputDStreams to take advantage of parallelism.
        • No need to provide a persistence level because re-reading data directly from Kafka ensures fault tolerance.
      • Cons
        • For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see org.apache.spark.streaming.kafka.HasOffsetRanges)
    • More about Receiver-based V.S. Direct mode.
  • Dstream
    • No data loss for streaming
      • File source
        • Enable checkpointing
          • val ssc = new StreamingContext(...)
          • ssc.checkpoint(checkpointDirectory)
        • Checkpointing gotchas
          • Checkpoints don't work across app or spark upgrades
          • Clear out (or change) checkpointing directory across upgrades
      • Receiver based sources
        • Enable checkpointing
        • Enable Write Ahead Log (WAL)
          • spark.streaming.receiver.writeAheadLog.enable = true
        • WAL gotchas
          • Makes a copy of all data on disk
          • Use StorageLevel.MEMORY_AND_DISK_SER storage level for your DStream
      • Kafka
        • Use direct connector
          • No need for a WAL with the direct connector
        • Direct connector gotchas
          • Need to track offsets for driver recovery
          • Track them yourself
            • In zookeeper, HDFS, or a database
          • For accuracy
            • Processing needs to be idempotent, OR
            • Update offsets in the same transaction when updating results
      • In structured streaming, state is stored in memory (backed by HDFS/S3 WAL), starting Spark 2.1
    • Graceful shutdown for streaming job
      • Thread hooks
        • Check for a flag every N seconds
        • def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
      • CMD line
        • spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID
        • spark.streaming.stopGracefullyOnShutdown = true
      • By marker file
        • Touch a file when starting the app on HDFS
        • Remove the file when you want to stop
        • Separate thread in spark app, calls 
          • streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    • Recovery function, Lazily instantiated singleton instance of SparkSession and preferred createDirectStream API
      • E.g.
        package test
         
        import kafka.serializer.StringDecoder
        import org.apache.spark.SparkConf
        import org.apache.spark.rdd.RDD
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.streaming.kafka.KafkaUtils
        import org.apache.spark.streaming.{Seconds, StreamingContext}
         
        object Dstream {
          def createStreamingContext(appName:String, checkpointPath:String):StreamingContext = {
            val bootstrapServers = "10.106.101.51:9092,10.106.101.52:9092,10.106.101.53:9092"
            val topic = "netmarbles.cleansing.log"
            val kafkaParams = Map[String, String](
              "bootstrap.servers" -> bootstrapServers
            )
            val topics = Set(topic)
            val conf = new SparkConf()
              .setAppName(appName)
              .set("spark.streaming.stopGracefullyOnShutdown""true")
            //.set("spark.streaming.backpressure.enabled", "true")
            //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         
            val ssc = new StreamingContext(conf, Seconds(1))
         
            ssc.checkpoint(checkpointPath)
         
            val log = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
         
            log.foreachRDD { (rdd: RDD[(String, String)]/*, time: Time*/) =>
              val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
              import spark.implicits._
              val logDF = rdd.toDF("c1""c2")
         
              logDF.show(false)
            }
         
            ssc
          }
         
          def main(args: Array[String]): Unit = {
            if (args.length != 1) {
              System.err.print("Usage: SparkStreamingApp <app name>")
              System.exit(1)
            }
         
            println("!!!!!!! 2")
         
            val Array(appName) = args
            val checkpointPath = "/tmp/spark/checkpointPath"
            val ssc = StreamingContext.getOrCreate(checkpointPath, () => createStreamingContext(appName, checkpointPath))
            // val ssc = StreamingContext.getOrCreate(checkpointPath, createStreamingContext _)
         
            ssc.start()
            ssc.awaitTermination()
          }
        }
         
        /**
          * Lazily instantiated singleton instance of SparkSession
          *
          * Able to avoid the following warning.
          * WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
          */
        object SparkSessionSingleton {
         
          @transient  private var instance: SparkSession = _
         
          def getInstance(sparkConf: SparkConf): SparkSession = {
            if (instance == null) {
              instance = SparkSession
                .builder
                .config(sparkConf)
                .getOrCreate()
            }
         
            instance
          }
        }
  • Python examples