本站文章(技术文章和tank手记)均为社长矢量比特工作.实践.学习中的心得原创,请勿转载!

logstash中timestamp的转换及json日志的解析

日志分析 矢量比特

logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志

       logstash在整个日志分析方案中起到了数据清洗的作用,在向es推数据时一个很重的细节是timestamp的设定,如果按照流时间走默认值,是不利于后期的分析的,所以需要通过各种方式将日志里的时间用作es的timestamp索引,总结一下目前遇到的一些格式处理。

nginx的日志时间:

时间格式

[07/Dec/2017:14:16:54 +0800]

logstash的配置写法

grok {
  match => ["time","%{HTTPDATE:logdate}"]             #取出time字段并按照HTTPDATE匹配到时间格式化后赋值给logdate
    }
date {
  match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]      #匹配时间
  target => "@timestamp"                              #替换timestamp
  remove_field => ["time","logdate"]                  #删除这两个字段
}

自定义日志时间格式:

时间格式

2017-12-07 15:26:08

logstash的配置写法

date {
  match => ["time", "yyyy-MM-dd HH:mm:ss"]
  target => "@timestamp"
  remove_field => ["time"]
   }

json日志的解析与时间转换:

日志格式

{"status": 3, "docID": "fypnqvn0771173", "taskID": "fypnyqi1591819", "sendPubMsgTime": "2017-12-07 16:14:10", "errCode": 0, "replaceTaskID": "", "retried": 0, "mTime": "2017-12-07 16:14:10", "operator": "lixu5@staff.sina.com.cn", "modelID": "weibo_video_hotrank", "errMsg": "发布成功:", "cTime": "2017-12-07 16:14:00"}

logstash的配置写法

input {
  kafka {
    zk_connect => "10.13.86.190:2181,10.13.86.191:2181,10.13.86.192:2181,10.13.86.193:2181"
    topic_id => "jsonjson"
    group_id => "jsonjson"
    consumer_threads => 1
    reset_beginning => false
    codec => json {
       charset => "UTF-8"
      }
  }
}
filter {
date {
        match => ["cTime", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
        remove_field => ["time"]
        }
 }

logstash输出如下

图片.png

一个告警的日志分割:

[20 Oct 2016 11:43:07] ***['mail', 'msg'] TO lixu5***10.13.1.239(web008.nwapcms.msina.bx)TCP_Establish PROBLEM***---- 新浪网服务保障监控平台 ----报警状态:PROBLEM报警等
级:【Average】出错信息:TCP_Establish主机IP:10.13.1.239主机名:web008.nwapcms.msina.bx.sinanode.com机房:北显产品线:SINA.Portal.COM.发布系统返回信息:TCP_ESTAB (10.13.1.23
9:tcp[ESTAB]): 1814:报警时间:2016-10-20 11:43:07***{"message": null, "code": 0, "data": 3905729118377222}

logstash配置如下:

filter {
  ruby {
        init => "@kname = ['field1','field2','sendTitle','body']"
        code => "event.append(Hash[@kname.zip(event['message'].split('***'))]) "
        remove_field => ["@version","message","tags","field1"]
  }
   
     if [field2] {
        ruby {
                init => "@kname = ['sendKind','toName']"
                code => "event.append(Hash[@kname.zip(event['field2'].split(' TO '))]) "
                remove_field => ["field2"]
                }
        }
     if [sendTitle] {
        grok {  
                match => ["sendTitle","%{IP:hostIp}\(%{DATA:hostName}\)"]
                
                }  
        }
     if [body] {
        grok {  
                match => ["body","机房:%{DATA:idcName}产品线:%{DATA:productLine}返回信息:%{DATA:alterMsg}报警时间"]
                
                }  
        }  
      grok {  
                match => ["body","报警时间:%{TIMESTAMP_ISO8601:time}"]
                }            
      date {
        match => ["time", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
        remove_field => ["time"]
         }
}
output {
      
      if "_grokparsefailure" not in [tags] { 
          elasticsearch {
                hosts => ["10.39.40.162:9220","10.39.40.163:9220","10.39.40.164:9220"]
workers => 1
                index => "alter-mop-%{+YYYY.MM}"
        }
       }
          #stdout { codec => dots }
          stdout { codec => rubydebug }
}


运维网咖社”原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://www.net-add.com


©本站文章(技术文章和tank手记)均为社长"矢量比特"工作.实践.学习中的心得原创或手记,请勿转载!

喜欢 (4) or 分享 (0)
欢迎扫描关注微信公众号【运维网咖社
社长"矢量比特",曾就职中软、新浪,现任职小米,致力于DevOps运维体系的探索和运维技术的研究实践.