Thursday, June 15, 2017

Kafka 2 Kafka Using Flume

  • Issue
    • 현상
      • 설정에 memory channel, kafka source 와 sink를 사용 시 source topic에서 consuming한 데이터를 다시 source topic으로 producing하는 문제점 존재
    • 원인
      • kafka source 사용 시 flume event의 header에 topic 정보가 보함됨
        • 예, Event: { headers:{topic=source.topic, timestamp=1491273904475}
      • Event header에 있는 topic의 정보가 kafka sink에서 지정한 topic의 정보를 over ridden함
        • 문서 내용
          • Note Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink.
        • 로그 내용
          • Using the static topic: sink.topic this may be over-ridden by event headers
      • 따라서, kafka sink에서 지정한 topic 즉 sink.topic이 아닌 event header에 있는 topic 정보인 source.topic으로 데이터를 producing함
    • 해결
    • 기타
  • Sample
    • ## agent(s): agent01
      ## names of source(s), channel(s) and sink(s)
      agent01.sources = source01
      agent01.channels = channel01 channel02
      agent01.sinks = sink01 sink02
      ## channel01
      agent01.channels.channel01.type = memory
      agent01.channels.channel01.capacity = 100000
      agent01.channels.channel01.transactionCapacity = 10000
      agent01.channels.channel02.type = memory
      agent01.channels.channel02.capacity = 100000
      agent01.channels.channel02.transactionCapacity = 10000
      ## source01
      agent01.sources.source01.channels = channel01 channel02
      agent01.sources.source01.type = org.apache.flume.source.kafka.KafkaSource
      agent01.sources.source01.zookeeperConnect = 10.107.95.193:2181
      agent01.sources.source01.topic = netmarbles.cleansing.log
      agent01.sources.source01.groupId = flume
      #agent01.sources.source01.interceptors = interceptor01 interceptor02
      #agent01.sources.source01.interceptors.interceptor01.type = host
      #agent01.sources.source01.interceptors.interceptor02.type = timestamp
      agent01.sources.source01.interceptors = interceptor01
      agent01.sources.source01.interceptors.interceptor01.type = static
      agent01.sources.source01.interceptors.interceptor01.preserveExisting = false
      agent01.sources.source01.interceptors.interceptor01.key = topic
      agent01.sources.source01.interceptors.interceptor01.value = flume.dev
      ## sink01
      #agent01.sinks.sink01.channel = channel01
      #agent01.sinks.sink01.type = hdfs
      #agent01.sinks.sink01.hdfs.path = /netmarbles.log/mobigen/%y%m%d
      #agent01.sinks.sink01.hdfs.filePrefix = %{host}
      #agent01.sinks.sink01.hdfs.inUsePrefix = .
      #agent01.sinks.sink01.hdfs.rollInterval = 300
      #agent01.sinks.sink01.hdfs.rollSize = 0
      #agent01.sinks.sink01.hdfs.rollCount = 0
      #agent01.sinks.sink01.hdfs.idleTimeout = 60
      #agent01.sinks.sink01.hdfs.fileType = CompressedStream
      #agent01.sinks.sink01.hdfs.codeC = gzip
      ## test
      agent01.sinks.sink01.channel = channel01
      agent01.sinks.sink01.type = logger
      #agent01.sinks.sink01.channel = channel01
      #agent01.sinks.sink01.type = org.apache.flume.sink.kafka.KafkaSink
      #agent01.sinks.sink01.topic = flume.dev
      #agent01.sinks.sink01.brokerList = 10.107.95.193:9092
      #agent01.sinks.sink02.channel = channel02
      #agent01.sinks.sink02.type = logger
      agent01.sinks.sink02.channel = channel02
      agent01.sinks.sink02.type = org.apache.flume.sink.kafka.KafkaSink
      agent01.sinks.sink02.topic = flume.dev
      agent01.sinks.sink02.brokerList = 10.107.95.193:9092

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.