kafka入門

Apache Kafka 是一款開源的消息引擎系統(tǒng)贴唇,由LinkedIn 公司內(nèi)部孵化的項目搀绣。Raft 算法和 Paxos 算法分布式系統(tǒng)的一致性算法,zookeeper等的原理戳气。Google 的 Protocol Buffer 或 Facebook 的 Thrift链患,rpc框架。 Kafka使用的是純二進(jìn)制的字節(jié)序列物咳。kafka同時支持點對點模式和發(fā)布訂閱模式(通過group機制實現(xiàn))锣险。

JMS 是 Java Message Service,它也是支持上面這兩種消息引擎模型的览闰。JMS是一組API芯肤,是一個規(guī)范,很多消息引擎也支持压鉴,比如 ActiveMQ崖咨、RabbitMQ、IBM 的 WebSphere MQ 和 Apache Kafka油吭。kafka等消息引擎的主要作用就是解耦生產(chǎn)和發(fā)布击蹲。

Kafka 的三層消息架構(gòu):

1署拟、第一層是主題層(topic),每個主題可以配置 M 個分區(qū)歌豺,而每個分區(qū)又可以配置 N 個副本推穷,一條消息只能在一個分區(qū)

2类咧、第二層是分區(qū)層(partition)馒铃,每個分區(qū)的 N 個副本中只能有一個充當(dāng)領(lǐng)導(dǎo)者角色,對外提供服務(wù)痕惋;其他 N-1 個副本是追隨者副本区宇,只是提供數(shù)據(jù)冗余之用。

3值戳、第三層是消息層(message)议谷,分區(qū)中包含若干條消息,每條消息的位移從 0 開始堕虹,依次遞增卧晓。

4、最后鲫凶,客戶端程序只能與分區(qū)的領(lǐng)導(dǎo)者副本進(jìn)行交互禀崖。


消息:Record。Kafka 是消息引擎嘛螟炫,這里的消息就是指 Kafka 處理的主要對象波附。

主題:Topic。主題是承載消息的邏輯容器昼钻,在實際使用中多用來區(qū)分具體的業(yè)務(wù)掸屡。

分區(qū):Partition。一個有序不變的消息序列然评。每個主題下可以有多個分區(qū)仅财。

消息位移:Offset。表示分區(qū)中每條消息的位置信息碗淌,是一個單調(diào)遞增且不變的值盏求。

副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數(shù)據(jù)冗余亿眠,這些地方就是所謂的副本碎罚。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分纳像。副本是在分區(qū)層級下的荆烈,即每個分區(qū)可配置多個副本實現(xiàn)高可用。

生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序憔购。

消費者:Consumer宫峦。從主題訂閱新消息的應(yīng)用程序。

消費者位移:Consumer Offset玫鸟。表征消費者消費進(jìn)度导绷,每個消費者都有自己的消費者位移。

消費者組:Consumer Group屎飘。多個消費者實例共同組成的一個組诵次,同時消費多個分區(qū)以實現(xiàn)高吞吐。

重平衡:Rebalance枚碗。消費者組內(nèi)某個消費者實例掛掉后绩聘,其他消費者實例自動重新分配訂閱主題分區(qū)的過程伍玖。Rebalance 是 Kafka 消費者端實現(xiàn)高可用的重要手段。


1梭稚、Apache Kafka箱玷,也稱社區(qū)版 Kafka怨规。優(yōu)勢在于迭代速度快,社區(qū)響應(yīng)度高锡足,使用它可以讓你有更高的把控度波丰;缺陷在于僅提供基礎(chǔ)核心組件,缺失一些高級的特性舶得。

2掰烟、Confluent Kafka,Confluent 公司提供的 Kafka沐批。優(yōu)勢在于集成了很多高級特性且由 Kafka 原班人馬打造纫骑,質(zhì)量上有保證;缺陷在于相關(guān)文檔資料不全九孩,普及率較低先馆,沒有太多可供參考的范例。

3躺彬、CDH/HDP Kafka煤墙,大數(shù)據(jù)云公司提供的 Kafka,內(nèi)嵌 Apache Kafka宪拥。優(yōu)勢在于操作簡單仿野,節(jié)省運維成本;缺陷在于把控度低江解,演進(jìn)速度較慢设预。

kafka版本:

kafka-2.11-2.1.1? ? 2.11代表scala版本,2.1.1代表大版本犁河,小版本和patch鳖枕。

0.8.2.0 版本社區(qū)引入了新版本 Producer API魄梯,即需要指定 Broker 地址的 Producer。老版本需指定zookeeper地址宾符。


消息壓縮:

Producer 端壓縮酿秸、Broker 端保持、Consumer 端解壓縮魏烫。producer和broker都有一個參數(shù)compression.type辣苏,盡量保持一致,避免不必要的解壓縮哄褒,否則broker端的cpu很可能飆升稀蟋。每個壓縮過的消息集合在 Broker 端寫入時都要發(fā)生解壓縮操作,目的就是為了對消息執(zhí)行各種驗證呐赡。

在 Kafka 2.1.0 版本之前退客,Kafka 支持 3 種壓縮算法:GZIPSnappy LZ4链嘀。從 2.1.0 開始萌狂,Kafka 正式支持 Zstandard 算法(簡寫為 zstd)。它是 Facebook 開源的一個壓縮算法怀泊,能夠提供超高的壓縮比(compression ratio)茫藏。

在實際使用中,GZIP霹琼、Snappy务傲、LZ4 甚至是 zstd 的表現(xiàn)各有千秋。但對于 Kafka 而言碧囊,它們的性能測試結(jié)果卻出奇得一致树灶,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面糯而,zstd > LZ4 > GZIP > Snappy天通。具體到物理資源,使用 Snappy 算法占用的網(wǎng)絡(luò)帶寬最多熄驼,zstd 最少像寒,這是合理的,畢竟 zstd 就是要提供超高的壓縮比瓜贾;在 CPU 使用率方面诺祸,各個算法表現(xiàn)得差不多,只是在壓縮時 Snappy 算法使用的 CPU 較多一些祭芦,而在解壓縮時 GZIP 算法則可能使用更多的 CPU筷笨。

高性能原因:

1、頁緩存技術(shù)(系統(tǒng)page cache) + 磁盤順序?qū)懀愃聘鞣Ndb

2胃夏、零拷貝技術(shù)轴或,省略了把os cache里的數(shù)據(jù)拷貝到應(yīng)用緩存,再從應(yīng)用緩存拷貝到Socket緩存了仰禀,省去2此內(nèi)存拷貝照雁,系統(tǒng)內(nèi)存直接到IO。

3答恶、吞吐量高饺蚊,線性增加。partiton悬嗓、replica機制污呼。

Exactly-once:kafka默認(rèn)at least once。

最多一次(at most once):消息可能會丟失包竹,但絕不會被重復(fù)發(fā)送曙求。禁止重試即可

至少一次(at least once):消息不會丟失映企,但有可能被重復(fù)發(fā)送。?Broker 的應(yīng)答沒有成功發(fā)送回 Producer 端静浴,它只能選擇重試堰氓。默認(rèn)

精確一次(exactly once):消息不會丟失苹享,也不會被重復(fù)發(fā)送双絮。

冪等性(Idempotence):

其最大的優(yōu)勢在于我們可以安全地重試任何冪等性操作,反正它們也不會破壞我們的系統(tǒng)狀態(tài)得问。在 0.11 之后囤攀,指定 Producer 冪等性的方法很簡單,僅需要設(shè)置一個參數(shù)即可宫纬,即 props.put(“enable.idempotence”, ture)焚挠,或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)漓骚。Kafka 自動幫你做消息的重復(fù)去重蝌衔。底層具體的原理很簡單,就是經(jīng)典的用空間去換時間的優(yōu)化思路蝌蹂,即在 Broker 端多保存一些字段噩斟。當(dāng) Producer 發(fā)送了具有相同字段值的消息后,Broker 能夠自動知曉這些消息已經(jīng)重復(fù)了孤个,于是可以在后臺默默地把它們“丟棄”掉剃允。

冪等性缺陷:冪等性 Producer 能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息,它無法實現(xiàn)多個分區(qū)的冪等性。其次斥废,它只能實現(xiàn)單會話上的冪等性椒楣,不能實現(xiàn)跨會話的冪等性。這里的會話营袜,你可以理解為 Producer 進(jìn)程的一次運行撒顿。當(dāng)你重啟了 Producer 進(jìn)程之后,這種冪等性保證就喪失了荚板。

事務(wù):實際生產(chǎn)中很少使用凤壁,性能影響較大

事務(wù)型 Producer 能夠保證將消息原子性地寫入到多個分區(qū)中跪另。這批消息要么全部寫入成功拧抖,要么全部失敗。另外免绿,事務(wù)型 Producer 也不懼進(jìn)程的重啟唧席。Producer 重啟回來后,Kafka 依然保證它們發(fā)送消息的精確一次處理嘲驾。

1淌哟、和冪等性 Producer 一樣,開啟 enable.idempotence = true辽故。

2徒仓、設(shè)置 Producer 端參數(shù) transctional. id。最好為其設(shè)置一個有意義的名字誊垢。

producer.initTransactions();

try {

? ? ? ? ? ? producer.beginTransaction();

? ? ? ? ? ? producer.send(record1);

? ? ? ? ? ? producer.send(record2);

? ? ? ? ? ? producer.commitTransaction();

} catch (KafkaException e) {

? ? ? ? ? ? producer.abortTransaction();

}

在 Consumer 端掉弛,讀取事務(wù)型 Producer 發(fā)送的消息也是需要一些變更的。修改起來也很簡單喂走,設(shè)置 isolation.level 參數(shù)的值即可殃饿。當(dāng)前這個參數(shù)有兩個取值:

1、read_uncommitted:這是默認(rèn)值芋肠,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息乎芳,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取帖池。很顯然秒咐,如果你用了事務(wù)型 Producer,那么對應(yīng)的 Consumer 就不要使用這個值碘裕。

2携取、read_committed:表明 Consumer 只會讀取事務(wù)型 Producer 成功提交事務(wù)寫入的消息。當(dāng)然了帮孔,它也能看到非事務(wù)型 Producer 寫入的所有消息雷滋。

producer端:

1不撑、使用 producer.send(msg, callback)。一定要有回調(diào)晤斩,接收異常消息焕檬。

2、acks澳泵,0实愚、1(一個成功即可)、-1或者all(副本全部成功才算已提交兔辅,不會丟失腊敲,但吞吐量低)。

3维苔、配置retries > 0 的 Producer 能夠自動重試消息發(fā)送碰辅,避免消息丟失。

broker端:

1介时、設(shè)置 unclean.leader.election.enable = false没宾。不允許落后太多的broker參與選主。

2沸柔、設(shè)置 replication.factor >= 3循衰。這也是 Broker 端的參數(shù)。其實這里想表述的是褐澎,最好將消息多保存幾份羹蚣,畢竟目前防止消息丟失的主要機制就是冗余。

3乱凿、置 min.insync.replicas > 1。這依然是 Broker 端參數(shù)咽弦,控制的是消息至少要被寫入到多少個副本才算是“已提交”徒蟆。設(shè)置成大于 1 可以提升消息持久性。在實際環(huán)境中千萬不要使用默認(rèn)值 1型型。

4段审、確保 replication.factor > min.insync.replicas。如果兩者相等闹蒜,那么只要有一個副本掛機寺枉,整個分區(qū)就無法正常工作了。我們不僅要改善消息的持久性绷落,防止數(shù)據(jù)丟失姥闪,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1筐喳。

consumer端:

1、設(shè)置enable.auto.commit=false荣月,關(guān)閉consumer端自動提交,并采用手動提交位移的方式萌业。就像前面說的咽白,這對于單 Consumer 多線程處理的場景而言是至關(guān)重要的晶框。


kafka攔截器:

Kafka 攔截器可以應(yīng)用于包括客戶端監(jiān)控授段、端到端系統(tǒng)性能檢測侵贵、消息審計等多種功能在內(nèi)的場景∈ィ可實現(xiàn)端到端的統(tǒng)計和審計乞娄。

Producer端:

Properties props = new Properties();

List<String> interceptors = new ArrayList<>();

interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器 1

interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器 2

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

1、onSend:該方法會在消息發(fā)送之前被調(diào)用范删。如果你想在發(fā)送之前對消息“美美容”瓶逃,這個方法是你唯一的機會厢绝。

2懈万、onAcknowledgement:該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用会通。還記得我在上一期中提到的發(fā)送回調(diào)通知 callback 嗎?onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用裳涛。值得注意的是端三,這個方法和 onSend 不是在同一個線程中被調(diào)用的郊闯,因此如果你在這兩個方法中調(diào)用了某個共享可變對象,一定要保證線程安全哦欢摄。還有一點很重要,這個方法處在 Producer 發(fā)送的主路矗钟。

consumer端:

指定消費者攔截器也是同樣的方法,只是具體的實現(xiàn)類要實現(xiàn) org.apache.kafka.clients.consumer.ConsumerInterceptor 接口东涡,這里面也有兩個核心方法疮跑。

1祖娘、onConsume:該方法在消息返回給 Consumer 程序之前調(diào)用掀潮。也就是說在開始正式處理消息之前,攔截器會先攔一道薯鼠,搞一些事情人断,之后再返回給你。

2暇仲、onCommit:Consumer 在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作斥滤,比如打日志等。

TCP管理:

對最新版本的 Kafka(2.1.0)而言挑胸,Java Producer 端管理 TCP 連接的方式是:

1簿透、KafkaProducer 實例創(chuàng)建時啟動 Sender 線程,從而創(chuàng)建與 bootstrap.servers 中所有 Broker 的 TCP 連接蚂维。

2、KafkaProducer 實例首次更新元數(shù)據(jù)信息之后涂籽,還會再次創(chuàng)建與集群中所有 Broker 的 TCP 連接直焙。

3奔誓、如果 Producer 端發(fā)送消息到某臺 Broker 時發(fā)現(xiàn)沒有與該 Broker 的 TCP 連接,那么也會立即創(chuàng)建連接派阱。

4、如果設(shè)置 Producer 端 connections.max.idle.ms 參數(shù)大于 0,則步驟 1 中創(chuàng)建的 TCP 連接會被自動關(guān)閉樟蠕;如果設(shè)置該參數(shù) =-1吓懈,那么步驟 1 中創(chuàng)建的 TCP 連接將無法被關(guān)閉隔嫡,從而成為“僵尸”連接温兼。


消費者組:

1荡含、Consumer Group 下可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進(jìn)程,也可以是同一進(jìn)程下的線程。在實際場景中,使用進(jìn)程更為常見一些。

2、Group ID 是一個字符串,在一個 Kafka 集群中峭判,它標(biāo)識唯一的一個 Consumer Group。

3、Consumer Group 下所有實例訂閱的主題的單個分區(qū)只能分配給組內(nèi)的某個 Consumer 實例消費华畏。這個分區(qū)當(dāng)然也可以被其他的 Group 消費。極端情況下可實現(xiàn)點對點模型(所有consumer在一個組)和發(fā)布 / 訂閱模型(每個consumer單獨一個組)晰甚。

理想情況下蓖捶,Consumer 實例的數(shù)量應(yīng)該等于該 Group 訂閱主題的分區(qū)總數(shù)。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中”庠叮現(xiàn)在比較流行的提法是將服務(wù)器節(jié)點做成無狀態(tài)的俊鱼,這樣可以自由地擴(kuò)縮容,實現(xiàn)超強的伸縮性畅买。但是ZooKeeper 這類元框架其實并不適合進(jìn)行頻繁的寫更新并闲,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。新版本中谷羞,將位移保存在 Broker 端的內(nèi)部主題中帝火,__consumer_offsets。

Rebalance:

Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個 Consumer Group 下的所有 Consumer 如何達(dá)成一致犀填,來分配訂閱 Topic 的每個分區(qū)萌京。

1、組成員數(shù)發(fā)生變更宏浩。比如有新的 Consumer 實例加入組或者離開組,抑或是有 Consumer 實例崩潰被“踢出”組靠瞎。

2比庄、訂閱主題數(shù)發(fā)生變更。Consumer Group 可以使用正則表達(dá)式的方式訂閱主題乏盐,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭佳窑、字母 c 結(jié)尾的主題。在 Consumer Group 的運行過程中父能,你新創(chuàng)建了一個滿足這樣條件的主題神凑,那么該 Group 就會發(fā)生 Rebalance。

3何吝、訂閱主題的分區(qū)數(shù)發(fā)生變更溉委。Kafka 當(dāng)前只能允許增加一個主題的分區(qū)數(shù)。當(dāng)分區(qū)數(shù)增加時爱榕,就會觸發(fā)訂閱該主題的所有 Group 開啟 Rebalance瓣喊。

要避免不必要的rebalance,否則影響broker性能黔酥。

第一類非必要 Rebalance 是因為未能及時發(fā)送心跳藻三,導(dǎo)致 Consumer 被“踢出”Group 而引發(fā)的。因此跪者,你需要仔細(xì)地設(shè)置session.timeout.ms 和 heartbeat.interval.ms的值棵帽。我在這里給出一些推薦數(shù)值,你可以“無腦”地應(yīng)用在你的生產(chǎn)環(huán)境中渣玲。

設(shè)置 session.timeout.ms = 6s逗概。

設(shè)置 heartbeat.interval.ms = 2s。

要保證 Consumer 實例在被判定為“dead”之前忘衍,能夠發(fā)送至少 3 輪的心跳請求仗谆,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

將 session.timeout.ms 設(shè)置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經(jīng)掛掉的 Consumer淑履。畢竟隶垮,我們還是希望能盡快揪出那些“尸位素餐”的 Consumer,早日把它們踢出 Group秘噪。希望這份配置能夠較好地幫助你規(guī)避第一類“不必要”的 Rebalance狸吞。

第二類非必要 Rebalance 是 Consumer 消費時間過長導(dǎo)致的。我之前有一個客戶,在他們的場景中蹋偏,Consumer 消費數(shù)據(jù)時需要將消息處理之后寫入到 MongoDB便斥。顯然,這是一個很重的消費邏輯威始。MongoDB 的一丁點不穩(wěn)定都會導(dǎo)致 Consumer 程序消費時長的增加枢纠。此時,max.poll.interval.ms參數(shù)值的設(shè)置顯得尤為關(guān)鍵黎棠。如果要避免非預(yù)期的 Rebalance晋渺,你最好將該參數(shù)值設(shè)置得大一點,比你的下游最大處理時間稍長一點脓斩。就拿 MongoDB 這個例子來說木西,如果寫 MongoDB 的最長時間是 7 分鐘,那么你可以將該參數(shù)設(shè)置為 8 分鐘左右随静。

Consumer 端的 GC八千,比如是否出現(xiàn)了頻繁的 Full GC 導(dǎo)致的長時間停頓,從而引發(fā)了 Rebalance燎猛。

位移主題:__consumer_offsets用于保存消費者位移信息的內(nèi)部主題恋捆。

如果位移主題是 Kafka 自動創(chuàng)建的,那么該主題的分區(qū)數(shù)是 50重绷,副本數(shù)是 3鸠信。

Consumer端位移提交:

位移提交的語義保障是由consumer來負(fù)責(zé)的,Kafka 只會“無腦”地接受你提交的位移论寨。從用戶的角度來說星立,位移提交分為自動提交手動提交;從 Consumer 端的角度來說葬凳,位移提交分為同步提交異步提交绰垂。

開啟自動提交位移的方法很簡單。Consumer 端有個參數(shù) enable.auto.commit火焰,把它設(shè)置為 true 或者壓根不設(shè)置它就可以了劲装。因為它的默認(rèn)值就是 true,即 Java Consumer 默認(rèn)就是自動提交位移的昌简。如果啟用了自動提交占业,Consumer 端還有個參數(shù)就派上用場了:auto.commit.interval.ms。它的默認(rèn)值是 5 秒纯赎,表明 Kafka 每 5 秒會為你自動提交一次位移谦疾。

enable.auto.commit=true; # 開啟自動提交

auto.commit.interval.ms=5000; # 設(shè)置自動提交間隔,默認(rèn)5s

當(dāng)在默認(rèn)提交的時間間隔內(nèi)犬金,若發(fā)生rebalance念恍,將會發(fā)生重復(fù)消費六剥。

consumer.commitSync(); # 同步提交offset,會影響consumer性能

consumer.commitAsync(); # 不能重試

對于常規(guī)性峰伙、階段性的手動提交疗疟,我們調(diào)用 commitAsync() 避免程序阻塞,而在 Consumer 要關(guān)閉前瞳氓,我們調(diào)用 commitSync() 方法執(zhí)行同步阻塞式的位移提交策彤,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)。將兩者結(jié)合后匣摘,我們既實現(xiàn)了異步無阻塞式的位移管理店诗,也確保了 Consumer 位移的正確性。


CommitFailedException異常:

1恋沃、縮短單條消息處理的時間。比如必指,之前下游系統(tǒng)消費一條消息的時間是 100 毫秒囊咏,優(yōu)化之后成功地下降到 50 毫秒,那么此時 Consumer 端的 TPS 就提升了一倍塔橡。

2梅割、增加 Consumer 端允許下游系統(tǒng)消費一批消息的最大時長。這取決于 Consumer 端參數(shù) max.poll.interval.ms 的值葛家。在最新版的 Kafka 中户辞,該參數(shù)的默認(rèn)值是 5 分鐘。如果你的消費邏輯不能簡化癞谒,那么提高該參數(shù)值是一個不錯的辦法底燎。值得一提的是,Kafka 0.10.1.0 之前的版本是沒有這個參數(shù)的弹砚,因此如果你依然在使用 0.10.1.0 之前的客戶端 API双仍,那么你需要增加 session.timeout.ms 參數(shù)的值。不幸的是桌吃,session.timeout.ms 參數(shù)還有其他的含義朱沃,因此增加該參數(shù)的值可能會有其他方面的“不良影響”,這也是社區(qū)在 0.10.1.0 版本引入 max.poll.interval.ms 參數(shù)茅诱,將這部分含義從 session.timeout.ms 中剝離出來的原因之一逗物。

3、減少下游系統(tǒng)一次性消費的消息總數(shù)瑟俭。這取決于 Consumer 端參數(shù) max.poll.records 的值翎卓。當(dāng)前該參數(shù)的默認(rèn)值是 500 條,表明調(diào)用一次 KafkaConsumer.poll 方法摆寄,最多返回 500 條消息莲祸□灏玻可以說,該參數(shù)規(guī)定了單次 poll 方法能夠返回的消息總數(shù)的上限锐帜。如果前兩種方法對你都不適用的話田盈,降低此參數(shù)值是避免 CommitFailedException 異常最簡單的手段。

4缴阎、下游系統(tǒng)使用多線程來加速消費允瞧。這應(yīng)該算是“最高級”同時也是最難實現(xiàn)的解決辦法了。具體的思路就是蛮拔,讓下游系統(tǒng)手動創(chuàng)建多個消費線程處理 poll 方法返回的一批消息述暂。之前你使用 Kafka Consumer 消費數(shù)據(jù)更多是單線程的,所以當(dāng)消費速度無法匹及 Kafka Consumer 消息返回的速度時建炫,它就會拋出 CommitFailedException 異常畦韭。如果是多線程,你就可以靈活地控制線程數(shù)量肛跌,隨時調(diào)整消費承載能力艺配,再配以目前多核的硬件條件,該方法可謂是防止 CommitFailedException 最高檔的解決之道衍慎。事實上转唉,很多主流的大數(shù)據(jù)流處理框架使用的都是這個方法,比如 Apache Flink 在集成 Kafka 時稳捆,就是創(chuàng)建了多個 KafkaConsumerThread 線程赠法,自行處理多線程間的數(shù)據(jù)消費。不過乔夯,凡事有利就有弊砖织,這個方法實現(xiàn)起來并不容易,特別是在多個線程間如何處理位移提交這個問題上末荐,更是極容易出錯镶苞。在專欄后面的內(nèi)容中,我將著重和你討論一下多線程消費的實現(xiàn)方案鞠评。

副本機制:

kafka追隨者副本不提供服務(wù)茂蚓,所有讀寫都在主broker上。因為如果提供讀服務(wù)可能會有寫入到讀取的延時剃幌,同時akfka不像mysql它的讀一般是一次性的聋涨,跟寫有對應(yīng)關(guān)系,提供讀服務(wù)將不能保證分區(qū)間的線性讀负乡,實現(xiàn)起來復(fù)雜牍白。

In-sync Replicas(ISR)副本,ISR 不只是追隨者副本集合抖棘,它必然包括 Leader 副本茂腥。甚至在某些情況下狸涌,ISR 只有 Leader 這一個副本。而加入isr副本的標(biāo)準(zhǔn)就是 Broker 端參數(shù) replica.lag.time.max.ms 參數(shù)值最岗。這個參數(shù)的含義是 Follower 副本能夠落后 Leader 副本的最長時間間隔帕胆,當(dāng)前默認(rèn)值是 10 秒。這就是說般渡,只要一個 Follower 副本落后 Leader 副本的時間不連續(xù)超過 10 秒懒豹,那么 Kafka 就認(rèn)為該 Follower 副本與 Leader 是同步的,即使此時 Follower 副本中保存的消息明顯少于 Leader 副本中的消息驯用。

Unclean 領(lǐng)導(dǎo)者選舉(Unclean Leader Election)脸秽,不在isr副本集合中的追隨者被認(rèn)為是unclean leader,即落后leader較多的副本蝴乔。Broker 端參數(shù) unclean.leader.election.enable 控制是否允許 Unclean 領(lǐng)導(dǎo)者選舉记餐。并且isr會根據(jù)follower延時時間動態(tài)調(diào)整,能加入也能剔除薇正。

一個分布式系統(tǒng)通常只能同時滿足一致性(Consistency)片酝、可用性(Availability)、分區(qū)容錯性(Partition tolerance)中的兩個铝穷。顯然钠怯,在這個問題上佳魔,Kafka 賦予你選擇 C 或 A 的權(quán)利曙聂。若是允許unclean選舉,則因為其延時可能會丟數(shù)據(jù)鞠鲜,但是保證了系統(tǒng)的可用性宁脊,即C和A的選擇。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贤姆,一起剝皮案震驚了整個濱河市榆苞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌霞捡,老刑警劉巖坐漏,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異碧信,居然都是意外死亡赊琳,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門砰碴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躏筏,“玉大人,你說我怎么就攤上這事呈枉〕媚幔” “怎么了埃碱?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長酥泞。 經(jīng)常有香客問我砚殿,道長,這世上最難降的妖魔是什么婶博? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任瓮具,我火速辦了婚禮,結(jié)果婚禮上凡人,老公的妹妹穿的比我還像新娘名党。我一直安慰自己,他們只是感情好挠轴,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布传睹。 她就那樣靜靜地躺著,像睡著了一般岸晦。 火紅的嫁衣襯著肌膚如雪欧啤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天启上,我揣著相機與錄音邢隧,去河邊找鬼。 笑死冈在,一個胖子當(dāng)著我的面吹牛倒慧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播包券,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼纫谅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了溅固?” 一聲冷哼從身側(cè)響起付秕,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎侍郭,沒想到半個月后询吴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡亮元,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年猛计,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苹粟。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡有滑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出嵌削,到底是詐尸還是另有隱情毛好,我是刑警寧澤望艺,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站肌访,受9級特大地震影響找默,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吼驶,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一惩激、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蟹演,春花似錦风钻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至羞反,卻和暖如春布朦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背昼窗。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工是趴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人澄惊。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓唆途,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缤削。 傳聞我的和親對象是個殘疾皇子窘哈,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容