Kafka事務(wù)特性詳解

Kafka在0.11版本中除了引入了Exactly Once語義,還引入了事務(wù)特性男翰。Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費者提交偏移量的操作在一個事務(wù)中,或者說是一個原子操作,生產(chǎn)消息和提交偏移量同時成功或者失敗。

1. Kafka事務(wù)的使用

Kafka中的事務(wù)特性主要用于以下兩種場景:

  • 生產(chǎn)者發(fā)送多條消息可以封裝在一個事務(wù)中色冀,形成一個原子操作。多條消息要么都發(fā)送成功柱嫌,要么都發(fā)送失敗锋恬。
  • read-process-write模式:將消息消費和生產(chǎn)封裝在一個事務(wù)中,形成一個原子操作编丘。在一個流式處理的應(yīng)用中与学,常常一個服務(wù)需要從上游接收消息,然后經(jīng)過處理后送達(dá)到下游嘉抓,這就對應(yīng)著消息的消費和生成索守。

當(dāng)事務(wù)中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset并沒有區(qū)別抑片。因此單純的消費消息并不是Kafka引入事務(wù)機(jī)制的原因卵佛,單純的消費消息也沒有必要存在于一個事務(wù)中。

Kafka producer API提供了以下接口用于事務(wù)操作:

    /**
     * 初始化事務(wù)
     */
    public void initTransactions();
 
    /**
     * 開啟事務(wù)
     */
    public void beginTransaction() throws ProducerFencedException ;
 
    /**
     * 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                         String consumerGroupId) throws ProducerFencedException ;
 
    /**
     * 提交事務(wù)
     */
    public void commitTransaction() throws ProducerFencedException;
 
    /**
     * 丟棄事務(wù)
     */
    public void abortTransaction() throws ProducerFencedException ;

下面是使用Kafka事務(wù)特性的例子敞斋,這段代碼Producer開啟了一個事務(wù)截汪,然后在這個事務(wù)中發(fā)送了兩條消息。這兩條消息要么都發(fā)送成功植捎,要么都失敗衙解。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id”, “my-transactional-id");

producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();

下面這段代碼即為read-process-write模式,在一個Kafka事務(wù)中鸥跟,同時涉及到了生產(chǎn)消息和消費消息丢郊。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");

KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");

consumer.subscribe(singleton("inputTopic"));

producer.initTransactions();

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
  producer.commitTransaction();
}

注意:在理解消息的事務(wù)時,一直處于一個錯誤理解是医咨,把操作db的業(yè)務(wù)邏輯跟操作消息當(dāng)成是一個事務(wù)枫匾,如下所示:

void  kakfa_in_tranction(){
  // 1.kafa的操作:讀取消息或生產(chǎn)消息
  kafkaOperation();
  // 2.db操作
  dbOperation();
}

其實這個是有問題的。操作DB數(shù)據(jù)庫的數(shù)據(jù)源是DB拟淮,消息數(shù)據(jù)源是kfaka干茉,這是完全不同兩個數(shù)據(jù)。一種數(shù)據(jù)源(如mysql很泊,kafka)對應(yīng)一個事務(wù)角虫,所以它們是兩個獨立的事務(wù)。kafka事務(wù)指kafka一系列 生產(chǎn)、消費消息等操作組成一個原子操作,db事務(wù)是指操作數(shù)據(jù)庫的一系列增刪改操作組成一個原子操作射赛。

2. Kafka事務(wù)配置

  • 對于Producer匙握,需要設(shè)置transactional.id屬性,這個屬性的作用下文會提到。設(shè)置了transactional.id屬性后犬性,enable.idempotence屬性會自動設(shè)置為true拱燃。
  • 對于Consumer隶债,需要設(shè)置isolation.level = read_committed腾它,這樣Consumer只會讀取已經(jīng)提交了事務(wù)的消息。另外死讹,需要設(shè)置enable.auto.commit = false來關(guān)閉自動提交Offset功能瞒滴。

更多關(guān)于配置的信息請參考我的文章:Kafka消息送達(dá)語義詳解

3. Kafka事務(wù)特性

Kafka的事務(wù)特性本質(zhì)上代表了三個功能:原子寫操作拒絕僵尸實例(Zombie fencing)和讀事務(wù)消息赞警。

3.1 原子寫

Kafka的事務(wù)特性本質(zhì)上是支持了Kafka跨分區(qū)和Topic的原子寫操作妓忍。在同一個事務(wù)中的消息要么同時寫入成功,要么同時寫入失敗仅颇。我們知道单默,Kafka中的Offset信息存儲在一個名為_consumed_offsets的Topic中,因此read-process-write模式忘瓦,除了向目標(biāo)Topic寫入消息搁廓,還會向_consumed_offsets中寫入已經(jīng)消費的Offsets數(shù)據(jù)。因此read-process-write本質(zhì)上就是跨分區(qū)和Topic的原子寫操作耕皮。Kafka的事務(wù)特性就是要確本惩桑跨分區(qū)的多個寫操作的原子性。

3.2 拒絕僵尸實例(Zombie fencing)

在分布式系統(tǒng)中凌停,一個instance的宕機(jī)或失聯(lián)粱年,集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復(fù)了罚拟,那么集群中就產(chǎn)生了兩個具有相同職責(zé)的實例台诗,此時前一個instance就被稱為“僵尸實例(Zombie Instance)”。在Kafka中赐俗,兩個相同的producer同時處理消息并生產(chǎn)出重復(fù)的消息(read-process-write模式)拉队,這樣就嚴(yán)重違反了Exactly Once Processing的語義。這就是僵尸實例問題阻逮。

Kafka事務(wù)特性通過transaction-id屬性來解決僵尸實例問題粱快。所有具有相同transaction-id的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch叔扼。Kafka收到事務(wù)提交請求時事哭,如果檢查當(dāng)前事務(wù)提交者的epoch不是最新的,那么就會拒絕該P(yáng)roducer的請求瓜富。從而達(dá)成拒絕僵尸實例的目標(biāo)鳍咱。

3.3 讀事務(wù)消息

為了保證事務(wù)特性,Consumer如果設(shè)置了isolation.level = read_committed与柑,那么它只會讀取已經(jīng)提交了的消息流炕。在Producer成功提交事務(wù)后澎现,Kafka會將所有該事務(wù)中的消息的Transaction Markeruncommitted標(biāo)記為committed狀態(tài)仅胞,從而所有的Consumer都能夠消費每辟。

4. Kafka事務(wù)原理

Kafka為了支持事務(wù)特性,引入一個新的組件:Transaction Coordinator干旧。主要負(fù)責(zé)分配pid渠欺,記錄事務(wù)狀態(tài)等操作。下面時Kafka開啟一個事務(wù)到提交一個事務(wù)的流程圖:

KafkaTransaction.png

主要分為以下步驟:

1. 查找Tranaction Corordinator

Producer向任意一個brokers發(fā)送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址椎眯。

2. 初始化事務(wù) initTransaction

Producer發(fā)送InitpidRequest給Transaction Coordinator挠将,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關(guān)系编整。另外舔稀,它還會做兩件事:

  • 恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)
  • 對PID對應(yīng)的epoch進(jìn)行遞增,這樣可以保證同一個app的不同實例對應(yīng)的PID是一樣掌测,而epoch是不同的内贮。

只要開啟了冪等特性即必須執(zhí)行InitpidRequest,而無須考慮該P(yáng)roducer是否開啟了事務(wù)特性汞斧。

3. 開始事務(wù)beginTransaction

執(zhí)行Producer的beginTransacion()夜郁,它的作用是Producer在本地記錄下這個transaction的狀態(tài)為開始狀態(tài)。這個操作并沒有通知Transaction Coordinator粘勒,因為Transaction Coordinator只有在Producer發(fā)送第一條消息后才認(rèn)為事務(wù)已經(jīng)開啟竞端。

4. read-process-write流程

一旦Producer開始發(fā)送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存于Transaction Log內(nèi)庙睡,并將其狀態(tài)置為BEGIN事富。另外,如果該<Topic, Partition>為該事務(wù)中第一個<Topic, Partition>乘陪,Transaction Coordinator還會啟動對該事務(wù)的計時(每個事務(wù)都有自己的超時時間)统台。

在注冊<Transaction, Topic, Partition>到Transaction Log后,生產(chǎn)者發(fā)送數(shù)據(jù)暂刘,雖然沒有還沒有執(zhí)行commit或者abort饺谬,但是此時消息已經(jīng)保存到Broker上了。即使后面執(zhí)行abort谣拣,消息也不會刪除募寨,只是更改狀態(tài)字段標(biāo)識消息為abort狀態(tài)。

5. 事務(wù)提交或終結(jié) commitTransaction/abortTransaction

在Producer執(zhí)行commitTransaction/abortTransaction時森缠,Transaction Coordinator會執(zhí)行一個兩階段提交:

  • 第一階段拔鹰,將Transaction Log內(nèi)的該事務(wù)狀態(tài)設(shè)置為PREPARE_COMMITPREPARE_ABORT
  • 第二階段,將Transaction Marker寫入該事務(wù)涉及到的所有消息(即將消息標(biāo)記為committedaborted)贵涵。這一步驟Transaction Coordinator會發(fā)送給當(dāng)前事務(wù)涉及到的每個<Topic, Partition>的Leader列肢,Broker收到該請求后恰画,會將對應(yīng)的Transaction Marker控制信息寫入日志。

一旦Transaction Marker寫入完成瓷马,Transaction Coordinator會將最終的COMPLETE_COMMITCOMPLETE_ABORT狀態(tài)寫入Transaction Log中以標(biāo)明該事務(wù)結(jié)束拴还。

針對第二階段(寫入Transaction Marker),存在兩條消息一條寫入成功欧聘,一條寫入失敗的場景片林。這樣會導(dǎo)致一個consumer讀取到寫入了Transaction Marker的那條數(shù)據(jù)嗎?答案是不會的怀骤,因為broker不允許consumer將offset推進(jìn)一個未完成的事務(wù)的位置费封。這樣offset沒推進(jìn),這條消息就不會被consumer消費到蒋伦。參考:(https://www.confluent.io/blog/transactions-apache-kafka/

5. 總結(jié)

  • Transaction Marker與PID提供了識別消息是否應(yīng)該被讀取的能力弓摘,從而實現(xiàn)了事務(wù)的隔離性。
  • Offset的更新標(biāo)記了消息是否被讀取痕届,從而將對讀操作的事務(wù)處理轉(zhuǎn)換成了對寫(Offset)操作的事務(wù)處理韧献。
  • Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對應(yīng)的消息與一組讀操作(如果有)對應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(Transaction Marker)來實現(xiàn)事務(wù)中涉及的所有讀寫操作同時對外可見或同時對外不可見爷抓。
  • Kafka只提供對Kafka本身的讀寫操作的事務(wù)性势决,不提供包含外部系統(tǒng)的事務(wù)性。

參考文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蓝撇,一起剝皮案震驚了整個濱河市果复,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌渤昌,老刑警劉巖虽抄,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異独柑,居然都是意外死亡迈窟,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門忌栅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來车酣,“玉大人,你說我怎么就攤上這事索绪『保” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵瑞驱,是天一觀的道長娘摔。 經(jīng)常有香客問我,道長唤反,這世上最難降的妖魔是什么凳寺? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任鸭津,我火速辦了婚禮,結(jié)果婚禮上肠缨,老公的妹妹穿的比我還像新娘逆趋。我一直安慰自己,他們只是感情好怜瞒,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布父泳。 她就那樣靜靜地躺著,像睡著了一般吴汪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蒸眠,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天漾橙,我揣著相機(jī)與錄音,去河邊找鬼楞卡。 笑死霜运,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蒋腮。 我是一名探鬼主播淘捡,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼池摧!你這毒婦竟也來了焦除?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤作彤,失蹤者是張志新(化名)和其女友劉穎膘魄,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竭讳,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡创葡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了绢慢。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灿渴。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖胰舆,靈堂內(nèi)的尸體忽然破棺而出骚露,到底是詐尸還是另有隱情,我是刑警寧澤思瘟,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布荸百,位于F島的核電站,受9級特大地震影響滨攻,放射性物質(zhì)發(fā)生泄漏够话。R本人自食惡果不足惜蓝翰,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望女嘲。 院中可真熱鬧畜份,春花似錦、人聲如沸欣尼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽愕鼓。三九已至钙态,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間菇晃,已是汗流浹背册倒。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留磺送,地道東北人驻子。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像估灿,于是被迫代替她去往敵國和親崇呵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容