ELK基于ElastAlert實現(xiàn)日志的微信報警

一、ElastAlert介紹

在日志管理上我們使用Elasticsearch沉噩,Logstash和Kibana技術(shù)棧來管理不斷增長的數(shù)據(jù)和日志闹丐,但是對于錯誤日志的監(jiān)控ELK架構(gòu)并沒有提供,所以我們需要使用到第三方工具ElastAlert昆禽,來幫助我們及時發(fā)現(xiàn)業(yè)務中存在的問題。

ElastAlert通過定期查詢Elasticsearch蝇庭,并將數(shù)據(jù)傳遞到規(guī)則類型醉鳖,該規(guī)則類型確定何時找到匹配項。發(fā)生匹配時哮内,將為該警報提供一個或多個警報盗棵,這些警報將根據(jù)匹配采取行動。

這是由一組規(guī)則配置的北发,每個規(guī)則定義一個查詢纹因,一個規(guī)則類型和一組警報。


在這里插入圖片描述

ElastAlert支持以下方式報警

  • Command (可調(diào)用短信接口)
  • Email
  • JIRA
  • OpsGenie
  • SNS
  • HipChat
  • Slack
  • Telegram
  • Debug
  • Stomp

除了這種基本用法外琳拨,還有許多其他功能使警報更加有用:

  • 警報鏈接到Kibana儀表板
  • 任意字段的合計計數(shù)
  • 將警報合并為定期報告
  • 通過使用唯一鍵字段來分隔警報
  • 攔截并增強比賽數(shù)據(jù)

二瞭恰、部署ElastAlert

1. 部署所需環(huán)境

ELK 環(huán)境部署

EFK6.3+kafka+logstash日志分析平臺集群

安裝依賴包

$ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel

部署python3.6

$ mkdir -p /usr/local/python3
$ cd /usr/local/python3
$ wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz
$ tar xf Python-3.6.9.tgz
$ cd Python-3.6.9
$ ./configure --prefix=/usr/local/python3
$ make && make install

配置環(huán)境變量

# 將python2.7 軟鏈刪除,換成python3.6
$ rm /usr/bin/python
$ ln -s /usr/local/python3/bin/python3.6 /usr/bin/python
$ rm /usr/bin/pip
$ ln -s /usr/local/python3/bin/pip3 /usr/bin/pip

驗證版本

$ python
Python 3.6.9 (default, Jun  2 2020, 12:12:43) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> 
$ pip -V
pip 18.1 from /usr/local/python3/lib/python3.6/site-packages/pip (python 3.6)

2. 部署ElastAlert

$ cd /app
$ git clone https://github.com/Yelp/elastalert.git

安裝模塊:

$ pip install "setuptools>=11.3"
$ python setup.py install

根據(jù)Elasticsearch的版本狱庇,您可能需要手動安裝正確版本的elasticsearch-py惊畏。

Elasticsearch 5.0+:

$ pip install "elasticsearch>=5.0.0"

Elasticsearch 2.X:

$ pip install "elasticsearch<3.0.0"

3. 配置ElastAlert

配置config.yaml 文件

$ cp config.yaml.example  config.yaml
$ cat config.yaml
rules_folder: example_rules
run_every:
  seconds: 10
buffer_time:
  minutes: 15
es_host: 10.1.144.208
es_port: 9201
#es_username: elastic
#es_password: 123456
writeback_index: elastalert_status
alert_time_limit:
  days: 2

rules_folder:ElastAlert從中加載規(guī)則配置文件的位置恶耽。它將嘗試加載文件夾中的每個.yaml文件。沒有任何有效規(guī)則陕截,ElastAlert將無法啟動驳棱。
run_every: ElastAlert多久查詢一次Elasticsearch的時間。
buffer_time:查詢窗口的大小农曲,從運行每個查詢的時間開始向后延伸社搅。對于其中use_count_query或use_terms_query設置為true的規(guī)則,將忽略此值乳规。
es_host:是Elasticsearch群集的地址形葬,ElastAlert將在其中存儲有關(guān)其狀態(tài),查詢運行暮的,警報和錯誤的數(shù)據(jù)笙以。
es_port:es對應的端口。
es_username: 可選的; 用于連接的basic-auth用戶名es_host冻辩。
es_password: 可選的; 用于連接的basic-auth密碼es_host猖腕。
es_send_get_body_as: 可選的; 方法查詢Elasticsearch - GET,POST或source恨闪。默認是GET
writeback_index:ElastAlert將在其中存儲數(shù)據(jù)的索引的名稱倘感。我們稍后將創(chuàng)建此索引。
alert_time_limit: 失敗警報的重試窗口咙咽。

創(chuàng)建elastalert-create-index索引

$ elastalert-create-index
New index name (Default elastalert_status)
Name of existing index to copy (Default None)
New index elastalert_status created
Done!

三老玛、使用微信報警

由于ElastAlert沒有內(nèi)置企業(yè)微信的報警方式,我們還需要使用一個開源插件elastalert-wechat-plugin來實現(xiàn)微信的報警钧敞,Github項目地址

1. 下載項目文件

$ cd elastalert
$ wget -P ~/elastalert/elastalert_modules/ wget https://raw.githubusercontent.com/anjia0532/elastalert-wechat-plugin/master/elastalert_modules/wechat_qiye_alert.py
$ touch ~/elastalert/elastalert_modules/__init__.py

2. 修改插件源碼

由于這個插件是基于python2.x版本開發(fā)的蜡豹,而ElastAlert的最新版本使用的是python3.6版本開發(fā),所以需要改一些代碼溉苛,以便正常運行镜廉,另外還添添加了轉(zhuǎn)中文字符功能。
wechat_qiye_alert.py修改后如下:

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import json
import datetime
from elastalert.alerts import Alerter, BasicMatchString
from requests.exceptions import RequestException
from elastalert.util import elastalert_logger,EAException #[感謝minminmsn分享](https://github.com/anjia0532/elastalert-wechat-plugin/issues/2#issuecomment-311014492)
import requests
from elastalert_modules.MyEncoder import MyEncoder

'''
#################################################################
# 微信企業(yè)號推送消息                                              #
#                                                               #
# 作者: AnJia <anjia0532@gmail.com>                              #
# 作者博客: https://anjia.ml/                                    #
# Github: https://github.com/anjia0532/elastalert-wechat-plugin #
#                                                               #
#################################################################
'''
class WeChatAlerter(Alerter):

    #企業(yè)號id炊昆,secret桨吊,應用id必填

    required_options = frozenset(['corp_id','secret','agent_id'])

    def __init__(self, *args):
        super(WeChatAlerter, self).__init__(*args)
        self.corp_id = self.rule.get('corp_id', '')     #企業(yè)號id
        self.secret = self.rule.get('secret', '')       #secret
        self.agent_id = self.rule.get('agent_id', '')   #應用id

        self.party_id = self.rule.get('party_id')       #部門id
        self.user_id = self.rule.get('user_id', '')     #用戶id,多人用 | 分割凤巨,全部用 @all
        self.tag_id = self.rule.get('tag_id', '')       #標簽id
        self.access_token = ''                          #微信身份令牌
        self.expires_in=datetime.datetime.now() - datetime.timedelta(seconds=60)

    def create_default_title(self, matches):
        subject = 'ElastAlert: %s' % (self.rule['name'])
        return subject

    def alert(self, matches):

        if not self.party_id and not self.user_id and not self.tag_id:
            elastalert_logger.warn("All touser & toparty & totag invalid")

        # 參考elastalert的寫法
        # https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py#L236-L243
        body = self.create_alert_body(matches)

        #matches 是json格式
        #self.create_alert_body(matches)是String格式,詳見 [create_alert_body 函數(shù)](https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py)

        # 微信企業(yè)號獲取Token文檔
        # http://qydev.weixin.qq.com/wiki/index.php?title=AccessToken
        self.get_token()

        self.senddata(body)

        elastalert_logger.info("send message to %s" % (self.corp_id))

    def get_token(self):

        #獲取token是有次數(shù)限制的,本想本地緩存過期時間和token视乐,但是elastalert每次調(diào)用都是一次性的,不能全局緩存
        if self.expires_in >= datetime.datetime.now() and self.access_token:
            return self.access_token

        #構(gòu)建獲取token的url
        get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(self.corp_id,self.secret)

        try:
            response = requests.get(get_token_url)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("get access_token failed , stacktrace:%s" % e)
            #sys.exit("get access_token failed, system exit")

        token_json = response.json()

        if 'access_token' not in token_json :
            raise EAException("get access_token failed , , the response is :%s" % response.text())
            #sys.exit("get access_token failed, system exit")

        #獲取access_token和expires_in
        self.access_token = token_json['access_token']
        self.expires_in = datetime.datetime.now() + datetime.timedelta(seconds=token_json['expires_in'])

        return self.access_token

    def senddata(self, content):

        #如果需要原始json敢茁,需要傳入matches

        # http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F
        # 微信企業(yè)號有字符長度限制(2048)佑淀,超長自動截斷

        # 參考 http://blog.csdn.net/handsomekang/article/details/9397025
        #len utf8 3字節(jié),gbk2 字節(jié)彰檬,ascii 1字節(jié)
        if len(content) > 512 :
            content = content[:512] + "..."

        # 微信發(fā)送消息文檔
        # http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F
        send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' %( self.access_token)

        headers = {'content-type': 'application/json'}
      
        # 替換消息標題為中文,下面的字段為logstash切分的日志字段
        title_dict = { 
#            "At least": "報警規(guī)則:At least",
            "@timestamp": "報警時間",
            "_index": "索引名稱",
            "_type": "索引類型",
            "ServerIP": "報警主機",
            "hostname": "報警機器",
            "message": "報警內(nèi)容",
            "class": "報錯類",
            "lineNum": "報錯行"
           "num_hits": "文檔命中數(shù)",
           "num_matches": "文檔匹配數(shù)"
        }

        #print(f"type:{type(content)}")
        for k, v in title_dict.items():
            content = content.replace(k, v, 1 )

        # 最新微信企業(yè)號調(diào)整校驗規(guī)則伸刃,tagid必須是string類型谎砾,如果是數(shù)字類型會報錯,故而使用str()函數(shù)進行轉(zhuǎn)換
        payload = {
            "touser": self.user_id and str(self.user_id) or '', #用戶賬戶捧颅,建議使用tag
            "toparty": self.party_id and str(self.party_id) or '', #部門id景图,建議使用tag
            "totag": self.tag_id and str(self.tag_id) or '', #tag可以很靈活的控制發(fā)送群體細粒度。比較理想的推送應該是碉哑,在heartbeat或者其他elastic工具自定義字段挚币,添加標簽id。這邊根據(jù)自定義的標簽id扣典,進行推送
            'msgtype': "text",
            "agentid": self.agent_id,
            "text":{
                "content": content.encode('UTF-8').decode("latin1") #避免中文字符發(fā)送失敗
               },
            "safe":"0"
        }

        # set https proxy, if it was provided
        # 如果需要設置代理妆毕,可修改此參數(shù)并傳入requests
        # proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else None
        try:
            #response = requests.post(send_url, data=json.dumps(payload, ensure_ascii=False), headers=headers)
            response = requests.post(send_url, data=json.dumps(payload, cls=MyEncoder, indent=4, ensure_ascii=False), headers=headers)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("send message has error: %s" % e)

        elastalert_logger.info("send msg and response: %s" % response.text)


    def get_info(self):
        return {'type': 'WeChatAlerter'}

在同級目錄下創(chuàng)建MyEncoder.py文件

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
import json

class MyEncoder(json.JSONEncoder):
     def default(self, obj):
         if isinstance(obj, bytes):
            return str(obj, encoding='utf-8')
         return json.JSONEncoder.default(self, obj)

3. 申請企業(yè)微信賬號

step 1: 訪問網(wǎng)站 注冊企業(yè)微信賬號(不需要企業(yè)認證)。
step 2: 訪問apps 創(chuàng)建第三方應用贮尖,點擊創(chuàng)建應用按鈕 -> 填寫應用信息:
Step3: 創(chuàng)建部門笛粘,獲取部門ID

4. 配置報警規(guī)則

配置規(guī)則文件

$ cd  elastalert/example_rules/
$ cp example_frequency.yaml   applog.yaml
$ cat sms-applog.yaml  | grep -v ^#
name: 【日志報警】
use_strftine_index: true
type: frequency

index: applog-*  
num_events: 1
timeframe:
  minutes: 1
filter: 
- query:
    query_string:
      query: '"\[ERROR\]" NOT "發(fā)送郵件失敗"'

alert:
 - "elastalert_modules.wechat_qiye_alert.WeChatAlerter"
corp_id: wxdxxx40b4720f24
secret: xa4pWq63sxxtaZzzEg8X860ZBIoOkToCbh_oNc
agent_id: 1000002
party_id: 2

index:要查詢的索引的名稱, ES中存在的索引。
num_events:此參數(shù)特定于frequency類型湿硝,并且是觸發(fā)警報時的閾值薪前。
filter:用于過濾結(jié)果的Elasticsearch過濾器列表,這里的規(guī)則定義是除了包含“發(fā)送郵件失敗”的錯誤日志关斜,其他所有ERROR的日志都會觸發(fā)報警序六。
alert:定義報警方式,我們這里采用企業(yè)微信報警蚤吹。
corp_id: 企業(yè)微信的接口認證信息

5. 運行ElastAlert

$ python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml 
1 rules loaded
INFO:elastalert:Starting up
INFO:elastalert:Disabled rules are: []
INFO:elastalert:Sleeping for 9.999904 seconds
INFO:elastalert:Queried rule 【日志報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 / 0 hits
INFO:elastalert:Ran 【日志報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 query hits (0 already seen), 0 matches, 0 alerts sent
后臺運行
$ nohup python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml > nohup.txt 2>&1 

微信端添加企業(yè)號報警格式如下:

image.png

關(guān)注公眾號獲取視頻教程及更多資料:


image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市随抠,隨后出現(xiàn)的幾起案子裁着,更是在濱河造成了極大的恐慌,老刑警劉巖拱她,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件二驰,死亡現(xiàn)場離奇詭異,居然都是意外死亡秉沼,警方通過查閱死者的電腦和手機桶雀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來唬复,“玉大人矗积,你說我怎么就攤上這事〕ㄟ郑” “怎么了棘捣?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長休建。 經(jīng)常有香客問我乍恐,道長评疗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任茵烈,我火速辦了婚禮百匆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘呜投。我一直安慰自己加匈,他們只是感情好,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布宙彪。 她就那樣靜靜地躺著矩动,像睡著了一般。 火紅的嫁衣襯著肌膚如雪释漆。 梳的紋絲不亂的頭發(fā)上悲没,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機與錄音男图,去河邊找鬼示姿。 笑死,一個胖子當著我的面吹牛逊笆,可吹牛的內(nèi)容都是我干的栈戳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼难裆,長吁一口氣:“原來是場噩夢啊……” “哼子檀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起乃戈,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤褂痰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后症虑,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體缩歪,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年谍憔,在試婚紗的時候發(fā)現(xiàn)自己被綠了匪蝙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡习贫,死狀恐怖逛球,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情沈条,我是刑警寧澤需忿,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響屋厘,放射性物質(zhì)發(fā)生泄漏涕烧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一汗洒、第九天 我趴在偏房一處隱蔽的房頂上張望议纯。 院中可真熱鬧,春花似錦溢谤、人聲如沸瞻凤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阀参。三九已至,卻和暖如春瞻坝,著一層夾襖步出監(jiān)牢的瞬間蛛壳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工所刀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留衙荐,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓浮创,卻偏偏與公主長得像忧吟,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子斩披,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353