kafka (python)

分布式消息發(fā)布和訂閱系統(tǒng)
提供了類似JMS的特性

對用戶行為收集
日志收集

Broker:一個kafka服務(wù)
Producer
consumer
topic: kafka集群的類別,一類數(shù)據(jù)的集合
partition: 每一個topic中具體的物理分區(qū)
consumer group:每一個consumer都有一個對應(yīng)的group 對應(yīng)一個topic,達(dá)到發(fā)布訂閱的功能
官網(wǎng)
博客
視頻

LMS蚌斩、AQMP消息模型

  • JMS(java消息服務(wù))
    • 點對點(一對一)
      • Quene
  • AMQP(高級消息隊列協(xié)議)
    • 隊列
    • 信箱
    • 綁定


      image.png

創(chuàng)建多broker集群

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

更改屬性

因為原文檔是在一臺機(jī)器上搭建的為分布式孵淘,所以在多臺機(jī)器上搭建的時候只用更改brokerid以及對應(yīng)的zookeeper節(jié)點即可
然后我們啟動各個機(jī)器上的broker
多節(jié)點多broker
創(chuàng)建一個新的擁有備份的topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看一下這個topic的描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

第一行介紹了所有的分區(qū)慷吊,其他的行娇斑,每一行都介紹了一個分區(qū)的信息,因為我們只創(chuàng)建了一個分區(qū)所以這里只有一行數(shù)據(jù)

  • ‘leader’就是該分區(qū)所屬的broker白修,負(fù)責(zé)這個分區(qū)的一些讀寫操作。
  • replicas 就是這個分區(qū)的日志備份brokers重斑,無論他們是否是leader還是是否alive
  • isr 這個記錄了被leader捕獲并且還活著的上面replicas的子集

使用kafka connect 來導(dǎo)入/導(dǎo)出數(shù)據(jù)

我們經(jīng)常需要用kafka導(dǎo)入別的數(shù)據(jù)源兵睛,或者導(dǎo)出到別的系統(tǒng)。所以kafka提供了個工具叫做kafka connect.
Kafka Connect是Kafka附帶的工具窥浪,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)祖很。 它是一個可擴(kuò)展的工具,運(yùn)行連接器漾脂,實現(xiàn)與外部系統(tǒng)交互的自定義邏輯假颇。
官方文檔給的案例是一個kafak connect,它實現(xiàn)了從文件的導(dǎo)入和導(dǎo)出骨稿,producer可以從文件1讀取數(shù)據(jù)進(jìn)入kafka笨鸡, consumer則讀取數(shù)據(jù)并寫入文件2中姜钳,實現(xiàn)了在文件系統(tǒng)中的發(fā)布訂閱。

python kafka

安裝
pip install confluent-kafka
官方文檔
github

Admin API

kafka的控制端形耗,創(chuàng)建傲须、瀏覽、改變趟脂、刪除主題和資源


class confluent_kafka.admin.AdminClient(conf)

AdminClient 為kafka 的brokers提供一些控制操作泰讽,topics、groups昔期、以及其他borker支持的資源類型已卸。
Admin API方法是異步的,并返回由實體鍵入的concurrent.futures.Future對象的dict硼一。
實體是一個topic 名字供create_topics(), delete_topics(), create_partitions()調(diào)用累澡,并且一個ConfigResource 供alter_configs(), describe_configs()調(diào)用。

查看使用案例 examples/adminapi.py

下面是可以調(diào)用的函數(shù):
alter_configs(resources, **kwargs)
改變配置
create_partitions(new_partitions, **kwargs)
為給定的topic創(chuàng)建新分區(qū)
create_topics(new_topics, **kwargs)
集群創(chuàng)建新topic
delete_topics(topics, **kwargs)
刪除topic
describe_configs(resources, **kwargs)
查看某個特定資源的配置


class confluent_kafka.admin.BrokerMetadata

包含kafka broker 信息的類


class confluent_kafka.admin.ClusterMetadata

包含kafka 集群般贼、brokers愧哟、topics信息的對象


class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>,  \
is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])

describe_configs()的返回對象


class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None)

展示擁有配置的資源,通過資源類型和名字進(jìn)行實例化哼蛆。
ConfigResource.Type 描繪了kafka 資源的type


ConfigResource.set_config(name, value, overwrite=True)
設(shè)置或者重寫配置參數(shù)


class confluent_kafka.admin.ConfigSource

Config sources returned in ConfigEntry by describe_configs().



class confluent_kafka.admin.PartitionMetadata

| Variables: |

  • id (int) – Partition id.
  • leader (int) – Current leader broker for this partition, or -1.
  • replicas (list(int)) – List of replica broker ids for this partition.
  • isrs (list(int)) – List of in-sync-replica broker ids for this partition.
  • error (KafkaError) – Partition error, or None. Value is a KafkaError object.

class confluent_kafka.admin.TopicMetadata

Variables:
topic (str) – Topic name.
partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
error (KafkaError) – Topic error, or None. Value is a KafkaError object.


Producer API

  • 采用異步發(fā)送消息
    image.png
  • 采用同步發(fā)送消息
  • 批量發(fā)送消息
    • 消息大小
    • 延遲時間
  • acks
    生產(chǎn)者發(fā)送消息后蕊梧,服務(wù)器的回執(zhí)
    • 0
      生產(chǎn)者不等待服務(wù)器,消息發(fā)送到緩沖區(qū)就ok了
    • 1
      broker收到就ok腮介, 不管follower是否備份
    • -1/all
      等到broker收到肥矢, follower備份
  • retries
    當(dāng)消息發(fā)送失敗重復(fù)的次數(shù),默認(rèn)為0
  • 至多一次
    acks=0 或acks=1
  • 至少一次
    acks =-1或all
    retries >0
  • 精確一次與冪等
    enable.idempotence = true
    //retries = integer.MAX_VALUE
    acts = all
  • 事務(wù)
    消息要么全部成功叠洗,要么全部失敗

class confluent_kafka.Producer

kafka 異步 producer
Producer(config)
config(dict)—參數(shù)屬性甘改,至少需要設(shè)置bootstrap.severs


len()
返回等待傳遞給broker的消息數(shù)和請求數(shù), type:int


flush([timeout])
等待producer隊列中的所有消息被發(fā)送灭抑。timeout是最大的堵塞時間十艾,返回仍在隊列中的消息數(shù)


poll([timeout])
輪詢生產(chǎn)者的事件并調(diào)用相應(yīng)的回調(diào)(已注冊的)

  • on_delivery :produce()的回調(diào)
    參數(shù):timeout-最大堵塞時間(秒)
    返回:被處理的事件數(shù)(調(diào)用回調(diào))(int)

list_topics([topic=None][,timeout=-1])
從集群中請求元數(shù)據(jù)。這個方法提供了listTopics(), describeTopics() and describeCluster() 在java客戶端中同樣的信息
參數(shù)

  • topic(str) - 如果提供了這個參數(shù)腾节,那么僅僅顯示有關(guān)這個topic的信息忘嫉,兜著返回所有集群中的topic信息。
  • timeout 最大的響應(yīng)時間在超時之間禀倔, -1是無限timeout
    Return type:ClusterMetadata
    Raises: KafkaException

produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
向topic發(fā)送消息榄融。這是一個異步操作,一個應(yīng)用可能會使用回調(diào)(別名 on_delivery)參數(shù)來傳遞一個函數(shù)或者匿名函數(shù)救湖,當(dāng)消息被成功發(fā)送或者永久失敗時愧杯,就會被poll()調(diào)用。
參數(shù)

  • topic(str)
  • value(str|bytes)- 消息負(fù)載
  • key(str|bytes)-消息key
  • partition(int)- 發(fā)送消息的分區(qū)鞋既,否則使用默認(rèn)的
  • on_delivery(err,msg) (func) -被調(diào)用的報告回調(diào)函數(shù)
  • timestamp(int)-消息時間戳
  • dict|list (headers)
    消息的消息頭部力九。頭部的key必須是一個字符串耍铜,值必須是二進(jìn)制,unicode或者None跌前。 Accepts a list of (key,value) or a dict棕兼。
    Return type:None

Consumer API

class confluent_kafka.Consumer

Consumer(config)
Parameters: config (dict) – 配置參數(shù)。至少得設(shè)置bootstrap.servers和group.id
創(chuàng)建一個新的消費(fèi)端
特殊參數(shù):on_commit:當(dāng)一個commit request 成功或失敗時調(diào)用的回調(diào)


on_commit(err, partitions)
參數(shù):

  • consumer (Consumer) –consumer實例
  • err (KafkaError) – commit error object
  • partitions (list(TopicPartition)) –分區(qū)列表包括他們的 committed offsets or per-partition errors.

assign()
assign(partitions)
將消費(fèi)的消息分區(qū)設(shè)置為提供的TopicPartition列表并開始消費(fèi)
參數(shù)

  • partitions (list(TopicPartition)) – 主題+分區(qū)的列表以及可選擇的消費(fèi)的初始o(jì)ffset

assignment()
返回目前的分區(qū)設(shè)置情況


close()關(guān)閉消費(fèi)者
進(jìn)行的操作:

  • 結(jié)束消費(fèi)
  • 提交offsets 除非消費(fèi)者屬性'enable.auto.commit'設(shè)置為False
  • 離開用戶組

commit()
commit([message=None][, offsets=None][, asynchronous=True])
提交一個信息或者offsets列表
消息和偏移是互斥的抵乓,如果兩者都沒有設(shè)置伴挚,則使用當(dāng)前分區(qū)分配的偏移。 如果您將'enable.auto.commit'設(shè)置為False灾炭,則consumer依賴于您使用此方法


committed()
committed(partitions[, timeout=None])
檢索分區(qū)中已經(jīng)提交的offsets


consume()
consume([num_messages=1][, timeout=-1])
消費(fèi)消息茎芋,調(diào)用回調(diào)函數(shù)以及返回消息列表。應(yīng)用程序必須檢查返回的Message對象的Message.error()方法蜈出,以區(qū)分正確的消息(error()返回None)田弥,或列表中每個Message的事件或錯誤(請參閱error()。code()以獲取詳細(xì)信息)铡原。


get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])
檢索分區(qū)的low and high offsets
cached (bool) – 不是查詢broker所使用的內(nèi)存信息偷厦。緩存值:定期更新低偏移量(如果設(shè)置了statistics.interval.ms),同時在從此分區(qū)的broker獲取的每條消息上更新高偏移量燕刻。
Return : Tuple of (low,high) on success or None on timeout.


list_topics()
list_topics([topic=None][, timeout=-1])
返回元數(shù)據(jù)


offsets_for_times()
offsets_for_times(partitions[, timeout=None])
offsets_for_times按給定分區(qū)的時間戳查找偏移量只泼。
每個分區(qū)的返回偏移量是最早的偏移量,其時間戳大于或等于相應(yīng)分區(qū)中的給定時間戳酌儒。


pause()
pause(partitions)
暫停該分區(qū)的消費(fèi)


poll()
poll([timeout=None])
使用消息辜妓,調(diào)用回調(diào)并返回事件。


position()
position(partitions[, timeout=None])
檢索給定分區(qū)的當(dāng)前偏移量忌怎。


resume()
resume(partitions)
恢復(fù)提供的分區(qū)列表的消耗。


seek(partition)
將分區(qū)的消耗位置設(shè)置為偏移量酪夷。 偏移可以是絕對(> = 0)或邏輯偏移(OFFSET_BEGINNING等)榴啸。
seek()可以僅用于更新主動消耗的分區(qū)的消耗偏移(即,在assign()之后)晚岭,以設(shè)置未被消耗的分區(qū)的起始偏移鸥印,而不是在assign()調(diào)用中傳遞偏移。


store_offsets()
store_offsets([message=None][, offsets=None])
存儲消息或偏移列表的偏移量坦报。
消息和偏移是互斥的库说。 存儲的偏移量將根據(jù)'auto.commit.interval.ms'或手動無偏移提交()進(jìn)行提交。 請注意片择,使用此API時潜的,“enable.auto.offset.store”必須設(shè)置為False。


subscribe()
subscribe(topics[, listener=None])
設(shè)置訂閱提供的主題列表這將替換以前的訂閱字管。
可以通過正則化進(jìn)行訂閱:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])

回調(diào)函數(shù)
on_assign(consumer, partitions)
on_revoke(consumer, partitions)


unassign()
刪除當(dāng)前分區(qū)設(shè)置并停止消費(fèi)啰挪。


unsubscribe()
移除當(dāng)前訂閱

TopicPartition API

class confluent_kafka.TopicPartition

TopicPartition是一種通用類型信不,用于保存單個分區(qū)及其各種信息。
它通常用于為各種操作提供主題或分區(qū)列表亡呵,例如Consumer.assign()抽活。
TopicPartition(topic[, partition][, offset])
實例化一個topicpartition對象
參數(shù)

  • topic (string) – Topic name
  • partition (int) – Partition id
  • offset (int) – Initial partition offset

屬性:

  • error
  • offset
  • partition
  • topic

Message API

Message對象表示單個消費(fèi)或生成的消息,或者一個錯誤事件(error()不是None)锰什。
應(yīng)用程序必須檢查error()以查看對象是否是正確的消息(error()返回None)或錯誤/事件下硕。
這個對象不需要用戶初始化
方法:

  • len() 返回消息大小
  • error() Return type: None or KafkaError 用來檢查是否消息是錯誤事件
  • headers() 檢索消息的頭部。每個頭部都是一個鍵值對汁胆。注意消息頭的key是有序且可重復(fù)的
  • key() Returns: message key or None if not available.
  • offset() Returns: message offset or None if not available.
  • partition() Returns: partition number or None if not available.
  • set_headers() Set the field ‘Message.headers’ with new value.
  • set_key() Set the field ‘Message.key’ with new value.
  • set_value() Set the field ‘Message.value’ with new value.
  • timestamp()
    從消息中檢索時間戳類型和時間戳梭姓。 時間戳類型是以下之一:
    • TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker
    • TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time)
    • TIMESTAMP_LOG_APPEND_TIME - Broker receive time
  • topic() Returns: topic name or None if not available.
  • value() Returns: message value (payload) or None if not available.

Offset API

邏輯offset常量:

  • OFFSET_BEGINNING - Beginning of partition (oldest offset)
  • OFFSET_END - End of partition (next offset)
  • OFFSET_STORED - Use stored/committed offset
  • OFFSET_INVALID - Invalid/Default offset
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市沦泌,隨后出現(xiàn)的幾起案子糊昙,更是在濱河造成了極大的恐慌,老刑警劉巖谢谦,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件释牺,死亡現(xiàn)場離奇詭異,居然都是意外死亡回挽,警方通過查閱死者的電腦和手機(jī)没咙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來千劈,“玉大人祭刚,你說我怎么就攤上這事∏脚疲” “怎么了涡驮?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長喜滨。 經(jīng)常有香客問我捉捅,道長,這世上最難降的妖魔是什么虽风? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任棒口,我火速辦了婚禮,結(jié)果婚禮上辜膝,老公的妹妹穿的比我還像新娘无牵。我一直安慰自己,他們只是感情好厂抖,可當(dāng)我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布茎毁。 她就那樣靜靜地躺著,像睡著了一般验游。 火紅的嫁衣襯著肌膚如雪充岛。 梳的紋絲不亂的頭發(fā)上保檐,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機(jī)與錄音崔梗,去河邊找鬼夜只。 笑死,一個胖子當(dāng)著我的面吹牛蒜魄,可吹牛的內(nèi)容都是我干的扔亥。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼谈为,長吁一口氣:“原來是場噩夢啊……” “哼旅挤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起伞鲫,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤粘茄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后秕脓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柒瓣,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年吠架,在試婚紗的時候發(fā)現(xiàn)自己被綠了芙贫。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡傍药,死狀恐怖磺平,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拐辽,我是刑警寧澤拣挪,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站俱诸,受9級特大地震影響媒吗,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乙埃,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锯岖。 院中可真熱鬧介袜,春花似錦、人聲如沸出吹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捶牢。三九已至鸠珠,卻和暖如春巍耗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渐排。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工炬太, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驯耻。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓亲族,卻偏偏與公主長得像,于是被迫代替她去往敵國和親可缚。 傳聞我的和親對象是個殘疾皇子霎迫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容