環(huán)境:
系統(tǒng):centos7
ps: 請(qǐng)確認(rèn)kafka漂彤,zookeeper弦牡,storm部署完成(本文基于Apache ambari搭建的一個(gè)集群兼都,進(jìn)行測(cè)試)-
安裝包:
$ yum install -y gcc python-devel java cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-md5 cyrus-sasl-plain librdkafka-devel redis
- Install lein
$ wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
$ mv lein /usr/bin/
$ chmod a+x /usr/bin/lein
$ wget https://github.com/technomancy/leiningen/releases/download/2.8.1/leiningen-2.8.1-standalone.zip
$ mv leiningen-2.8.1-standalone.zip /root/.lein/self-installs/leiningen-2.8.1-standalone.jar
$ export LEIN_ROOT = 1
$ lein version # test lein version
image.png
- Create virtualenv
$ pip install streamparse confluent-kafka redis kazoo
整體架構(gòu)
Start demo
-
get kafka brokers
-
find zookeeper cluster(through Ambari)
image.png - get brokers
from kazoo.client import KazooClient import json def get_kafka_brokers(host): zookeeper = KazooClient(hosts=host, read_only=True) zookeeper.start() for node in zookeeper.get_children('/brokers/ids'): data, stats = zookeeper.get('/brokers/ids/'+node) props = json.loads(data) yield props['host']+':'+str(props['port']) zookeeper.stop() if __name__ == "__main__": print ','.join(get_kafka_brokers("cluster1.dc.com, cluster2.dc.com"))
輸出: cluster2.dc.com:6667
通過Ambari 確認(rèn)kafka集群,如圖
image.png -
-
producer往brokers生產(chǎn)數(shù)據(jù)(用了confluent-kafka)
# -*- coding:utf-8 -*- import confluent_kafka import random, time import json from get_broker_list import get_kafka_brokers def error_cb(err): print('Error: %s' % err) def main(): # bootstrap_servers = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' bootstrap_servers = ','.join(get_kafka_brokers(zk_host)) api_version_request = True conf = {'bootstrap.servers': bootstrap_servers, 'api.version.request': api_version_request, 'error_cb': error_cb, 'debug': 'protocol', 'broker.address.family': 'v4'} producer = confluent_kafka.Producer(**conf) user_list = ['jason', 'jane', 'tom', 'jack'] while True: data = {"user": random.choice(user_list), "timestamp": time.time(), "log_level": random.randint(0, 5) } try: producer.produce('test', value=json.dumps(data)) # time.sleep(random.randint(1, 2)) except BufferError: producer.poll(100) continue producer.flush() if __name__ == '__main__': main()
部分結(jié)果如圖:
image.png -
創(chuàng)建一個(gè)consumer進(jìn)行驗(yàn)證:
#!/usr/bin/env python import time import json from confluent_kafka import Consumer, KafkaException, KafkaError from get_broker_list import get_kafka_brokers def main(): # broker = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' bootstrap_servers = ','.join(get_kafka_brokers(zk_host)) group = 'test.py' conf = {'bootstrap.servers': bootstrap_servers, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = Consumer(**conf) consumer.subscribe(['test']) while True: msg = consumer.poll() try: print json.loads(msg.value()) except Exception: time.sleep(1) continue consumer.close() if __name__ == '__main__': main()
部分結(jié)果如圖:
image.png -
integrate with Storm(use package streamparse)
上面kafka producer產(chǎn)生了一條用戶記錄套媚,storm demo以計(jì)算5分鐘內(nèi)產(chǎn)生了多少條記錄(實(shí)際效果producer >> consumer缚态,所以導(dǎo)致延遲問題,測(cè)試數(shù)據(jù)大概5分鐘寫入150w-180w條凑阶,資源限制導(dǎo)致的性能問題猿规,僅供參考)
sparse quickstart onlineuser
其中topologies,bolts宙橱,以及spouts中的文件名可能是wordcount相關(guān)命名姨俩,修改或不修改均可,只需要確認(rèn)topologies文件中的topology能與spouts中的spout师郑,bolts中的bolt對(duì)應(yīng)起來即可
-
vim spout/user.py
import sys, os # sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../../kafka_example') abspath = "" # must fill with abs path, cannot use os.path.abspath, run as jar in /tmp directory if not abspath: raise Exception("setting kafka_exmaple directory abspath to import get_broker_list") sys.path.append(abspath) from confluent_kafka import Consumer from streamparse import Spout from get_broker_list import get_kafka_brokers class OnlineUserSpout(Spout): outputs = ['log'] def initialize(self, stormconf, context): # broker = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' broker = ','.join(get_kafka_brokers(zk_host)) group = 'test.py' conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} self.consumer = Consumer(**conf) def activate(self): self.consumer.subscribe(['test']) def next_tuple(self): msg = self.consumer.poll() if msg.value(): self.emit([msg.value()]) def deactivate(self): self.consumer.close()
-
vim bolts/serializer_log.py
(這部分沒有考慮用戶重復(fù)問題)import json import time from datetime import datetime, timedelta from redis import StrictRedis from streamparse import Bolt class RedisLog(Bolt): def initialize(self, conf, ctx): self.redis = StrictRedis() self.interval_minute = 5 def _increment(self, duration): return self.redis.incr(duration) def process(self, tup): data = json.loads(tup.values[0]) user = data['user'] # useless timestamp = data["timestamp"] now = datetime.fromtimestamp(int(timestamp)) now = now - timedelta(minutes=now.minute % self.interval_minute, seconds=now.second, microseconds=now.microsecond) now_timestamp = int(time.mktime(now.timetuple())) duration = '{0}-{1}'.format(now_timestamp, now_timestamp + self.interval_minute * 60) count = self._increment(duration) self.emit([duration, count])
-
vim topologies/onlineuser.py
""" Online User topology """ from streamparse import Topology from bolts.serializer_log import RedisLog from spouts.user import OnlineUserSpout class OnlineUserCount(Topology): log_spout = OnlineUserSpout.spec() count_bolt = RedisLog.spec(inputs=[log_spout])
-
$ sparse run
# 必須在sparse quickstart 項(xiàng)目路徑下(耗時(shí)較久环葵,需要build成jar到/tmp下執(zhí)行)
部分結(jié)果如圖(可能有一些warn,這是由于zookeeper日記文件相關(guān)寫入延遲宝冕,會(huì)影響storm性能张遭,測(cè)試先忽略)
image.png -
可以通過redis檢測(cè)key value(key是以時(shí)間戳區(qū)間,整形地梨,格式 'timestamp1-timestamp2')
import time from redis import StrictRedis redis = StrictRedis() while 1: keys = redis.keys() vals = redis.mget(keys) kv = zip(keys, vals) print kv time.sleep(10)
result:大致如圖
image.png -
可能出現(xiàn)的一些問題解決辦法:
-
運(yùn)行sparse run 時(shí)菊卷,爆storm版本不一致問題,修改project.clj宝剖,由于可能storm也是通過ambari進(jìn)行安裝洁闰,輸出版本的格式不一致(Hortonworks data platform 版本號(hào),類似‘1.1.0.2.6.2.0-205’万细,這時(shí)候需要去vim xxx/xxx/site-packages/streamparse/cli/run.py 大概48,49修改一下判斷or去掉檢測(cè))
image.png - 運(yùn)行sparse run時(shí)扑眉,可能出現(xiàn)NoClassDefFoundError: org/apache/commons/lang/StringUtils.
解決的辦法wget https://www.apache.org/dist/commons/lang/binaries/commons-lang-2.6-bin.zip.md5 unzip commons-lang-2.6-bin.zip cd commons-lang-2.6-bin.zip && mv commons-lang.jar storm/lib
-
Ending
整個(gè)過程中,可能還會(huì)出現(xiàn)一些issue赖钞,可以到對(duì)應(yīng)的項(xiàng)目去查看文檔腰素。
Finally,本文原創(chuàng)雪营,未經(jīng)許可弓千,謝絕轉(zhuǎn)載。=_=!