Wednesday, July 29, 2015

LogStash 4 Test

  1. 2015.01.19
    1. wiselog 설정
      1. input {
          file {
            type => "access"
            path => "/home/mungeol/access.log"
          }
        }
        filter {
                if [type] == "access" {
                        grok {
                                match => { "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" \"(?<referrer>.+)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:agent} %{QS:cookie}" }
                        }
                        grok {
                                match => { "cookie" => "PCID=(?<pcid>\d+);" }
                        }
                        grok {
                                match => { "cookie" => "UID=(?<uid>\d+);" }
                        }
                        grok {
                                match => { "cookie" => "n_ss=(?<n_ss>\d+.\d+);" }
                        }
                        grok {
                                match => { "cookie" => "n_cs=(?<n_cs>.+);" }
                        }
                }
                date {
                        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
                }
        }
        output {
                elasticsearch {
                        cluster => "dev"
                        index => "wiselog_test"
                        protocol => "http"
                        workers => 4
                }
                #stdout { codec => rubydebug }
        }
    2. kafka output 설정
      1. input {
          file {
            type => "apache-access"
            path => "/home/mungeol/access-test"
          }
        #  file {
        #    type => "apache-error"
        #    path => "/home/weblog/test/data/test-error"
        #  }
        }
        #filter {
        #  if [type] == "apache-access" {
        #      grok {
        #        match => { "message" => "%{COMBINEDAPACHELOG}" }
        #      }
        #    date {
        #      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
        #    }
        #  }
        #  if [type] == "apache-error" {
        #      grok {
        #        match => { "message" => "%{APACHEERRORLOG}" }
        #        patterns_dir => ["/var/lib/logstash/etc/grok"]
        #      }
        #  }
        #  if [clientip]  {
        #    geoip {
        #      source => "clientip"
        #      target => "geoip"
        #      remove_field => ["[geoip][ip]", "[geoip][country_code3]", "[geoip][country_name]", "[geoip][continent_code]", "[geoip][region_name]",
        #                       "[geoip][city_name]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][timezone]", "[geoip][real_region_name]"]
        #    }
        #  }
        #}
        output {
                kafka {
                        broker_list => "10.0.2.81:9092,10.0.2.82:9092,10.0.2.83:9092"
                        topic_id => "access-test-02"
                        topic_metadata_refresh_interval_ms => 30000
                        request_required_acks => 0
        #               producer_type => "async"
                }
        }
    3. kafka input 설정
      1. input {
                kafka {
                        zk_connect => "localhost:2181,10.0.2.81:2181,10.0.2.82:2181,10.0.2.83:2181"
                        topic_id => "access-test-07"
                        type => "apache-access"
        queue_size => 200
        fetch_message_max_bytes => 2097152
                }
        }
        filter {
          if [type] == "apache-access" {
              grok {
                match => { "message" => "%{COMBINEDAPACHELOG}" }
              }
            date {
              match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
            }
          }
        #  if [type] == "apache-error" {
        #      grok {
        #        match => { "message" => "%{APACHEERRORLOG}" }
        #        patterns_dir => ["/var/lib/logstash/etc/grok"]
        #      }
        #  }
        #  if [clientip]  {
        #    geoip {
        #      source => "clientip"
        #      target => "geoip"
        #      remove_field => ["[geoip][ip]", "[geoip][country_code3]", "[geoip][country_name]", "[geoip][continent_code]", "[geoip][region_name]",
        #                       "[geoip][city_name]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][timezone]", "[geoip][real_region_name]"]
        #    }
        #  }
        }
        output {
          elasticsearch {
            cluster => "dev"
            index => "test"
            protocol => "http"
                workers => 4
          }
        #stdout { codec => rubydebug }
        }

      1. input {
          file {
                type => "access-report"
            path => "/home/mungeol/workspace/securityTeam/sec_report"
          }
        }
        filter {
                if [type] == "access-report" {
                        csv {
                                columns => ["id","name","department","date","ip","request"]
                        }
                        date {
                                match => [ "date", "yyyy-MM-dd HH:mm:ss" ]
                        }
                }
        }
        output {
          elasticsearch {
            cluster => "dev"
            index => "sec-team"
            protocol => "http"
          }
        #stdout { codec => rubydebug }
        }
  2. ~ 2015.01.01
    1. 소개
      1. 현재 아파치 access 로그가 생성되는 방법을 고려하여 LogStash로 ElasticSearch 클러스터에 로그를 변환하여 전송하는 테스트 진행
    2. 실시간 전송
      1. /home/weblog/test/data/test-access 사용
      2. 같은 형식으로 가상 로그 서버에 로그 생성
      3. 설정 파일
        1. input {
            file {
              path => "/home/weblog/test/data/test-access"
            }
          }
          filter {
            if [path] =~ "access" {
              mutate { replace => { type => "apache_access" } }
              grok {
                match => { "message" => "%{COMBINEDAPACHELOG}" }
              }
              date {
                match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
              }
            } else if [path] =~ "error" {
              mutate { replace => { type => "apache_error" } }
            } else {
              mutate { replace => { type => "random_logs" } }
            }
          }
          output {
            elasticsearch {
              host => "211.49.227.177"
              protocol => "http"
            }
          }
      4. /home/weblog/test/data/test-access에 새로운 로그가 추가 될때마다 실시간으로 ElasticSearch 클러스터에 인덱싱

    3. 시간 단위 전송
      1. /home/weblog/test/data/test-access.082117 형식 사용
      2. 같은 형식으로 가상 로그 서버에 로그 생성
      3. 설정 파일
        1. input {
            file {
              path => "/home/weblog/test/data/test-access.*"
            }
          }
          filter {
            if [path] =~ "access" {
              mutate { replace => { type => "apache_access" } }
              grok {
                match => { "message" => "%{COMBINEDAPACHELOG}" }
              }
              date {
                match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
              }
            } else if [path] =~ "error" {
              mutate { replace => { type => "apache_error" } }
            } else {
              mutate { replace => { type => "random_logs" } }
            }
          }
          output {
            elasticsearch {
              host => "211.49.227.177"
              protocol => "http"
            }
          }
      4. /home/weblog/test/data/ 폴더에 새로운 'test-access.*' 형식의 로그가 추가 될때마다 ElasticSearch 클러스터에 인덱싱

    4. elasticsearch output 설정
      1. input {
          file {
            path => "/home/weblog/test/data/test-access.*"
          }
        }
        filter {
          if [path] =~ "access" {
            mutate { replace => { type => "apache_access" } }
            grok {
              match => { "message" => "%{COMBINEDAPACHELOG}" }
            }
            date {
              match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
            }
          } else if [path] =~ "error" {
            mutate { replace => { type => "apache_error" } }
          } else {
            mutate { replace => { type => "random_logs" } }
          }
        }
        output {
          elasticsearch {
            cluster => "elasticsearch"
            host => "211.49.227.177"
            index => "apache-%{+YYYY.MM}"
            protocol => "http"
          }
        stdout { codec => rubydebug }
        }
        

    5. IP 정보에서 geo 정보 추출
      1. geo 정보에서 bettermap과 map 정보에 사용할 정보만 남기고 기타 정보 삭제
        1. input {
            file {
              type => "apache-access"
              path => "/home/weblog/test/data/test-access"
            }
            file {
              type => "apache-error"
              path => "/home/weblog/test/data/test-error"
            }
          }
          filter {
            if [type] == "apache-access" {
                grok {
                  match => { "message" => "%{COMBINEDAPACHELOG}" }
                }
            }
            if [type] == "apache-error" {
                grok {
                  #match => { "message" => "%{APACHEERRORLOG}" }
                  #patterns_dir => ["/var/lib/logstash/etc/grok"]
                }
            }
            if [clientip]  {
              geoip {
                source => "clientip"
                target => "geoip"
                remove_field => ["[geoip][ip]", "[geoip][country_code3]", "[geoip][country_name]", "[geoip][continent_code]", "[geoip][region_name]",
                                 "[geoip][city_name]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][timezone]", "[geoip][real_region_name]"]
              }
            }
          }
          output {
            elasticsearch {
              cluster => "elasticsearch"
              host => "211.49.227.177"
              index => "apache-%{+YYYY.MM}"
              protocol => "http"
            }
          stdout { codec => rubydebug }
          }
    6. MySQL slow query log 파싱
      1. input {
          file {
            type => "mysql-slow"
            path => "/DBLog/dbmaster-slow.log"
          }
        }
        filter {
          if [message] =~ "# Time: " {
            drop {}
          }
          grok {
            match => {
              message => [
                "^# User@Host: %{USER:user}(?:\[[^\]]+\])?\s+@\s+%{HOST:host}?\s+\[%{IP:ip}?\]",
                "^# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float} Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:float}",
                "^SET timestamp=%{NUMBER:timestamp};",
                "%{GREEDYDATA:query}"
              ]
            }
          }
          multiline {
            pattern => "# User"
            negate => false
            what => "next"
          }
          multiline {
            pattern => "^#"
            negate => true
            what => "previous"
          }
          date {
            match => [ "timestamp", "UNIX" ]
          }
          mutate {
            remove_field => [ "timestamp" ]
          }
        }
        output {
          elasticsearch {
            cluster => "elasticsearch"
            host => "211.49.227.177"
            index => "mysql-%{+YYYY.MM}"
            protocol => "http"
          }
        #  stdout { codec => rubydebug }
        }
        

    7. elasticsearch-river / RabbitMQ 사용
      1. input {
          file {
            type => "apache-access"
            path => "/home/mungeol/test-access*"
          }
        #  file {
        #    type => "apache-error"
        #    path => "/home/weblog/test/data/test-error"
        #  }
        }
        filter {
          if [type] == "apache-access" {
              grok {
                match => { "message" => "%{COMBINEDAPACHELOG}" }
              }
            date {
              match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
            }
          }
        #  if [type] == "apache-error" {
        #      grok {
        #        match => { "message" => "%{APACHEERRORLOG}" }
        #        patterns_dir => ["/var/lib/logstash/etc/grok"]
        #      }
        #  }
          if [clientip]  {
            geoip {
              source => "clientip"
              target => "geoip"
              remove_field => ["[geoip][ip]", "[geoip][country_code3]", "[geoip][country_name]", "[geoip][continent_code]", "[geoip][region_name]",
                               "[geoip][city_name]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][timezone]", "[geoip][real_region_name]"]
            }
          }
        }
        output {
        #  elasticsearch {
        #    cluster => "dev"
        #    host => "10.0.2.83"
        #    index => "apache-test"
        #    protocol => "http"
        #  }
        elasticsearch_river {
          es_host => "10.0.2.82"
          rabbitmq_host => "10.0.2.81"
          index => "apache-test"
          user => "test"
          password => "test"
        }
        #stdout { codec => rubydebug }
        }
    8. logstash-kafka
      1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic access-test
      2. producer
        1. input {
            file {
              type => "apache-access"
              path => "/home/mungeol/access-test"
            }
          }
          filter {
            if [type] == "apache-access" {
                grok {
                  #match => { "message" => "%{COMBINEDAPACHELOG}" }
                  match => { "message" => "%{COMBINEDAPACHELOG} %{QS:cookie}" }
                  match => { "cookie" => "UID=(?<mem_idx>\d+);" }
                  break_on_match => false
                }
              date {
                locale => en
                match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
              }
            }
            if [clientip]  {
              geoip {
                source => "clientip"
                target => "geoip"
                remove_field => ["[geoip][ip]", "[geoip][country_code3]", "[geoip][country_name]", "[geoip][continent_code]", "[geoip][region_name]",
                                 "[geoip][city_name]", "[geoip][latitude]", "[geoip][longitude]", "[geoip][timezone]", "[geoip][real_region_name]"]
              }
            }
          }
          output {
            kafka {
              topic_id => "access-test"
              broker_list => "10.0.2.83:9092"
            }
          #stdout { codec => rubydebug }
          }
      3. consumer
        1. input {
            kafka {
              topic_id => "access-test"
            }
          }
          output {
            elasticsearch {
              cluster => "dev"
              index => "test-%{+YYYY.MM.dd}"
              protocol => "http"
            }
          #stdout { codec => rubydebug }
          }
      4. http://x.x.x.x:8080/#/topics

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.