Wednesday, December 31, 2014

ElasticSearch

  1. Introduction

    1. Elasticsearch is an open-source search engine built on top of Apache Lucene™ , a full-text search engine library
    2. Elasticsearch is a real-time distributed search and analytics engine
    3. It allows you to explore your data at a speed and at a scale never before possible
    4. It is used for full text search, structured search, analytics, and all three in combination

  2. Why ES

    1. reason 1
      1. Unfortunately, most databases are astonishingly inept at extracting actionable knowledge from your data
      2. Can they perform full-text search, handle synonyms and score documents by relevance?
      3. Can they generate analytics and aggregations from the same data?
      4. Most importantly, can they do this in real-time without big batch processing jobs?
    2. reason 2
      1. A distributed real-time document store where every field is indexed and searchable
      2. A distributed search engine with real-time analytics
      3. Capable of scaling to hundreds of servers and petabytes of structured and unstructured data

  3. Installation

    1. curl -L -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.2.zip
    2. unzip elasticsearch-1.3.2.zip
    3. cd elasticsearch-1.3.2
    4. ./bin/elasticsearch -d
  4. Cluster
    1. 3 lower resource master-eligible nodes in large clusters
    2. light wight client nodes
    3. metal is more configurable
    4. metal can utilize SSD
  5. Commnad

    1. curl 'http://localhost:9200/?pretty'
    2. curl -XPOST 'http://localhost:9200/_shutdown'
    3. curl -XPOST 'http://localhost:9200/_cluster/nodes/_local/_shutdown'
    4. curl -XPOST 'http://localhost:9200/_cluster/nodes/nodeId1,nodeId2/_shutdown'
    5. curl -XPOST 'http://localhost:9200/_cluster/nodes/_master/_shutdown'
  6. Configuration

    1. config/elasticsearch.yml
      1. cluster.name
      2. node.name
      3. node.master
      4. node.data
      5. path.*
        1. path.conf: -Des.path.conf
        2. path.data
        3. path.work
        4. path.logs
      6. discovery.zen.ping.multicast.enabled: false
      7. discovery.zen.ping.unicast.hosts
      8. gateway.recover_after_nodes: n
      9. discovery.zen.minimum_master_nodes: (n/2) + 1
      10. action.disable_delete_all_indices: true
      11. action.auto_create_index: false
      12. action.destructive_requires_name: true
      13. index.mapper.dynamic: false
      14. script.disable_dynamic: true
    2. dynamic
      1. discovery.zen.minimum_master_nodes
        curl -XPUT localhost:9200/_cluster/settings -d '{
          "persistent" : {
            "discovery.zen.minimum_master_nodes" : (n/2) + 1
          }
        }'
      2. disable _all
        PUT /my_index/_mapping/my_type
        {
            "my_type": {
                "_all": { "enabled": false }
            }
        }
      3. include_in_all
        PUT /my_index/my_type/_mapping
        {
            "my_type": {
                "include_in_all": false,
                "properties": {
                    "title": {
                        "type":           "string",
                        "include_in_all": true
                    },
                    ...
                }
            }
        }
      4. _alias, _aliases
        1. PUT /my_index_v1 
          PUT /my_index_v1/_alias/my_index
        2. POST /_aliases
          {
              "actions": [
                  { "remove": { "index": "my_index_v1", "alias": "my_index" }},
                  { "add":    { "index": "my_index_v2", "alias": "my_index" }}
              ]
          }
      5. refresh_interval (bulk indexing)
        1. PUT /my_logs
          {
            "settings": {
              "refresh_interval": "30s" 
            }
          }
        2. POST /my_logs/_settings
          { "refresh_interval": -1 } 
          
          POST /my_logs/_settings
          { "refresh_interval": "1s" } 
      6. flush
        1. POST /blogs/_flush 
          
          POST /_flush?wait_for_ongoing
      7. optimize
        1. POST /logstash-old-index/_optimize?max_num_segments=1
      8. filed length norm (for logging)
        1. PUT /my_index
          {
            "mappings": {
              "doc": {
                "properties": {
                  "text": {
                    "type": "string",
                    "norms": { "enabled": false } 
                  }
                }
              }
            }
          }
      9. tune cluster and index recovery settings (test the value)
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.node_initial_primary_recoveries":25}}'
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.node_concurrent_recoveries":5}}'
        ?
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.recovery.max_bytes_per_sec":"100mb"}}'
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.recovery.concurrent_streams":20}}'
    3. logging.yml
      1. use node.name instead of cluster.name
        file: ${path.logs}/${node.name}.log
    4. elasticsearch.in.sh
      1. disable HeapDumpOnOutOfMemoryError
        #JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError"
    5. ES_HEAP_SIZE: 50%
    6. heaps < 32GB
    7. no swap
      1. bootstrap.mlockall = true
      2. ulimit -l unlimited
    8. thread pools
      1. thread pool size
        1. search - 3 * # of processors (3 * 64 = 192)
        2. index - 2 * # of processors (2 * 64 = 128)
        3. bulk - 3 * # of processors (3 * 64 = 192)
      2. queues - set the size to -1 to prevent rejections from ES
    9. buffers
      1. increased indexing buffer size to 40%
    10. dynamic node.name
      1. ES script
        export ES_NODENMAE=`hostname -s`
      2. elasticsearch.yml
        node.name: "${ES_NODENAME}"
  7. Hardware
    1. CPU
      1. core
    2. disk
      1. SSD
        1. noop / deadline scheduler
        2. better IOPS
        3. cheaper WRT: IOPS
        4. manufacturing tolerance can vary
      2. RAID
        1. do not necessarily need
        2. ES handles redundancy
  8. Monitoring

    1. curl 'localhost:9200/_cluster/health'
    2. curl 'localhost:9200/_nodes/process'
      1. max_file_descriptotrs: 30000?
    3. curl 'localhost:9200/_nodes/jvm'
      1. version
      2. mem.heap_max
    4. curl 'localhost:9200/_nodes/jvm/stats'
      1. heap_used
    5. curl 'localhost:9200/_nodes/indices/stats'
      1. fielddata
    6. curl 'localhost:9200/_nodes/indices/stats?fields=created_on'
      1. fields
    7. curl 'localhost:9200/_nodes/http/stats'
      1. http
    8. GET /_stats/fielddata?fields=*
    9. GET /_nodes/stats/indices/fielddata?fields=*
    10. GET /_nodes/stats/indices/fielddata?level=indices&fields=*
  9. Scenario

    1. adding nodes
      1. disable allocation to stop shard shuffling until ready
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.disable_allocation":true}}'
      2. increase speed of transfers
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"indices.recovery.concurrent_streams":6,"indices.recovery.max_bytes_per_sec":"50mb"}}'
      3. start new nodes
      4. enable allocation
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.disable_allocation":false}}'
    2. removing nodes
      1. exclude the nodes from the cluster, this will tell ES to move things off
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.exclude._name":"node-05*,node-06*"}}'
      2. increase speed of transfers
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"indices.recovery.concurrent_streams":6,"indices.recovery.max_bytes_per_sec":"50mb"}}'
      3. shutdown old nodes after all shards move off
        curl -XPOST 'localhost:9200/_cluster/nodes/node-05*,node-06*/_shutdown'
    3. upgrades / node restarts
      1. disable auto balancing  if doing rolling restarts
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.disable_allocation":true}}'
      2. restart
      3. able auto balancing
        curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.disable_allocation":false}}'
    4. re / bulk indexing
      1. set replicas to 0
      2. increase after completion
  10. Restoration
    1. snapshot
  11. Reference

    1. Elasticsearch - The Definitive Guide
    2. http://www.elasticsearch.org/webinars/elasticsearch-pre-flight-checklist/
    3. http://www.elasticsearch.org/webinars/elk-stack-devops-environment/
    4. http://www.elasticsearch.org/videos/moloch-elasticsearch-powering-network-forensics-aol/
    5. http://www.elasticsearch.org/videos/elastic-searching-big-data/

Apache Sqoop

  1. Introduction
    1. Sqoop is a tool designed to transfer data between Hadoop and relational databases. You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.
  2. Creating password file
    1. echo -n password > .password
    2. hdfs dfs -put .password /user/$USER/
  3. Installing the MySQL JDBC driver in CDH
    1. mkdir -p /var/lib/sqoop 
    2. chown sqoop:sqoop /var/lib/sqoop 
    3. chmod 755 /var/lib/sqoop
    4. donwload JDBC dirver from http://dev.mysql.com/downloads/connector/j/5.1.html
    5. sudo cp mysql-connector-java-version/mysql-connector-java-version-bin.jar /var/lib/sqoop/
  4. Reference
    1. http://sqoop.apache.org/
    2. http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_jdbc_driver_install.html

Apache Solr

  1. Solr 소개
    1. REST 형식의 API를 사용하는 독립적 엔터프라이즈 검색 서버
    2. XML, JSON, CSV 혹은 HTTP 위 바이너리로 인덱싱을 진행
    3. HTTP GET 요청으로 검색
    4. XML, JSON, CSV 혹은 바이너리 결과를 반환
  2. Solr 특징
    1. 고급 Full-Text 검색 능력
    2. 다량의 웹 트래픽을 위한 최적화
    3. XML, JSON과 HTTP 등 오픈 인터페이스 기반 표준
    4. 종합적인 HTML 관리자 인터페이스
    5. 모니터링을 위한 서버 통계
    6. 선형 확장, 자동 인덱스 복사, 자동 대체와 복구
    7. 근 실시간 인덱싱
    8. XML 설정으로 유연성과 적응성을 제공
    9. 확장가능한 플러그인 아키텍처
  3. 사용 설명

    1. 설치 및 실행
      1. Java 1.7 이상 필요
      2. wget http://apache.mirror.cdnetworks.com/lucene/solr/4.9.0/solr-4.9.0.tgz
      3. tar -xvf solr-4.9.0.tgz
      4. cd solr-4.9.0/
      5. cd example/
      6. java -jar start.jar &
      7. 아래 주소를 통하여 Solr Admin에 접속
      8. http://211.49.227.178:8983/solr (211.49.227.178 대신 정확한 IP로 대체)
      9. 아래 그림은 Solr Admin 화면


    2. 데이터 인덱싱
      1. cd exampledocs/
      2. 아래 명령어를 통하여 solr.xml과 monitor.xml를 인덱싱
      3. java -jar post.jar solr.xml monitor.xml
      4. 아래 방법으로 solr 단어를 검색
      5. Core Selector -> collection1 -> Query -> q에 solr 입력 -> wt에서 xml 선택 -> Execute Query
      6. 혹은 아래 http 요청으로 직접 검색
      7. http://211.49.227.178:8983/solr/collection1/select?q=solr&wt=xml
      8. 아래 명령어로 exampledocs에 있는 모든 xml 파일 인덱싱
      9. java -jar post.jar *.xml
    3. 데이터 갱신
      1. 'java -jar post.jar solr.xml monitor.xml'와 'java -jar post.jar *.xml' 명령어로 solr.xml과 monitor.xml를 두번 인덱싱 하였지만 중복으로 인덱싱되지 않고 후에 인덱싱한 내용으로 대체
      2. 이는 인덱싱함에 있어 'id'라는 유니크한 키를 사용하기 때문이며 데이터 갱신은 이런 형식으로 진행
    4. 데이터 삭제
      1. 아래 명령어로 id가 SP2514N인 문서를 삭제
      2. java -Ddata=args -Dcommit=false -jar post.jar "<delete><id>SP2514N</id></delete>"
      3. 아래 명령어로 name에 DDR 단어가 포함된 문서를 삭제
      4. java -Dcommit=false -Ddata=args -jar post.jar "<delete><query>name:DDR</query></delete>"
      5. 위 삭제 명령어에서 -Dcommit=false 옵션을 사용하였기에 아래 명령어로 커밋하여야 문서를 검색하지 못함
      6. java -jar post.jar -
      7. 이렇게 하는 원인은 커밋하는 작업이 비싸기에 모든 작업을 한번에 커밋하는 것을 추천하기 때문
    5. 데이터 검색
      1. 아래 요청으로 video 관련 문서에서 name과 id 필드만 반환
      2. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&fl=name,id
      3. 아래 요청으로 video 관련 문서의 모든 내용을 반환
      4. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&fl=*
      5. 아래 요청으로 video 관련 문서에서 name, id 및 price를 필드를 반환하는데 price 내림 순으로 정렬
      6. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&sort=price%20desc&fl=name,id,price
      7. 아래 요청으로 video 관련 문서를 JSON으로 반환
      8. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&wt=json
      9. 아래 요청으로 video 관련 문서를 inStock 오름 순과 price 내림 순으로 반환
      10. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&sort=inStock%20asc,%20price%20desc
      11. 아래 요청처럼 함수 사용 가능
      12. http://211.49.227.178:8983/solr/collection1/select/?indent=on&q=video&sort=div(popularity,add(price,1))%20desc
  4. 참고 자료

    1. http://lucene.apache.org/solr/index.html
    2. http://wiki.apache.org/solr/

Kafka Web Console

  1. Introduction
    1. Kafka Web Console is a Java web application for monitoring Apache Kafka. With a modern web browser, you can view from the console
      1. Registered brokers
      2. Topics, partitions, log sizes, and partition leaders
      3. Consumer groups, individual consumers, consumer owners, partition offsets and lag
      4. Graphs showing consumer offset and lag history as well as consumer/producer message throughput history
      5. Latest published topic messages (requires web browser support for WebSocket)
  2. Prerequisite
    1. Typesafe Activator
      1. download
        1. https://typesafe.com/platform/getstarted or
        2. wget http://downloads.typesafe.com/typesafe-activator/1.2.10/typesafe-activator-1.2.10.zip?_ga=1.45331555.679995947.1412749425
      2. unzip typesafe-activator-1.2.10.zip\?_ga\=1.191598701.679995947.1412749425
      3. export PATH=$PATH:/relativePath/to/activator
      4. activator -help
    2. Play Framework
      1. download
        1. https://www.playframework.com/ or
        2. wget http://downloads.typesafe.com/play/2.2.5/play-2.2.5.zip
      2. unzip play-2.2.5.zip
      3. export PATH=$PATH:/relativePath/to/play
  3. installation
    1. wget https://github.com/claudemamo/kafka-web-console/archive/master.zip
    2. unzip master.zip
    3. cd kafka-web-console-master
    4. play start
    5. error
      1. Database 'default' needs evolution!
        1. play "start -DapplyEvolutions.default=true"
      2. org.jboss.netty.channel.ChannelException: Failed to bind to: /0.0.0.0:9000
        1. play "start -Dhttp.port=8080"
    6. http://hostname:8080/
  4. Register
    1. zookeepers -> resiger zookeeper
  5. Reference
    1. https://github.com/claudemamo/kafka-web-console

Kafka Offset Monitor

  1. Introduction
    1. This is an app to monitor your kafka consumers and their position (offset) in the queue
    2. You can see the current consumer groups, for each group the topics that they are consuming and the position of the group in each topic queue
    3. This is useful to understand how quick you are consuming from a queue and how fast the queue is growing
    4. It allows for debuging kafka producers and consumers or just to have an idea of what is going on in your system
    5. The app keeps an history of queue position and lag of the consumers so you can have an overview of what has happened in the last days
  2. Installation
    1. Download
      1. http://quantifind.github.io/KafkaOffsetMonitor/ or,
      2. wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.0/KafkaOffsetMonitor-assembly-0.2.0.jar
    2. run it
      java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
           com.quantifind.kafka.offsetapp.OffsetGetterWeb \
           --zk zk-server1,zk-server2 \
           --port 8080 \
           --refresh 10.seconds \
           --retain 2.days
  3. Arguments
    1. zk the ZooKeeper hosts port on what 
    2. port will the app be available refresh how often should the app 
    3. refresh and store a point in the DB 
    4. retain how long should points be kept in the DB 
    5. dbName where to store the history (default 'offsetapp')
  4. Reference
    1. http://quantifind.github.io/KafkaOffsetMonitor/

Apache Kafka

  1. Introduction
    1. Kafka is a distributed, partitioned, replicated commit log service
    2. It provides the functionality of a messaging system, but with a unique design
  2. Feature
    1. Fast
      1. A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients
    2. Scalable
      1. Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization
      2. It can be elastically and transparently expanded without downtime
      3. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers
    3. Durable
      1. Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact
    4. Distributed by Design
      1. Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees

  3. Installation
    1. wget http://mirror.apache-kr.org/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
    2. tar -xzf kafka_2.9.2-0.8.1.1.tgz
    3. cd kafka_2.9.2-0.8.1.1
    4. bin/kafka-server-start.sh config/server.properties
  4. Configuration
    1. broker
      1. broker.id
      2. log.dirs
      3. zookeeper.connect
      4. host.name
      5. topic-level
        1. bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
        2. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000
        3. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --deleteConfig max.message.bytes
      6. controlled.shutdown.enable=true
      7. auto.leader.rebalance.enable=true
    2. consumer
      1. group.id
      2. zookeeper.connect
    3. producer
      1. metadata.broker.list
    4. server production server configuration
      # Replication configurations
      num.replica.fetchers=4
      replica.fetch.max.bytes=1048576
      replica.fetch.wait.max.ms=500
      replica.high.watermark.checkpoint.interval.ms=5000
      replica.socket.timeout.ms=30000
      replica.socket.receive.buffer.bytes=65536
      replica.lag.time.max.ms=10000
      replica.lag.max.messages=4000
      
      controller.socket.timeout.ms=30000
      controller.message.queue.size=10
      
      # Log configuration
      num.partitions=8
      message.max.bytes=1000000
      auto.create.topics.enable=true
      log.index.interval.bytes=4096
      log.index.size.max.bytes=10485760
      log.retention.hours=168
      log.flush.interval.ms=10000
      log.flush.interval.messages=20000
      log.flush.scheduler.interval.ms=2000
      log.roll.hours=168
      log.cleanup.interval.mins=30
      log.segment.bytes=1073741824
      
      # ZK configuration
      zookeeper.connection.timeout.ms=6000
      zookeeper.sync.time.ms=2000
      
      # Socket server configuration
      num.io.threads=8
      num.network.threads=8
      socket.request.max.bytes=104857600
      socket.receive.buffer.bytes=1048576
      socket.send.buffer.bytes=1048576
      queued.max.requests=16
      fetch.purgatory.purge.interval.requests=100
      producer.purgatory.purge.interval.requests=100
  5. Operations
    1. bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
    2. bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
    3. bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
    4. bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
    5. bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
    6. bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
    7. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
  6. Reference
    1. http://kafka.apache.org/
    2. http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Apache Flume

  1. Introduction
    1. Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. 
    2. It has a simple and flexible architecture based on streaming data flows. 
    3. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. 
    4. It uses a simple extensible data model that allows for online analytic application.
  2. System Requirements
    1. Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended) 
    2. Memory - Sufficient memory for configurations used by sources, channels or sinks 
    3. Disk Space - Sufficient disk space for configurations used by channels or sinks 
    4. Directory Permissions - Read/Write permissions for directories used by agent
  3. Features
    1. complex flows
      1. Flume allows a user to build multi-hop flows where events travel through multiple agents before reaching the final destination. It also allows fan-in and fan-out flows, contextual routing and backup routes (fail-over) for failed hops.
    2. reliability
      1. The events are staged in a channel on each agent. The events are then delivered to the next agent or terminal repository (like HDFS) in the flow. The events are removed from a channel only after they are stored in the channel of next agent or in the terminal repository. This is a how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flow. Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.
    3. recover ability
      1. The events are staged in the channel, which manages recovery from failure. Flume supports a durable file channel which is backed by the local file system. There’s also a memory channel which simply stores the events in an in-memory queue, which is faster but any events still left in the memory channel when an agent process dies can’t be recovered.
  4. Installation
    1. download
      1. http://flume.apache.org/download.html, or
      2. wget http://apache.mirror.cdnetworks.com/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz
    2. tar xvf apache-flume-1.5.0.1-bin.tar.gz
    3. cd apache-flume-1.5.0.1-bin
    4. bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
  5. CDH
    1. Installattion
      1. CDH5-Installation-Guide.pdf (P.155)
      2. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Installation-Guide/cdh5ig_flume_installation.html
    2. Security Configuration
      1. CDH5-Security-Guide.pdf (P.53)
      2. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Security-Guide/cdh5sg_flume_security.html
  6. CM
    1. Service
      1. Managing-Clusters-with-Cloudera-Manager.pdf (P.49)
      2. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Managing-Clusters/cm5mc_flume_service.html
    2. Properties: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Configuration-Properties/cm5config_cdh500_flume.html
    3. Health Tests
      1. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Health-Tests/ht_flume.html
      2. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Health-Tests/ht_flume_agent.html
    4. Metrics
      1. Metrics: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Metrics/flume_metrics.html
      2. Channel Metrics: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Metrics/flume_channel_metrics.html
      3. Sink Metrics: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Metrics/flume_sink_metrics.html
      4. Source Metrics: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM5/latest/Cloudera-Manager-Metrics/flume_source_metrics.html
  7. Reference
    1. http://flume.apache.org/

Talend Open Studio for Big Data

  1. Introduction
    1. Talend provides a powerful and versatile open source big data product that makes the job of working with big data technologies easy and helps drive and improve business performance, without the need for specialist knowledge or resources.
  2. Features
    1. Integration at Cluster Scale
      1. Talend’s big data product combines big data components for MapReduce 2.0 (YARN), Hadoop, HBase, Hive, HCatalog, Oozie, Sqoop and Pig into a unified open source environment so you can quickly load, extract, transform and process large and diverse data sets from disparate systems.
    2. Big Data Without The Need To Write / Maintain Code
      1. Ready to Use Big Data Connectors
        1. Talend provides an easy-to-use graphical environment that allows developers to visually map big data sources and targets without the need to learn and write complicated code. Running 100% natively on Hadoop, Talend Big Data provides massive scalability. Once a big data connection is configured the underlying code is automatically generated and can be deployed remotely as a job that runs natively on your big data cluster - HDFS, Pig, HCatalog, HBase, Sqoop or Hive.
      2. Big Data Distribution and Big Data Appliance Support
        1. Talend's big data components have been tested and certified to work with leading big data Hadoop distributions, including Amazon EMR, Cloudera, IBM PureData, Hortonworks, MapR, Pivotal Greenplum, Pivotal HD, and SAP HANA. Talend provides out-of-the-box support for big data platforms from the leading appliance vendors including Greenplum/Pivotal, Netezza, Teradata, and Vertica.
      3. Open Source
        1. Using the Apache software license means developers can use the Studio without restrictions. As Talend’s big data products rely on standard Hadoop APIs, users can easily migrate their data integration jobs between different Hadoop distributions without any concerns about underlying platform dependencies. Support for Apache Oozie is provided out-of-the-box, allowing operators to schedule their data jobs through open source software.
      4. Pull Source Data from Anywhere Including NoSQL
        1. With 800+ connectors, Talend integrates almost any data source so you can transform and integrate data in real-time or batch. Pre-built connectors for HBase, MongoDB,Cassandra, CouchDB, Couchbase, Neo4J and Riak speed development without requiring specific NoSQL knowledge. Talend big data components can be configured to bulk upload data to Hadoop or other big data appliance, either as a manual process, or an automatic schedule for incremental data updates.
  3. Products
    1. https://www.talend.com/products/big-data/matrix

      FEATURESTalend Open Studio for Big DataTalend Enterprise Big DataTalend Platform for Big Data
      Job Designer
      x
      x
      x
      Components for HDFS, HBase, HCatalog, Hive, Pig, Sqoop
      x
      x
      x
      Hadoop Job Scheduler
      x
      x
      x
      NoSQL Support
      x
      x
      x
      Versioning
      x
      x
      x
      Shared Repository
      x
      x
      Reporting and Dashboards

      x
      Hadoop Profiling, Parsing and Matching

      x
      Indemnification/Warranty and Talend Support
      x
      x
      LicenseOpen SourceSubscriptionSubscription


  4. Reference
    1. http://www.talend.com/

Tuesday, September 2, 2014

LogStash - 'Could not load FFI Provider'

1. version

- 1.4.2

2. Error

- LoadError: Could not load FFI Provider: (NotImplementedError) FFI not available: null

3. Solution

- cd logstash-1.4.2/bin
- vim logstash.lib.sh
- add 'JAVA_OPTS="$JAVA_OPTS -Djava.io.tmpdir=/path/to/somewhere"
- mkdir /path/to/somewhere
- start logstash

Thursday, July 31, 2014

PostgreSQL - Installation

1. Version Info

- PostgreSQL 9.3
- CentOS base 6.4 64-bit

2. Installation

- vi /etc/yum.repos.d/CentOS-Base.repo
- add 'exclude=postgresql*' at '[base]'
- yum localinstall http://yum.postgresql.org/9.3/redhat/rhel-6-x86_64/pgdg-centos93-9.3-1.noarch.rpm
- yum list postgres*
- yum install postgresql93-server.x86_64
- service postgresql-9.3 initdb
- cd /var/lib/pgsql/9.3/data/
- vi postgresql.conf
- add "listen_addresses=’*’"
- vi pg_hba.conf
- add "host all all 'your network info'/24 trust"
- service postgresql-9.3 start
- connect to it and create a user by using command or pgAdmin
- vi pg_hba.conf
- change "host all all 'your network info'/24 trust" to "host all all 'your network info'/24 md5"
- service postgresql-9.3 restart

Thursday, April 3, 2014

MongoDB 2.4 - sharding

- killall mongod
- killall mongos
- cd
- mkdir a0, b0, cfg0
- mongod --shardsvr --dbpath a0 --logpath log.a0 --fork --logappend --smallfiles --oplogSize 50
- Note that with --shardsvr specified the default port for mongod becomes 27018
- mongod --configsvr --dbpath cfg0 --fork --logpath log.cfg0 --logappend
- Note with --configsvr specified the default port for listening becomes 27019 and the default data directory /data/configdb. Wherever your data directory is, it is suggested that you verify that the directory is empty before you begin.
- mongos --configdb your_host_name:27019 --fork --logappend --logpath log.mongos0
- mongo
- sh.addShard('your_host_name:27018')
- sh.enableSharding('databaseName')
- db.collectionName.ensureIndex({a:1,b:1})
- sh.shardCollection('databaseName.collectionName',{a:1,b:1})
- use config
- db.chunks.find()
- db.chunks.find({}, {min:1,max:1,shard:1,_id:0,ns:1})
- exit
- mongod --shardsvr --dbpath b0 --logpath log.b0 --fork --logappend --smallfiles --oplogSize 50 --port 27000
- mongo
- sh.addShard('your_host_name:27000')
- sh.status()
- db.getSisterDB("config").shards.count()

Mongo 2.4.9 - currentOp and killOp

- db.currentOp()

...
"opid" : 20165
...

- db.killOp(20165)

Wednesday, March 26, 2014

Mongo 2.4.9 - getSisterDB

- show dbs

test
test1

- db

test

- db.getSisterDB('test1').collection.count()

123

Mongo 2.4.9 - connect

- var c = connect('localhost:27002/database')
- db
- show dbs
- c.isMaster()
- c.getMongo().setSlaveOk()
- c.colleciont.count()
- c.collection.find().limit(3)

Monday, March 24, 2014

Mongo 2.4.9 - oplog

- use local
- db.oplog.rs.find()
- db.oplog.rs.stats()

MongoDB 2.4 - Replica Set for Testing and Developing

1. start a single mongod as a standalone server

- cd
- mkdir 1
- mongod --dbpath 1 --port 27001 --smallfiles --oplogSize 50 --logpath 1.log --logappend --fork
- mongo --port 27001

2. convert the mongod instance to a single server replica set

- exit
- killall mongod
- mongod --replSet rstest --dbpath 1 --port 27001 --smallfiles --oplogSize 50 --logpath 1.log --logappend --fork
- mongo --port 27001
- Note: use hostname command to check hostname of the server
-

cfg =
{
        "_id" : "rstest",
        "members" : [
                {
                        "_id" : 0,
                        "host" : "localhost:27001"
                }
        ]
}

- rs.initiate(cfg)

3. add two more members to the set

- exit
- cd
- mkdir 2 3
- mongod --replSet rstest --dbpath 2 --port 27002 --smallfiles --oplogSize 50 --logpath 2.log --logappend --fork
- mongod --replSet rstest --dbpath 3 --port 27003 --smallfiles --oplogSize 50 --logpath 3.log --logappend --fork
- mongo --port 27001

3.1.

- cfg = rs.conf()
- cfg.members[1] = { "_id" : 1, "host" : "localhost:27002" }
- cfg.members[2] = { "_id" : 2, "host" : "localhost:27003" }
- rs.reconfig(cfg)

3.2.

- rs.add('localhost:27002')
- rs.add('localhost:27003')

4. retire the first member from the set

- rs.stepDown(300)
- exit
- terminate mongod process for memeber 1
- go to the new primary of the set
- cfg = rs.conf()
- cfg.members.shift()
- rs.reconfig(cfg)

Thursday, March 20, 2014

Eclipse - Specified VM install not found: type Standard VM, name XXX

Solution 1

- suppose the path of workspace of eclipse is 'D:\eclipse\workspace'
- then, go to  'D:\eclipse\workspace\.metadata\.plugins\org.eclipse.debug.core\.launches'
- find 'projectName build.xml.launch' file and delete it

Solution 2

- Right Click on build.xml
- Go to "Run As" >> "External Tools Configurations..."
- It shall open new window
- Go to JRE tab
- Select proper JRE if missing

MongoDB 2.4 - Java (Fail over)

import java.net.UnknownHostException;
import java.util.Arrays;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.ServerAddress;

public class MongoDB {

public static void main(String[] args) throws UnknownHostException,
InterruptedException {
MongoClient client = new MongoClient(Arrays.asList(new ServerAddress(
"localhost", 27017), new ServerAddress("localhost", 27018),
new ServerAddress("localhost", 27019)));
DBCollection test = client.getDB("database")
.getCollection("collection");

test.drop();

for (int i = 0; i < Integer.MAX_VALUE; i++) {
for (int retries = 0; retries < 3; retries++) {
try {
test.insert(new BasicDBObject("_id", i));
System.out.println("Inserted document: " + i);
break;
} catch (MongoException.DuplicateKey e) {
System.out.println("Document already inserted " + i);
} catch (MongoException e) {
System.out.println(e.getMessage());
System.out.println("Retrying");
Thread.sleep(5000);
}
}

Thread.sleep(500);
}
}
}

MongoDB 2.4 - Java (connecting to a replica set)

import java.net.UnknownHostException;
import java.util.Arrays;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class MongoDB {

public static void main(String[] args) throws UnknownHostException,
InterruptedException {
MongoClient client = new MongoClient(Arrays.asList(new ServerAddress(
"localhost", 27017), new ServerAddress("localhost", 27018),
new ServerAddress("localhost", 27019)));
DBCollection test = client.getDB("database")
.getCollection("collection");

test.drop();

for (int i = 0; i < Integer.MAX_VALUE; i++) {
test.insert(new BasicDBObject("_id", i));
System.out.println("Inserted document: " + i);
Thread.sleep(500);
}
}
}

Wednesday, March 19, 2014

Ubuntu 12.04 - link /var/lib/postgresql to another partition's directory

- sudo service postgresql stop
- sudo cp -rf /var/lib/postgresql /anotherPartition/postgresql
- sudo chwon -R postgres:postgres /anotherPartition/postgresql
- sudo service postgresql start

Thursday, March 13, 2014

Mongo 2.4.9 - MapReduce

1. Map

function map_closest() {
    var pitt = [-80.064879, 40.612044];
    var phil = [-74.978052, 40.089738];

    function distance(a, b) {
        var dx = a[0] - b[0];
        var dy = a[1] - b[1];
        return Math.sqrt(dx * dx + dy * dy);
    }

    if (distance(this.loc, pitt) < distance(this.loc, phil)) {
        emit("pitt", 1);
    } else {
        emit("phil", 1);
    }
}

2. Reduce

2.1. Array.sum()

function reduce_closest(city, counts) {  
  return Array.sum(counts); 
}

2.2. for()

function reduce_closest(city, counts) {
 var total = 0;
 var length = counts.length;

 for (var i = 0; i < length; i++) {
  total += counts[i];
 }

 return total;
}

2.3. forEach()

function reduce_closest(city, counts) {
 var total = 0;

 counts.forEach(function(count) {
  total += count;
 });

 return total;
}

3. mapReduce

db.zips.mapReduce(
  map_closest, 
  reduce_closest, 
  { 
    query: { state: 'PA' }, 
    out: { inline: 1 } 
  }
)

Mongo 2.4.9 - $elemMatch

- db.policies.find( { status : { $ne : "expired" }, coverages : { $elemMatch : { type : "liability", rates : { $elemMatch : { rate : { $gte : 100 }, current : true } } } } } ).pretty()

{
        "_id" : "1024850AB",
        "status" : "draft",
        "insured_item" : {
                "make" : "Cessna",
                "model" : "Skylane",
                "year" : 1982,
                "serial" : "AB1783A"
        },
        "insured_parties" : [
                ObjectId("5097f7351d9a5941f5111d61")
        ],
        "coverages" : [
                {
                        "type" : "liability",
                        "limit" : 1000000,
                        "rates" : [
                                {
                                        "rate" : 200,
                                        "staff_id" : ObjectId("5097f7351d9a5999f5111d69"),
                                        "date" : ISODate("2012-11-05T17:29:54.561Z"),
                                        "current" : true
                                }
                        ]
                },
                {
                        "type" : "property",
                        "deductible" : 5000,
                        "limit" : 100000,
                        "rates" : [
                                {
                                        "rate" : 300,
                                        "staff_id" : ObjectId("5097f7351d9a5999f5111d69"),
                                        "date" : ISODate("2012-11-05T17:29:56.561Z"),
                                        "current" : true
                                }
                        ]
                }
        ],
        "underwriting" : {
                "staff_id" : ObjectId("5097f84cf8dd729bc7273068"),
                "action" : "approved",
                "date" : ISODate("2012-11-05T17:33:00.693Z")
        }
}

Mongo 2.4.9 - Command

- mongoimport -d database -c collection --drop example.json

- mongo example.js
- mongo --shell example.js
- mongo --shell database example.js

Wednesday, March 12, 2014

Mongo 2.4.9 - $substr

- db.zips.findOne()

{
        "city" : "ACMAR",
        "loc" : [
                -86.51557,
                33.584132
        ],
        "pop" : 6055,
        "state" : "AL",
        "_id" : "35004"
}

- db.zips.count()

29467

- db.zips.aggregate({$project:{_id:{$substr:['$city',0,1]}}},{$group:{_id:'$_id',n:{$sum:1}}},{$sort:{n:-1}})

{
        "result" : [
                {
                        "_id" : "S",
                        "n" : 2871
                },
                {
                        "_id" : "C",
                        "n" : 2692
                },
                {
                        "_id" : "M",
                        "n" : 2348
                },
                {
                        "_id" : "B",
                        "n" : 2344
                },
                {
                        "_id" : "W",
                        "n" : 1834
                },
                {
                        "_id" : "L",
                        "n" : 1738
                },
                {
                        "_id" : "P",
                        "n" : 1681
                },
                {
                        "_id" : "H",
                        "n" : 1621
                },
                {
                        "_id" : "A",
                        "n" : 1398
                },
                {
                        "_id" : "G",
                        "n" : 1304
                },
                {
                        "_id" : "R",
                        "n" : 1284
                },
                {
                        "_id" : "D",
                        "n" : 1162
                },
                {
                        "_id" : "N",
                        "n" : 1128
                },
                {
                        "_id" : "F",
                        "n" : 1091
                },
                {
                        "_id" : "E",
                        "n" : 1050
                },
                {
                        "_id" : "T",
                        "n" : 955
                },
                {
                        "_id" : "O",
                        "n" : 767
                },
                {
                        "_id" : "K",
                        "n" : 630
                },
                {
                        "_id" : "J",
                        "n" : 391
                },
                {
                        "_id" : "V",
                        "n" : 381
                },
                {
                        "_id" : "I",
                        "n" : 288
                },
                {
                        "_id" : "U",
                        "n" : 165
                },
                {
                        "_id" : "Y",
                        "n" : 112
                },
                {
                        "_id" : "Q",
                        "n" : 68
                },
                {
                        "_id" : "Z",
                        "n" : 48
                },
                {
                        "_id" : "3",
                        "n" : 22
                },
                {
                        "_id" : "6",
                        "n" : 20
                },
                {
                        "_id" : "4",
                        "n" : 19
                },
                {
                        "_id" : "5",
                        "n" : 15
                },
                {
                        "_id" : "2",
                        "n" : 13
                },
                {
                        "_id" : "7",
                        "n" : 10
                },
                {
                        "_id" : "9",
                        "n" : 8
                },
                {
                        "_id" : "8",
                        "n" : 3
                },
                {
                        "_id" : "0",
                        "n" : 3
                },
                {
                        "_id" : "X",
                        "n" : 2
                },
                {
                        "_id" : "1",
                        "n" : 1
                }
        ],
        "ok" : 1
}

- db.zips.aggregate({$project:{_id:{$substr:['$city',0,1]}}},{$group:{_id:'$_id',n:{$sum:1}}},{$match:{_id:{$in:['0','1','2','3','4','5','6','7','8','9']}}}, {$group:{_id:null,count:{$sum:'$n'}}})

{ "result" : [ { "_id" : null, "count" : 114 } ], "ok" : 1 }

- db.zips.remove({city:/^[0-9]/})

- db.zips.count()

29353