分布式消息發(fā)布和訂閱系統(tǒng)
提供了類似JMS的特性
對用戶行為收集
日志收集
Broker:一個kafka服務(wù)
Producer
consumer
topic: kafka集群的類別,一類數(shù)據(jù)的集合
partition: 每一個topic中具體的物理分區(qū)
consumer group:每一個consumer都有一個對應(yīng)的group 對應(yīng)一個topic,達(dá)到發(fā)布訂閱的功能
官網(wǎng)
博客
視頻
LMS蚌斩、AQMP消息模型
- JMS(java消息服務(wù))
- 點對點(一對一)
- Quene
- 點對點(一對一)
- AMQP(高級消息隊列協(xié)議)
- 隊列
- 信箱
-
綁定
image.png
創(chuàng)建多broker集群
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
因為原文檔是在一臺機(jī)器上搭建的為分布式孵淘,所以在多臺機(jī)器上搭建的時候只用更改brokerid以及對應(yīng)的zookeeper節(jié)點即可
然后我們啟動各個機(jī)器上的broker
多節(jié)點多broker
創(chuàng)建一個新的擁有備份的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看一下這個topic的描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行介紹了所有的分區(qū)慷吊,其他的行娇斑,每一行都介紹了一個分區(qū)的信息,因為我們只創(chuàng)建了一個分區(qū)所以這里只有一行數(shù)據(jù)
- ‘leader’就是該分區(qū)所屬的broker白修,負(fù)責(zé)這個分區(qū)的一些讀寫操作。
- replicas 就是這個分區(qū)的日志備份brokers重斑,無論他們是否是leader還是是否alive
- isr 這個記錄了被leader捕獲并且還活著的上面replicas的子集
使用kafka connect 來導(dǎo)入/導(dǎo)出數(shù)據(jù)
我們經(jīng)常需要用kafka導(dǎo)入別的數(shù)據(jù)源兵睛,或者導(dǎo)出到別的系統(tǒng)。所以kafka提供了個工具叫做kafka connect.
Kafka Connect是Kafka附帶的工具窥浪,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)祖很。 它是一個可擴(kuò)展的工具,運(yùn)行連接器漾脂,實現(xiàn)與外部系統(tǒng)交互的自定義邏輯假颇。
官方文檔給的案例是一個kafak connect,它實現(xiàn)了從文件的導(dǎo)入和導(dǎo)出骨稿,producer可以從文件1讀取數(shù)據(jù)進(jìn)入kafka笨鸡, consumer則讀取數(shù)據(jù)并寫入文件2中姜钳,實現(xiàn)了在文件系統(tǒng)中的發(fā)布訂閱。
python kafka
安裝
pip install confluent-kafka
官方文檔
github
Admin API
kafka的控制端形耗,創(chuàng)建傲须、瀏覽、改變趟脂、刪除主題和資源
class confluent_kafka.admin.AdminClient(conf)
AdminClient 為kafka 的brokers提供一些控制操作泰讽,topics、groups昔期、以及其他borker支持的資源類型已卸。
Admin API方法是異步的,并返回由實體鍵入的concurrent.futures.Future對象的dict硼一。
實體是一個topic 名字供create_topics(), delete_topics(), create_partitions()調(diào)用累澡,并且一個ConfigResource 供alter_configs(), describe_configs()調(diào)用。
查看使用案例 examples/adminapi.py
下面是可以調(diào)用的函數(shù):
alter_configs(resources, **kwargs)
改變配置
create_partitions(new_partitions, **kwargs)
為給定的topic創(chuàng)建新分區(qū)
create_topics(new_topics, **kwargs)
集群創(chuàng)建新topic
delete_topics(topics, **kwargs)
刪除topic
describe_configs(resources, **kwargs)
查看某個特定資源的配置
class confluent_kafka.admin.BrokerMetadata
包含kafka broker 信息的類
class confluent_kafka.admin.ClusterMetadata
包含kafka 集群般贼、brokers愧哟、topics信息的對象
class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, \
is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])
describe_configs()的返回對象
class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
展示擁有配置的資源,通過資源類型和名字進(jìn)行實例化哼蛆。
ConfigResource.Type 描繪了kafka 資源的type
ConfigResource.set_config(name, value, overwrite=True)
設(shè)置或者重寫配置參數(shù)
class confluent_kafka.admin.ConfigSource
Config sources returned in ConfigEntry by describe_configs().
class confluent_kafka.admin.PartitionMetadata
| Variables: |
- id (int) – Partition id.
- leader (int) – Current leader broker for this partition, or -1.
- replicas (list(int)) – List of replica broker ids for this partition.
- isrs (list(int)) – List of in-sync-replica broker ids for this partition.
- error (KafkaError) – Partition error, or None. Value is a KafkaError object.
class confluent_kafka.admin.TopicMetadata
Variables:
topic (str) – Topic name.
partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
error (KafkaError) – Topic error, or None. Value is a KafkaError object.
Producer API
- 采用異步發(fā)送消息
image.png - 采用同步發(fā)送消息
- 批量發(fā)送消息
- 消息大小
- 延遲時間
- acks
生產(chǎn)者發(fā)送消息后蕊梧,服務(wù)器的回執(zhí)- 0
生產(chǎn)者不等待服務(wù)器,消息發(fā)送到緩沖區(qū)就ok了 - 1
broker收到就ok腮介, 不管follower是否備份 - -1/all
等到broker收到肥矢, follower備份
- 0
- retries
當(dāng)消息發(fā)送失敗重復(fù)的次數(shù),默認(rèn)為0 - 至多一次
acks=0 或acks=1 - 至少一次
acks =-1或all
retries >0 - 精確一次與冪等
enable.idempotence = true
//retries = integer.MAX_VALUE
acts = all - 事務(wù)
消息要么全部成功叠洗,要么全部失敗
class confluent_kafka.Producer
kafka 異步 producer
Producer(config)
config(dict)—參數(shù)屬性甘改,至少需要設(shè)置bootstrap.severs
len()
返回等待傳遞給broker的消息數(shù)和請求數(shù), type:int
flush([timeout])
等待producer隊列中的所有消息被發(fā)送灭抑。timeout是最大的堵塞時間十艾,返回仍在隊列中的消息數(shù)
poll([timeout])
輪詢生產(chǎn)者的事件并調(diào)用相應(yīng)的回調(diào)(已注冊的)
- on_delivery :produce()的回調(diào)
參數(shù):timeout-最大堵塞時間(秒)
返回:被處理的事件數(shù)(調(diào)用回調(diào))(int)
list_topics([topic=None][,timeout=-1])
從集群中請求元數(shù)據(jù)。這個方法提供了listTopics(), describeTopics() and describeCluster() 在java客戶端中同樣的信息
參數(shù)
- topic(str) - 如果提供了這個參數(shù)腾节,那么僅僅顯示有關(guān)這個topic的信息忘嫉,兜著返回所有集群中的topic信息。
- timeout 最大的響應(yīng)時間在超時之間禀倔, -1是無限timeout
Return type:ClusterMetadata
Raises: KafkaException
produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
向topic發(fā)送消息榄融。這是一個異步操作,一個應(yīng)用可能會使用回調(diào)(別名 on_delivery)參數(shù)來傳遞一個函數(shù)或者匿名函數(shù)救湖,當(dāng)消息被成功發(fā)送或者永久失敗時愧杯,就會被poll()調(diào)用。
參數(shù)
- topic(str)
- value(str|bytes)- 消息負(fù)載
- key(str|bytes)-消息key
- partition(int)- 發(fā)送消息的分區(qū)鞋既,否則使用默認(rèn)的
- on_delivery(err,msg) (func) -被調(diào)用的報告回調(diào)函數(shù)
- timestamp(int)-消息時間戳
-
dict|list (headers)
消息的消息頭部力九。頭部的key必須是一個字符串耍铜,值必須是二進(jìn)制,unicode或者None跌前。 Accepts a list of (key,value) or a dict棕兼。
Return type:None
Consumer API
class confluent_kafka.Consumer
Consumer(config)
Parameters: config (dict) – 配置參數(shù)。至少得設(shè)置bootstrap.servers和group.id
創(chuàng)建一個新的消費(fèi)端
特殊參數(shù):on_commit:當(dāng)一個commit request 成功或失敗時調(diào)用的回調(diào)
on_commit(err, partitions)
參數(shù):
- consumer (Consumer) –consumer實例
- err (KafkaError) – commit error object
- partitions (list(TopicPartition)) –分區(qū)列表包括他們的 committed offsets or per-partition errors.
assign()
assign(partitions)
將消費(fèi)的消息分區(qū)設(shè)置為提供的TopicPartition列表并開始消費(fèi)
參數(shù)
- partitions (list(TopicPartition)) – 主題+分區(qū)的列表以及可選擇的消費(fèi)的初始o(jì)ffset
assignment()
返回目前的分區(qū)設(shè)置情況
close()
關(guān)閉消費(fèi)者
進(jìn)行的操作:
- 結(jié)束消費(fèi)
- 提交offsets 除非消費(fèi)者屬性'enable.auto.commit'設(shè)置為False
- 離開用戶組
commit()
commit([message=None][, offsets=None][, asynchronous=True])
提交一個信息或者offsets列表
消息和偏移是互斥的抵乓,如果兩者都沒有設(shè)置伴挚,則使用當(dāng)前分區(qū)分配的偏移。 如果您將'enable.auto.commit'設(shè)置為False灾炭,則consumer依賴于您使用此方法
committed()
committed(partitions[, timeout=None])
檢索分區(qū)中已經(jīng)提交的offsets
consume()
consume([num_messages=1][, timeout=-1])
消費(fèi)消息茎芋,調(diào)用回調(diào)函數(shù)以及返回消息列表。應(yīng)用程序必須檢查返回的Message對象的Message.error()方法蜈出,以區(qū)分正確的消息(error()返回None)田弥,或列表中每個Message的事件或錯誤(請參閱error()。code()以獲取詳細(xì)信息)铡原。
get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])
檢索分區(qū)的low and high offsets
cached (bool) – 不是查詢broker所使用的內(nèi)存信息偷厦。緩存值:定期更新低偏移量(如果設(shè)置了statistics.interval.ms),同時在從此分區(qū)的broker獲取的每條消息上更新高偏移量燕刻。
Return : Tuple of (low,high) on success or None on timeout.
list_topics()
list_topics([topic=None][, timeout=-1])
返回元數(shù)據(jù)
offsets_for_times()
offsets_for_times(partitions[, timeout=None])
offsets_for_times按給定分區(qū)的時間戳查找偏移量只泼。
每個分區(qū)的返回偏移量是最早的偏移量,其時間戳大于或等于相應(yīng)分區(qū)中的給定時間戳酌儒。
pause()
pause(partitions)
暫停該分區(qū)的消費(fèi)
poll()
poll([timeout=None])
使用消息辜妓,調(diào)用回調(diào)并返回事件。
position()
position(partitions[, timeout=None])
檢索給定分區(qū)的當(dāng)前偏移量忌怎。
resume()
resume(partitions)
恢復(fù)提供的分區(qū)列表的消耗。
seek(partition)
將分區(qū)的消耗位置設(shè)置為偏移量酪夷。 偏移可以是絕對(> = 0)或邏輯偏移(OFFSET_BEGINNING等)榴啸。
seek()可以僅用于更新主動消耗的分區(qū)的消耗偏移(即,在assign()之后)晚岭,以設(shè)置未被消耗的分區(qū)的起始偏移鸥印,而不是在assign()調(diào)用中傳遞偏移。
store_offsets()
store_offsets([message=None][, offsets=None])
存儲消息或偏移列表的偏移量坦报。
消息和偏移是互斥的库说。 存儲的偏移量將根據(jù)'auto.commit.interval.ms'或手動無偏移提交()進(jìn)行提交。 請注意片择,使用此API時潜的,“enable.auto.offset.store”必須設(shè)置為False。
subscribe()
subscribe(topics[, listener=None])
設(shè)置訂閱提供的主題列表這將替換以前的訂閱字管。
可以通過正則化進(jìn)行訂閱:
consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
回調(diào)函數(shù)
on_assign(consumer, partitions)
on_revoke(consumer, partitions)
unassign()
刪除當(dāng)前分區(qū)設(shè)置并停止消費(fèi)啰挪。
unsubscribe()
移除當(dāng)前訂閱
TopicPartition API
class confluent_kafka.TopicPartition
TopicPartition是一種通用類型信不,用于保存單個分區(qū)及其各種信息。
它通常用于為各種操作提供主題或分區(qū)列表亡呵,例如Consumer.assign()抽活。
TopicPartition(topic[, partition][, offset])
實例化一個topicpartition對象
參數(shù)
- topic (string) – Topic name
- partition (int) – Partition id
- offset (int) – Initial partition offset
屬性:
error
offset
partition
topic
Message API
Message對象表示單個消費(fèi)或生成的消息,或者一個錯誤事件(error()不是None)锰什。
應(yīng)用程序必須檢查error()以查看對象是否是正確的消息(error()返回None)或錯誤/事件下硕。
這個對象不需要用戶初始化
方法:
-
len()
返回消息大小 -
error()
Return type: None or KafkaError 用來檢查是否消息是錯誤事件 -
headers()
檢索消息的頭部。每個頭部都是一個鍵值對汁胆。注意消息頭的key是有序且可重復(fù)的 -
key()
Returns: message key or None if not available. -
offset()
Returns: message offset or None if not available. -
partition()
Returns: partition number or None if not available. -
set_headers()
Set the field ‘Message.headers’ with new value. -
set_key()
Set the field ‘Message.key’ with new value. -
set_value()
Set the field ‘Message.value’ with new value. -
timestamp()
從消息中檢索時間戳類型和時間戳梭姓。 時間戳類型是以下之一:- TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker
- TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time)
- TIMESTAMP_LOG_APPEND_TIME - Broker receive time
-
topic()
Returns: topic name or None if not available. -
value()
Returns: message value (payload) or None if not available.
Offset API
邏輯offset常量:
- OFFSET_BEGINNING - Beginning of partition (oldest offset)
- OFFSET_END - End of partition (next offset)
- OFFSET_STORED - Use stored/committed offset
- OFFSET_INVALID - Invalid/Default offset