題目
題目內(nèi)容
Topic類似于水壩(蓄積功能痪枫,消峰填谷之利器)贴妻,Queue類似于水渠帚呼;每當新建一個Queue的時候,可以選擇綁定到幾個Topic稠歉,類似于水渠從水壩引水; 每個Topic可以被任意多個Queue綁定汇陆,這點與現(xiàn)實生活不太一樣怒炸,因為數(shù)據(jù)可以多次拷貝; 在發(fā)送的時候毡代,可以選擇發(fā)送到Topic阅羹,也可以選擇直接發(fā)送到Queue勺疼;直接發(fā)送到Queue的數(shù)據(jù)只能被對應Queue消費,不能被其他Queue讀取到捏鱼; 在消費的時候恢口,除了要讀取綁定的Topic的數(shù)據(jù),還要去取直接發(fā)送到該Queue的數(shù)據(jù)穷躁。
程序目標
實現(xiàn)以下接口:
- Producer的createBytesMessageToTopic(topic, body)
- Producer的createBytesMessageToQueue(queue, body)
- Producer的send(message)
- PullConsumer的attachQueue(queue, topics)
- PullConsumer的poll()
程序校驗邏輯
- 10~20個線程(位于同一進程中)各自獨立調(diào)用Producer發(fā)送消息(每個線程啟動一個Producer,每條消息隨機發(fā)送到某個Topic或者Queue)因妇,持續(xù)時間T1问潭,請注意把消息數(shù)據(jù)寫入磁盤中
- 強行kill Producer進程,未寫入磁盤的消息都會丟失
- 10~20個線程(位于同一進程中)獨立調(diào)用Consumer收取消息(每個線程啟動一個Consumer婚被,attach到指定的Queue狡忙,不同的Consumer不會attach同一個Queue),驗證消息順序準確性址芯,可靠性灾茁,消費持續(xù)的時間為T2,消費到的總消息數(shù)目為N
- 以N/(t1+t2)來衡量性能
補充說明
- 測試時谷炸,topic和queue的數(shù)目大約是100個(其中queue的數(shù)目與消費者線程數(shù)相等)北专;
- 測試時,消息大小不會超過256K旬陡;
- 可靠性是指拓颓,消息不能丟失,且消息的內(nèi)容不能被篡改描孟;在測試消費的時候驶睦,會對消息的body,headers,properties的內(nèi)容進行校驗;
- header與properties中key和value都不會插入null或空值匿醒;
- 僅允許依賴JavaSE8包含的lib场航;
消息順序的說明
- 順序只針對單個topic或者queue,不同topic廉羔,不同queue溉痢,topic與queue之間都不用考慮順序;
- 消息產(chǎn)品的一個重要特性是順序保證蜜另,也就是消息消費的順序要與發(fā)送的時間順序保持一致适室;
- 在多發(fā)送端的情況下,保證全局順序代價比較大举瑰,只要求各個發(fā)送端的順序有保障即可捣辆; 舉個例子P1發(fā)送M11,M12,M13,P2發(fā)送M21,M22,M23此迅,在消費的時候汽畴,只要求保證M11,M12,M13(M21,M22,M23)的順序旧巾,也就是說實際消費順序為M11,M21,M12,M13,M22,M23 正確,M11,M21,M22,M12,M13,M23 正確忍些,M11,M13,M21,M22,M23,M12 錯誤鲁猩,M12與M13的順序顛倒了;
題目解讀
- 題目要求實現(xiàn)五個接口罢坝,分別對應于Producer生產(chǎn)消息廓握、發(fā)送消息,Consumer綁定topic和queue嘁酿、消費消息隙券;
- topic可以存儲數(shù)據(jù),queue也可以存儲數(shù)據(jù)闹司;
- Producer可以把消息發(fā)送到任意的topic和queue中娱仔,但是一條消息只能發(fā)送到一個topic或queue中;
- Consumer和queue數(shù)量相等游桩,兩者一一對應牲迫,一個Consumer綁定一個queue和多個topic,不同的Consumer綁定不同的queue借卧,topic可以相同盹憎;
- Consumer消費數(shù)據(jù)時,只保證對應Producer局部有序谓娃,即Consumer消費某topic/queue的消息時脚乡,來自同一Producer的數(shù)據(jù)其接收順序與發(fā)送順序相同;
答辯資料解讀
初賽第六名代碼鏈接(本人非作者):
https://github.com/whutjs/MessageSystem
生產(chǎn)者架構(gòu)
生產(chǎn)者架構(gòu)圖
- 每個生產(chǎn)者對應一個輸出文件滨达;
- 生產(chǎn)者每生產(chǎn)一條消息奶稠,就把消息編碼后寫入ByteBuffer中,再把ByteBuffer中二進制數(shù)據(jù)寫入對應topic或queue的Cache中捡遍,然后清空ByteBuffer锌订;
- 當topic/queue對應的Cache存滿后,就把這個Cache中所有二進制數(shù)據(jù)寫到輸出文件對應的ByteBuffer中画株;
- 寫入ByteBuffer時依次寫入topic/queue的編號辆飘、數(shù)據(jù)長度的類型、數(shù)據(jù)長度和多條消息的二進制數(shù)據(jù)谓传;
- ByteBuffer存滿后就把數(shù)據(jù)刷到輸出文件中蜈项;
消費者架構(gòu)
消費者架構(gòu)圖
- 一個文件對應一個FileReadCache,一個消費者對應一個FileReadCache续挟;
- 每個消費者擁有一個ConcurrentLinkedQueue紧卒,調(diào)用poll()方法讀取消息;
- 若queue中沒有消息诗祸,且對應的FileReadCache沒有讀完跑芳,則通過FileReadCache解碼出一條消息轴总,然后MessageLoader把消息分發(fā)到訂閱了此topic/queue的Consumer的queue中;
- 若queue中沒有消息博个,且對應的FileReadCache已經(jīng)讀完怀樟,則降低該消費者線程的優(yōu)先級,這樣其它Consumer就會多占用CPU來解碼消息盆佣,并把該消費者需要的消息發(fā)過來往堡,存到queue中;
- 所有的FileReadCache都把對應的文件讀完了共耍,MessageLoader發(fā)送endMsg到所有Consumer的queue中投蝉,這樣所有Consumer就消費結(jié)束;
消息存儲結(jié)構(gòu)
消息存儲結(jié)構(gòu)
- 每個Producer對應一個輸出文件征堪,文件中存儲二進制數(shù)據(jù);
- 文件中數(shù)據(jù)的結(jié)構(gòu):topic/queue的編號关拒、數(shù)據(jù)長度的類型佃蚜、數(shù)據(jù)長度、多條消息的二進制數(shù)據(jù)着绊,然后是下一個topic/queue的數(shù)據(jù)谐算;
- 消息的結(jié)構(gòu):消息頭標識、消息id的長度归露、消息id的二進制數(shù)據(jù)洲脂、key的長度、key的二進制數(shù)據(jù)剧包、val的長度恐锦、val的二進制數(shù)據(jù)、消息體開頭標識疆液、消息體的長度一铅、消息體的二進制數(shù)據(jù),然后是下一條消息的數(shù)據(jù)堕油;
如何解碼二進制數(shù)據(jù)
- 因為文件是追加寫入的潘飘,因此最開始的消息寫在文件的最前面;
- 讀取數(shù)據(jù)時掉缺,把當前topic/queue對應的整塊Cache數(shù)據(jù)加載到ByteBuffer中卜录,一次解析出一條消息,再把該消息push給訂閱了該topic/queue的Consumer眶明;
- 當前topic/queue的消息被解析完后艰毒,再加載下一個topic/queue對應的Cache數(shù)據(jù);
如何保證數(shù)據(jù)的有序性
有序性示例
圖中producer1發(fā)送3條數(shù)據(jù)給topic1赘来、2條數(shù)據(jù)給topic2现喳,consumer1訂閱了topic1和topic2凯傲;需保證consumer1中來自topic1的3條數(shù)據(jù)其順序與其在producer1中相同,來自topic2的2條數(shù)據(jù)其順序與其在producer1相同嗦篱,來自不同topic的數(shù)據(jù)沒有順序要求冰单;
原理:File中消息的順序與Producer一致,F(xiàn)ileReadCache從前往后依次解析File中的消息灸促,然后把消息push給Consumer诫欠,這樣Consumer就保證了消息局部有順;