logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志
logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志里的时间用作es的timestamp索引,总结一下目前遇到的一些格式处理。
nginx的日志时间:
时间格式
[07/Dec/2017:14:16:54 +0800]
logstash的配置写法
grok {
match => ["time","%{HTTPDATE:logdate}"] #取出time字段并按照HTTPDATE匹配到时间格式化后赋值给logdate
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] #匹配时间
target => "@timestamp" #替换timestamp
remove_field => ["time","logdate"] #删除这两个字段
}自定义日志时间格式:
时间格式
2017-12-07 15:26:08
logstash的配置写法
date {
match => ["time", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
remove_field => ["time"]
}json日志的解析与时间转换:
日志格式
{"status": 3, "docID": "fypnqvn0771173", "taskID": "fypnyqi1591819", "sendPubMsgTime": "2017-12-07 16:14:10", "errCode": 0, "replaceTaskID": "", "retried": 0, "mTime": "2017-12-07 16:14:10", "operator": "lixu5@staff.sina.com.cn", "modelID": "weibo_video_hotrank", "errMsg": "发布成功:", "cTime": "2017-12-07 16:14:00"}logstash的配置写法
input {
kafka {
zk_connect => "10.13.86.190:2181,10.13.86.191:2181,10.13.86.192:2181,10.13.86.193:2181"
topic_id => "jsonjson"
group_id => "jsonjson"
consumer_threads => 1
reset_beginning => false
codec => json {
charset => "UTF-8"
}
}
}
filter {
date {
match => ["cTime", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
remove_field => ["time"]
}
}logstash输出如下

一个告警的日志分割:
[20 Oct 2016 11:43:07] ***['mail', 'msg'] TO lixu5***10.13.1.239(web008.nwapcms.msina.bx)TCP_Establish PROBLEM***---- 新浪网服务保障监控平台 ----报警状态:PROBLEM报警等
级:【Average】出错信息:TCP_Establish主机IP:10.13.1.239主机名:web008.nwapcms.msina.bx.sinanode.com机房:北显产品线:SINA.Portal.COM.发布系统返回信息:TCP_ESTAB (10.13.1.23
9:tcp[ESTAB]): 1814:报警时间:2016-10-20 11:43:07***{"message": null, "code": 0, "data": 3905729118377222}logstash配置如下:
filter {
ruby {
init => "@kname = ['field1','field2','sendTitle','body']"
code => "event.append(Hash[@kname.zip(event['message'].split('***'))]) "
remove_field => ["@version","message","tags","field1"]
}
if [field2] {
ruby {
init => "@kname = ['sendKind','toName']"
code => "event.append(Hash[@kname.zip(event['field2'].split(' TO '))]) "
remove_field => ["field2"]
}
}
if [sendTitle] {
grok {
match => ["sendTitle","%{IP:hostIp}\(%{DATA:hostName}\)"]
}
}
if [body] {
grok {
match => ["body","机房:%{DATA:idcName}产品线:%{DATA:productLine}返回信息:%{DATA:alterMsg}报警时间"]
}
}
grok {
match => ["body","报警时间:%{TIMESTAMP_ISO8601:time}"]
}
date {
match => ["time", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
remove_field => ["time"]
}
}
output {
if "_grokparsefailure" not in [tags] {
elasticsearch {
hosts => ["10.39.40.162:9220","10.39.40.163:9220","10.39.40.164:9220"]
workers => 1
index => "alter-mop-%{+YYYY.MM}"
}
}
#stdout { codec => dots }
stdout { codec => rubydebug }
}“运维网咖社”原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://www.net-add.com
©本站文章(技术文章和tank手记)均为社长"矢量比特"工作.实践.学习中的心得原创或手记,请勿转载!
欢迎扫描关注微信公众号【运维网咖社】

![]() |
社长"矢量比特",曾就职中软、新浪,现任职小米,致力于DevOps运维体系的探索和运维技术的研究实践. |






