- Hardware
- Conf
- spark.yarn.maxAppAttempts
- spark.yarn.am.attemptFailuresValidityInterval
- spark.default.parallelism
- spark.rdd.compress
- spark.streaming.concurrentJobs
- spark.scheduler.mode=fair
- Actions in a job can execute in parallel.
- spark.task.cpus
- Use this setting for heavy-duty, CPU-bound tasks. Examples of such operations include compression and matrix multiplication.
- parallelized collection
- Typically you want 2-4 partitions for each CPU in your cluster.
- Normally, Spark tries to set the number of partitions automatically based on your cluster.
- However, you can also set it manually by passing it as a second parameter to parallelize
- sc.textFile("hdfs://localhost:9000/user/hduser/words",10)
- sc.parallelize(data, 10)
- spark.sql.orc.filterPushdown
- spark.serializer=org.apache.spark.serializer.KryoSerializer
- spark.cleaner.ttl
- If you are planning to run Spark for a long time on a cluster, you may wish to enable it
- spark.sql.codegen
- for large or repeated queries
- spark.sql.inMemoryColumnarStorage.compressed
- spark.sql.inMemoryColumnarStorage.batchSize
- spark.sql.autoBroadcastJoinThreshold
- spark.sql.tungsten.enabled
- spark.sql.shuffle.partitions
- Configures the number of partitions to use when shuffling data for joins or aggregations.
- spark.memory.fraction
- Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended.
- spark.streaming.blockInterval
- For receiver
- For receiver
- RDD
- partitioning
- operations that benefit from partitioning
- cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), lookup()
- If both RDDs have the same partitioner, and if they are cached on the same machines (e.g., one was created using mapValues() on the other, which preserves keys and partitioning) or if one of them has not yet been computed, then no shuffling across the network will occur.
- all the operations that result in a partitioner being set on the output RDD
- cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort()
- if the parent RDD has a partitioner
- mapValues(), flatMapValues(), filter()
- operations that benefit from partitioning
- partitioning
- API
- RDD
- coalesce
- coalesce causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce, which may be undesirable in some cases. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce to true or by using the repartition function instead.
- val log2 = log.coalesce(1, true)
- With shuffle = true, you can actually coalesce to a larger number of partitions
- val log3 = log2.coalesce(100, true)
- coalesce causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce, which may be undesirable in some cases. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce to true or by using the repartition function instead.
- coalesce
- RDD
- Debug
- RDD.toDebugString()
- YARN UI
- spark job history UI
- http://<host>:8088/proxy/<job_id>/environment/
- http://<host>:8088/proxy/<app_id>/stages/
- yarn logs -applicationId <app_id>
- Flame graph
- Job
- Executor processes will be not released if the job has not finished, even if they are no longer in use. Therefore, please do not overallocate executors above your estimated requirements.
- Driver memory does not need to be large if the job does not aggregate much data (as with a collect() action). There are tradeoffs between num-executors and executor-memory.
- Large executor memory does not imply better performance, due to JVM garbage collection. Sometimes it is better to configure a larger number of small JVMs than a small number of large JVMs
- JVM
- java 1.7+
- G1: garbage-first GC
- G1GC suffers from fragmentation due to humongous allocations, if object size is more than 32 MB, then use parallel GC instead of G1GC
- .set("spark.executor.extraJavaOptions", "-XX:ParallelGCThreads=4 -XX:+UseParallelGC")
- GMS is recommended for streaming job
- --driver-java-options -XX:+UseConcMarkSweepGC
- .set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC")
- java 1.7+
- GC
- Using fewer objects and the data structures that use fewer objects (simpler and smaller data structures, such as arrays) helps
- Use primitive types rather than custom classes
- Use arrays rather than case classes or tuples
- Within a function, it is often beneficial to avoid intermediate object creation. It is important to remember that converting between types (such as between different flavors of Scala collections) creates intermediate objects.
- Serialization also shines here as a byte array needs only one object to be garbage collected
- If you find that your Spark cluster uses too much time for collecting garbage, you can reduce the amount of space used for RDD caching by changing spark.storage.memoryFraction
- Persist RDDs using off_heap storage level.
- Use more executors with smaller heap sizes.
- Using fewer objects and the data structures that use fewer objects (simpler and smaller data structures, such as arrays) helps
- Shuffle
- Early projection and filtering
- Always use a combiner
- Use reduceByKey(_ + _) instead of groupByKey().mapValues(_.sum)
- Generous parallelism
- Increase the parallelism of *ByKey() tasks
- File Consolidation
- In situations with a large number of reduce tasks, it is useful to consolidate intermediate files to improve disk seeks. Setting spark.shuffle.consolidateFiles to true turns on this consolidation. This invariably improves performance in Ext4 and XFS file systems. For Ext3, in certain cases, it may actually degrade performance, especially on machines with more than eight cores.
- More memory
- The executor Java heap is shared between RDDs, shuffle, and application objects. By default, RDDs use 60% ( spark.storage.memoryFraction ) of the memory, and shuffle has 20% at its disposal ( spark.shuffle. memoryFraction ). Excessive use spills the contents of the aggregation phase of the shuffle to disk. If your application contains many shuffle steps, you should consider increasing the share of shuffle memory to reduce the number of spills to disk. This obviously trades RDD storage memory for shuffle memory.
- Join
- RDD
- Known partitioner join
- Manual broadcast hash join
- Partial manual broadcast hash join
- Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition. In this case you can use countByKeyApprox2 on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using sc.broadcast you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include a large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.
- Dataframe
- Broadcast hash join
- df1.join(broadcast(df2), "key")
- spark.sql.conf.autoBroadcastJoinThreshold
- Broadcast hash join
- RDD
- Memory
- Serialization can reduce memory use.
- Another option is alluxio.
- CPU
- Both compression and serialization lead to more CPU cycles.
- This trend has also pushed the need for CPU optimization to reduce the CPU cycle cost
- Kafka
- Receiver-based consumer
- Parallelism
- Create multiple topics and load-balance messages across them. These multiple KafkaInputDStreams can then be merged in a union() operation.
- Cons
- Under certain circumstances, this approach does not ensure exactly once semantics and falls back on at-least-once: for instance, if the receiver has received messages and crashes before the offsets have been updated in ZooKeeper.
- In addition, maintaining a write-ahead log affects performance because messages need to be written to HDFS before they can be consumed.
- Parallelism
- Direct consumer (recommended)
- Pros
- Performance: Unlike the receiver approach, in which data is replicated twice (once by Kafka and then by the write-ahead log), the data-replication buck in the direct model is passed to Kafka. The Spark Streaming application only maintains offsets.
- Parallelism: There is a one-to-one mapping between Kafka partitions and RDD partitions. This means you do not need to load-balance data across topics and consumers or create multiple KafkaInputDStreams to take advantage of parallelism.
- No need to provide a persistence level because re-reading data directly from Kafka ensures fault tolerance.
- Cons
- For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see org.apache.spark.streaming.kafka.HasOffsetRanges)
- Pros
- More about Receiver-based V.S. Direct mode.
- Receiver-based consumer
- Dstream
- No data loss for streaming
- File source
- Enable checkpointing
- val ssc = new StreamingContext(...)
- ssc.checkpoint(checkpointDirectory)
- Checkpointing gotchas
- Checkpoints don't work across app or spark upgrades
- Clear out (or change) checkpointing directory across upgrades
- Receiver based sources
- Enable checkpointing
- Enable Write Ahead Log (WAL)
- spark.streaming.receiver.writeAheadLog.enable = true
- WAL gotchas
- Makes a copy of all data on disk
- Use StorageLevel.MEMORY_AND_DISK_SER storage level for your DStream
- Kafka
- Use direct connector
- No need for a WAL with the direct connector
- Direct connector gotchas
- Need to track offsets for driver recovery
- Track them yourself
- In zookeeper, HDFS, or a database
- For accuracy
- Processing needs to be idempotent, OR
- Update offsets in the same transaction when updating results
- In structured streaming, state is stored in memory (backed by HDFS/S3 WAL), starting Spark 2.1
- Graceful shutdown for streaming job
- Thread hooks
- Check for a flag every N seconds
- def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
- CMD line
- spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID
- spark.streaming.stopGracefullyOnShutdown = true
- By marker file
- Touch a file when starting the app on HDFS
- Remove the file when you want to stop
- Separate thread in spark app, calls
- streamingContext.stop(stopSparkContext = true, stopGracefully = true)
- Thread hooks
- Recovery function, Lazily instantiated singleton instance of SparkSession and preferred createDirectStream API
- E.g.
- No data loss for streaming
- Python examples
Friday, December 30, 2016
Spark 1.6
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.