仿照Kafka,從零開始自實現(xiàn) MQ伯顶,實現(xiàn)了 Kafka 中 80% 的基礎(chǔ)功能囚灼。學(xué)習(xí) Kafka 的話如果只是看文章和源碼,可能不久就會忘了祭衩,還是自己實現(xiàn)一個「精簡版」的 Kafka 吧灶体,
實現(xiàn)功能概覽
1、基于內(nèi)存Queue實現(xiàn)生產(chǎn)和消費(fèi)API
- 1) 創(chuàng)建內(nèi)存Queue掐暮, 作為底層消息存儲
- 2) 定義Topic蝎抽, 支持多個Topic
- 3) 定義Producer, 支持Send消息
- 4) 定義Consumer路克, 支持Poll消息
2樟结、設(shè)計自定義Queue,實現(xiàn)消息確認(rèn)和消費(fèi)offset
- 1) 自定義內(nèi)存Message數(shù)組模擬Queue精算。
- 2) 使用指針記錄當(dāng)前消息寫入位置瓢宦。
- 3) 對于每個命名消費(fèi)者, 用指針記錄消費(fèi)位置
3灰羽、拆分broker和client(包括producer和consumer)
- 1) 將Queue保存到web server端
- 2) 設(shè)計消息讀寫API接口驮履, 確認(rèn)接口, 提交offset接口
- 3) producer和consumer通過httpclient訪問Queue
- 4) 實現(xiàn)消息確認(rèn)谦趣, offset提交
- 5) 實現(xiàn)consumer從offset增量拉取
項目目錄
項目設(shè)計及項目能力
Server
一疲吸、Topic
- 維護(hù)ArrayList用于模擬持久化消息「原因:消息需要隨機(jī)訪問」
- 設(shè)定消息隊列容量,達(dá)到容量時無法再生產(chǎn)消息
- 當(dāng)前消息的最大索引
二前鹅、ConsumerGroup
- 消費(fèi)者組由消費(fèi)者組名和topic名共同決定摘悴,即不同topic的消費(fèi)者組相互獨立,不會相互影響
- 需根據(jù)topic創(chuàng)建消費(fèi)者組舰绘,即消費(fèi)者組必須關(guān)聯(lián)topic
- 消費(fèi)者組創(chuàng)建后蹂喻,默認(rèn)從頭完整消費(fèi)關(guān)聯(lián)topic的所有消息
- 同一個消費(fèi)者組內(nèi)葱椭,各個消費(fèi)者總共消費(fèi)一次「最少消費(fèi)一次」所關(guān)聯(lián)topic的所有消息
三、broker
- 一個broker關(guān)聯(lián)一個ConsumerGroup列表和一個Topic列表
- 通過broker暴露的接口口四,可以展示關(guān)聯(lián)ConsumerGroup列表和Topic列表的概覽信息
- 通過broker暴露的接口孵运,可以向一個topic中生產(chǎn)消息
- 通過broker暴露的接口,可以根據(jù)消費(fèi)者組名和topic名消費(fèi)消息
注:本次僅實現(xiàn)單個broker蔓彩,broker后實現(xiàn)了topic和consumerGroup「消費(fèi)者組」治笨,細(xì)節(jié)結(jié)構(gòu)圖如下:
client
- 客戶端通過topic名生產(chǎn)消息
- 客戶端根據(jù)消費(fèi)者組名和topic名消費(fèi)消息
- 客戶端消費(fèi)消息時,可以同時獲得消費(fèi)者組的offset「偏移量」
- 客戶端消費(fèi)消息成功后赤嚼,需手動更新消費(fèi)者組的offset旷赖。若不更新,客戶端默認(rèn)無法消費(fèi)后面的消息更卒。
- 客戶端消費(fèi)消息失敗時等孵,不應(yīng)更新消費(fèi)者組的offset。此時客戶端可以重復(fù)消費(fèi)當(dāng)條消息蹂空。
- 多個客戶端可以使用同一個消費(fèi)者組消費(fèi)同一個topic俯萌;可以使用不同的消費(fèi)者組消費(fèi)同一個topic;可以使用不同的消費(fèi)者組消費(fèi)不同的topic
客戶端工作示意圖如下:
項目結(jié)構(gòu)
本項目共提供四個module:
bitkylin-mq-server
bitkylin-mq-api
bitkylin-mq-client-producer
bitkylin-mq-client-consumer
各module的介紹如下:
1. bitkylin-mq-server
提供MQ服務(wù)端上枕,提供broker以及其關(guān)聯(lián)的ConsumerGroup和Topic等咐熙,主要實現(xiàn)如下功能:
- 展示MQ概覽信息,包括topic和ConsumerGroup的詳細(xì)信息
- 創(chuàng)建消費(fèi)者組辨萍,創(chuàng)建消費(fèi)者組后糖声,即可使用該消費(fèi)者組消費(fèi)消息
- 生產(chǎn)消息,將消息發(fā)送至指定topic
- 基于指定消費(fèi)者組消費(fèi)消息分瘦,消費(fèi)消息但不更新關(guān)聯(lián)消費(fèi)者組的offset
- 基于指定消費(fèi)者組消費(fèi)消息蘸泻,消費(fèi)消息且自動更新關(guān)聯(lián)消費(fèi)者組的offset
- 手動更新指定消費(fèi)者組的偏移量
2. bitkylin-mq-api
提供供客戶端使用的api,通過feignClient形式提供嘲玫,客戶端可直接使用悦施,執(zhí)行RPC,當(dāng)前實現(xiàn)如下功能:
- 發(fā)送消息至指定topic
- 訂閱指定topic的消息去团。自動創(chuàng)建消費(fèi)者組抡诞,使用觀察者模式輪詢消息并消費(fèi)。
3. bitkylin-mq-client-producer
消息生產(chǎn)客戶端土陪,通過feign-api生產(chǎn)消息昼汗,當(dāng)前實現(xiàn)如下演示功能:
隨機(jī)向topic名為「topic-1」和「topic-2」的topic中發(fā)送消息,每隔3秒發(fā)送一次消息鬼雀。
4. bitkylin-mq-client-consumer
消息消費(fèi)客戶端顷窒,通過feign-api消費(fèi)消息,當(dāng)前實現(xiàn)如下演示功能:
- 創(chuàng)建消費(fèi)者組「spring-group-1」訂閱「topic-1」,并打印訂閱的消息鞋吉。
- 創(chuàng)建消費(fèi)者組「spring-group-2」訂閱「topic-2」鸦做,并打印訂閱的消息。
代碼演示
- 運(yùn)行module「bitkylin-mq-server」谓着,啟動MQ的broker泼诱,啟動消息服務(wù)。
- 運(yùn)行module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」赊锚,開啟消息訂閱演示任務(wù)和消息發(fā)送演示任務(wù)治筒。
- 此時可通過「bitkylin-mq-client-consumer」的控制臺,看到消息不斷被消費(fèi)舷蒲。
2021-01-24 01:55:58.008 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:1
2021-01-24 01:56:00.996 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:2
2021-01-24 01:56:04.000 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:3
2021-01-24 01:56:07.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:4
2021-01-24 01:56:10.015 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:5
2021-01-24 01:56:13.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:6
2021-01-24 01:56:16.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:7
2021-01-24 01:56:19.006 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:8
2021-01-24 01:56:21.997 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:9
2021-01-24 01:56:24.994 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:10
2021-01-24 01:56:28.002 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:11
2021-01-24 01:56:30.991 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-1: topic-1-msg:12
2021-01-24 01:56:34.014 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:13
2021-01-24 01:56:37.010 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:14
2021-01-24 01:56:40.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到消息:spring-group-2: topic-2-msg:15
- 打開postman矢炼,發(fā)送如下請求創(chuàng)建專用于postman的消費(fèi)者組:
POST http://localhost:8080/mq/broker/consumer-group/create
{
"groupName": "postman-group-1",
"topicName": "topic-1"
}
- 發(fā)送如下請求即可消費(fèi)消息,且自動確認(rèn)「無需手動更新消費(fèi)者組的offset」
POST http://localhost:8080/mq/broker/message/simple-pull
{
"groupName": "postman-group-1",
"topicName": "topic-1"
}
可以發(fā)現(xiàn)阿纤,postman可以獨立消費(fèi)指定topic的消息,不受Spring程序消費(fèi)的影響夷陋。當(dāng)然欠拾,postman可以直接使用Spring程序一致的消費(fèi)者組,以共同消費(fèi)消息骗绕。
此時查詢MQ的概覽信息:
GET http://localhost:8080/mq/broker/overview
響應(yīng):
{
"groupList": [
{
"groupName": "spring-group-1",
"topic": {
"name": "topic-1",
"capacity": 1000,
"maxIndex": 14
},
"offset": 15
},
{
"groupName": "postman-group-1",
"topic": {
"name": "topic-1",
"capacity": 1000,
"maxIndex": 14
},
"offset": 5
},
{
"groupName": "spring-group-2",
"topic": {
"name": "topic-2",
"capacity": 1000,
"maxIndex": 17
},
"offset": 18
}
]
}
局限性
- 每個topic的隊列容量是固定的藐窄,隊列滿后拒絕生產(chǎn)消息,暫不支持清理歷史消息酬土。
- 消息消費(fèi)未加鎖荆忍,如果一個消費(fèi)者組的多個消費(fèi)者高并發(fā)消費(fèi)消息,可能導(dǎo)致同一條消息被消費(fèi)多次撤缴。