Apache Kafka 是分布式的流處理平臺(tái)瞻凤, 能夠發(fā)布消息和訂閱消息, 并且能夠以容錯(cuò)的持久的方式來(lái)存儲(chǔ)記錄數(shù)據(jù)流世杀, 作為大數(shù)據(jù)生態(tài)的重要組成部分阀参, Apache Kafka主要應(yīng)用在構(gòu)建實(shí)時(shí)的流數(shù)據(jù)管道,在系統(tǒng)和應(yīng)用間得到可靠的數(shù)據(jù), 并且能夠構(gòu)建轉(zhuǎn)換或響應(yīng)實(shí)時(shí)數(shù)據(jù)流的應(yīng)用瞻坝。這里通過(guò)用一個(gè)小demo展示如何使用 Apache Kafka producer和consumer 來(lái)實(shí)時(shí)發(fā)布和訂閱數(shù)據(jù)蛛壳。
數(shù)據(jù)的來(lái)源是https://covid19api.com/。網(wǎng)站提供完全免費(fèi)的rest api 新冠數(shù)據(jù)。如通過(guò)以下的Api call 可以獲得如下的json.
(https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-03-01T00:00:00Z&to=2020-04-01T00:00:00Z)
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 130,
"Status": "confirmed",
"Date": "2020-03-01T00:00:00Z"
},
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 159,
"Status": "confirmed",
"Date": "2020-03-02T00:00:00Z"
},
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 196,
"Status": "confirmed",
"Date": "2020-03-03T00:00:00Z"
}
在開(kāi)始數(shù)據(jù)的發(fā)布和訂閱之前炕吸,首先要開(kāi)始Kafka 服務(wù)伐憾。代碼如下
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-kafka
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-kafka
之后查看kafka broker是否在運(yùn)行。
這樣Kafka就設(shè)置好了赫模,下一步要?jiǎng)?chuàng)建一個(gè)話題topic
kafka-topics --bootstrap-server localhost:9092 --create --topic py --partitions 1 --replication-factor 1
接下來(lái)用python 來(lái)創(chuàng)建消息發(fā)布者和訂閱者树肃。消息的來(lái)源是新冠數(shù)據(jù), 通過(guò)api call來(lái)獲取數(shù)據(jù), 是德國(guó)從4月20號(hào)以來(lái)每天的現(xiàn)存病例數(shù)量瀑罗, 先創(chuàng)建一個(gè)發(fā)布者實(shí)例胸嘴, 設(shè)置好服務(wù)器,然后通過(guò)loop 把得到的json數(shù)據(jù)字典中的每天的病例數(shù)量發(fā)布到topic 里面斩祭。當(dāng)啟動(dòng)發(fā)布者之后劣像, 訂閱者就會(huì)逐行打印得到的信息。
from kafka import KafkaProducer
from json import loads
import json
import requests
from time import sleep
#list of all data from first date
#URL = "https://api.covid19api.com/total/dayone/country/germany/status/confirmed"
URL ="https://api.covid19api.com/live/country/germany/status/confirmed/date/2020-04-20T13:13:30Z"
req = requests.get(url = URL)
data = req.json()
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range (len(data)):
file = data[i]
sleep(1)
producer.send('py', value=str(file["Date"].split("T")[0])+':'+str(file["Active"]))
消息的訂閱者很簡(jiǎn)單就是一個(gè)監(jiān)聽(tīng)topic 的訂閱者摧玫。首先開(kāi)始訂閱者耳奕, 由于還沒(méi)有消息發(fā)布, 所以沒(méi)有信息诬像。當(dāng)發(fā)布者啟動(dòng)之后屋群, 就可以看到信息被逐行打印出來(lái)。
代碼可以通過(guò)我的github 分叉:https://github.com/dtdetianyin/ApacheKafka/tree/master/Corona19%20Data%20processed%20with%20ApacheKafka%20and%20Python
_