logstash 설정:
/etc/logstash/conf.d/mysql_log.conf
input {
beats {
port => 5044
}
}
filter {
if [message]=~"^#Time: " {
drop{}
}
if "slow-query-mysql" in [log_type] {
mutate {
remove_field => ["[host]"]
}
grok {
patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]
match => [ "message", "User@Host: %{MYSQLSLOWLOG}" ]
remove_field => [ "message" ]
}
grok {
patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]
match => [ "message", "User@Host: %{MYSQLSLOWLOGHOST}" ]
remove_field => [ "message" ]
}
grok {
patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]
match => [ "message", "User@Host: %{MYSQLSLOWLOGDB}" ]
remove_field => [ "message" ]
}
grok {
patterns_dir => [ "/etc/logstash/conf.d/patterns/" ]
match => [ "message", "User@Host: %{MYSQLSLOWLOGDBHOST}" ]
remove_field => [ "message" ]
}
mutate {
add_field => {
"dbms" => "mysql"
"hostname" => "%{[agent][hostname]}"
"datetime" => "%{@timestamp}"
}
}
date {
match => [ "datetime", "ISO8601" ]
target => "datetime"
}
#data 미 존재 시 NULL String 으로 데이터 치환
ruby {
code => "if event.get('host').nil?; event.set('host', 'NULL');end"
}
ruby {
code => "if event.get('sql').nil?; event.set('sql', 'NULL');end"
}
ruby {
code => "if event.get('host_ip').nil?; event.set('host_ip', 'NULL');end"
}
ruby {
code => "if event.get('db').nil?; event.set('db', 'NULL');end"
}
# datetime date formatting
ruby {
code => "
event.set('datetime', event.get('@timestamp').time.localtime('+09:00').strftime('%Y-%m-%d %H:%M:%S'+ ' +0900'))
"
}
}
else if "errorlog-mysql" in [log_type] {
grok {
patterns_dir => [ "/etc/logstash/slowmon/conf.d/patterns/" ]
match => [ "message", "%{SCHEDULER:scheduler}" ]
add_field => { "event" => "scheduler"}
remove_field => [ "message", "scheduler" ]
}
grok {
patterns_dir => [ "/etc/logstash/slowmon/conf.d/patterns/" ]
match => [ "message", "%{ERROR:error}" ]
add_field => { "event" => "error"}
remove_field => [ "message", "error" ]
}
}
}
output {
# stdout { codec => rubydebug }
if "slow-query-mysql" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "sql-slowquery-%{+YYYY.MM.dd}"
}
}
else if "errorLog-mysql" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "mysql-errorlog-%{+YYYY.MM.dd}"
}
}
else if "backupLog-mysql" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "mysql-backuplog-%{+YYYY.MM.dd}"
}
}
else if "alertLog-oracle" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "oracle-alertlog-%{+YYYY.MM.dd}"
}
}
else if "listenerLog-oracle" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "oracle-listenerlog-%{+YYYY.MM.dd}"
}
}
else if "backupLog-oracle" in [log_type] {
elasticsearch {
hosts => "엘라스틱서치 서버 IP:9200"
index => "oracle-backuplog-%{+YYYY.MM.dd}"
}
}
kafka {
bootstrap_servers => "kafka 서버 IP"
codec => "json_lines"
topic_id => "사내 보안 정책에 따른 id"
max_request_size => 10527865
}
}
logstash pattern:
/etc/logstash/conf.d/patterns/
# slow query log parsing pattern
MYSQLSLOWLOG %{NOTSPACE:user}%{SPACE}@%{SPACE}%{HOSTNAME:host}%{SPACE}\[\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}
MYSQLSLOWLOGHOST %{NOTSPACE:user}%{SPACE}@%{SPACE}\[%{HOSTNAME:host}\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}(\\n)?(?m)SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}
MYSQLSLOWLOGDB %{NOTSPACE:user}%{SPACE}@%{SPACE}%{HOSTNAME:host}%{SPACE}\[\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}(\\n)?%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}(\\n)?(?m)use%{SPACE}%{WORD:db};%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}
MYSQLSLOWLOGDBHOST %{NOTSPACE:user}%{SPACE}@%{SPACE}\[%{HOSTNAME:host}\]%{SPACE}Id:%{SPACE}%{BASE10NUM:id}%{SPACE}# Query_time:%{SPACE}%{BASE16FLOAT:query_time}%{SPACE}Lock_time:%{SPACE}%{BASE16FLOAT:lock_time}%{SPACE}Rows_sent:%{SPACE}%{BASE10NUM:rows_sent}%{SPACE}Rows_examined:%{SPACE}%{BASE10NUM:rows_examined}%{SPACE}([u|U][s|S][e|E])%{SPACE}%{GREEDYDATA:db};%{SPACE}SET timestamp=%{NUMBER:timestamp};%{SPACE}%{GREEDYDATA:sql}
# error, backup log parsing pattern
MYSQLDATE %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
LOGLEVEL ([N|n]ote|NOTE|[A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
USER %{WORD}%{GREEDYDATA}
PROC_NAME %{WORD}%{GREEDYDATA}
ERROR %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}%{GREEDYDATA:log_content}
LOGINACCESS %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}Access denied for user '%{USER:user}%{GREEDYDATA}'\@'%{HOSTNAME:hostname}'
%{GREEDYDATA:tail_log}
SCHEDULER %{MYSQLDATE:logdate}%{SPACE}%{NUMBER:Log_id}%{SPACE}\[%{LOGLEVEL:Log_Level}\]%{SPACE}Event Scheduler:%{SPACE}\[%{USER:user}\@%{HOSTNAME:hostname}\]%{GREEDYDATA}\[%{PROC_NAME:proc_name}\]%{SPACE}%{GREEDYDATA:log_centent}