一隐圾、背景
項(xiàng)目開(kāi)發(fā)中免不了各模塊或系統(tǒng)之間進(jìn)行消息通信龙致,目前熱門(mén)的消息中間件有Redis、RabbitMQ刘莹、Kafka、RocketMQ等等焚刚。以上幾種組件中Redis在消息隊(duì)列方面表現(xiàn)還可以点弯,但是如果涉及發(fā)布訂閱功能,就不行了矿咕,最近項(xiàng)目就使用了redis的發(fā)布訂閱抢肛,每秒只能發(fā)出幾千條,雖然目前綽綽有余碳柱,但是瓶頸可以預(yù)期捡絮。
其余的幾種都是比較重量級(jí)的消息中間件,什么跨平臺(tái)莲镣、分布式福稳、集群、支持N種協(xié)議等等瑞侮,很高大全的圆,我們可能就只使用了其中1、2個(gè)功能半火。嚴(yán)格來(lái)說(shuō)越妈,項(xiàng)目中集成這幾種MQ的工作量是不小的,對(duì)于中小型系統(tǒng)來(lái)說(shuō)钮糖,可能維護(hù)MQ穩(wěn)定的工作量都比項(xiàng)目還大梅掠,難度也高,所有功能用全了的程序員恐怕不多店归。
從長(zhǎng)遠(yuǎn)考慮出發(fā)阎抒,選擇重量級(jí)MQ恐怕是板上釘釘?shù)氖拢琼?xiàng)目一開(kāi)始就上這幾種消痛,我覺(jué)得那也是欠缺考慮的挠蛉。如果項(xiàng)目根本不要求跨機(jī)器通信,那殺雞就不要用牛刀了肄满。比如谴古,你只是在模塊之間质涛、線程之間、進(jìn)程之間掰担,或者是在同一主機(jī)的各種不同系統(tǒng)之間汇陆,其實(shí)都可以不用重量級(jí)MQ。當(dāng)然你使用了也沒(méi)事带饱,看個(gè)人選擇毡代。
最近的項(xiàng)目有這么個(gè)場(chǎng)景,采集近所有底層設(shè)備勺疼,每個(gè)設(shè)備有點(diǎn)3000個(gè)教寂,總共20多萬(wàn)個(gè)點(diǎn)需要采集上來(lái)。剛開(kāi)始使用了Redis的發(fā)布訂閱执庐,但是程序毫無(wú)疑問(wèn)地掛了酪耕,根本帶不起來(lái);因?yàn)槌绦騿?dòng)時(shí)每個(gè)點(diǎn)的值都是從0變成N轨淌,就需要發(fā)消息出來(lái)训措,那一開(kāi)始消息是很多的欣福,redis根本處理不完,而且有很高頻率的超時(shí)斷線。以至于想換RabbitMQ蓖康,后來(lái)想想還是算了娇哆,因?yàn)槟菢釉黾禹?xiàng)目難度不說(shuō)劫狠,后期維護(hù)也是個(gè)難題夸研。
說(shuō)到底這是模塊之間的通信,是主程序(Winform)調(diào)用采集C++的DLL類(lèi)庫(kù)媳拴,發(fā)出消息后主程序和web端訂閱谷炸,在主程序與DLL這邊,在DLL方法上增加一個(gè)回調(diào)函數(shù)就搞定了禀挫,完全不用走消息中間件旬陡,Web端要哪些點(diǎn)的實(shí)時(shí)值就先ASK,先請(qǐng)求需要看哪些點(diǎn)语婴,如何在主程序這邊發(fā)布那些點(diǎn)的實(shí)時(shí)值消息描孟,這樣發(fā)布訂閱的數(shù)據(jù)量少了2、3個(gè)數(shù)量級(jí)不止砰左。
二匿醒、需求
針對(duì)上邊的業(yè)務(wù)場(chǎng)景,因?yàn)槭悄K之間的線程間通信缠导,這樣搞問(wèn)題不大廉羔;如果是進(jìn)程之間也要那么高頻率的通信,那就不好辦了僻造,我們不想使用重量級(jí)MQ憋他,又想高頻率傳輸消息孩饼,怎么辦呢?網(wǎng)上搜索了一番竹挡,貌似沒(méi)看到有成熟的速度又快镀娶、體量又小,部署又簡(jiǎn)單的中間件揪罕。
所以在下不才梯码,針對(duì)這個(gè)問(wèn)題拋磚引玉,做一個(gè)demo出來(lái)供大家討論一下好啰。
三轩娶、原理
應(yīng)題,就是使用內(nèi)存映射來(lái)做同一個(gè)機(jī)器下各種消息的通信框往,內(nèi)存映射比較適合做消息隊(duì)列鳄抒,因?yàn)橄⒖梢猿志没诒镜兀瑳](méi)讀完下次進(jìn)來(lái)還可以接著讀搅窿。
我預(yù)想是這樣設(shè)計(jì):
1、發(fā)布訂閱涉及到2個(gè)主要方法:Publish(string channel)隙券、Subscribe(string channel, Callback callback);
2男应、為每個(gè)channel生成一個(gè)文件:channel.db,默認(rèn)每個(gè)db可以存儲(chǔ)1000個(gè)同類(lèi)型的結(jié)構(gòu)體消息作為消息隊(duì)列,從頭部寫(xiě)入娱仔,尾部讀出沐飘。
每個(gè)db文件前面留一個(gè)索引區(qū)作為發(fā)布方與訂閱方各自的讀寫(xiě)位置。發(fā)布與訂閱前牲迫,先讀寫(xiě)這個(gè)索引區(qū)耐朴,因?yàn)槭且粚?duì)一讀寫(xiě),所以可以完美避開(kāi)讀寫(xiě)鎖盹憎,大大提高性能筛峭。
3、針對(duì)一對(duì)多需求陪每,單獨(dú)設(shè)計(jì)一個(gè)config.db文件存儲(chǔ)種channel與其相關(guān)訂閱信息影晓,大概原理圖如下:
4、解決讀寫(xiě)不加鎖問(wèn)題
我們看結(jié)構(gòu)體:SIndex有三個(gè)屬性
1) WriteIndex 記錄發(fā)布方(Pubish)最后寫(xiě)入數(shù)據(jù)的位置
2) ReadIndex 記錄訂閱方(Subscribe)最后讀取數(shù)據(jù)的位置
3) Over 表示W(wǎng)riteIndex已達(dá)到隊(duì)列最大值檩禾,再WriteIndex小于等于隊(duì)列最大值前挂签,讀寫(xiě)如下圖:
WriteIndex達(dá)到最大值后再往下寫(xiě)Over就要取反,如由False變?yōu)門(mén)rue盼产。WriteIndex=0
如果此時(shí)沒(méi)有訂閱方,那新消息就會(huì)被拋棄饵婆,因?yàn)橐褵o(wú)空間存儲(chǔ)。
4) 如果ReadIndex數(shù)值到隊(duì)列最大值戏售,Over也取反侨核,此時(shí)ReadIndex = 0,讀寫(xiě)又變成圖1所示
5) 讀寫(xiě)過(guò)程中并不存在互斥的情況草穆,只要管理好讀寫(xiě)位置,就可以避免加鎖芹关。
四续挟、接口設(shè)計(jì)
4.1、主要參數(shù)定義
#define FM_MAX_CHANNEL 100 // 暫定最多100個(gè)不同頻道
#define FM_MAX_SUBSCRIBE 3 // 暫定最多3個(gè)訂閱用戶
#define FM_MAX_ROWS 1000 // 暫定最多隊(duì)列大小為1000
#define FM_DISCONNECT_TIME 5000 // 超過(guò)5000毫秒無(wú)心跳更新視為訂閱斷開(kāi)
#define FM_KEEP_CONN_CYCLE 1000 // 保持心跳連接的時(shí)間周期
#define FM_NOTHING -1 // 空白,數(shù)組為0等
#define FM_WORD_SIZE sizeof(WORD) // WORD長(zhǎng)度
#define FM_INDEX_SIZE sizeof(SIndex) // SIndex長(zhǎng)度
4.2侥衬、結(jié)構(gòu)體
View Code
4.3诗祸、主要方法
// 發(fā)布信息
template<typename T>
int?Publish(const?char?*channel, T* data);
// 訂閱信息
template<typename T>
void Subscribe(const?char?*channel, SubscribeCallBackHandle callback);
五、代碼實(shí)現(xiàn)
5.1 轴总、FMDBManager直颅,主要管理內(nèi)存映射相關(guān)操作,因?yàn)槭亲x寫(xiě)位置不一樣怀樟,所以不需要加互斥量
View Code
5.2功偿、FMDBClient,內(nèi)存映射客戶端往堡,主要封裝Publish與Subscribe方法給前端調(diào)用械荷,屏蔽復(fù)雜性
View Code
請(qǐng)注意上邊控制讀寫(xiě)的2個(gè)方法
bool?CanWrite(SIndex *sIndex)
{
int?nextWriteIndex = sIndex->WriteIndex + 1;
if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
return nextWriteIndex != sIndex->ReadIndex;
}
bool?CanRead(SIndex *sIndex)
{
if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
}
我們可以分析一下,下一個(gè)WriteIndex值如果大于隊(duì)列最大值 WriteIndex置0虑灰,下一個(gè)WriteIndex數(shù)值如果不等于
正在讀的位置ReadIndex就能寫(xiě)吨瞎;如果WriteIndex沒(méi)有超出最大值,只要ReadIndex小于等于WriteIndex就能讀穆咐,
如果超出颤诀,就判斷ReadIndex大于WriteIndex就能讀。WriteIndex與ReadIndex數(shù)值在Publish與Subscribe中維護(hù)
5.3对湃、建立新線程獲取最新訂閱的客戶端信息崖叫,這個(gè)功能主要是動(dòng)態(tài)地像多個(gè)Subscribe端發(fā)生消息,比如訂閱發(fā)生在發(fā)布之后拍柒,
也應(yīng)該能收到消息心傀。
View Code
六、Demo測(cè)試
6.1拆讯、Producer.cpp
1 #include "pch.h"
2 #include "../FMDB.h"
3
4 using namespace std;
5
6 int main()
7 {
8 FMClient * client = new FMClient();
9
10 int times = 0;
11 int index = 0;
12 int total = 0;
13 UINT structSize = sizeof(SPerson);
14 DWORD dwStartTmp = GetTickCount();
15
16 while(TRUE)
17 {
18 times++;
19 if(index == 0)
20 {
21 dwStartTmp = GetTickCount();
22 }
23
24 SPerson sPerson = { 0 };
25 sPerson.Idx = index;
26 sprintf_s(sPerson.Name, "Name.%d", index);
27 sPerson.Age = index;
28
29 if(client->Publish("Person", &sPerson) == enumSuccess)
30 {
31 if(index % 2 == 0) total = total + sPerson.Idx;
32 else total = total - sPerson.Idx;
33
34 index++;
35 if(index % 50000 == 0)
36 printf_s("發(fā)送條數(shù): %d, 耗時(shí):%d \n", index, (GetTickCount() - dwStartTmp));
37 }
38
39 if(index >= 2000000) break;
40 }
41
42 printf_s("調(diào)用次數(shù): %d, 成功條數(shù): %d, 檢驗(yàn)值: %d \n", times, index, total);
43 system("pause");
44 }
6.2剧包、Consumer.cpp
1 #include "pch.h"
2 #include "../FMDB.h"
3
4 using namespace std;
5
6 int index = 0;
7 int total = 0;
8 DWORD dwStartTmp = GetTickCount();
9
10 int SubscribeCallback(void *msg)
11 {
12 SPerson * person = (SPerson *)msg;
13
14 if(index == 0)
15 {
16 dwStartTmp = GetTickCount();
17 }
18
19 if(index % 2 == 0) total = total + person->Idx;
20 else total = total - person->Idx;
21
22 index++;
23 if(index % 50000 == 0)
24 {
25 printf("接收條數(shù): %d, 耗時(shí):%d, Idx:%d, Name:%s, Age:%d\n",
26 index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
27 }
28
29 if(index >= 2000000)
30 {
31 return enumBreak;
32 }
33
34 return enumSuccess;
35 };
36
37 int main()
38 {
39 FMClient * client = new FMClient();
40 client->Subscribe("Person", SubscribeCallback);
41
42 printf("接收條數(shù): %d, 檢驗(yàn)值: %d \n", index, total);
43 system("pause");
44 }
6.3、運(yùn)行往果,測(cè)試用例中使用了向隊(duì)列發(fā)送200萬(wàn)條數(shù)據(jù)疆液,消息大小128字節(jié),訂閱端也是接受到200萬(wàn)數(shù)據(jù)后退出陕贮,并且打印檢驗(yàn)值堕油。
1) 檢驗(yàn)值計(jì)算:0+1-2+3-4+ --------- - 2000000 = -1000000,如果隊(duì)列運(yùn)行正常,那兩邊的檢驗(yàn)值應(yīng)該都是是 -1000000.
2) 每5萬(wàn)條打印一次日志掉缺,運(yùn)行情況如下
一對(duì)一方式運(yùn)行三次卜录,分別耗時(shí)(毫秒):2886、2979眶明、2871
3) 一對(duì)二方式運(yùn)行三次艰毒,分別耗時(shí)(毫秒):4087、4009搜囱、4040
4)運(yùn)行過(guò)程中產(chǎn)生的文件
6.4丑瞧、200萬(wàn)數(shù)據(jù)一對(duì)一耗時(shí)近3秒,貌似也不是非呈裰猓快是不是绊汹?但是這就是最大速度了嗎?
當(dāng)然不是哦扮宠,別忘了這是debug版本西乖,我們切換到release版本看速度會(huì)不會(huì)有所提升。
一對(duì)一運(yùn)行三次耗時(shí)分別是:1224坛增、1373获雕、1326
厲害了,
SPerson結(jié)構(gòu)體128字節(jié)收捣,每秒可以處理180萬(wàn)數(shù)據(jù)届案,當(dāng)然實(shí)際運(yùn)用肯定達(dá)不到,因?yàn)樘幚砥渌麡I(yè)務(wù)邏輯也要耗時(shí)間坏晦。