背景
項(xiàng)目中需要?jiǎng)?chuàng)建分區(qū)(partitions)數(shù)不同的Topic。在server.properties中可以配置默認(rèn)的Topic分區(qū)數(shù)量,但是不能在需要的時(shí)候任意改變篮愉。(使用Producer API會(huì)自動(dòng)創(chuàng)建Topic)
簡(jiǎn)單的Solution
翻遍了Kafka-python的文檔,沒(méi)有發(fā)現(xiàn)kafka-python提供了類似client.create_topic(name='test', num_partitions=3)
這樣簡(jiǎn)單的API。只能往底層探索了贸毕,果然發(fā)現(xiàn)了兩個(gè)關(guān)鍵信息。
在KafkaClient API中有這樣一個(gè)方法:
屏幕快照 2017-07-26 16.29.26.png
在kafka.protocol.admin — kafka-python 1.3.4.dev documentation中:
屏幕快照 2017-07-26 16.30.46.png
顯然夜赵,我們只需要構(gòu)建一個(gè)CreateTopicsRequest的請(qǐng)求,然后通過(guò)KafkaClient的send()方法發(fā)送給控制節(jié)點(diǎn)(由于本小白也不太清楚Kafka的機(jī)制寇僧,測(cè)試的時(shí)候,不是控制節(jié)點(diǎn)嘁傀,會(huì)報(bào)錯(cuò)。也不清楚各個(gè)版本的區(qū)別细办,下面代碼用的是v0版本橙凳。??)
原理就是這樣,還是很簡(jiǎn)單的笑撞。
糟糕的Code
def create_topic(self, topic='topic', num_partitions=3, configs=None, timeout_ms=3000, brokers=['localhost:9290'], no_partition_change=True):
client = KafkaClient(bootstrap_servers=brokers)
if topic not in client.cluster.topics(exclude_internal_topics=True): # Topic不存在
request = admin.CreateTopicsRequest_v0(
create_topic_requests=[(
topic,
num_partitions,
-1, # replication unset.
[], # Partition assignment.
[(key, value) for key, value in configs.items()], # Configs
)],
timeout=timeout_ms
)
future = client.send(2, request) # 2是Controller,發(fā)送給其他Node都創(chuàng)建失敗。
client.poll(timeout_ms=timeout_ms, future=future, sleep=False) # 這里
result = future.value
# error_code = result.topic_error_codes[0][1]
print("CREATE TOPIC RESPONSE: ", result) # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
client.close()
else: # Topic已經(jīng)存在
print("Topic already exists!")
return