python使用kafka初步入門

kafka的使用

zookeeper相關

? zookeeper的安裝,查看文檔:zookeeper的使用

? 在kafka中产雹,zookeeper主要存kafka節(jié)點的數(shù)據(jù)意乓。

? 查看znode信息樱调,可以看到kafka的 broker, topic等信息

? 可以使用zookeeper命令,用來模擬創(chuàng)建topic届良,在kafka使用 kafka-topics.sh 也是能查到數(shù)據(jù)的。

? 所以證明圣猎,kafka的許多信息就是使用zookeeper來存取的士葫。

安裝kafka

? 直接下載,解壓就行送悔。

kafka的一些特性

  1. 高吞吐量慢显、低延遲爪模;
  2. 可擴展性;
  3. 持久性荚藻、可靠性屋灌;
  4. 容錯性;
  5. 高并發(fā)应狱;
  6. 支持實時在線處理和離線處理共郭。

kafka的使用場景

官方文檔顯示如下幾種場景可以使用:

  1. 網(wǎng)站活動追蹤

    用戶的活動追蹤,網(wǎng)站的活動(網(wǎng)頁游覽疾呻,搜索或其他用戶的操作信息)發(fā)布到不同的話題中心除嘹,這些消息可實時處理,實時監(jiān)測岸蜗,也可加載到Hadoop或離線處理數(shù)據(jù)倉庫

  2. 指標

    分布式應用程序生成的統(tǒng)計數(shù)據(jù)集中聚合

  3. 日志聚合

    使用kafka代替一個日志聚合的解決方案尉咕。

  4. 流處理

    kafka消息處理包含多個階段。其中原始輸入數(shù)據(jù)是從kafka主題消費的璃岳,然后匯總年缎,豐富,或者以其他的方式處理轉化為新主題

  5. 事件采集

    事件采集是一種應用程序的設計風格铃慷,其中狀態(tài)的變化根據(jù)時間的順序記錄下來单芜,kafka支持這種非常大的存儲日志數(shù)據(jù)的場景。

  6. 提交日志

    kafka可以作為一種分布式的外部提交日志枚冗,日志幫助節(jié)點之間復制數(shù)據(jù)缓溅,并作為失敗的節(jié)點來恢復數(shù)據(jù)重新同步。

kafka的一些基本概念

Topic(主題)

? 每一類的消息赁温,稱之為一個主題坛怪。

? 一個主題能分為多個partition,每個partition對應一個文件夾股囊,每一個消息發(fā)送到Broker時袜匿,會根據(jù)partition規(guī)則選擇存儲到哪一個partition。

Producer(生產(chǎn)者)

? 發(fā)布消息的對象稚疹。

producer將會與topic所有的partition leader保持socker連接居灯;

消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何路由層内狗;

事實上怪嫌,消息被路由到哪個partition由producer決定

Consumer(消費者)

? 訂閱消息并處理發(fā)布消息。

? kafka穩(wěn)定狀態(tài)下柳沙,每一個consumer實例只會消費某一個或多個特定的partition的數(shù)據(jù)岩灭。consumer與partition的數(shù)目有以下三種對應關系:

  1. consumer < partition , 至少有一個consumer會消費多個partition的數(shù)據(jù)赂鲤;

  2. consumer = partition 噪径, 正好有一個consumer消費一個partition數(shù)據(jù)柱恤;

  3. consumer > parition, 會有部分consumer無法消費該topic下任何一條消息。

    kafka對于消費消息:

  4. 不刪除已消費的消息找爱;

    1. 基于時間
    2. 基于partition文件大小
  5. 保證同一個consumer group只有一個consumer會消費一條消息梗顺;

  6. 允許不同consumer group同時消費一條消息。

consumer端向broker發(fā)送"fetch"請求车摄,并告知其獲取消息的offset寺谤。正常情況下,會在消費完一條消息后線性增加這個offset练般。

Broker (代理)

? 已發(fā)布的消息保存在一組服務器中矗漾,稱之為kafka集群。集群中的每一個服務器都是一個代理(broker)薄料。

kafka的目錄相關

  • /bin 操作kafka的可執(zhí)行腳本敞贡, 還包含windows下腳本;
  • /config 配置文件所在目錄
  • /libs 依賴庫目錄
  • /logs 日志數(shù)據(jù)目錄摄职,目錄kafka把server端日志分為:server, request, state, log-cleaner, controller.

kafka命令行下的使用

我們可以先用kafka提供的命令行工具誊役,來熟悉kafka的基本使用。

0谷市、最基本的命令

? 開啟kafka服務蛔垢。

? 在開啟kafka服務之前,先開啟zookeeper服務迫悠。

/path/to/zookeeper/bin/zkServer.sh start #默認訪問的配置文件../conf/zoo.cfg
/path/to/kafka/bin/kafka-server-start.sh -daemon ../config/server.properties
jps #查看是否開啟服務
18723 Jps
1972 ZooKeeperMain #zookeeper
4549 Kafka  #kafka

1鹏漆、創(chuàng)建一個主題

(learn) [root@localhost bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2
Created topic "test2".

2、獲取最近創(chuàng)建的主題列表

[root@localhost bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 
__consumer_offsets
test
test2   #剛才創(chuàng)建的主題

3创泄、查看主題詳細信息

[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2
Topic:test2 PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test2    Partition: 0    Leader: none    Replicas: 1001  Isr: 

這里發(fā)現(xiàn)Leader竟然顯示為None,這是一個錯誤R樟帷!鞠抑! 以下開始排錯

排錯的博文

也即是說饭聚,我們kafka存到zookeeper的信息出錯; 問題的原因搁拙,是因為我之前將zookeeper的datadir目錄下的所有文件都刪除秒梳,導致錯誤的出現(xiàn)。

[zk: localhost:2181(CONNECTED) 0] ls /    
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[test2, test, __consumer_offsets]
# 這里可以看出箕速,我們test2下沒有任何信息
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test2
[]
# test2節(jié)點存的信息如下
[zk: localhost:2181(CONNECTED) 1] get /brokers/topics/test2
{"version":1,"partitions":{"0":[1001]}}
cZxid = 0x104
ctime = Sat Dec 29 14:28:38 CST 2018
mZxid = 0x104
mtime = Sat Dec 29 14:28:38 CST 2018
pZxid = 0x10d
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 39
numChildren = 1
#然后我們開始創(chuàng)建test2的分區(qū)父節(jié)點
[zk: localhost:2181(CONNECTED) 5] create /brokers/topics/test2/partitions null 
Created /brokers/topics/test2/partitions
#然后創(chuàng)建分區(qū)0的節(jié)點
[zk: localhost:2181(CONNECTED) 6] create /brokers/topics/test2/partitions/0 null
Created /brokers/topics/test2/partitions/0
#創(chuàng)建0分區(qū)的狀態(tài)節(jié)點
[zk: localhost:2181(CONNECTED) 8] create /brokers/topics/test2/partitions/0/state
#設置節(jié)點信息
[zk: localhost:2181(CONNECTED) 6] set /brokers/topics/test2/partitions/0/state {"controller_epoch":2,"leader":0,"version":1,"leader_epoch":0,"isr":[1001]}

以上操作完成之后酪碘。我們再一次查看 topic信息

[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2
Topic:test2 PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test2    Partition: 0    Leader: 0   Replicas: 1001  Isr: 1001

這樣我們就能正確的獲得到topic的詳細信息了。

4盐茎、發(fā)送消息

[root@localhost bin]# ./kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic test2
>aaa
>bbb
>ccc

5婆跑、接收消息

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --topic test2
aaa
bbb
ccc

6、設置多個broker集群

? 上面我就是開啟了兩個偽Broker集群庭呜,也就是一臺機器上開了2個代理滑进。

? server-0.properties ,其他的配置不做修改

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.11.120:9093

? server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.11.120:9092

7募谎、使用kafka Connect 來導入/導出數(shù)據(jù)

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

? 主要就是修改connect-file-source.properties文件中file的路徑名(全路徑)

? 還有修改config/connect-file-sink.properties 導出的目標路徑名

使用一下命令查看主題:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning

8扶关、使用Kafka Stream來處理數(shù)據(jù)

步驟如下:

  • 運行中間處理數(shù)據(jù)的java庫
./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
  • 創(chuàng)建topic
bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input

(暫時沒試驗)

上面的命令有一些看上去沒有什么規(guī)律,有的地方用zookeeper数冬, 用的地方用broker-list节槐, 有的地方用bootstrap-server, 我們只需要記住拐纱,如果要查詢信息铜异,那么使用的就是zookeeper,因為kafka的信息都在zookeeper上秸架,發(fā)送命令用broker-list揍庄, 接收用bootstrap-server。 在代碼中使用大多都是使用bootstrap-server的方式东抹。

9蚂子、如何清空zookeeper與kafka測試數(shù)據(jù)

1、步驟先關閉zookeeper,kafka;

2缭黔、查看zookeeper配置文件中 datadir的具體位置食茎,然后刪除version-2/ ;

3馏谨、刪除zookeeper的log文件

4别渔、 刪除kafka的log文件。

Confluent-kafka-python的使用

? 常用的三個庫有kafka-python惧互, pykafka, confluent-kafka-python哎媚,我選擇測試的庫是confluent-kafka-python,kafka推薦使用該庫壹哺。

? confluent-kafka-python是基于librdkafka的高性能python客戶端抄伍,具有完整的協(xié)議支持。

主要測試的api分為以下三個:

Producer

? 具體測試代碼如下:

from collections import defaultdict
from confluent_kafka import Producer
# 初始化Producer
p = Producer({'bootstrap.servers': '192.168.11.120:9092,192.168.11.120:9093'})

parition_count = defaultdict(int)
def delivery_report(err, msg):
    if err is not None:
        print("Message delivery failed: {}".format(err))
    else:
        parition_count[msg.partition()] += 1
        # print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
count = 100
while count > 0:
    for data in range(1000):
        p.poll(0)
        p.produce('test', str(data).encode('utf-8'), callback=delivery_report)
    count -= 1

p.flush()
編寫過程如下:
  • 初始化Producer管宵,在初始化中箩朴,必須提供一個字典炸庞,其中必須包括一個'bootstrap-server'的鍵值。(在測試的過程中查牌,我想知道producer的其他參數(shù)如何設置纸颜,在其給出的example中沒有給出具體用例胁孙,所以這里提供一個網(wǎng)站用來查詢配置:配置項)
  • 創(chuàng)建一個回調(diào)函數(shù)涮较,用來處理消息發(fā)布后的流程
  • poll函數(shù)用來輪詢生產(chǎn)者的事件狂票,參數(shù)為time苫亦,單位seconds, 表示事件最大等待時間润匙。
  • produce函數(shù)唉匾,就是用來給topic來生產(chǎn)消息的主要函數(shù)巍膘,函數(shù)完整形式如下:
produce(topic, [value], [key], [partition], [on_delivery], [timestamp], [headers])
  • flush函數(shù),強刷所有的緩沖發(fā)送到brokers

其他的函數(shù)還有:

list_topics()這個函數(shù)可以用來獲取broker的很多詳細信息璃饱。比如列舉topic,查詢分區(qū)信息等荚恶。

Consumer

from collections import defaultdict
from confluent_kafka import Consumer, KafkaError,TopicPartition

c = Consumer({
    'bootstrap.servers': '192.168.11.120:9092, 192.168.11.120:9093',
    'group.id': 'test-consumer-group',
    'auto.offset.reset': 'latest',
    'fetch.wait.max.ms': 5000
})
partition_count = defaultdict(int)
c.subscribe(["test"])
while True:
    msg = c.poll(0.1)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == -191:
            print('EOF')
            partition_count = defaultdict(int)
            continue
        print("Consumer error: {}".format(msg.error()))
        continue
    partition_count[msg.partition()] += 1
    print('Received message: {} partition: {}'.format(msg.value().decode('utf-8'), msg.partition()))
    print(partition_count)

c.close()

編寫過程如下:

  • 新建Consumer客戶端,配置與上面Producer相似廓潜;
  • 使用subscribe函數(shù)辩蛋,來訂閱topic消息堪澎,如果開啟多個分區(qū)樱蛤,那么就會自動負載剑鞍。 這里還可以使用另一個函數(shù)assign()蚁署,這個函數(shù)可以用來訂閱某一topic的固定的一個分區(qū)便脊。
  • 以下就是循環(huán)獲取數(shù)據(jù)光戈。

Admin

? Kafka AdminClient為代理支持的Kafka代理哪痰,主題久妆,組和其他資源類型提供管理操作。這里使用這個函數(shù)的目的筷弦,就是用來管理broker肋演。

? 在生產(chǎn)環(huán)境下,我建議關閉自動創(chuàng)建topic這有個配置選項烂琴,理由,因為如果我們producer代碼與consumer代碼topic不一致奸绷,那么我們排查起來很麻煩号醉。 所以,建議還是使用特定的工具去干特定的事情父虑。

? 具體的方法如下:

alter_config:

create_partitions:

create_topics:

delete_topics:

describe_configs:

(暫時沒有使用该酗,打算在封裝庫之中使用)

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悔叽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子爵嗅,更是在濱河造成了極大的恐慌娇澎,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件睹晒,死亡現(xiàn)場離奇詭異趟庄,居然都是意外死亡,警方通過查閱死者的電腦和手機伪很,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門戚啥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人锉试,你說我怎么就攤上這事猫十。” “怎么了呆盖?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵拖云,是天一觀的道長。 經(jīng)常有香客問我应又,道長宙项,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任丁频,我火速辦了婚禮杉允,結果婚禮上,老公的妹妹穿的比我還像新娘席里。我一直安慰自己叔磷,他們只是感情好,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布奖磁。 她就那樣靜靜地躺著改基,像睡著了一般。 火紅的嫁衣襯著肌膚如雪咖为。 梳的紋絲不亂的頭發(fā)上秕狰,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音躁染,去河邊找鬼鸣哀。 笑死,一個胖子當著我的面吹牛吞彤,可吹牛的內(nèi)容都是我干的我衬。 我是一名探鬼主播叹放,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼挠羔!你這毒婦竟也來了井仰?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤破加,失蹤者是張志新(化名)和其女友劉穎俱恶,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體范舀,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡合是,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了尿背。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片端仰。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖田藐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吱七,我是刑警寧澤汽久,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站踊餐,受9級特大地震影響景醇,放射性物質發(fā)生泄漏。R本人自食惡果不足惜吝岭,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一三痰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧窜管,春花似錦散劫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至失乾,卻和暖如春常熙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背碱茁。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工裸卫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人纽竣。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓墓贿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子募壕,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容