消息隊列的引入发魄,什么時候使用MQ?
MQ(Message Queue)俩垃,一種跨進程的通信機制励幼,用于上下游傳遞消息。能達到解耦口柳、異步苹粟、消峰限流的作用。舉幾個對應的適用例子跃闹。
解耦
1. 比如定時任務依賴的場景嵌削,晚上需要跑一些定時統(tǒng)計任務毛好,任務2依賴任務1的結果,任務3依賴任務2的接口苛秕,一般開發(fā)人員會在每個定時任務之間肌访,預留一些時間buffer處理。但是艇劫,當某一天其中一個任務超出常規(guī)時間吼驶,任務就跑亂套了,第二天肯定就有人來找到你了港准。這個場景就很適合用MQ去解耦旨剥,當任務1完成后,通知到任務2浅缸,任務二通過訂閱消息去實現觸發(fā)轨帜。
2. 比如統(tǒng)一充值網關服務,某個產品接入統(tǒng)一充值服務的微信渠道充值衩椒。充值成功后蚌父,微信服務端會通知到統(tǒng)一充值服務端,因為這些是異步的調用毛萌,且是公網接口苟弛,時間會相對長一些,業(yè)務上產品接入方會有需求想知道阁将,到賬的結果膏秫。這個到賬通知就很適合MQ去實現。如果做盅,這里由充值網關服務調用上游來通知結果的話缤削,每次新增調用方,充值網關服務都需要修改代碼發(fā)布吹榴,依賴反轉了亭敢。充值網關+MQ的方式,業(yè)務調用方去訂閱消息實現解耦图筹。
異步
1.場景上游不關心執(zhí)行結果帅刀。異步,rpc框架異步調用也可以远剩。區(qū)別就是MQ消息會落地扣溺,并且消息中間件都會有HA的涉及,能保證消息語義的實現(至少一次瓜晤、至少一次娇妓、至多一次)。rpc異步請求本身也會有本地內存隊列活鹰,所以數據不是要求很重要的場景哈恰,差不多只估。只是在工程上,有一點着绷,使用rpc處理這種業(yè)務蛔钙,經驗上要單拉出一個服務去調用下游,因為依賴倒置了荠医。每增加接入方或者業(yè)務有修改吁脱,都要提供服務的工程去修改發(fā)布(有經驗的同學應該深有體會),作為基礎服務的話彬向,應該減少這種依賴倒置的發(fā)布兼贡,獨立出來一個服務處理的話,會減少風險娃胆。
業(yè)務的話遍希,處理推薦日志,處理app埋點的統(tǒng)計數據
消峰限流
這個應該是MQ另一個最主要的作用之一里烦。做活動訪問量陡增凿蒜,下游處理不過來的時候,使用消息隊列達到限流的作用胁黑。工程上有個C10K問題(雖然現在已經有現在我們早已經突破了C10K這個瓶頸废封,具體的思路就是通過單個進程或線程服務于多個客戶端請求,通過異步編程和事件觸發(fā)機制替換輪訓丧蘸,IO 采用非阻塞的方式(reactor模型)漂洋,減少不必要的性能損耗,等等)力喷。但是這個要求下游的服務包括存儲和依賴刽漂,都要做到這點,哪個環(huán)節(jié)弱都不行冗懦。可能還會浪費資源仇祭,平時的量用不著那么多服務器披蕉。
相應的,那些調用方需要被調用方立刻返回結果的需求乌奇,就不適用于MQ没讲,需要根據業(yè)務去考慮,脫離了業(yè)務去引入新技術就是耍流氓礁苗。
業(yè)務上有對消息組件的需求后爬凑,市面上陸續(xù)出現了很多成熟的消息中間件
IBM webSphere MQ、Apache ActiveMQ试伙、LinkedIn Kafka嘁信、阿里 rocketMQ于样、
java社區(qū)肯定要跟著一起玩的,社區(qū)也定義jms規(guī)范(JMS規(guī)范百度詞條)maid潘靖。這些消息組件穿剖,前兩個是基于jms規(guī)范實現的,后兩個沒有卦溢,rocketMQ開始是kafka的java版實現糊余,現在已經從Apache社區(qū)正式畢業(yè)(17-0925),成為Apache頂級項目单寂。在原有設計的基礎上贬芥,比如提供事務消息等一些功能,kafka0.11版開始也提供可事務的支持宣决,還沒發(fā)布太久蘸劈,效果還有待觀察;但是都用別的方式實現了jms定義的一些功能疲扎,比如發(fā)布訂閱昵时,點對點通信。
如何設計實現一個消息隊列:
實現一個消息組件不可避免的要處理如下問題(消息中間件精要設計):
1.通信協(xié)議的選擇
2.消息的分布式存儲設計椒丧,關系型數據庫 磁盤 kv存儲
3.如何分布式設計生產者和消費者保證高吞吐
4.如何實現順序消費
5.如何保證HA壹甥,在高吞吐和HA上做平衡
6.事務消息的支持
7.push還是pull
8.單播、廣播壶熏、訂閱發(fā)布的實現
9.性能的優(yōu)化句柠,同步異步、批量等
以上問題有交叉棒假,以這些問題出發(fā)溯职,看看kafka是如何設計實現的。
一.kafka基本概念的介紹
1.topics 主題帽哑,隊列的邏輯概念谜酒。可以有多個producer生產消息往topic發(fā)送妻枕,有個consumer從topic消費消息僻族。
topic有partition組成,partition個數有server.property配置文件指定屡谐,partition的服務器的分布述么,會由算法均分到不同的服務器上。每個partition上的消息是有序的不可變的愕掏。
生產的消息度秘,是以log的文件格式存儲在服務器端。具體格式下邊會單獨再說下饵撑。每條消息有一個位移信息offset剑梳,生產者在隊列尾添加消息唆貌,offset+1。生產者消費消息的進度位置也是用offset標記阻荒,消費后挠锥,消息是不刪除的,所以可以指定offset重新消費侨赡。offset的值的提交存儲是放到客戶端完成的蓖租,所以服務端是無消費狀態(tài)的。
offset的存儲位置老版本是放到zookeeper上羊壹,考慮到集群topic很多的話蓖宦,zookeeper的讀寫操作很頻繁,zookeeper是不適合有大量寫入操作的油猫。所以新版本把offset存儲到服務器端一個單獨的topic下__consumer-offsets恼蓬。這個topic默認50個分區(qū)
2.producer
生產者往指定的topic發(fā)消息插爹,生產者發(fā)消息到不同的partition上算法有根據key值hash螺捐、輪訓和新版本最新的算法吆录,也可以自己實現指定partition的策略。
生產者如何指定發(fā)送的partition毡证?如何指定發(fā)送的策略电爹?
3.consumer and consumer group
1.消費者消費topic丐箩,必須指定consumer group,其中配置文件group.id唯一標識一個consumer group
2.topic上可以有多個consumer group去訂閱恤煞,kafka使用這個概念實現JMS規(guī)范里邊發(fā)布訂閱功能屎勘,即不同的接入方想實現訂閱功能,只需要指定不同的consumer group即可居扒。
3.consumer group上可以有多個消費者概漱,并且一個partition只有consumer group的其中一個consumer在消費。一個consumer可以消費多個partition
看到后會想到的問題:consumer group里的consumer如何分配消費partition的關系喜喂?
4.replica 副本
HA設計的概念瓤摧,副本對應的是partition的概念。每個partition的副本個數夜惭,在配置文件有指定姻灶。如果有3機器铛绰,3副本能保證(3-1)個server fail的情況下诈茧,不丟消息。見kafka的HA設計
問題:kafka是如何利用replica概念設計HA的捂掰?
二.服務端消息log存儲設計
選擇的是磁盤文件的持久化方式敢会,沒有提供不持久化的選擇曾沈。
可以看到kafka10-topic-20170924有三個文件夾,每個文件夾代表一個分區(qū)鸥昏,每個分區(qū)下的存儲由segment組成塞俱,一個segment包括index索引文件,時間戳index索引文件和實際存儲數據的log文件組成吏垮。
每個log文件的大小默認是10M(可配置)障涯,超過10M,新建文件膳汪,文件的名字是第一個消息的offset值唯蝶。
為什么這樣設計?好處是什么遗嗽?
索引文件的結構是一個map粘我,key是當前segment的offset的偏移量,從0開始痹换。value是對應的log文件中消息開始位置的實際物理位置偏移量征字。索引文index file采取稀疏索引存儲方式,它減少索引文件大小娇豫,通過mmap可以直接內存操作匙姜,稀疏索引為數據文件的每個對應message設置一個元數據指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間锤躁。
舉個??搁料,消息的查找過程,比如offset的值是368772系羞,如何查找消費對應消息內容郭计。
1.根據offset找到所在的segment,根據二分查找椒振,找到消息所在的log文件0000000000000368769.log和索引文件0000000000000368769.index
2.計算下差368772-368769=3昭伸,在索引文件中也是二分查找,定位到是<3,497>記錄澎迎,即對應的物理位置是497庐杨,從而找到消息
3.根據物理位置497在0000000000000368769.log文件找到消息。
問題:如果稀疏索引沒有找到怎么辦夹供?
如果是索引文件沒有命中怎么辦灵份。這就要繼續(xù)在看下每條log的消息格式:
字段解釋:
offset:8bytes長度的偏移量,唯一表示一條消息
message length :消息長度
crc:CRC32校驗消息
magic value:標示是否允許格式化改變
attributes:bit 0~2:壓縮方法哮洽,0沒有壓縮 1gzip 2 snappy 3 lz4填渠;bit 3:時間戳類型,0創(chuàng)建時間 1日志的追加時間;bit 4~7預留
timestamp:當時magic value的值是1是氛什,有效莺葫。表示時間戳。這個新版本引入的字段
key length:消息key的長度
key:key的值
value length:消息內容的長度
value:消息的具體內容
所以沒有索引到的查找枪眉,就先根據二分找到最近的一條內容捺檬,然后根據每條消息的格式,知道消息的長度贸铜。依次計算出下一條消息的位置堡纬,直到找到offset相等的那條記錄。
服務器上的日志文件是二進制的蒿秦,kafka提供很多方便的腳本工具隐轩,可以使用kafka的工具類DumpLogSegments類解析查看一下結構如圖
轉換腳本指令,要加上--print-data-log參數渤早,不加的話职车,默認不輸出key值和value值。其中圖中payload是value值鹊杖。
./kafka-run-class.sh? kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka-logs/kafka10-topic-20170924-0/000000000000000000.log
同樣的看下offset索引文件和時間戳索引文件悴灵,嗯,跟官網文檔描述的一樣骂蓖,就放心了积瞒。