logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志
logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志里的时间用作es的timestamp索引,总结一下目前遇到的一些格式处理。
nginx的日志时间:
时间格式
[07/Dec/2017:14:16:54 +0800]
logstash的配置写法
grok { match => ["time","%{HTTPDATE:logdate}"] #取出time字段并按照HTTPDATE匹配到时间格式化后赋值给logdate } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] #匹配时间 target => "@timestamp" #替换timestamp remove_field => ["time","logdate"] #删除这两个字段 }
自定义日志时间格式:
时间格式
2017-12-07 15:26:08
logstash的配置写法
date { match => ["time", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" remove_field => ["time"] }
json日志的解析与时间转换:
日志格式
{"status": 3, "docID": "fypnqvn0771173", "taskID": "fypnyqi1591819", "sendPubMsgTime": "2017-12-07 16:14:10", "errCode": 0, "replaceTaskID": "", "retried": 0, "mTime": "2017-12-07 16:14:10", "operator": "lixu5@staff.sina.com.cn", "modelID": "weibo_video_hotrank", "errMsg": "发布成功:", "cTime": "2017-12-07 16:14:00"}
logstash的配置写法
input { kafka { zk_connect => "10.13.86.190:2181,10.13.86.191:2181,10.13.86.192:2181,10.13.86.193:2181" topic_id => "jsonjson" group_id => "jsonjson" consumer_threads => 1 reset_beginning => false codec => json { charset => "UTF-8" } } } filter { date { match => ["cTime", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" remove_field => ["time"] } }
logstash输出如下
一个告警的日志分割:
[20 Oct 2016 11:43:07] ***['mail', 'msg'] TO lixu5***10.13.1.239(web008.nwapcms.msina.bx)TCP_Establish PROBLEM***---- 新浪网服务保障监控平台 ----报警状态:PROBLEM报警等 级:【Average】出错信息:TCP_Establish主机IP:10.13.1.239主机名:web008.nwapcms.msina.bx.sinanode.com机房:北显产品线:SINA.Portal.COM.发布系统返回信息:TCP_ESTAB (10.13.1.23 9:tcp[ESTAB]): 1814:报警时间:2016-10-20 11:43:07***{"message": null, "code": 0, "data": 3905729118377222}
logstash配置如下:
filter { ruby { init => "@kname = ['field1','field2','sendTitle','body']" code => "event.append(Hash[@kname.zip(event['message'].split('***'))]) " remove_field => ["@version","message","tags","field1"] } if [field2] { ruby { init => "@kname = ['sendKind','toName']" code => "event.append(Hash[@kname.zip(event['field2'].split(' TO '))]) " remove_field => ["field2"] } } if [sendTitle] { grok { match => ["sendTitle","%{IP:hostIp}\(%{DATA:hostName}\)"] } } if [body] { grok { match => ["body","机房:%{DATA:idcName}产品线:%{DATA:productLine}返回信息:%{DATA:alterMsg}报警时间"] } } grok { match => ["body","报警时间:%{TIMESTAMP_ISO8601:time}"] } date { match => ["time", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" remove_field => ["time"] } } output { if "_grokparsefailure" not in [tags] { elasticsearch { hosts => ["10.39.40.162:9220","10.39.40.163:9220","10.39.40.164:9220"] workers => 1 index => "alter-mop-%{+YYYY.MM}" } } #stdout { codec => dots } stdout { codec => rubydebug } }
“运维网咖社”原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://www.net-add.com
©本站文章(技术文章和tank手记)均为社长"矢量比特"工作.实践.学习中的心得原创或手记,请勿转载!
欢迎扫描关注微信公众号【运维网咖社】
社长"矢量比特",曾就职中软、新浪,现任职小米,致力于DevOps运维体系的探索和运维技术的研究实践. |