大數(shù)據(jù):用ApacheKafka和Python來(lái)實(shí)時(shí)提取新冠數(shù)據(jù)

Apache Kafka 是分布式的流處理平臺(tái)瞻凤, 能夠發(fā)布消息和訂閱消息, 并且能夠以容錯(cuò)的持久的方式來(lái)存儲(chǔ)記錄數(shù)據(jù)流世杀, 作為大數(shù)據(jù)生態(tài)的重要組成部分阀参, Apache Kafka主要應(yīng)用在構(gòu)建實(shí)時(shí)的流數(shù)據(jù)管道,在系統(tǒng)和應(yīng)用間得到可靠的數(shù)據(jù), 并且能夠構(gòu)建轉(zhuǎn)換或響應(yīng)實(shí)時(shí)數(shù)據(jù)流的應(yīng)用瞻坝。這里通過(guò)用一個(gè)小demo展示如何使用 Apache Kafka producer和consumer 來(lái)實(shí)時(shí)發(fā)布和訂閱數(shù)據(jù)蛛壳。

數(shù)據(jù)的來(lái)源是https://covid19api.com/。網(wǎng)站提供完全免費(fèi)的rest api 新冠數(shù)據(jù)。如通過(guò)以下的Api call 可以獲得如下的json.

(https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-03-01T00:00:00Z&to=2020-04-01T00:00:00Z)
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 130,
    "Status": "confirmed",
    "Date": "2020-03-01T00:00:00Z"
  },
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 159,
    "Status": "confirmed",
    "Date": "2020-03-02T00:00:00Z"
  },
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 196,
    "Status": "confirmed",
    "Date": "2020-03-03T00:00:00Z"
  }


在開(kāi)始數(shù)據(jù)的發(fā)布和訂閱之前炕吸,首先要開(kāi)始Kafka 服務(wù)伐憾。代碼如下

(base) cloud_user@yin2c:~$ sudo systemctl start confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-kafka
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-kafka

之后查看kafka broker是否在運(yùn)行。

這樣Kafka就設(shè)置好了赫模,下一步要?jiǎng)?chuàng)建一個(gè)話題topic

kafka-topics --bootstrap-server localhost:9092 --create --topic py --partitions 1 --replication-factor 1

接下來(lái)用python 來(lái)創(chuàng)建消息發(fā)布者和訂閱者树肃。消息的來(lái)源是新冠數(shù)據(jù), 通過(guò)api call來(lái)獲取數(shù)據(jù), 是德國(guó)從4月20號(hào)以來(lái)每天的現(xiàn)存病例數(shù)量瀑罗, 先創(chuàng)建一個(gè)發(fā)布者實(shí)例胸嘴, 設(shè)置好服務(wù)器,然后通過(guò)loop 把得到的json數(shù)據(jù)字典中的每天的病例數(shù)量發(fā)布到topic 里面斩祭。當(dāng)啟動(dòng)發(fā)布者之后劣像, 訂閱者就會(huì)逐行打印得到的信息。

from kafka import KafkaProducer
from json import loads
import json
import requests
from time import sleep

#list of all data from first date
#URL = "https://api.covid19api.com/total/dayone/country/germany/status/confirmed"
URL ="https://api.covid19api.com/live/country/germany/status/confirmed/date/2020-04-20T13:13:30Z"
req = requests.get(url = URL)
data = req.json()
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for i in range (len(data)):
    file = data[i]
    sleep(1)
    producer.send('py', value=str(file["Date"].split("T")[0])+':'+str(file["Active"]))
   

消息的訂閱者很簡(jiǎn)單就是一個(gè)監(jiān)聽(tīng)topic 的訂閱者摧玫。首先開(kāi)始訂閱者耳奕, 由于還沒(méi)有消息發(fā)布, 所以沒(méi)有信息诬像。當(dāng)發(fā)布者啟動(dòng)之后屋群, 就可以看到信息被逐行打印出來(lái)。


image.png

image.png

代碼可以通過(guò)我的github 分叉:https://github.com/dtdetianyin/ApacheKafka/tree/master/Corona19%20Data%20processed%20with%20ApacheKafka%20and%20Python
_

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末坏挠,一起剝皮案震驚了整個(gè)濱河市芍躏,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌降狠,老刑警劉巖对竣,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異榜配,居然都是意外死亡否纬,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)芥牌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)烦味,“玉大人,你說(shuō)我怎么就攤上這事壁拉∶恚” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵弃理,是天一觀的道長(zhǎng)溃论。 經(jīng)常有香客問(wèn)我,道長(zhǎng)痘昌,這世上最難降的妖魔是什么钥勋? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任炬转,我火速辦了婚禮,結(jié)果婚禮上算灸,老公的妹妹穿的比我還像新娘扼劈。我一直安慰自己,他們只是感情好菲驴,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布荐吵。 她就那樣靜靜地躺著,像睡著了一般赊瞬。 火紅的嫁衣襯著肌膚如雪先煎。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天巧涧,我揣著相機(jī)與錄音薯蝎,去河邊找鬼。 笑死谤绳,一個(gè)胖子當(dāng)著我的面吹牛占锯,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播缩筛,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼烟央,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了歪脏?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤粮呢,失蹤者是張志新(化名)和其女友劉穎婿失,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體啄寡,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡豪硅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了挺物。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懒浮。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖识藤,靈堂內(nèi)的尸體忽然破棺而出砚著,到底是詐尸還是另有隱情,我是刑警寧澤痴昧,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布稽穆,位于F島的核電站,受9級(jí)特大地震影響赶撰,放射性物質(zhì)發(fā)生泄漏舌镶。R本人自食惡果不足惜柱彻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望餐胀。 院中可真熱鬧哟楷,春花似錦、人聲如沸否灾。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)坟冲。三九已至磨镶,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間健提,已是汗流浹背琳猫。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留私痹,地道東北人脐嫂。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像紊遵,于是被迫代替她去往敵國(guó)和親账千。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345