基于Elastalert的二次開發(fā)
https://github.com/lights8080/elastalert forked from Yelp/elastalert
修改內(nèi)容大致如下:
- 修改報(bào)警及日志的日期格式為
%Y-%m-%d %H:%M:%S %Z
- 集成釘釘報(bào)警(支持At、secret認(rèn)證),參考
example_rules/example_frequency_lights8080.yaml
- PercentageMatchRule,報(bào)警內(nèi)容增加match_bucket_count字段
- FrequencyRule轴猎,報(bào)警內(nèi)容增加doc_count字段
- requirements.txt改為elasticsearch==7.0.0
- 優(yōu)化日志
規(guī)則配置建議
- buffer_time與run_every參數(shù)設(shè)置相同
支持釘釘報(bào)警
- 新增文件:elastalert_modules/dingtalk_alert.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: xuyaoqiang,lights8080
@contact: xuyaoqiang@gmail.com
@date: 2017-09-14 17:35,2021-06-23
@version: 0.0.0
@license:
@copyright:
"""
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
import sys
import io
import time
import datetime
import hmac
import hashlib
import base64
import urllib
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
class DingTalkAlerter(Alerter):
required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])
def __init__(self, rule):
super(DingTalkAlerter, self).__init__(rule)
self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
self.dingtalk_title = self.rule.get('dingtalk_title', '')
self.dingtalk_atMobiles = self.rule.get('dingtalk_atMobiles', [])
self.dingtalk_secret = self.rule.get('dingtalk_secret', '')
def format_body(self, body):
return body.encode('utf8')
def alert(self, matches):
headers = {
"Content-Type": "application/json",
"Accept": "application/json;charset=utf-8"
}
body = self.create_alert_body(matches)
payload = {
"msgtype": self.dingtalk_msgtype,
"text": {
"content": body
},
"at": {
"isAtAll":False
}
}
if len(self.dingtalk_atMobiles) > 0:
payload["at"]["atMobiles"] = self.dingtalk_atMobiles
url = self.dingtalk_webhook_url
if len(self.dingtalk_secret) > 0:
timestamp = round(time.time() * 1000)
secret_enc = bytes(self.dingtalk_secret, encoding='utf8')
string_to_sign = '{}\n{}'.format(timestamp, self.dingtalk_secret)
string_to_sign_enc = bytes(string_to_sign, encoding='utf8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
url = '{}×tamp={}&sign={}'.format(self.dingtalk_webhook_url, timestamp, sign)
try:
response = requests.post(url,
data=json.dumps(payload, cls=DateTimeEncoder),
headers=headers)
response.raise_for_status()
print(response)
except RequestException as e:
raise EAException("Error request to Dingtalk: {0}".format(str(e)))
def get_info(self):
return {
"type": "dingtalk",
"dingtalk_webhook": self.dingtalk_webhook_url
}
pass
- 修改規(guī)則
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=token"
dingtalk_msgtype: "text"
dingtalk_secret: "secret"
dingtalk_atMobiles:
- "18610241024"
報(bào)警實(shí)踐轉(zhuǎn)換為本地時(shí)區(qū)
規(guī)則配置增強(qiáng)模塊:
match_enhancements:
- "elastalert.enhancements.TimeEnhancement"
修改前:
Example rule
At least 50 events occurred between 2021-06-18 19:55:24 CST and 2021-06-18 20:00:24 CST
@timestamp: 2021-06-18T12:00:24.768631Z
修改后:
Example rule
At least 50 events occurred between 2021-06-18 19:55:24 CST and 2021-06-18 20:00:24 CST
@timestamp: 2021-06-18 20:00:24 CST