오픈소스/Logstash

Logstash MySQL Log Parsing & Oracle Log Config

민둥곰 2021. 5. 16. 19:00

logstash 설정:

/etc/logstash/conf.d/mysql_log.conf

input {

  beats {

    port => 5044

  }

}

filter {

    if [message]=~"^#Time: " {

      drop{}

    }

    if "slow-query-mysql" in [log_type] {

       mutate {

         remove_field => ["[host]"]

       }

       grok {

            patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]

            match => [ "message", "User@Host: %{MYSQLSLOWLOG}" ]

            remove_field => [ "message" ]

        }

       grok {

            patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]

            match => [ "message", "User@Host: %{MYSQLSLOWLOGHOST}" ]

            remove_field => [ "message" ]

        }

       grok {

            patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]

            match => [ "message", "User@Host: %{MYSQLSLOWLOGDB}" ]

            remove_field => [ "message" ]

        }

       grok {

            patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]

            match => [ "message", "User@Host: %{MYSQLSLOWLOGDBHOST}" ]

            remove_field => [ "message" ]

        }

       mutate {

         add_field => {

           "dbms" => "mysql"

           "hostname" => "%{[agent][hostname]}"

           "datetime" => "%{@timestamp}"

         }

       }

       date {

            match => [ "datetime", "ISO8601" ]

            target => "datetime"

        }

       #data 미 존재 시 NULL String 으로 데이터 치환

       ruby {

            code => "if event.get('host').nil?; event.set('host', 'NULL');end"

       }

       ruby {

            code => "if event.get('sql').nil?; event.set('sql', 'NULL');end"

       }

       ruby {

            code => "if event.get('host_ip').nil?; event.set('host_ip', 'NULL');end"

       }

       ruby {

            code => "if event.get('db').nil?; event.set('db', 'NULL');end"

       }

       # datetime date formatting

       ruby {

            code => "

              event.set('datetime', event.get('@timestamp').time.localtime('+09:00').strftime('%Y-%m-%d %H:%M:%S'+ ' +0900'))

            "

       }

    }

    else if "errorlog-mysql" in [log_type] {

        grok {

            patterns_dir => [ "/etc/logstash/slowmon/conf.d/patterns/" ]

            match => [ "message", "%{SCHEDULER:scheduler}" ]

            add_field => { "event" => "scheduler"}

            remove_field => [ "message", "scheduler" ]

        }

        grok {

            patterns_dir => [ "/etc/logstash/slowmon/conf.d/patterns/" ]

            match => [ "message", "%{ERROR:error}" ]

            add_field => { "event" => "error"}

            remove_field => [ "message", "error" ]

        }

    }

}

output {

#  stdout {  codec => rubydebug  }

  if "slow-query-mysql" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "sql-slowquery-%{+YYYY.MM.dd}"

    }

  }

  else if "errorLog-mysql" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "mysql-errorlog-%{+YYYY.MM.dd}"

    }

  }

  else if "backupLog-mysql" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "mysql-backuplog-%{+YYYY.MM.dd}"

    }

  }

  else if "alertLog-oracle" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "oracle-alertlog-%{+YYYY.MM.dd}"

    }

  }

  else if "listenerLog-oracle" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "oracle-listenerlog-%{+YYYY.MM.dd}"

    }

  }

  else if "backupLog-oracle" in [log_type] {

    elasticsearch {

      hosts => "엘라스틱서치 서버 IP:9200"

      index => "oracle-backuplog-%{+YYYY.MM.dd}"

    }

  }

  kafka {

    bootstrap_servers => "kafka 서버 IP"

    codec => "json_lines"

    topic_id => "사내 보안 정책에 따른 id"

    max_request_size => 10527865

  }

}

logstash pattern:

/etc/logstash/conf.d/patterns/

# slow query log parsing pattern

MYSQLSLOWLOG %{NOTSPACE:user}%{SPACE}@%{SPACE}%{HOSTNAME:host}%{SPACE}\[\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}

MYSQLSLOWLOGHOST %{NOTSPACE:user}%{SPACE}@%{SPACE}\[%{HOSTNAME:host}\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}(\\n)?(?m)SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}

MYSQLSLOWLOGDB %{NOTSPACE:user}%{SPACE}@%{SPACE}%{HOSTNAME:host}%{SPACE}\[\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}(\\n)?(?m)use%{SPACE}%{WORD:db};%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}

MYSQLSLOWLOGDBHOST %{NOTSPACE:user}%{SPACE}@%{SPACE}\[%{HOSTNAME:host}\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}([u|U][s|S][e|E])%{SPACE}%{GREEDYDATA:db};%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}

# error, backup log parsing pattern

MYSQLDATE %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?

LOGLEVEL ([N|n]ote|NOTE|[A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

USER %{WORD}%{GREEDYDATA}

PROC_NAME %{WORD}%{GREEDYDATA}

ERROR %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}%{GREEDYDATA:log_content}

LOGINACCESS %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}Access denied for user '%{USER:user}%{GREEDYDATA}'\@'%{HOSTNAME:hostname}'

%{GREEDYDATA:tail_log}

SCHEDULER %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}Event Scheduler:%{SPACE}\[%{USER:user}\@%{HOSTNAME:hostname}\]%{GREEDYDATA}\[%{PROC_NAME:proc_name}\]%{SPACE}%{GREEDYDATA:log_centent}