- 上一節(jié)我們講了librdkakfa對topic-partition的封裝, 任何一個partition都必須要屬于一下topic;
- 我們這節(jié)就來分析一上librdkafka對topic的封裝
rd_kafka_itopic_s
- 所在文件: src/rdkafka_topic.h
- 這里還有一個類型
rd_kafka_topic_t
,定義:typedef struct rd_kafka_topic_s rd_kafka_topic_t;
,這是個空定義沒有現(xiàn)實(shí), 其實(shí)就是rd_kafka_itopic_s
, 這個類型主要是面向librdkafka的使用者,sdk里稱作app topic
, 它有自己的引用計數(shù). 在librdkafka內(nèi)部使用rd_kafka_itopic
, 它也有自己的引用計數(shù), 有點(diǎn)羅嗦啊~
- 定義:
struct rd_kafka_itopic_s {
// 定義成tailq的元素
TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
// 引用計數(shù)
rd_refcnt_t rkt_refcnt;
rwlock_t rkt_lock;
// 所屬topic的名字
rd_kafkap_str_t *rkt_topic;
// 表示一個未assigned的partition
shptr_rd_kafka_toppar_t *rkt_ua; /* unassigned partition */
// 擁有的partition列表
shptr_rd_kafka_toppar_t **rkt_p;
int32_t rkt_partition_cnt;
// 期望操作的partition, 但還沒有從broker獲取到其信息的partition列表
rd_list_t rkt_desp; /* Desired partitions
* that are not yet seen
* in the cluster. */
// 最近一次更新metadata的時間
rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata
* update for this topic. */
mtx_t rkt_app_lock; /* Protects rkt_app_* */
,
// 在application層一個rd_kafka_itopic_s對外表現(xiàn)為一個 rd_kafka_topic_t類型的對象, 且有自己的引用計數(shù)
rd_kafka_topic_t *rkt_app_rkt; /* A shared topic pointer
* to be used for callbacks
* to the application. */
int rkt_app_refcnt; /* Number of active rkt's new()ed
* by application. */
// topic的三種狀態(tài):未知, 已存在, 不存在
enum {
RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */
RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */
RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
} rkt_state;
int rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1 /* Leader lost/unavailable
* for at least one partition. */
// 所屬的rd_kafka_t
rd_kafka_t *rkt_rk;
shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */
rd_kafka_topic_conf_t rkt_conf;
};
- 創(chuàng)建一個
rd_kafka_itopic_s
對象rd_kafka_topic_new0
, 這是一個內(nèi)部調(diào)用函數(shù)
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
const char *topic,
rd_kafka_topic_conf_t *conf,
int *existing,
int do_lock) {
rd_kafka_itopic_t *rkt;
shptr_rd_kafka_itopic_t *s_rkt;
const struct rd_kafka_metadata_cache_entry *rkmce;
// topic名字check , 長度不能超512
if (!topic || strlen(topic) > 512) {
if (conf)
rd_kafka_topic_conf_destroy(conf);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
EINVAL);
return NULL;
}
if (do_lock)
rd_kafka_wrlock(rk);
// 所有創(chuàng)建的rd_kafka_itopic對象都會加入到對應(yīng)的topic的rk->rk_topics中, 先從中查找, 如果找到就不再創(chuàng)建
if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
if (do_lock)
rd_kafka_wrunlock(rk);
if (conf)
rd_kafka_topic_conf_destroy(conf);
if (existing)
*existing = 1;
return s_rkt;
}
if (existing)
*existing = 0;
// 分配對應(yīng)的內(nèi)存, 設(shè)置各屬性
rkt = rd_calloc(1, sizeof(*rkt));
rkt->rkt_topic = rd_kafkap_str_new(topic, -1);
rkt->rkt_rk = rk;
if (!conf) {
if (rk->rk_conf.topic_conf)
conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
else
conf = rd_kafka_topic_conf_new();
}
rkt->rkt_conf = *conf;
rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
* since we dont want to rd_free internal members,
* just the placeholder. The internal members
* were copied on the line above. */
/* Default partitioner: consistent_random */
if (!rkt->rkt_conf.partitioner)
rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;
if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;
rd_list_init(&rkt->rkt_desp, 16, NULL);
rd_refcnt_init(&rkt->rkt_refcnt, 0);
s_rkt = rd_kafka_topic_keep(rkt);
rwlock_init(&rkt->rkt_lock);
mtx_init(&rkt->rkt_app_lock, mtx_plain);
/* Create unassigned partition */
rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);
// 加入到對應(yīng)的rk_kafka_t中的topic列表
TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
rk->rk_topic_cnt++;
/* Populate from metadata cache. */
// 加入或更新到metadata cache
if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
if (existing)
*existing = 1;
rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
rkmce->rkmce_ts_insert);
}
if (do_lock)
rd_kafka_wrunlock(rk);
return s_rkt;
- 創(chuàng)建
rd_kafka_topic_t
對象, 對外的接口rd_kafka_topic_new
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf) {
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_itopic_t *rkt;
rd_kafka_topic_t *app_rkt;
int existing;
// 創(chuàng)建一個`shptr_rd_kafka_itopic_t`對象
s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
if (!s_rkt)
return NULL;
// 指針轉(zhuǎn)換, 從`shptr_rd_kafka_itopic_t`到`rd_kafka_itopic_t`, 引用計數(shù)不變
rkt = rd_kafka_topic_s2i(s_rkt);
/* Save a shared pointer to be used in callbacks. */
// 引數(shù)計數(shù)加1, 指針轉(zhuǎn)換成一個`rd_kafka_topic_t`
// app相對應(yīng)的引用計數(shù)也加1
app_rkt = rd_kafka_topic_keep_app(rkt);
/* Query for the topic leader (async) */
if (!existing)
// 發(fā)metadata request, 獲取leader等相關(guān)信息
rd_kafka_topic_leader_query(rk, rkt);
/* Drop our reference since there is already/now a rkt_app_rkt */
rd_kafka_topic_destroy0(s_rkt);
return app_rkt;
}
- 獲取當(dāng)前
rd_kafka_t
對象持有的所有topic的名字,保存在一個rd_list
中
void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
rd_kafka_itopic_t *rkt;
rd_kafka_rdlock(rk);
rd_list_grow(topics, rk->rk_topic_cnt);
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
rd_kafka_rdunlock(rk);
}
- 判斷parition是否是有效的,就是判斷其leader是否有效
int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
int32_t partition) {
int avail;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_broker_t *rkb;
s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
partition, 0/*no ua-on-miss*/);
if (unlikely(!s_rktp))
return 0;
rktp = rd_kafka_toppar_s2i(s_rktp);
rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
avail = rkb ? 1 : 0;
if (rkb)
rd_kafka_broker_destroy(rkb);
rd_kafka_toppar_destroy(s_rktp);
return avail;
}
- 掃描所有topic的patitions:
- 篩出 kafka message過期的, 回調(diào)application層
- 找出需要刷新metadata的, 發(fā)送metadata request
int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
rd_kafka_itopic_t *rkt;
rd_kafka_toppar_t *rktp;
shptr_rd_kafka_toppar_t *s_rktp;
int totcnt = 0;
rd_list_t query_topics;
rd_list_init(&query_topics, 0, rd_free);
rd_kafka_rdlock(rk);
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
int p;
int cnt = 0, tpcnt = 0;
rd_kafka_msgq_t timedout;
int query_this = 0;
rd_kafka_msgq_init(&timedout);
rd_kafka_topic_wrlock(rkt);
/* Check if metadata information has timed out. */
// metadata cache中沒有緩存,需要query metadata
if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
!rd_kafka_metadata_cache_topic_get(
rk, rkt->rkt_topic->str, 1/*only valid*/)) {
rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);
query_this = 1;
}
/* Just need a read-lock from here on. */
rd_kafka_topic_wrunlock(rkt);
rd_kafka_topic_rdlock(rkt);
if (rkt->rkt_partition_cnt == 0) {
query_this = 1;
}
for (p = RD_KAFKA_PARTITION_UA ;
p < rkt->rkt_partition_cnt ; p++) {
int did_tmout = 0;
if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
continue;
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
/* Check that partition has a leader that is up,
* else add topic to query list. */
// partition leader無效時, 要request metadata
if (p != RD_KAFKA_PARTITION_UA &&
(!rktp->rktp_leader ||
rktp->rktp_leader->rkb_source ==
RD_KAFKA_INTERNAL ||
rd_kafka_broker_get_state(rktp->rktp_leader) <
RD_KAFKA_BROKER_STATE_UP)) {
query_this = 1;
}
/* Scan toppar's message queues for timeouts */
if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
&timedout, now) > 0)
did_tmout = 1;
if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
&timedout, now) > 0)
did_tmout = 1;
tpcnt += did_tmout;
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
rd_kafka_topic_rdunlock(rkt);
if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
totcnt += cnt;
// kafka mesage過期, 則需要回調(diào)到application層
rd_kafka_dr_msgq(rkt, &timedout,
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
}
/* Need to re-query this topic's leader. */
if (query_this &&
!rd_list_find(&query_topics, rkt->rkt_topic->str,
(void *)strcmp))
rd_list_add(&query_topics,
rd_strdup(rkt->rkt_topic->str));
}
rd_kafka_rdunlock(rk);
if (!rd_list_empty(&query_topics))
// 發(fā)送 metadata request
rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
1/*force even if cached
* info exists*/,
"refresh unavailable topics");
rd_list_destroy(&query_topics);
return totcnt;
}
- 更新topic的partition個數(shù), partition個數(shù)可能增加, 也可能減少
rd_kafka_topic_partition_cnt_update
, 簡單講:
- 新增的partition, 創(chuàng)建;
- 老的partition, 刪除;
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
int32_t partition_cnt) {
rd_kafka_t *rk = rkt->rkt_rk;
shptr_rd_kafka_toppar_t **rktps;
shptr_rd_kafka_toppar_t *rktp_ua;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
int32_t i;
更新前后partition數(shù)量相同的話, 不作任何處理
if (likely(rkt->rkt_partition_cnt == partition_cnt))
return 0; /* No change in partition count */
/* Create and assign new partition list */
// 創(chuàng)建新的partition list, 分配內(nèi)存
if (partition_cnt > 0)
rktps = rd_calloc(partition_cnt, sizeof(*rktps));
else
rktps = NULL;
// 如果新個數(shù)大于老個數(shù)
for (i = 0 ; i < partition_cnt ; i++) {
// 多出來的都是新擴(kuò)容的partition
if (i >= rkt->rkt_partition_cnt) {
/* New partition. Check if its in the list of
* desired partitions first. */
// 檢查是否在desired partition 列表中
s_rktp = rd_kafka_toppar_desired_get(rkt, i);
rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
if (rktp) {
// 在desired partition 列表中,則移除它
rd_kafka_toppar_lock(rktp);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;
/* Remove from desp list since the
* partition is now known. */
rd_kafka_toppar_desired_unlink(rktp);
rd_kafka_toppar_unlock(rktp);
} else
s_rktp = rd_kafka_toppar_new(rkt, i);
// 賦值rktps[i]
rktps[i] = s_rktp;
} else {
// 如果是已經(jīng)存在的partition, 放到rktps[i], 并且作引用計數(shù)的增減
/* Existing partition, grab our own reference. */
rktps[i] = rd_kafka_toppar_keep(
rd_kafka_toppar_s2i(rkt->rkt_p[i]));
/* Loose previous ref */
rd_kafka_toppar_destroy(rkt->rkt_p[i]);
}
}
rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
/* Propagate notexist errors for desired partitions */
// 掃描desired partition 列表中, 還余下的都是無主的, 集群中不存在的partition, 回調(diào)error
RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
}
/* Remove excessive partitions */
// 處理更新后的partition個數(shù)小于更新前的情況, 需要刪除一部分partition
for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
s_rktp = rkt->rkt_p[i];
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
"Topic %s [%"PRId32"] is desired "
"but no longer known: "
"moving back on desired list",
rkt->rkt_topic->str, rktp->rktp_partition);
// 是DESIRED狀態(tài)的話, 再放回到desired列表
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
rd_kafka_toppar_desired_link(rktp);
if (!rd_kafka_terminating(rkt->rkt_rk))
rd_kafka_toppar_enq_error(
rktp,
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
// 解除和broker的聯(lián)系, 實(shí)際上是關(guān)聯(lián)到內(nèi)部的UA broker
rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
} else {
/* Tell handling broker to let go of the toppar */
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
rd_kafka_toppar_broker_leave_for_remove(rktp);
}
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
if (rkt->rkt_p)
rd_free(rkt->rkt_p);
rkt->rkt_p = rktps;
rkt->rkt_partition_cnt = partition_cnt;
return 1;
}
- 將在UA partition上待發(fā)送的kafka message重新分配到有效的patition上
rd_kafka_topic_assign_uas
:
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
rd_kafka_resp_err_t err) {
rd_kafka_t *rk = rkt->rkt_rk;
shptr_rd_kafka_toppar_t *s_rktp_ua;
rd_kafka_toppar_t *rktp_ua;
rd_kafka_msg_t *rkm, *tmp;
rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
int cnt;
if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
return;
// 沒有UA partition,就直接返回了
s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
if (unlikely(!s_rktp_ua)) {
return;
}
rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);
// 將ua partition上的msg移動到臨時隊列上
rd_kafka_toppar_lock(rktp_ua);
rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
rd_kafka_toppar_unlock(rktp_ua);
TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
/* Fast-path for failing messages with forced partition */
// 無效的msg放到failed 隊列
if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
rkm->rkm_partition >= rkt->rkt_partition_cnt &&
rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
rd_kafka_msgq_enq(&failed, rkm);
continue;
}
// 重新路由kafka message到相應(yīng)的partition, 失敗則放入failed 隊列
if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
/* Desired partition not available */
rd_kafka_msgq_enq(&failed, rkm);
}
}
// 失敗的msg, 都回調(diào)給application層
if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
/* Fail the messages */
rd_kafka_dr_msgq(rkt, &failed,
rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
err :
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
}
rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
}
- 關(guān)于metadata相關(guān)的操作, 我們介紹metadata時再來分析