使用內(nèi)存映射開(kāi)發(fā)高性能進(jìn)程間消息通信組件

一隐圾、背景

項(xiàng)目開(kāi)發(fā)中免不了各模塊或系統(tǒng)之間進(jìn)行消息通信龙致,目前熱門(mén)的消息中間件有Redis、RabbitMQ刘莹、Kafka、RocketMQ等等焚刚。以上幾種組件中Redis在消息隊(duì)列方面表現(xiàn)還可以点弯,但是如果涉及發(fā)布訂閱功能,就不行了矿咕,最近項(xiàng)目就使用了redis的發(fā)布訂閱抢肛,每秒只能發(fā)出幾千條,雖然目前綽綽有余碳柱,但是瓶頸可以預(yù)期捡絮。

其余的幾種都是比較重量級(jí)的消息中間件,什么跨平臺(tái)莲镣、分布式福稳、集群、支持N種協(xié)議等等瑞侮,很高大全的圆,我們可能就只使用了其中1、2個(gè)功能半火。嚴(yán)格來(lái)說(shuō)越妈,項(xiàng)目中集成這幾種MQ的工作量是不小的,對(duì)于中小型系統(tǒng)來(lái)說(shuō)钮糖,可能維護(hù)MQ穩(wěn)定的工作量都比項(xiàng)目還大梅掠,難度也高,所有功能用全了的程序員恐怕不多店归。

從長(zhǎng)遠(yuǎn)考慮出發(fā)阎抒,選擇重量級(jí)MQ恐怕是板上釘釘?shù)氖拢琼?xiàng)目一開(kāi)始就上這幾種消痛,我覺(jué)得那也是欠缺考慮的挠蛉。如果項(xiàng)目根本不要求跨機(jī)器通信,那殺雞就不要用牛刀了肄满。比如谴古,你只是在模塊之間质涛、線程之間、進(jìn)程之間掰担,或者是在同一主機(jī)的各種不同系統(tǒng)之間汇陆,其實(shí)都可以不用重量級(jí)MQ。當(dāng)然你使用了也沒(méi)事带饱,看個(gè)人選擇毡代。

最近的項(xiàng)目有這么個(gè)場(chǎng)景,采集近所有底層設(shè)備勺疼,每個(gè)設(shè)備有點(diǎn)3000個(gè)教寂,總共20多萬(wàn)個(gè)點(diǎn)需要采集上來(lái)。剛開(kāi)始使用了Redis的發(fā)布訂閱执庐,但是程序毫無(wú)疑問(wèn)地掛了酪耕,根本帶不起來(lái);因?yàn)槌绦騿?dòng)時(shí)每個(gè)點(diǎn)的值都是從0變成N轨淌,就需要發(fā)消息出來(lái)训措,那一開(kāi)始消息是很多的欣福,redis根本處理不完,而且有很高頻率的超時(shí)斷線。以至于想換RabbitMQ蓖康,后來(lái)想想還是算了娇哆,因?yàn)槟菢釉黾禹?xiàng)目難度不說(shuō)劫狠,后期維護(hù)也是個(gè)難題夸研。

說(shuō)到底這是模塊之間的通信,是主程序(Winform)調(diào)用采集C++的DLL類(lèi)庫(kù)媳拴,發(fā)出消息后主程序和web端訂閱谷炸,在主程序與DLL這邊,在DLL方法上增加一個(gè)回調(diào)函數(shù)就搞定了禀挫,完全不用走消息中間件旬陡,Web端要哪些點(diǎn)的實(shí)時(shí)值就先ASK,先請(qǐng)求需要看哪些點(diǎn)语婴,如何在主程序這邊發(fā)布那些點(diǎn)的實(shí)時(shí)值消息描孟,這樣發(fā)布訂閱的數(shù)據(jù)量少了2、3個(gè)數(shù)量級(jí)不止砰左。

二匿醒、需求

針對(duì)上邊的業(yè)務(wù)場(chǎng)景,因?yàn)槭悄K之間的線程間通信缠导,這樣搞問(wèn)題不大廉羔;如果是進(jìn)程之間也要那么高頻率的通信,那就不好辦了僻造,我們不想使用重量級(jí)MQ憋他,又想高頻率傳輸消息孩饼,怎么辦呢?網(wǎng)上搜索了一番竹挡,貌似沒(méi)看到有成熟的速度又快镀娶、體量又小,部署又簡(jiǎn)單的中間件揪罕。

所以在下不才梯码,針對(duì)這個(gè)問(wèn)題拋磚引玉,做一個(gè)demo出來(lái)供大家討論一下好啰。

三轩娶、原理

應(yīng)題,就是使用內(nèi)存映射來(lái)做同一個(gè)機(jī)器下各種消息的通信框往,內(nèi)存映射比較適合做消息隊(duì)列鳄抒,因?yàn)橄⒖梢猿志没诒镜兀瑳](méi)讀完下次進(jìn)來(lái)還可以接著讀搅窿。

我預(yù)想是這樣設(shè)計(jì):

1、發(fā)布訂閱涉及到2個(gè)主要方法:Publish(string channel)隙券、Subscribe(string channel, Callback callback);

2男应、為每個(gè)channel生成一個(gè)文件:channel.db,默認(rèn)每個(gè)db可以存儲(chǔ)1000個(gè)同類(lèi)型的結(jié)構(gòu)體消息作為消息隊(duì)列,從頭部寫(xiě)入娱仔,尾部讀出沐飘。

每個(gè)db文件前面留一個(gè)索引區(qū)作為發(fā)布方與訂閱方各自的讀寫(xiě)位置。發(fā)布與訂閱前牲迫,先讀寫(xiě)這個(gè)索引區(qū)耐朴,因?yàn)槭且粚?duì)一讀寫(xiě),所以可以完美避開(kāi)讀寫(xiě)鎖盹憎,大大提高性能筛峭。

3、針對(duì)一對(duì)多需求陪每,單獨(dú)設(shè)計(jì)一個(gè)config.db文件存儲(chǔ)種channel與其相關(guān)訂閱信息影晓,大概原理圖如下:

4、解決讀寫(xiě)不加鎖問(wèn)題

我們看結(jié)構(gòu)體:SIndex有三個(gè)屬性

1) WriteIndex 記錄發(fā)布方(Pubish)最后寫(xiě)入數(shù)據(jù)的位置

2) ReadIndex 記錄訂閱方(Subscribe)最后讀取數(shù)據(jù)的位置

3) Over 表示W(wǎng)riteIndex已達(dá)到隊(duì)列最大值檩禾,再WriteIndex小于等于隊(duì)列最大值前挂签,讀寫(xiě)如下圖:

WriteIndex達(dá)到最大值后再往下寫(xiě)Over就要取反,如由False變?yōu)門(mén)rue盼产。WriteIndex=0

如果此時(shí)沒(méi)有訂閱方,那新消息就會(huì)被拋棄饵婆,因?yàn)橐褵o(wú)空間存儲(chǔ)。

4) 如果ReadIndex數(shù)值到隊(duì)列最大值戏售,Over也取反侨核,此時(shí)ReadIndex = 0,讀寫(xiě)又變成圖1所示

5) 讀寫(xiě)過(guò)程中并不存在互斥的情況草穆,只要管理好讀寫(xiě)位置,就可以避免加鎖芹关。

四续挟、接口設(shè)計(jì)

4.1、主要參數(shù)定義

#define FM_MAX_CHANNEL 100 // 暫定最多100個(gè)不同頻道

#define FM_MAX_SUBSCRIBE 3 // 暫定最多3個(gè)訂閱用戶

#define FM_MAX_ROWS 1000 // 暫定最多隊(duì)列大小為1000

#define FM_DISCONNECT_TIME 5000 // 超過(guò)5000毫秒無(wú)心跳更新視為訂閱斷開(kāi)

#define FM_KEEP_CONN_CYCLE 1000 // 保持心跳連接的時(shí)間周期

#define FM_NOTHING -1 // 空白,數(shù)組為0等

#define FM_WORD_SIZE sizeof(WORD) // WORD長(zhǎng)度

#define FM_INDEX_SIZE sizeof(SIndex) // SIndex長(zhǎng)度

4.2侥衬、結(jié)構(gòu)體

View Code

4.3诗祸、主要方法

// 發(fā)布信息

template<typename T>

int?Publish(const?char?*channel, T* data);

// 訂閱信息

template<typename T>

void Subscribe(const?char?*channel, SubscribeCallBackHandle callback);

五、代碼實(shí)現(xiàn)

5.1 轴总、FMDBManager直颅,主要管理內(nèi)存映射相關(guān)操作,因?yàn)槭亲x寫(xiě)位置不一樣怀樟,所以不需要加互斥量

View Code

5.2功偿、FMDBClient,內(nèi)存映射客戶端往堡,主要封裝Publish與Subscribe方法給前端調(diào)用械荷,屏蔽復(fù)雜性

View Code

請(qǐng)注意上邊控制讀寫(xiě)的2個(gè)方法

bool?CanWrite(SIndex *sIndex)

{

int?nextWriteIndex = sIndex->WriteIndex + 1;

if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;

return nextWriteIndex != sIndex->ReadIndex;

}

bool?CanRead(SIndex *sIndex)

{

if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;

else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;

}

我們可以分析一下,下一個(gè)WriteIndex值如果大于隊(duì)列最大值 WriteIndex置0虑灰,下一個(gè)WriteIndex數(shù)值如果不等于

正在讀的位置ReadIndex就能寫(xiě)吨瞎;如果WriteIndex沒(méi)有超出最大值,只要ReadIndex小于等于WriteIndex就能讀穆咐,

如果超出颤诀,就判斷ReadIndex大于WriteIndex就能讀。WriteIndex與ReadIndex數(shù)值在Publish與Subscribe中維護(hù)

5.3对湃、建立新線程獲取最新訂閱的客戶端信息崖叫,這個(gè)功能主要是動(dòng)態(tài)地像多個(gè)Subscribe端發(fā)生消息,比如訂閱發(fā)生在發(fā)布之后拍柒,

也應(yīng)該能收到消息心傀。

View Code

六、Demo測(cè)試

6.1拆讯、Producer.cpp

1 #include "pch.h"

2 #include "../FMDB.h"

3

4 using namespace std;

5

6 int main()

7 {

8 FMClient * client = new FMClient();

9

10 int times = 0;

11 int index = 0;

12 int total = 0;

13 UINT structSize = sizeof(SPerson);

14 DWORD dwStartTmp = GetTickCount();

15

16 while(TRUE)

17 {

18 times++;

19 if(index == 0)

20 {

21 dwStartTmp = GetTickCount();

22 }

23

24 SPerson sPerson = { 0 };

25 sPerson.Idx = index;

26 sprintf_s(sPerson.Name, "Name.%d", index);

27 sPerson.Age = index;

28

29 if(client->Publish("Person", &sPerson) == enumSuccess)

30 {

31 if(index % 2 == 0) total = total + sPerson.Idx;

32 else total = total - sPerson.Idx;

33

34 index++;

35 if(index % 50000 == 0)

36 printf_s("發(fā)送條數(shù): %d, 耗時(shí):%d \n", index, (GetTickCount() - dwStartTmp));

37 }

38

39 if(index >= 2000000) break;

40 }

41

42 printf_s("調(diào)用次數(shù): %d, 成功條數(shù): %d, 檢驗(yàn)值: %d \n", times, index, total);

43 system("pause");

44 }

6.2剧包、Consumer.cpp

1 #include "pch.h"

2 #include "../FMDB.h"

3

4 using namespace std;

5

6 int index = 0;

7 int total = 0;

8 DWORD dwStartTmp = GetTickCount();

9

10 int SubscribeCallback(void *msg)

11 {

12 SPerson * person = (SPerson *)msg;

13

14 if(index == 0)

15 {

16 dwStartTmp = GetTickCount();

17 }

18

19 if(index % 2 == 0) total = total + person->Idx;

20 else total = total - person->Idx;

21

22 index++;

23 if(index % 50000 == 0)

24 {

25 printf("接收條數(shù): %d, 耗時(shí):%d, Idx:%d, Name:%s, Age:%d\n",

26 index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);

27 }

28

29 if(index >= 2000000)

30 {

31 return enumBreak;

32 }

33

34 return enumSuccess;

35 };

36

37 int main()

38 {

39 FMClient * client = new FMClient();

40 client->Subscribe("Person", SubscribeCallback);

41

42 printf("接收條數(shù): %d, 檢驗(yàn)值: %d \n", index, total);

43 system("pause");

44 }

6.3、運(yùn)行往果,測(cè)試用例中使用了向隊(duì)列發(fā)送200萬(wàn)條數(shù)據(jù)疆液,消息大小128字節(jié),訂閱端也是接受到200萬(wàn)數(shù)據(jù)后退出陕贮,并且打印檢驗(yàn)值堕油。

1) 檢驗(yàn)值計(jì)算:0+1-2+3-4+ --------- - 2000000 = -1000000,如果隊(duì)列運(yùn)行正常,那兩邊的檢驗(yàn)值應(yīng)該都是是 -1000000.

2) 每5萬(wàn)條打印一次日志掉缺,運(yùn)行情況如下

一對(duì)一方式運(yùn)行三次卜录,分別耗時(shí)(毫秒):2886、2979眶明、2871

3) 一對(duì)二方式運(yùn)行三次艰毒,分別耗時(shí)(毫秒):4087、4009搜囱、4040

4)運(yùn)行過(guò)程中產(chǎn)生的文件

6.4丑瞧、200萬(wàn)數(shù)據(jù)一對(duì)一耗時(shí)近3秒,貌似也不是非呈裰猓快是不是绊汹?但是這就是最大速度了嗎?

當(dāng)然不是哦扮宠,別忘了這是debug版本西乖,我們切換到release版本看速度會(huì)不會(huì)有所提升。

一對(duì)一運(yùn)行三次耗時(shí)分別是:1224坛增、1373获雕、1326

厲害了,

SPerson結(jié)構(gòu)體128字節(jié)收捣,每秒可以處理180萬(wàn)數(shù)據(jù)届案,當(dāng)然實(shí)際運(yùn)用肯定達(dá)不到,因?yàn)樘幚砥渌麡I(yè)務(wù)邏輯也要耗時(shí)間坏晦。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末萝玷,一起剝皮案震驚了整個(gè)濱河市嫁乘,隨后出現(xiàn)的幾起案子昆婿,更是在濱河造成了極大的恐慌,老刑警劉巖蜓斧,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仓蛆,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡挎春,警方通過(guò)查閱死者的電腦和手機(jī)看疙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)直奋,“玉大人能庆,你說(shuō)我怎么就攤上這事〗畔撸” “怎么了搁胆?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我渠旁,道長(zhǎng)攀例,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任顾腊,我火速辦了婚禮粤铭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘杂靶。我一直安慰自己梆惯,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布伪煤。 她就那樣靜靜地躺著加袋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抱既。 梳的紋絲不亂的頭發(fā)上职烧,一...
    開(kāi)封第一講書(shū)人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音防泵,去河邊找鬼蚀之。 笑死,一個(gè)胖子當(dāng)著我的面吹牛捷泞,可吹牛的內(nèi)容都是我干的足删。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼锁右,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼失受!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起咏瑟,我...
    開(kāi)封第一講書(shū)人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤拂到,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后码泞,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體兄旬,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年余寥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了领铐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宋舷,死狀恐怖绪撵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情祝蝠,我是刑警寧澤音诈,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布汹来,位于F島的核電站,受9級(jí)特大地震影響改艇,放射性物質(zhì)發(fā)生泄漏收班。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一谒兄、第九天 我趴在偏房一處隱蔽的房頂上張望摔桦。 院中可真熱鬧,春花似錦承疲、人聲如沸邻耕。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)兄世。三九已至,卻和暖如春啊研,著一層夾襖步出監(jiān)牢的瞬間御滩,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工党远, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留削解,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓沟娱,卻偏偏與公主長(zhǎng)得像氛驮,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子济似,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容