1灾锯、 簡介
它可以讓你發(fā)布和訂閱記錄流拍柒。在這方面,它類似于一個消息隊列或企業(yè)消息系統(tǒng)。
它可以讓你持久化收到的記錄流壳咕,從而具有容錯能力确虱。
首先补箍,明確幾個概念:
? Kafka運行在一個或多個服務器上匾七。
? Kafka集群分類存儲的記錄流被稱為主題(Topics)。
? 每個消息記錄包含一個鍵囚玫,一個值和時間戳喧锦。
Kafka有四個核心API:
? 生產者 API 允許應用程序發(fā)布記錄流至一個或多個Kafka的話題(Topics)。
? 消費者API 允許應用程序訂閱一個或多個主題抓督,并處理這些主題接收到的記錄流燃少。
? Streams API 允許應用程序充當流處理器(stream processor),從一個或多個主題獲取輸入流铃在,并生產一個輸出流至一個或多個的主題阵具,能夠有效地變換輸入流為輸出流。
? Connector API 允許構建和運行可重用的生產者或消費者定铜,能夠把 Kafka主題連接到現(xiàn)有的應用程序或數(shù)據(jù)系統(tǒng)阳液。例如,一個連接到關系數(shù)據(jù)庫的連接器(connector)可能會獲取每個表的變化揣炕。
? Kafka的客戶端和服務器之間的通信是靠一個簡單的帘皿,高性能的,與語言無關的TCP協(xié)議完成的畸陡。這個協(xié)議有不同的版本鹰溜,并保持向前兼容舊版本越庇。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端奉狈。
2、 架構
2.1 Broker
每個kafka server稱為一個Broker涩惑,多個borker組成kafka cluster仁期。一個機器上可以部署一個或者多個Broker,這多個Broker連接到相同的ZooKeeper就組成了Kafka集群竭恬。
2.2 主題Topic
讓我們先來了解Kafka的核心抽象概念記錄流 – 主題跛蛋。主題是一種分類或發(fā)布的一系列記錄的名義上的名字。Kafka的主題始終是支持多用戶訂閱的; 也就是說痊硕,一個主題可以有零個赊级,一個或多個消費者訂閱寫入的數(shù)據(jù)。
Topic 與broker
一個Broker上可以創(chuàng)建一個或者多個Topic岔绸。同一個topic可以在同一集群下的多個Broker中分布理逊。
當然,Topic只是一個名義上的組件盒揉,真正在Broker間分布式的Partition晋被。
2.3 分區(qū)與日志
一個主題對應多個分區(qū),一個分區(qū)對應一個日志
Kafka會為每個topic維護了多個分區(qū)(partition)刚盈,每個分區(qū)會映射到一個邏輯的日志(log)文件羡洛。每個分區(qū)是一個有序的,不可變的消息序列藕漱,新的消息不斷追加到這個有組織的有保證的日志上欲侮。分區(qū)會給每個消息記錄分配一個順序ID號 – 偏移量, 能夠唯一地標識該分區(qū)中的每個記錄肋联。
日志分區(qū)是分布式的存在于一個kafka集群的多個broker上威蕉。每個partition會被復制多份存在于不同的broker上。這樣做是為了容災橄仍。具體會復制幾份忘伞,會復制到哪些broker上,都是可以配置的沙兰。經過相關的復制策略后氓奈,每個topic在每個broker上會駐留一到多個partition:
2.4 保留策略與Offset
Kafka集群保留所有發(fā)布的記錄,不管這個記錄有沒有被消費過鼎天,Kafka提供可配置的保留策略去刪除舊數(shù)據(jù)(還有一種策略根據(jù)分區(qū)大小刪除數(shù)據(jù))舀奶。例如,如果將保留策略設置為兩天斋射,在記錄公布后兩天內育勺,它可用于消費但荤,之后它將被丟棄以騰出空間。Kafka的性能跟存儲的數(shù)據(jù)量的大小無關涧至, 所以將數(shù)據(jù)存儲很長一段時間是沒有問題的腹躁。
事實上,保留在每個消費者元數(shù)據(jù)中的最基礎的數(shù)據(jù)就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)南蓬。這種偏移是由消費者控制:通常偏移會隨著消費者讀取記錄線性前進纺非,但事實上,因為其位置是由消費者進行控制赘方,消費者可以在任何它喜歡的位置讀取記錄烧颖。例如,消費者可以恢復到舊的偏移量對過去的數(shù)據(jù)再加工或者直接跳到最新的記錄窄陡,并消費從“現(xiàn)在”開始的新的記錄炕淮。
這些功能的結合意味著,實現(xiàn)Kafka的消費者的代價都是很小的跳夭,他們可以增加或者減少而不會對集群或其他消費者有太大影響涂圆。例如,你可以使用我們的命令行工具去追隨任何主題币叹,而且不會改變任何現(xiàn)有的消費者消費的記錄乘综。
2.5 Leader與Followers
一個Topic可能有很多分區(qū),以便它能夠支持海量的的數(shù)據(jù)套硼,更重要的意義是分區(qū)是進行并行處理的基礎單元卡辰。日志的分區(qū)會跨服務器的分布在Kafka集群中,每個分區(qū)可以配置一定數(shù)量的副本分區(qū)提供容錯能力邪意。為了保證較高的處理效率九妈,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader雾鬼,而其他副本則是Follower萌朱,而Follower則會定期地到Leader上同步數(shù)據(jù)。
(1)leader處理所有的讀取和寫入分區(qū)的請求策菜,而followers被動的從領導者拷貝數(shù)據(jù)晶疼。
(2)如果leader失敗了,followers之一將自動成為新的領導者又憨。
(3)每個服務器可能充當一些分區(qū)的leader和其他分區(qū)的follower翠霍,這樣的負載就會在集群內很好的均衡分配。
(4)一個分區(qū)在同一時刻只能有一個消費者實例進行消費蠢莺。
舉例:
可以看見我們一共有3個分區(qū)分別是0寒匙,1,2躏将, replica 有2個:
partition 0 的leader在broker1锄弱, follower在broker2
partition 1 的leader在broker2考蕾, follower在broker0
partition 2 的leader在broker0, follower在brokder1
一個broker中不會出現(xiàn)兩個一樣的Partition会宪,replica會被均勻的分布在各個kafka server(broker)上 肖卧。Kafka并不允許replicas 數(shù)設置大于 broker數(shù),因為在一個broker上如果有2個replica其實是沒有意義的掸鹅,因為再多的replica同時在一臺broker上塞帐,隨著該broker的crash,一起不可用河劝。
(1)Leader選舉與ISR
如果某個分區(qū)所在的服務器除了問題,不可用矛紫,kafka會從該分區(qū)的其他的副本中選擇一個作為新的Leader赎瞎。之后所有的讀寫就會轉移到這個新的Leader上。現(xiàn)在的問題是應當選擇哪個作為新的Leader颊咬。顯然务甥,只有那些跟Leader保持同步的Follower才應該被選作新的Leader。
Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica喳篇,已同步的副本)的集合敞临,該集合中是一些分區(qū)的副本。只有當這些副本都跟Leader中的副本同步了之后麸澜,kafka才會認為消息已提交挺尿,并反饋給消息的生產者。如果這個集合有增減炊邦,kafka會更新zookeeper上的記錄编矾。如果某個分區(qū)的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader馁害。顯然通過ISR窄俏,kafka需要的冗余度較低,可以容忍的失敗數(shù)比較高碘菜。假設某個topic有f+1個副本凹蜈,kafka可以容忍f個服務器不可用。
(2)為什么不用少數(shù)服從多數(shù)的方法
少數(shù)服從多數(shù)是一種比較常見的一致性算法和Leader選舉法忍啸。它的含義是只有超過半數(shù)的副本同步了仰坦,系統(tǒng)才會認為數(shù)據(jù)已同步;選擇Leader時也是從超過半數(shù)的同步的副本中選擇。這種算法需要較高的冗余度计雌。譬如只允許一臺機器失敗缎岗,需要有三個副本;而如果只容忍兩臺機器失敗,則需要五個副本白粉。而kafka的ISR集合方法传泊,分別只需要兩個和三個副本鼠渺。
(3)如果所有的ISR副本都失敗了怎么辦
此時有兩種方法可選,一種是等待ISR集合中的副本復活眷细,一種是選擇任何一個立即可用的副本拦盹,而這個副本不一定是在ISR集合中。這兩種方法各有利弊溪椎,實際生產中按需選擇普舆。如果要等待ISR副本復活,雖然可以保證一致性校读,但可能需要很長時間沼侣。而如果選擇立即可用的副本,則很可能該副本并不一致歉秫。
2.6 生產者和消費者
(1)生產者
生產者發(fā)布數(shù)據(jù)到他們所選擇的主題蛾洛。生產者負責選擇把記錄分配到主題中的哪個分區(qū)。這可以使用輪詢算法( round-robin)進行簡單地平衡負載雁芙,也可以根據(jù)一些更復雜的語義分區(qū)算法(比如基于記錄一些鍵值)來完成轧膘。
(2)消費者
消費者以消費群(consumer group)的名稱來標識自己,每個發(fā)布到主題的消息都會發(fā)送給訂閱了這個主題的消費群里面的一個消費者實例兔甘,即一個消費群只發(fā)送一次谎碍。消費者的實例可以在單獨的進程或單獨的機器上。
上圖中兩個服務器的Kafka集群具有四個分區(qū)(P0-P3)和兩個消費群洞焙。A消費群有兩個消費者蟆淀,B群有四個。更常見的是澡匪,我們會發(fā)現(xiàn)主題有少量的消費群扳碍,每一個都是“邏輯上的訂閱者”。每組都是由很多消費者實例組成仙蛉,從而實現(xiàn)可擴展性和容錯性笋敞。這只不過是發(fā)布 – 訂閱模式的再現(xiàn),區(qū)別是這里的訂閱者是一組消費者而不是一個單一的進程的消費者荠瘪。
Kafka消費群的實現(xiàn)方式是通過分割分區(qū)給每個Consumer實例實現(xiàn)的夯巷,使每個實例在任何時間點的都可以“公平分享”獨占的分區(qū)。維持消費群中的成員關系的這個過程是通過Kafka動態(tài)協(xié)議處理哀墓。如果新的實例加入該組趁餐,他將接管該組的其他成員的一些分區(qū); 如果一個實例死亡,其分區(qū)將被分配到剩余的實例篮绰。
Kafka只保證一個分區(qū)內的消息有序后雷,不能保證一個主題的不同分區(qū)之間的消息有序。分區(qū)的消息有序與依靠主鍵進行數(shù)據(jù)分區(qū)的能力相結合足以滿足大多數(shù)應用的要求。但是臀突,如果你想要保證所有的消息都絕對有序可以只為一個主題分配一個分區(qū)勉抓,雖然這將意味著每個消費群同時只能有一個消費進程在消費。
3 候学、數(shù)據(jù)可靠性與一致性
3.1 Partition Recovery機制
每個Partition會在磁盤記錄一個RecoveryPoint藕筋,記錄已經flush到磁盤的最大offset。當broker fail 重啟時梳码,會進行l(wèi)oadLogs隐圾。 首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment掰茶, 這些segment就是可能沒有完全flush到磁盤segments暇藏。然后調用segment的recover,重新讀取各個segment的msg濒蒋,并重建索引盐碱。
優(yōu)點
? 以segment為單位管理Partition數(shù)據(jù),方便數(shù)據(jù)生命周期的管理啊胶,刪除過期數(shù)據(jù)簡單甸各。
? 在程序崩潰重啟時垛贤,加快recovery速度焰坪,只需恢復未完全flush到磁盤的segment。
? 通過index中offset與物理偏移映射聘惦,用二分查找能快速定位msg某饰,并且通過分多個Segment,每個index文件很小善绎,查找速度更快黔漂。
3.2 Partition Replica同步機制
? Partition的多個replica中一個為Leader,其余為follower
? Producer只與Leader交互禀酱,把數(shù)據(jù)寫入到Leader中
? Followers從Leader中拉取數(shù)據(jù)進行數(shù)據(jù)同步
? Consumer只從Leader拉取數(shù)據(jù)
ISR:in-sync replica炬守,已同步的副本。準確的定義是“所有不落后的replica集合”剂跟。不落后有兩層含義:距離上次FetchRequest的時間不大于某一個值或落后的消息數(shù)不大于某一個值减途, Leader失敗后會從ISR中選取一個Follower做Leader。
3.4 消息的順序消費問題
在說到消息中間件的時候曹洽,我們通常都會談到一個特性:消息的順序消費問題鳍置。這個問題看起來很簡單:Producer發(fā)送消息1, 2, 3;Consumer按1, 2, 3順序消費。但實際情況卻是:無論RocketMQ送淆,還是Kafka税产,缺省都不保證消息的嚴格有序消費!困難如下:
(1)Producer
發(fā)送端不能異步發(fā)送,異步發(fā)送在發(fā)送失敗的情況下,就沒辦法保證消息順序辟拷。比如你連續(xù)發(fā)了1撞羽,2,3梧兼。 過了一會放吩,返回結果1失敗,2, 3成功羽杰。你把1再重新發(fā)送1遍渡紫,這個時候順序就亂掉了。
(2)存儲端
對于存儲端考赛,要保證消息順序惕澎,會有以下幾個問題:
消息不能分區(qū)。也就是1個topic颜骤,只能有1個隊列唧喉。在Kafka中,它叫做partition;在RocketMQ中忍抽,它叫做queue八孝。 如果你有多個隊列,那同1個topic的消息鸠项,會分散到多個分區(qū)里面干跛,自然不能保證順序。
即使只有1個隊列的情況下祟绊,會有第2個問題楼入。該機器掛了之后,能否切換到其他機器?也就是高可用問題牧抽。比如你當前的機器掛了嘉熊,上面還有消息沒有消費完。此時切換到其他機器扬舒,可用性保證了阐肤。但消息順序就亂掉了。要想保證讲坎,一方面要同步復制孕惜,不能異步復制;另1方面得保證,切機器之前衣赶,掛掉的機器上面诊赊,所有消息必須消費完了,不能有殘留府瞄。很明顯碧磅,這個很難碘箍。
(3)接收端
對于接收端,不能并行消費鲸郊,也即不能開多線程或者多個客戶端消費同1個隊列丰榴。
3.5 Producer發(fā)送消息的配置
3.5.1 同步模式
kafka有同步(sync)、異步(async)以及oneway這三種發(fā)送方式秆撮,某些概念上區(qū)分也可以分為同步和異步兩種四濒,同步和異步的發(fā)送方式通過producer.type參數(shù)指定,而oneway由request.require.acks參數(shù)指定职辨。
producer.type的默認值是sync盗蟆,即同步的方式。這個參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的舒裤。如果設置成異步的模式喳资,可以運行生產者以batch的形式push數(shù)據(jù),這樣會極大的提高broker的性能腾供,但是這樣會增加丟失數(shù)據(jù)的風險仆邓。
3.5.2 異步模式
對于異步模式,還有4個配套的參數(shù)伴鳖,如下:
3.5.3 oneway
oneway是只顧消息發(fā)出去而不管死活节值,消息可靠性最低,但是低延遲榜聂、高吞吐搞疗,這種對于某些完全對可靠性沒有要求的場景還是適用的,即request.required.acks設置為0峻汉。
3.5.4 消息可靠性級別
當Producer向Leader發(fā)送數(shù)據(jù)時贴汪,可以通過request.required.acks參數(shù)設置數(shù)據(jù)可靠性的級別:
? 0: 不論寫入是否成功脐往,server不需要給Producer發(fā)送Response休吠,如果發(fā)生異常,server會終止連接业簿,觸發(fā)Producer更新meta數(shù)據(jù);
? 1: Leader寫入成功后即發(fā)送Response瘤礁,此種情況如果Leader fail,會丟失數(shù)據(jù)
? -1: 等待所有ISR接收到消息后再給Producer發(fā)送Response梅尤,這是最強保證
僅設置acks=-1也不能保證數(shù)據(jù)不丟失柜思,當Isr列表中只有Leader時,同樣有可能造成數(shù)據(jù)丟失巷燥。要保證數(shù)據(jù)不丟除了設置acks=-1赡盘, 還要保 證ISR的大小大于等于2,具體參數(shù)設置:
? (1)request.required.acks: 設置為-1 等待所有ISR列表中的Replica接收到消息后采算寫成功;
? (2)min.insync.replicas: 設置為大于等于2缰揪,保證ISR中至少有兩個Replica
Producer要在吞吐率和數(shù)據(jù)可靠性之間做一個權衡陨享。
3.5.5 一般配置
4、 應用場景
4.1 消息系統(tǒng)
消息處理模型歷來有兩種:
隊列模型:一組消費者可以從服務器讀取記錄,每個記錄都會被其中一個消費者處理抛姑,為保障消息的順序赞厕,同一時刻只能有一個進程進行消費。
發(fā)布-訂閱模型:記錄被廣播到所有的消費者定硝。
Kafka的消費群的推廣了這兩個概念皿桑。消費群可以像隊列一樣讓消息被一組進程處理(消費群的成員),與發(fā)布 – 訂閱模式一樣蔬啡,Kafka可以讓你發(fā)送廣播消息到多個消費群诲侮。
Kafka兼顧了消息的有序性和并發(fā)處理能力。傳統(tǒng)的消息隊列的消息在隊列中是有序的箱蟆,多個消費者從隊列中消費消息浆西,服務器按照存儲的順序派發(fā)消息。然而顽腾,盡管服務器是按照順序派發(fā)消息近零,但是這些消息記錄被異步傳遞給消費者,消費者接收到的消息也許已經是亂序的了抄肖。這實際上意味著消息的排序在并行消費中都將丟失久信。消息系統(tǒng)通常靠 “排他性消費”( exclusive consumer)來解決這個問題漓摩,只允許一個進程從隊列中消費裙士,當然,這意味著沒有并行處理的能力管毙。
Kafka做的更好腿椎。通過一個概念:并行性-分區(qū)-主題實現(xiàn)主題內的并行處理,Kafka是能夠通過一組消費者的進程同時提供排序保證和并行處理以及負載均衡的能力:
(1)排序保障
每個主題的分區(qū)指定給每個消費群中的一個消費者夭咬,使每個分區(qū)只由該組中的一個消費者所消費啃炸。通過這樣做,我們確保消費者是一個分區(qū)唯一的讀者卓舵,從而順序的消費數(shù)據(jù)南用。
(2)并行處理
因為有許多的分區(qū),所以負載還能夠均衡的分配到很多的消費者實例上去掏湾。但是請注意裹虫,一個消費群的消費者實例不能比分區(qū)數(shù)量多,因為分區(qū)數(shù)代表了一個主題的最大并發(fā)數(shù)融击,消費者的數(shù)量高于這個數(shù)量意義不大筑公。
4.2 日志采集
大多數(shù)時候,我們的log都會輸出到本地的磁盤上尊浪,排查問題也是使用linux命令來搞定匣屡,如果web程序組成負載集群涩拙,那么就有多臺機器,如果有幾十臺機器耸采,幾十個服務兴泥,那么想快速定位log問題和排查就比較麻煩了,所以很有必要有一個統(tǒng)一的平臺管理log虾宇,現(xiàn)在大多數(shù)公司的套路都是收集重要應用的log集中到kafka中搓彻,然后在分別導入到es和hdfs上,一個做實時檢索分析嘱朽,另一個做離線統(tǒng)計和數(shù)據(jù)備份旭贬。如何能快速收集應用日志到kafka中?
方法一:使用log4j的集成包
kafka官網已經提供了非常方便的log4j的集成包 kafka-log4j-appender,我們只需要簡單配置log4j文件搪泳,就能收集應用程序log到kafka中稀轨。
注意,需要引入maven的依賴包:
非常簡單岸军,一個maven依賴加一個log4j配置文件即可奋刽,如果依然想寫入log到本地 文件依然也是可以的,這種方式最簡單快速艰赞,但是默認的的log日志是一行一行的純文本佣谐,有些場景下我們可能需要json格式的數(shù)據(jù)。
方法二: 重寫Log4jAppender
重寫Log4jAppender方妖,自定義輸出格式狭魂,支持json格式,如果是json格式的數(shù)據(jù)打入到kafka中党觅,后續(xù)收集程序可能就非常方便了雌澄,直接拿到json就能入到mongodb或者es中,如果打入到kafka中的數(shù)據(jù)是純文本杯瞻,那么收集程序镐牺,可能需要做一些etl,解析其中的一些字段然后再入到es中又兵,所以原生的輸出格式任柜,可能稍不靈活卒废,這樣就需要我們自己寫一些類沛厨,然后達到靈活的程度,github連接:
總結:
(1)方法一簡單快速摔认,不支持json格式的輸出逆皮,打到kafka的消息都是原樣的log日志信息
(2)方法二稍微復雜,需要自己擴展log收集類参袱,但支持json格式的數(shù)據(jù)輸出电谣,對于想落地json數(shù)據(jù)直接到存儲系統(tǒng)中是非常適合的秽梅。
此外需要注意,在調試的時候log發(fā)送數(shù)據(jù)到kafka模式最好是同步模式的否則你控制臺打印的數(shù)據(jù)很有可能不會被收集kafka中剿牺,程序就停止了企垦。生產環(huán)境最好開啟異步發(fā)送數(shù)據(jù)模式,因為內部是批量的處理晒来,所以能提升吞吐,但有一定的輕微延遲钞诡。
4.3 流處理
只是讀,寫湃崩,以及儲存數(shù)據(jù)流是不夠的荧降,目的是能夠實時處理數(shù)據(jù)流。在Kafka中攒读,流處理器是從輸入的主題連續(xù)的獲取數(shù)據(jù)流朵诫,然后對輸入進行一系列的處理,并生產連續(xù)的數(shù)據(jù)流到輸出主題薄扁。
這些簡單處理可以直接使用生產者和消費者的API做到剪返。然而,對于更復雜的轉換Kafka提供了一個完全集成的流API邓梅。這允許應用程序把一些重要的計算過程從流中剝離或者加入流一起随夸。這種設施可幫助解決這類應用面臨的難題:處理雜亂的數(shù)據(jù),改變代碼去重新處理輸入震放,執(zhí)行有狀態(tài)的計算等宾毒。流API建立在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka存儲有狀態(tài)的數(shù)據(jù)殿遂,并使用群組機制在一組流處理實例中實現(xiàn)容錯诈铛。
把功能組合起來
消息的傳輸,存儲和流處理的組合看似不尋常墨礁,卻是Kafka作為流處理平臺的關鍵幢竹。像HDFS分布式文件系統(tǒng),允許存儲靜態(tài)文件進行批量處理恩静。像這樣的系統(tǒng)允許存儲和處理過去的歷史數(shù)據(jù)焕毫。傳統(tǒng)的企業(yè)消息系統(tǒng)允許處理您訂閱后才抵達的消息。這樣的系統(tǒng)只能處理將來到達的數(shù)據(jù)驶乾。
Kafka結合了這些功能邑飒,這種結合對Kafka作為流應用平臺以及數(shù)據(jù)流處理的管道至關重要。通過整合存儲和低延遲訂閱级乐,流處理應用可以把過去和未來的數(shù)據(jù)用相同的方式處理疙咸。這樣一個單獨的應用程序,不但可以處理歷史的风科,保存的數(shù)據(jù)撒轮,當它到達最后一條記錄不會停止乞旦,繼續(xù)等待處理未來到達的數(shù)據(jù)。這是泛化了的流處理的概念题山,包括了批處理應用以及消息驅動的應用兰粉。同樣,流數(shù)據(jù)處理的管道結合實時事件的訂閱使人們能夠用Kafka實現(xiàn)低延遲的管道; 可靠的存儲數(shù)據(jù)的能力使人們有可能使用它傳輸一些重要的必須保證可達的數(shù)據(jù)顶瞳∏阻耄可以與一個定期加載數(shù)據(jù)的線下系統(tǒng)集成,或者與一個因為維護長時間下線的系統(tǒng)集成浊仆。流處理的組件能夠保證轉換(處理)到達的數(shù)據(jù)客峭。
5幻林、Kafka與ActiveMQ對比
首先棚点,Active MQ與Kafka的相同點只有一個,就是都是消息中間件吝秕。其他沒有任何相同點洲劣。
5.1 consumer的不同
(1)AMQ消費完的消息會被清理掉
AMQ無論在standalone還是分布式的情況下备蚓,都會使用mysql作為存儲,多一個consumer線程去消費多個queue, 消費完的message會在mysql中被清理掉囱稽。
(2)AMQ的消費邏輯在Broker中完成
作為AMQ的consume clinet的多個consumer線程去消費queue郊尝,AMQ Broker會接收到這些consume線程,阻塞在這里战惊,有message來了就會進行消費流昏,沒有消息就會阻塞在這里。具體消費的邏輯也就是處理這些consumer線程都是AMQ Broker那面處理吞获。
kafka是message都存在partition下的segment文件里面况凉,有offsite偏移量去記錄那條消費了,哪條沒消費各拷。某個consumer group下consumer線程消費完就會刁绒,這個consumer group 下的這個consumer對應這個partition的offset+1,kafka并不會刪除這條已經被消費的message烤黍。其他的consumer group也可以再次消費這個message知市。在high level api中offset會自動或手動的提交到zookeeper上(如果是自動提交就有可能處理失敗或還沒處理完就提交offset+1了,容易出現(xiàn)下次再啟動consumer group的時候這條message就被漏了)速蕊,也可以使用low level api嫂丙,那么就是consumer程序中自己維護offset+1的邏輯。kafka中的message會定期刪除互例。
(3)Kafka有consumer group的概念奢入,AMQ沒有。
一個consumer group下有多個consumer媳叨,每個consumer都是一個線程腥光,consumer group是一個線程組。每個線程組consumer group之間互相獨立糊秆。同一個partition中的一個message只能被一個consumer group下的一個consumer線程消費武福,因為消費完了這個consumer group下的這個consumer對應的這個partition的offset就+1了,這個consumer group下的其他consumer還是這個consumer都不能在消費了痘番。 但是另外一個consumer group是完全獨立的捉片,可以設置一個from的offset位置,重新消費這個partition汞舱。
5.2 關于存儲結構
ActiveMQ的消息持久化機制有JDBC伍纫,AMQ,KahaDB和LevelDB
Kafka是文件存儲昂芜,每個topic有多個partition莹规,每個partition有多個replica副本(每個partition和replica都是均勻分配在不同的kafka broker上的)。每個partition由多個segment文件組成泌神。這些文件是順序存儲的良漱。因此讀取和寫入都是順序的,因此欢际,速度很快母市,省去了磁盤尋址的時間。
很多系統(tǒng)损趋、組件為了提升效率一般恨不得把所有數(shù)據(jù)都扔到內存里患久,然后定期flush到磁盤上;而Kafka決定直接使用頁面緩存;但是隨機寫入的效率很慢,為了維護彼此的關系順序還需要額外的操作和存儲浑槽,而線性的順序寫入可以避免磁盤尋址時間墙杯,實際上,線性寫入(linear write)的速度大約是300MB/秒括荡,但隨即寫入卻只有50k/秒高镐,其中的差別接近10000倍。這樣畸冲,Kafka以頁面緩存為中間的設計在保證效率的同時還提供了消息的持久化嫉髓,每個consumer自己維護當前讀取數(shù)據(jù)的offset(也可委托給zookeeper),以此可同時支持在線和離線的消費邑闲。
5.3 關于使用場景與吞吐量
ActiveMQ用于企業(yè)消息中間件算行,使得業(yè)務邏輯和前端處理邏輯解耦。AMQ的吞吐量不大苫耸,zuora的AMQ就是用作jms來使用州邢。AMQ吞吐量不夠,并且持久化message數(shù)據(jù)通過jdbc存在mysql褪子,寫入和讀取message性能太低量淌。而Kafka的吞吐量非常大骗村。
5.4 push/pull 模型
對于消費者而言有兩種方式從消息中間件獲取消息:
①Push方式:由消息中間件主動地將消息推送給消費者,采用Push方式呀枢,可以盡可能快地將消息發(fā)送給消費者;②Pull方式:由消費者主動向消息中間件拉取消息胚股,會增加消息的延遲,即消息到達消費者的時間有點長
但是裙秋,Push方式會有一個壞處:如果消費者的處理消息的能力很弱(一條消息需要很長的時間處理)琅拌,而消息中間件不斷地向消費者Push消息,消費者的緩沖區(qū)可能會溢出摘刑。
AMQ的Push消費
ActiveMQ使用PUSH模型进宝, 對于PUSH,broker很難控制數(shù)據(jù)發(fā)送給不同消費者的速度枷恕。AMQ Broker將message推送給對應的BET consumer党晋。ActiveMQ用prefetch limit 規(guī)定了一次可以向消費者Push(推送)多少條消息。當推送消息的數(shù)量到達了perfetch limit規(guī)定的數(shù)值時活尊,消費者還沒有向消息中間件返回ACK隶校,消息中間件將不再繼續(xù)向消費者推送消息。
AMQ的Pull消費
ActiveMQ prefetch limit 設置成0意味著什么?意味著此時蛹锰,消費者去輪詢消息中間件獲取消息深胳。不再是Push方式了,而是Pull方式了铜犬。即消費者主動去消息中間件拉取消息舞终。
那么,ActiveMQ中如何采用Push方式或者Pull方式呢?從是否阻塞來看癣猾,消費者有兩種方式獲取消息敛劝。同步方式和異步方式。
同步方式使用的是ActiveMQMessageConsumer的receive()方法纷宇。而異步方式則是采用消費者實現(xiàn)MessageListener接口夸盟,監(jiān)聽消息。使用同步方式receive()方法獲取消息時像捶,prefetch limit即可以設置為0上陕,也可以設置為大于0。
prefetch limit為零 意味著:“receive()方法將會首先發(fā)送一個PULL指令并阻塞拓春,直到broker端返回消息為止释簿,這也意味著消息只能逐個獲取(類似于Request<->Response)”。
prefetch limit 大于零 意味著:“broker端將會批量push給client 一定數(shù)量的消息(<= prefetch)硼莽,client端會把這些消息(unconsumedMessage)放入到本地的隊列中庶溶,只要此隊列有消息,那么receive方法將會立即返回,當一定量的消息ACK之后偏螺,broker端會繼續(xù)批量push消息給client端行疏。”
當使用MessageListener異步獲取消息時砖茸,prefetch limit必須大于零了隘擎。因為殴穴,prefetch limit 等于零 意味著消息中間件不會主動給消費者Push消息凉夯,而此時消費者又用MessageListener被動獲取消息(不會主動去輪詢消息)。這二者是矛盾的采幌。
Kafka只有Pull消費方式
Kafka使用PULL模型劲够,PULL可以由消費者自己控制,但是PULL模型可能造成消費者在沒有消息的情況下盲等休傍,這種情況下可以通過long polling機制緩解征绎,而對于幾乎每時每刻都有消息傳遞的流式系統(tǒng),這種影響可以忽略磨取。Kafka 的 consumer 是以pull的形式獲取消息數(shù)據(jù)的人柿。 pruducer push消息到kafka cluster ,consumer從集群中pull消息忙厌。
結語
為了幫助大家讓學習變得輕松凫岖、高效,給大家免費分享一大批資料逢净,幫助大家在成為大數(shù)據(jù)工程師哥放,乃至架構師的路上披荊斬棘。在這里給大家推薦一個大數(shù)據(jù)學習交流圈:
658558542? ?
歡迎大家進群交流討論爹土,學習交流甥雕,共同進步。
當真正開始學習的時候難免不知道從哪入手胀茵,導致效率低下影響繼續(xù)學習的信心社露。
但最重要的是不知道哪些技術需要重點掌握,學習時頻繁踩坑琼娘,最終浪費大量時間峭弟,所以有有效資源還是很有必要的。
最后祝福所有遇到瓶疾且不知道怎么辦的大數(shù)據(jù)程序員們轨奄,祝福大家在往后的工作與面試中一切順利孟害。