Kafka在0.11版本中除了引入了Exactly Once語義,還引入了事務(wù)特性男翰。Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費者提交偏移量的操作在一個事務(wù)中,或者說是一個原子操作,生產(chǎn)消息和提交偏移量同時成功或者失敗。
1. Kafka事務(wù)的使用
Kafka中的事務(wù)特性主要用于以下兩種場景:
- 生產(chǎn)者發(fā)送多條消息可以封裝在一個事務(wù)中色冀,形成一個原子操作。多條消息要么都發(fā)送成功柱嫌,要么都發(fā)送失敗锋恬。
- read-process-write模式:將消息消費和生產(chǎn)封裝在一個事務(wù)中,形成一個原子操作编丘。在一個流式處理的應(yīng)用中与学,常常一個服務(wù)需要從上游接收消息,然后經(jīng)過處理后送達(dá)到下游嘉抓,這就對應(yīng)著消息的消費和生成索守。
當(dāng)事務(wù)中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset并沒有區(qū)別抑片。因此單純的消費消息并不是Kafka引入事務(wù)機(jī)制的原因卵佛,單純的消費消息也沒有必要存在于一個事務(wù)中。
Kafka producer API提供了以下接口用于事務(wù)操作:
/**
* 初始化事務(wù)
*/
public void initTransactions();
/**
* 開啟事務(wù)
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事務(wù)
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 丟棄事務(wù)
*/
public void abortTransaction() throws ProducerFencedException ;
下面是使用Kafka事務(wù)特性的例子敞斋,這段代碼Producer開啟了一個事務(wù)截汪,然后在這個事務(wù)中發(fā)送了兩條消息。這兩條消息要么都發(fā)送成功植捎,要么都失敗衙解。
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id”, “my-transactional-id");
producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();
下面這段代碼即為read-process-write模式,在一個Kafka事務(wù)中鸥跟,同時涉及到了生產(chǎn)消息和消費消息丢郊。
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id", "my-transactional-id");
KafkaConsumer consumer = createKafkaConsumer(
"bootstrap.servers", "localhost:9092",
"group.id", "my-group-id",
"isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
producer.initTransactions();
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
注意:在理解消息的事務(wù)時,一直處于一個錯誤理解是医咨,把操作db的業(yè)務(wù)邏輯跟操作消息當(dāng)成是一個事務(wù)枫匾,如下所示:
void kakfa_in_tranction(){
// 1.kafa的操作:讀取消息或生產(chǎn)消息
kafkaOperation();
// 2.db操作
dbOperation();
}
其實這個是有問題的。操作DB數(shù)據(jù)庫的數(shù)據(jù)源是DB拟淮,消息數(shù)據(jù)源是kfaka干茉,這是完全不同兩個數(shù)據(jù)。一種數(shù)據(jù)源(如mysql很泊,kafka)對應(yīng)一個事務(wù)角虫,所以它們是兩個獨立的事務(wù)。kafka事務(wù)指kafka一系列 生產(chǎn)、消費消息等操作組成一個原子操作,db事務(wù)是指操作數(shù)據(jù)庫的一系列增刪改操作組成一個原子操作射赛。
2. Kafka事務(wù)配置
- 對于Producer匙握,需要設(shè)置
transactional.id
屬性,這個屬性的作用下文會提到。設(shè)置了transactional.id
屬性后犬性,enable.idempotence
屬性會自動設(shè)置為true拱燃。 - 對于Consumer隶债,需要設(shè)置
isolation.level = read_committed
腾它,這樣Consumer只會讀取已經(jīng)提交了事務(wù)的消息。另外死讹,需要設(shè)置enable.auto.commit = false
來關(guān)閉自動提交Offset功能瞒滴。
更多關(guān)于配置的信息請參考我的文章:Kafka消息送達(dá)語義詳解
3. Kafka事務(wù)特性
Kafka的事務(wù)特性本質(zhì)上代表了三個功能:原子寫操作,拒絕僵尸實例(Zombie fencing)和讀事務(wù)消息赞警。
3.1 原子寫
Kafka的事務(wù)特性本質(zhì)上是支持了Kafka跨分區(qū)和Topic的原子寫操作妓忍。在同一個事務(wù)中的消息要么同時寫入成功,要么同時寫入失敗仅颇。我們知道单默,Kafka中的Offset信息存儲在一個名為_consumed_offsets的Topic中,因此read-process-write模式忘瓦,除了向目標(biāo)Topic寫入消息搁廓,還會向_consumed_offsets中寫入已經(jīng)消費的Offsets數(shù)據(jù)。因此read-process-write本質(zhì)上就是跨分區(qū)和Topic的原子寫操作耕皮。Kafka的事務(wù)特性就是要確本惩桑跨分區(qū)的多個寫操作的原子性。
3.2 拒絕僵尸實例(Zombie fencing)
在分布式系統(tǒng)中凌停,一個instance的宕機(jī)或失聯(lián)粱年,集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復(fù)了罚拟,那么集群中就產(chǎn)生了兩個具有相同職責(zé)的實例台诗,此時前一個instance就被稱為“僵尸實例(Zombie Instance)”。在Kafka中赐俗,兩個相同的producer同時處理消息并生產(chǎn)出重復(fù)的消息(read-process-write模式)拉队,這樣就嚴(yán)重違反了Exactly Once Processing的語義。這就是僵尸實例問題阻逮。
Kafka事務(wù)特性通過transaction-id
屬性來解決僵尸實例問題粱快。所有具有相同transaction-id
的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch叔扼。Kafka收到事務(wù)提交請求時事哭,如果檢查當(dāng)前事務(wù)提交者的epoch不是最新的,那么就會拒絕該P(yáng)roducer的請求瓜富。從而達(dá)成拒絕僵尸實例的目標(biāo)鳍咱。
3.3 讀事務(wù)消息
為了保證事務(wù)特性,Consumer如果設(shè)置了isolation.level = read_committed
与柑,那么它只會讀取已經(jīng)提交了的消息流炕。在Producer成功提交事務(wù)后澎现,Kafka會將所有該事務(wù)中的消息的Transaction Marker
從uncommitted
標(biāo)記為committed
狀態(tài)仅胞,從而所有的Consumer都能夠消費每辟。
4. Kafka事務(wù)原理
Kafka為了支持事務(wù)特性,引入一個新的組件:Transaction Coordinator干旧。主要負(fù)責(zé)分配pid渠欺,記錄事務(wù)狀態(tài)等操作。下面時Kafka開啟一個事務(wù)到提交一個事務(wù)的流程圖:
主要分為以下步驟:
1. 查找Tranaction Corordinator
Producer向任意一個brokers發(fā)送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址椎眯。
2. 初始化事務(wù) initTransaction
Producer發(fā)送InitpidRequest給Transaction Coordinator挠将,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關(guān)系编整。另外舔稀,它還會做兩件事:
- 恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)
- 對PID對應(yīng)的epoch進(jìn)行遞增,這樣可以保證同一個app的不同實例對應(yīng)的PID是一樣掌测,而epoch是不同的内贮。
只要開啟了冪等特性即必須執(zhí)行InitpidRequest,而無須考慮該P(yáng)roducer是否開啟了事務(wù)特性汞斧。
3. 開始事務(wù)beginTransaction
執(zhí)行Producer的beginTransacion()夜郁,它的作用是Producer在本地記錄下這個transaction的狀態(tài)為開始狀態(tài)。這個操作并沒有通知Transaction Coordinator粘勒,因為Transaction Coordinator只有在Producer發(fā)送第一條消息后才認(rèn)為事務(wù)已經(jīng)開啟竞端。
4. read-process-write流程
一旦Producer開始發(fā)送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存于Transaction Log內(nèi)庙睡,并將其狀態(tài)置為BEGIN事富。另外,如果該<Topic, Partition>為該事務(wù)中第一個<Topic, Partition>乘陪,Transaction Coordinator還會啟動對該事務(wù)的計時(每個事務(wù)都有自己的超時時間)统台。
在注冊<Transaction, Topic, Partition>到Transaction Log后,生產(chǎn)者發(fā)送數(shù)據(jù)暂刘,雖然沒有還沒有執(zhí)行commit或者abort饺谬,但是此時消息已經(jīng)保存到Broker上了。即使后面執(zhí)行abort谣拣,消息也不會刪除募寨,只是更改狀態(tài)字段標(biāo)識消息為abort狀態(tài)。
5. 事務(wù)提交或終結(jié) commitTransaction/abortTransaction
在Producer執(zhí)行commitTransaction/abortTransaction時森缠,Transaction Coordinator會執(zhí)行一個兩階段提交:
- 第一階段拔鹰,將Transaction Log內(nèi)的該事務(wù)狀態(tài)設(shè)置為
PREPARE_COMMIT
或PREPARE_ABORT
-
第二階段,將
Transaction Marker
寫入該事務(wù)涉及到的所有消息(即將消息標(biāo)記為committed
或aborted
)贵涵。這一步驟Transaction Coordinator會發(fā)送給當(dāng)前事務(wù)涉及到的每個<Topic, Partition>的Leader列肢,Broker收到該請求后恰画,會將對應(yīng)的Transaction Marker
控制信息寫入日志。
一旦Transaction Marker
寫入完成瓷马,Transaction Coordinator會將最終的COMPLETE_COMMIT
或COMPLETE_ABORT
狀態(tài)寫入Transaction Log中以標(biāo)明該事務(wù)結(jié)束拴还。
針對第二階段(寫入Transaction Marker
),存在兩條消息一條寫入成功欧聘,一條寫入失敗的場景片林。這樣會導(dǎo)致一個consumer讀取到寫入了Transaction Marker
的那條數(shù)據(jù)嗎?答案是不會的怀骤,因為broker不允許consumer將offset推進(jìn)一個未完成的事務(wù)的位置费封。這樣offset沒推進(jìn),這條消息就不會被consumer消費到蒋伦。參考:(https://www.confluent.io/blog/transactions-apache-kafka/)
5. 總結(jié)
- Transaction Marker與PID提供了識別消息是否應(yīng)該被讀取的能力弓摘,從而實現(xiàn)了事務(wù)的隔離性。
- Offset的更新標(biāo)記了消息是否被讀取痕届,從而將對讀操作的事務(wù)處理轉(zhuǎn)換成了對寫(Offset)操作的事務(wù)處理韧献。
- Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對應(yīng)的消息與一組讀操作(如果有)對應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(Transaction Marker)來實現(xiàn)事務(wù)中涉及的所有讀寫操作同時對外可見或同時對外不可見爷抓。
- Kafka只提供對Kafka本身的讀寫操作的事務(wù)性势决,不提供包含外部系統(tǒng)的事務(wù)性。