在生產環境下,logstash 經常會遇到處理多種格式的日志,不同的日志格式,解析方法不同。下面來說說logstash處理多行日志的例子,對MySQL慢查詢日志進行分析,這個經常遇到過,網絡上疑問也很多。
MySQL慢查詢日志格式如下:
# User@Host: ttlsa[ttlsa] @ [10.4.10.12] Id: 69641319
# Query_time: 0.000148 Lock_time: 0.000023 Rows_sent: 0 Rows_examined: 202
SET timestamp=1456717595;
select `Id`, `Url` from `File` where `Id` in ('201319', '201300');
# Time: 160229 11:46:37
filebeat配置
我這裡是使用filebeat 1.1.1版本的,之前版本沒有multiline配置項,具體方法看後面那種。
filebeat:
prospectors:
-
paths:
- /www.ttlsa.com/logs/mysql/slow.log
document_type: mysqlslowlog
input_type: log
multiline:
negate: true
match: after
registry_file: /var/lib/filebeat/registry
output:
logstash:
hosts: ["10.6.66.14:5046"]
shipper:
logging:
files:
logstash配置
1. input段配置
# vi /etc/logstash/conf.d/01-beats-input.conf
input {
beats {
port => 5046
host => "10.6.66.14"
}
}
2. filter 段配置
# vi /etc/logstash/conf.d/16-mysqlslowlog.log
filter {
if [type] == "mysqlslowlog" {
grok {
match => { "message" => "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" }
}
date {
match => [ "timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
remove_field => [ "timestamp" ]
}
}
}
關鍵之重是grok正則的配置。
3. output段配置
# vi /etc/logstash/conf.d/30-beats-output.conf
output {
if "_grokparsefailure" in [tags] {
file { path => "/var/log/logstash/grokparsefailure-%{[type]}-%{+YYYY.MM.dd}.log" }
}
if [@metadata][type] in [ "mysqlslowlog" ] {
elasticsearch {
hosts => ["10.6.66.14:9200"]
sniffing => true
manage_template => false
template_overwrite => true
index => "%{[@metadata][beat]}-%{[type]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
}
如果是使用filebeat1.1.1之前的版本,配置如下:
1. filebeat配置
filebeat:
prospectors:
-
paths:
- /www.ttlsa.com/logs/mysql/slow.log
document_type: mysqlslowlog
input_type: log
registry_file: /var/lib/filebeat/registry
output:
logstash:
hosts: ["10.6.66.14:5046"]
shipper:
logging:
files:
2. logstash input段配置
input {
beats {
port => 5046
host => "10.6.66.14"
codec => multiline {
pattern => "^# User@Host:"
negate => true
what => previous
}
}
}
其它配置不變。