Monday, November 30, 2015

Spark basis

  1. hardware provisioning
    1. storage systems
      1. run on the same nodes as HDFS 
        1. yarn
        2. with a fixed amount memory and cores dedicated to Spark on each node
      2. run on different nodes in the same local-area network as HDFS
      3. run computing jobs on different nodes than the storage system
    2. local disks
      1. 4-8 disks per node
      2. without RAID(just as separate mount points)
      3. noatime option
      4. configurate the spark.local.dir variable to be a comma-separated list of the local disks
      5. same disks as HDFS, if running HDFS
    3. memory
      1. 8 GB - hundreds of GB
      2. 75% of the memory
      3. if memory > 200 GB, then run multiple worker JVMs per node
        1. standalone mode
          1. conf/spark-env.sh
            1. SPARK_WORKER_INSTANCES: set the number of workers per node
            2. SPARK_WORKER_CORES: the number of cores per worker
    4. netowrk
      1. >= 10 gigabit
    5. CPU cores
      1. 8-16 cores per machine, or more
    6. reference
      1. https://spark.apache.org/docs/latest/hardware-provisioning.html
  2. third-party hadoop distributions
    1. CDH
    2. HDP (recommended)
    3. inheriting cluster configuration
      1. spark-env.sh
        1. HADOOP_CONF_DIR
          1. hdfs-site.xml
          2. core-site.xml
    4. reference
      1. https://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
  3. external tools
    1. cluster-wide monitoring tool
      1. Gangila
    2. OS profiling tools
      1. dstat
      2. iostat
      3. iotop
    3. JVM utilities
      1. jstack
      2. jmap
      3. jstat
      4. jconsole
  4. optimization
    problem
    configuration
    out of memorysysctl -w vm.max_map_count=65535
    spark.storage.memoryMapThreshhold 131072
    too many open filessysctl -w fs.file-max=1000000
    spark.shuffle.consolidateFiles true
    spark.shuffle.manager sort
    connection reset by peer-XX:+UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=12 -XX:NewRatio=3 -XX:SurvivorRatio=3
    error communication with MapOutputTrackerspark.akka.askTimeout 120
    spark.akka.lookupTimeout 120
  5. configuration
    1. 75% of a machine's memory (standalone)
    2. minimum executor heap size: 8 GB
    3. maximum executor heap size: 40 GB / under 45 GB (watch GC)
    4. kryo serialization
    5. parallel (old) / CMS / G1 GC
    6. pypy > cpython
  6. notification
    1. memory usage is not same as data size (2x, 3x bigger)
    2. prefer reduceby than groupby
    3. there are limitations when using python with spark streaming (at least for now)

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.