本站文章(技术文章和tank手记)均为社长矢量比特工作.实践.学习中的心得原创,请勿转载!

轻量级流式日志计算分析plog+(zabbix+grafana)

监控告警 矢量比特

plog是一个用python写的流式计算分析框架,适用于轻量级流式数据的分析场景,大数据场景下大家自然想到使用spark等方案。 拿当前的业务场景看,需要对机器上nginx的流日志进行状态码、

      plog是一个用python写的流式计算分析框架,适用于轻量级流式数据的分析场景,大数据场景下大家自然想到使用spark等方案。

      拿当前的业务场景看,需要对机器上nginx的流日志进行状态码、响应时间、QPS的实时分析,通过zabbix展现在grafana里,QPS在1000以内。传统方法是用shell脚本来计算各种数据,然后通过主动或被动模式传到zabbix里,此种方法有很大局限性,一是grep或awk过滤日志时,很难控制好过滤的数量,过滤的多了严重影响性能,可能上一个数据都没计算出来,这一次的计算又得开始了,二是过滤的少了可能丢日志,三是不好控制计算推送周期,总结一句话就是不科学。对于这种数据量不大的流式数据,plog恰巧能解决所有问题,采用流式分析,对数据进行实时计算,通过编写zabbix的sink插件,将计算结果用zabbix_sender推到zabbix。

      默认的channel模块支持正则和grok正则的方式匹配,由于日志的复杂性,重新编写了用split分割符的插件进行处理,目前对2xx、3xx、4xx、5xx、qps、request_time、404、499、500、502、504进行了流式监控,详细介绍一下配置部署二开过程。

传统shell处理方式详见之前博客:zabbix_sender主动上传k/v监控nginx日志状态码

一、安装zabbix_sender,不介绍了

二、安装plog模块框架

    详见我浪同学的github:https://github.com/SinaMSRE/Plog.git

三、配置编写plog的模块

1、目录结构简单介绍

plog.init  start|stop|restart
plog.conf  plog的配置文件
./plog   
     channel    流过滤使用的模块
     sink      流处理模块
     source     流的数据源

2、配置文件plog.conf

[source]
source_module=file_source
source_file=/data1/logs/nginx/access/default.log    #日志文件
source_interval=5
[channel]
channel_module=split_channel   #新建channel
channel_dict_key=status,request_time    #定义流数据需要的数据字典的key
[sink]
interval=60    #流计算的时间间隔
sink_module=zabbix_sink    #流计算使用的模块
sink_service=cms_front
sink_zabbix_monitor_keys=2xx,3xx,4xx,5xx,qps,request_time,404,499,500,502,504  #zabbix使用的字典键值
sink_zabbix_send_file=/tmp/zabbix_send_info     
sink_zabbix_sender=/usr/bin/zabbix_sender
sink_zabbix_conf=/etc/zabbix/zabbix_agentd.conf
[log_config]
logging_format=%(asctime)s %(filename)s [funcname:%(funcName)s] [line:%(lineno)d] %(levelname)s %(message)s
logging_level=200
logging_filename=/tmp/plog.log

3、channel里split_channel模块的创建编写

日志格式如下

[17/May/2017:15:59:59 +0800]`-`"GET /e/2017-05-17/zx-ifyfeius8035129.d.html?HTTPS=1&wb7=1 HTTP/1.1"`"Mozilla/5.0 (Linux; Android 6.0.1; MI 5 Build/MXB48T; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/51.0.2704.81 Mobile Safari/537.36 Weibo (Xiaomi-MI 5__weibo__7.4.0__android__android6.0.1)"`302`[183.207.216.230]`-`"-"`0.006`5`-`183.207.216.230`zx.sina.cn`Apache=1734318905510.0083.1460969978688; SINAGLOBAL=1734318905510.0083.1460969978688; ustat=__117.136.46.233_1470561558_0.72156500; genTime=1470561558; wm=3049_0016; ULV=1492129669225:1:1:1:1734318905510.0083.1460969978688:; vt=4; historyRecord={\x22href\x22:\x22https://zx.sina.cn/e/2017-05-17/zx-ifyfeius8042731.d.html?HTTPS=1&wb7=1&wm=3049_0015\x22,\x22refer\x22:\x22\x22}; sina_ucode=YIpQpMYXYI; SUB=_2A250DEfSDeRhGeNG4lQY8yvOzjSIHXVXXnOarDV9PUJbitANLVr_kWt64BftZM-dOtVxSfecCOs95A8jTA..; SUBP=0033WrSXqPxfM725Ws9jqgMF55529P9D9WWkK3TP5Fm3eOQho4hT_RMa5NHD95Qf1h.c1Kefeo-RWs4DqcjVi--fiKysi-8si--fiKysi-8si--fiKysi-8si--Xi-iFi-2fSKn4So.0SKzf15tt

在./plog/plog/channel目录下创建split_channel.py文件,并编写代码如下:

from plog.channel.base import channel_base
import os
import time
class channel(channel_base):
    def __init__(self,channel_dict,source_iter,dict_queue):
        self.channel_dict_key   = channel_dict["channel_dict_key"].split(",")
        self.source_iter        = source_iter
        self.dict_queue         = dict_queue
    def parse_line(self):
        for line in self.source_iter:
            try:
                li1 = line.split('`')   #定义分隔符
                li2 = []                #初始化列表
                if li1[4] and li1[8]:  
                  li2.append(li1[4])   #取出日志分割后使用的字段,对应channel_dict_key的status
                  li2.append(li1[8])   #取出日志分割后使用的字段,对应channel_dict_key的request_time
                  self.dict_queue.put(dict(zip(self.channel_dict_key,li2)))   #对应打成字典
                else:
                  pass
            except Exception,e:
                print str(e)

4、sink里zabbix_sink流处理的编写

模块是已有的,进行重新编码如下:

from plog.sink.base import sink_base
import os
import time
import commands
class sink(sink_base):
    def __init__(self, sink_dict):
        self.service = sink_dict['sink_service']
        self.send_dict = dict.fromkeys(sink_dict['sink_zabbix_monitor_keys'].split(','),0)
        
        self.zabbix_send_file = "_".join((sink_dict['sink_zabbix_send_file'],self.service))
        self.zabbix_sender = sink_dict['sink_zabbix_sender']
        self.zabbix_conf = sink_dict['sink_zabbix_conf']
        self.dealed_total       = 0
        self.count              = 0
        self.req_time_count     = 0
    
    def calculate_item(self, item):   #计算函数
        self.count += 1
        #host = item['host']
        try:
            status = item['status']
            request_time = float(item['request_time'])
            if status[0:1] == "2":
                self.send_dict['2xx'] += 1
            elif status[0:1] == "3":
                self.send_dict['3xx'] += 1
            elif status[0:1] == "4":
                self.send_dict['4xx'] += 1
            elif status[0:1] == "5":
                self.send_dict['5xx'] += 1
            if status == "404":
                self.send_dict['404'] += 1
            elif status == "499":
                self.send_dict['499'] += 1
            elif status == "500":
                self.send_dict['500'] += 1
            elif status == "502":
                self.send_dict['502'] += 1
            elif status == "504":
                self.send_dict['504'] += 1
            self.req_time_count += request_time
        except Exception,e:
            pass
        #print response_code,request_time
    
    def deal_sink(self):    #一个周期计算完成后的处理函数
        import platform
        hostname = platform.uname()[1]
        if self.count > 0:
           self.send_dict['request_time'] = float(self.req_time_count/float(self.count))
           self.send_dict['qps'] = int(self.count/60)
        with open(self.zabbix_send_file,"w") as file_handle:
            for key in  self.send_dict:
                info="%s %s_%s %f\n" % (hostname, self.service, str(key), self.send_dict[key])
                file_handle.write(info)
                self.send_dict[key]=0
        self.count = 0
        self.req_time_count = 0
        cmd = "%s -c %s -i %s" % (self.zabbix_sender, self.zabbix_conf, self.zabbix_send_file)
        status,output = commands.getstatusoutput(cmd)

处理完成后启动服务,查看zabbix_sender的处理结果如下:

QQ图片20170517200306.png

5、将对应的监控项配置到zabbix里再配置到grafana里,不再进行介绍

备注:

1、调试信息可以直接打到log里,/tmp/plog.log;

2、启动时会遇到几个小插曲,用的时候就知道了,需要easy_install安装一个模块,需要改几行代码,留个小惊喜吧。


运维网咖社”原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://www.net-add.com



©本站文章(技术文章和tank手记)均为社长"矢量比特"工作.实践.学习中的心得原创或手记,请勿转载!

喜欢 (18) or 分享 (0)
欢迎扫描关注微信公众号【运维网咖社
社长"矢量比特",曾就职中软、新浪,现任职小米,致力于DevOps运维体系的探索和运维技术的研究实践.