配置信息
KAFKA_FROM_ALTER_MANGER_TOPIC = "alter_xxx_from_alter_manager_123"
KAFKA_CUSTOMER_GROUP = "group_alter_xxx_from_manager"
KAFKA_ALERT_MANAGER_CLUSTER = ['192.168.xx.xx:9092', '192.168.xx.xx:9092', '192.168.xx.xx:9092']
生產(chǎn)數(shù)據(jù)
from kafka import KafkaProducer
from django.conf import settings
import json
import os
class KP(object):
"""
模擬kafka生產(chǎn)者往指定topic發(fā)布消息
"""
p = None
tp = None
def __init__(self):
self.tp = settings.KAFKA_FROM_ALTER_MANGER_TOPIC
self.p = KafkaProducer(bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER)
def send_msg(self, dat):
try:
d = json.dumps(dat)
except Exception as e:
print("json dumps error: {}".format(e))
return
try:
future = self.p.send(self.tp, d.encode())
self.p.flush()
ret = future.get(timeout=1)
print("return: {}".format(ret))
except Exception as e:
print("error: {}".format(e))
多進(jìn)程消費(fèi)數(shù)據(jù)
import json
import os
import multiprocessing
from django.conf import settings
from django.db import connections
from common.utils import get_logger
from kafka import KafkaConsumer
from datetime import datetime
logger = get_logger(__file__)
class AlertWatcher:
"""
啟動(dòng)進(jìn)程消費(fèi)alert manager發(fā)布到kafka的告警消息
處理消息并保持到數(shù)據(jù)庫(kù)
"""
def __init__(self):
self.name = 'alter_msg_watcher'
def run(self):
executor = multiprocessing.Pool(processes=20)
custom = KafkaConsumer(settings.KAFKA_FROM_ALTER_MANGER_TOPIC,
group_id=settings.KAFKA_CUSTOMER_GROUP,
bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER,
max_poll_records=1)
for msg in custom:
executor.apply_async(self.handle_alert_msg, (msg,))
executor.close()
@staticmethod
def handle_alert_msg(_msg):
# 關(guān)閉無(wú)效的mysql連接
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
# --- for loop sub kafka topic msg ---
handle_start = datetime.now()
logger.info("===handle alert=== %s pid:%s got alert msg from kafka: %s" % (handle_start, os.getpid(), _msg))
try:
dat = json.loads(_msg.value.decode())
except Exception as e:
logger.error("msg: {}, load err: {}".format(_msg, e))
return
# --- handle alert msg ---
alerts = dat.get("alerts", None)
if not alerts:
logger.error("dat: {}, no ALTERS value.".format(dat))
return
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者