2018-04-21 消息的延遲

每個人的想法不同

, RocketMQ 介紹的時候就說 是阿里從他們使用的上 解耦出來 近一步簡化 便捷的 目的當然是 讓其能快速入手和開發(fā) 如果不是在項目設計層面上 只是使用的話 從Git上下載該項目的源碼 其中有一個包是專門的測試 實例的 只需要照貓畫虎?

使用就可以了?

?1 不能有中文路徑!

不能有中文路徑谎僻!

?不能有中文路徑歉铝!

?關系 兩個接口 interface MQProducer?

?//生產(chǎn)者接口?

?{ 實現(xiàn)該接口的只有一個 默認的 DefaultMQProducer DefaultMQProducer?

實現(xiàn) MQProducer 接口的時候?

還繼承了 ClientConfig類 (客戶端配置類) 可以配置如

?sendMsgTimeout 超時時間

?producerGroup 消息最大多少?

超過多少壓縮等等?

?關鍵方法 :

?send(Message) 發(fā)送一個消息到MQ 這個方法其實是調(diào)用 DefaultMQProducer構造方法

?創(chuàng)建的 defaultMQProducerImpl 類對象的 send(..)方法

?defaultMQProducerImpl 類 才是真正發(fā)送消息的核心類?

?defaultMQProducerImpl.send 方法

?--》 sendDefaultImpl

方法 sendDefaultImpl --》

?tryToFindTopicPublishInfo 來檢測映射 隊列是否存在

?是否正常 {

?final Segment[] segments; 這個 鍵值 不存在 不正常 :

?創(chuàng)建一個 TopicPublishInfo 到 segments 映射文件 同時 將 Topic (隊列) 信息?

更新到NameServer中 }?

?sendDefaultImpl --》

?通過設置是失敗重復次數(shù) 和 超時時間 來從新發(fā)送消息?

?詳細 for (; times < 失敗重復次數(shù) && (結束時間 - 開始時間) < 配置的超時時間; times++)?

?sendDefaultImpl --》

sendKernelImpl 裝載 配置 信息 --》

sendKernelImpl --》

this.mQClientFactory.getMQClientAPIImpl().sendMessage()?

?MQClientInstance mQClientFactory 對象

?是在 DefaultMQProducer start啟動方式時候初始化的 MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);?

?--》 --》sendMessage { MQClientInstance --》

?MQClientAPIImpl mQClientAPIImpl MQClientAPIImpl.sendMessage() --> sendMessageSync switch (communicationMode) 同步 異步 單向 處理 默認是 同步 } 后續(xù)返回 SendResult sendResult 改類型描述當時 發(fā)送MQ 的最終狀態(tài) Message 消息的 Topic 不能為空 producer.shutdown(); 關閉 shutdown來清理資源,關閉網(wǎng)絡連接,從MetaQ服務器上注銷自己 } 發(fā)送消息負載的問題 { 看源碼 是通過循環(huán)從 namesrv 獲取的到 Topic 路由消息 (也就是有幾個broker 每個 broker 有幾個隊列) 然后 記錄當前發(fā)送過的 +1 備注 : 隊列數(shù)量 小于 消費者數(shù)量 多余的消費者將不起做用 } 關于順序消息發(fā)送 的問題 { 環(huán)境: 1 下單 2 付款 3 收貨 3個狀態(tài) 臂港, 普通模式下 發(fā)送到隊列中的 是輪詢隊列 將3個消息分別發(fā)送到多個隊列中卿捎。 很可能會照成出現(xiàn) 先消費 2 在消費 1 流程錯亂的情況 當然可以業(yè)務層處理 但是業(yè)務層處理比較麻煩 順序消費的發(fā)送的原理 : 我們自己指定 消息將要添加的隊列 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); // 通過取于來 講 同一個訂單號 訪入同一個隊列中 // 前提是 隊列數(shù)量沒有變動 return mqs.get(index); } }, “10001”); // orderID “10001”是傳遞給回調(diào)方法的 自定義數(shù)據(jù) Listmqs 就是從namesrv 獲取的所有隊列 } 備注 // 訂單 Id String orderId = "20034568923546"; message.setKeys(orderId); // Keys 每個消息在業(yè)務局面的唯一標識碼 通過 topic配紫,key 來查詢返條消息內(nèi)容,以及消息被誰消費 查詢的時候 非常重要 消費者 interface MQConsumer { // 回溯消費 { mqadmin resetOffsetByTime 命令 改方式 是通過消費的日志來恢復的 但是只能通過 消費的組來恢復 恢復消息后 也只能用改組來從新消費 -s : 時間戳的問題 可以是 毫秒 或者是從什么時候開始 } //拉取模式 interface MQPullConsumer: { } // 接收模式 長輪詢模式 一次獲取一批 消息 記錄 批量和單條 內(nèi)部實現(xiàn) 還是獲取了所有的 可以獲取到的隊列消息 放入集合中 判斷集合大小是否 大于設置的單次消費數(shù)量 小于 直接將其 消息集合 放入執(zhí)行回調(diào)方法中 大于 使用的是For 循環(huán) 來單條處理 interface MQPushConsumer: { class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer DefaultMQPushConsumer 包含很多可以配置的信息 詳情見文檔 其中最主要的 有幾個 messageModel 消息模型 支持以下兩種 1午阵、集群消費 2躺孝、廣播消費 messageListener 消息監(jiān)聽器 consumeThreadMin 消費線程池數(shù)量 默認10 consumeThreadMax 消費線程池數(shù)量 默認20 重要的是 消費線程池 ! 這就說明 我發(fā)布一個 消費應用 消費邏輯就可以 N 個 處理底桂! 不用自己搞了有沒有V才邸! 安默認的來算 20個消費邏輯 可以配置 而且還 可以橫向 增加 消費應用 只要保持是一個組就可以了 難怪會在文檔中 特意話一個 性能圖白雅场S诟觥! 應用通常吐 Consumer 對象注冊一個 Listener 接口暮顺,一旦收到消息厅篓,Consumer 對象立刻回調(diào) Listener 接口方法 MessageListenerOrderly 這個是有序的 MessageListenerConcurrently 這個是無序的 關鍵方法 DefaultMQPushConsumer registerMessageListener(new implements MessageListenerConcurrently) { public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; // 給自己復制一個 消費邏輯類對象 方法后續(xù)查詢 替換修改等 關鍵方法 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); // 將消費邏輯類告訴 調(diào)用者類 } } 關鍵方法 start DefaultMQPushConsumer.start() --> DefaultMQPushConsumerImpl.start() { this.serviceState 來記錄設置當前程序運行狀態(tài) 來做多態(tài) checkConfig() 檢查配置 初始化賦值 copySubscription() 拷貝訂閱者信息 賦值 消費邏輯類 // 有就獲取 沒有就創(chuàng)建一個 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook); 接著初始化一系列信息 // 加載消費進度 this.offsetStore.load(); // 該方法有兩個實現(xiàn) 一個是本地 this.readLocalOffset() 獲取數(shù)據(jù) { //獲取文件字符串 String content = MixAll.file2String(this.storePath); OffsetSerializeWrapper offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); 可以看出 淘寶使用的是JSON } if (this.getMessageListenerInner() instanceof MessageListenerOrderly) else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) 判斷 消費邏輯類 實現(xiàn)那個接口 創(chuàng)建對應的 ConsumeMessageOrderlyService 對象 ConsumeMessageConcurrentlyService 該實現(xiàn)為空 本地 ConsumeMessageOrderlyService.start() { 創(chuàng)建并執(zhí)行一個周期性的動作成為了第一個在給定的初始延遲之后,隨后用給定的時期,這是執(zhí)行后將開始initialDelay然后initialDelay +,然后initialDelay + 2 *時期,等等。如果任何執(zhí)行任務遇到異常,后續(xù)執(zhí)行的鎮(zhèn)壓捶码。否則,只會終止的任務通過取消或終止執(zhí)行器羽氮。如果執(zhí)行這個任務花費的時間比其期,然后后續(xù)執(zhí)行可能會遲到,但不會同時執(zhí)行。 //就是一個定時器 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS); scheduleAtFixedRate 應該是一個線程池管理 不用去關心 scheduleAtFixedRate 方法 看 ConsumeMessageOrderlyService.this.lockMQPeriodically() { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll() 是 RebalanceImpl.lockAll() // 將讀取到的消息上鎖 } } // 最關鍵的服務啟動了 // 正在的啟動了 mQClientFactory.start(); { synchronized (this){ //Start request-response channel 啟動請求-響應通道 this.mQClientAPIImpl.start(); //Start various schedule tasks 開始各種安排任務 啟動定時任務 其中就包含 獲取到MQ消息消費的 回調(diào)方法 this.startScheduledTask(); //Start pull service 開始拉取服務 this.pullMessageService.start(); //Start rebalance service 啟動負載均衡 // 該服務非常重要 this.rebalanceService.start(); //Start push service 開始推動服務 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); } } } } 指定 group 訂閱 topic 注冊消息監(jiān)聽處理器惫恼,當消息到來時消費消息 消費端 Start 復制訂閱關系 初始化 rebalance 變量 構建 offsetStore 消費進度存儲對象 啟動消費消息服務 向 mqClientFactory 注冊本消費者 啟動 client 端遠程通信 * 加載消費進度 Loand() * 啟動定時任務 定時獲取 nameserver 地址 定時從 nameserver 獲取 topic 路由信息 定時清理下線的 borker 定時向所有 broker 發(fā)送心跳信息乏苦, (包括訂閱關系) * 定時持久化 Consumer 消費進度(廣播存儲到本地,集群存儲到 Broker) PS: 這里也是是個關鍵 持久化消費進度 是用來記錄當前 組的消費情況 可以做到 回溯消費 和宕機等情況下 啟動后接著上次執(zhí)行消費 統(tǒng)計信息打點 動態(tài)調(diào)整消費線程池 啟動拉消息服務 PullMessageService 啟動消費端負載均衡服務 RebalanceService 從 namesrv 更新 topic 路由信息 向所有 broker 發(fā)送心跳信息尤筐, (包括訂閱關系) 喚醒 Rebalance 服務線程 } // 有些懶得看了 直接看別人 得了 消費端負載均衡 { 這個也是個重點 消費端會通過 RebalanceService 線程汇荐,10 秒鐘做一次基于 topic 下的所有隊列負載 消費端 遍歷自己所有的 Topic 獲取 Topic 下所有的 隊列 (一個Topic 包含對個隊列 默認是 4 個 有別于其他MQ ) 從 broker 獲取當前 組(group)的所有消費端( 有心跳的) 獲取隊列集合SetmqSet 現(xiàn)在隊列分配策略實例 AllocateMessageQueueStrategy 執(zhí)行分配算法 { 1:平均分配算法 : 其實是類似于分頁的算法 將所有 queue 排好序類似于記錄 將所有消費端 consumer 排好序,相當于頁數(shù) 然后獲取當前 consumer 所在頁面應該分配到的 queue 2:按照配置來分配隊列 : 消費服務啟動的時候 就指定好了要消費的 是哪個隊列 3:按照機房來配置隊列 : Consumer 啟動的時候會指定在哪些機房的消息 (應該是指定 broker) 獲取指定機房的 queue 然后在執(zhí)行如 1)平均算法 } 根據(jù)分配隊列的結果更新 ProccessQueueTable{ 比對 mqSet 將多余的隊列刪除盆繁, 當 broker 當機或者添加掀淘,會導致分配到 mqSet 變化, 添加新增隊列油昂, 比對 mqSet革娄,給新增的 messagequeue 構建長輪詢對象 PullRequest 對象倾贰,會從 broker 獲取消費的進度 構建這個隊列的 ProcessQueue 將 PullRequest 對象派發(fā)到長輪詢拉消息服務(單線程異步拉取) 注:ProcessQueue 正在被消費的隊列拦惋, (1) 長輪詢拉取到消息都會先存儲到 ProcessQueue 的 TreeMap集合中匆浙,消費調(diào)后會刪除掉,用來控制 consumer 消息堆積厕妖, TreeMapkey 是消息在此 ConsumeQueue 隊列中索引 (2) 對于順序消息消費 處理 locked 屬性:當 consumer 端向 broker 申請鎖隊列成功后設置 true首尼, 只有被鎖定 的 processqueue 才能被執(zhí)行消費 rollback: 將消費在 msgTreeMapTemp 中的消息,放回 msgTreeMap 重新消費 commit: 將臨時表 msgTreeMapTemp 數(shù)據(jù)清空言秸,代表消費完成软能,放回最大偏移 值 (3) 這里是個 TreeMap,對 key 即消息的 offset 進行排序举畸,這個樣可以使得消息進 行順序消費 } } 長輪詢 { Rocketmq 的消息是由 consumer 端主動到 broker拉取的, consumer 向 broker 發(fā)送拉消息 請求查排, PullMessageService 服務通過一個線程將阻塞隊列 LinkedBlockingQueue中的 PullRequest 到 broker 拉取消息 DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法執(zhí)行向 broker 拉消息動作 1. 獲取 ProcessQueue 判讀是否 drop 的, drop 為 true 返回 2. 給 ProcessQueue 設置拉消息時間戳 3. 流量控制抄沮,正在消費隊列中消息(未被消費的)超過閥值跋核,稍后在執(zhí)行拉消息 4. 流量控制,正在消費隊列中消息的跨度超過閥值(默認 2000) 叛买,稍后在消費 5. 根據(jù) topic 獲取訂閱關系 6. 構建拉消息回調(diào)對象 PullBack, 從 broker 拉取消息(異步拉攘俗铩)返回結果是回調(diào) 7. 從內(nèi)存中獲取 commitOffsetValue //TODO 這個值跟 pullRequest.getNextOffset 區(qū)別 8. 構建 sysFlag pull 接口用到的 flag 9. 調(diào)底層通信層向 broker 發(fā)送拉消息請求 如果 master 壓力過大,會建議去 slave 拉取消息 如果是到 broker 拉取消息清楚實時提交標記位聪全,因為 slave 不允許實時提交消費進 度,可以定時提交 //TODO 關于 master 拉消息實時提交指的是什么辅辩? 10. 拉到消息后回調(diào) PullCallback 處理 broker 返回結果 pullResult 更新從哪個 broker(master 還是 slave)拉取消息 反序列化消息 消息過濾 消息中放入隊列最大最小 offset, 方便應用來感知消息堆積度 將消息加入正在處理隊列 ProcessQueue 將消息提交到消費消息服務 ConsumeMessageService 流控處理难礼, 如果 pullInterval 參數(shù)大于 0 (拉消息間隔,如果為了降低拉取速度玫锋, 可以設置大于 0 的值) 蛾茉, 延遲再執(zhí)行拉消息, 如果 pullInterval 為 0 立刻在執(zhí)行拉 消息動作 看圖 人家花的不錯 很明了 } push 消息 { PS: 長輪詢向broker拉取消息是批量拉取的撩鹿, 默認設置批量的值為pullBatchSize = 32谦炬, 可配置 消費端 consumer 構建一個消費消息任務 ConsumeRequest 消費一批消息的個數(shù)是 可配置的 consumeMessageBatchMaxSize = 1, 默認批量個數(shù)為一個 也就是說 每次傳遞給回調(diào)方法的 參數(shù) 消息集合 的解釋 ConsumeRequest 任務 run 方法執(zhí)行 判斷 proccessQueue 是否被 droped 的节沦, 廢棄直接返回键思,不在消費消息 構建并行消費上下文 給消息設置消費失敗時候的 retry topic,當消息發(fā)送失敗的時候發(fā)送到 topic 為%RETRY%groupname 的隊列中 調(diào) MessageListenerConcurrently 監(jiān)聽器的 consumeMessage 方法消費消息甫贯,返回消 費結果 如果 ProcessQueue 的 droped 為 true吼鳞,不處理結果,不更新 offset叫搁, 但其實這里消 費端是消費了消息的赔桌,這種情況感覺有被重復消費的風險 處理消費結果 : 消費成功供炎, 對于批次消費消息, 返回消費成功并不代表所有消息都消費成功疾党, 但是消費消息的時候一旦遇到消費消息失敗直接放回音诫,根據(jù) ackIndex 來標記 成功消費到哪里了 消費失敗, ackIndex 設置為-1 廣播模式發(fā)送失敗的消息丟棄雪位, 廣播模式對于失敗重試代價過高竭钝,對整個集 群性能會有較大影響,失敗重試功能交由應用處理 集群模式茧泪, 將消費失敗的消息一條條的發(fā)送到 broker 的重試隊列中去蜓氨,如果 此時還有發(fā)送到重試隊列發(fā)送失敗的消息, 那就在 cosumer 的本地線程定時 5 秒鐘以后重試重新消費消息队伟, 在走一次上面的消費流程穴吹。 刪除正在消費的隊列 processQueue 中本次消費的消息,放回消費進度 更新消費進度嗜侮, 這里的更新只是一個內(nèi)存 offsetTable 的更新港令,后面有定時任務定 時更新到 broker 上去 PS: 關于消費成功 和 失敗的 問題 在集群模式下 回調(diào)方法設置為消費失敗 會將當前消費的失敗消息 發(fā)送到 broker 的容錯度列中 等待N次+ 從新消費 。 push 消費-順序消費消息 順序消費服務 ConsumeMessageConcurrentlyService 構建的時候 構建一個線程池來接收消費請求 ConsumeRequest 構建一個單線程的本地線程, 用來稍后定時重新消費 ConsumeRequest, 用來執(zhí)行 定時周期性(一秒)鐘鎖隊列任務 周期性鎖隊列 lockMQPeriodically 獲取正在消費隊列列表 ProcessQueueTable 所有 MesssageQueue锈颗, 構建根據(jù) broker 歸類成 MessageQueue 集合 Map> 遍歷 Map>的 brokername, 獲取 broker 的 master 機器地址顷霹, 將brokerName的Set發(fā)送到broker請求鎖定這些隊列。 在broker 端鎖定隊列击吱,其實就是在 broker 的 queue 中標記一下消費端淋淀,表示這個 queue 被某個 client 鎖定。 Broker 會返回成功鎖定隊列的集合覆醇, 根據(jù)成功鎖定的 MessageQueue,設置對應的正 在處理隊列 ProccessQueue 的 locked 屬性為 true 沒有鎖定設置為 false 通過長輪詢拉取到消息后會提交到消息服務 ConsumeMessageOrderlyService朵纷, ConsumeMessageOrderlyService 的 submitConsumeRequest 方法構建 ConsumeRequest 任務提 交到線程池。ConsumeRequest 是由 ProcessQueue 和 Messagequeue 組成永脓。 ConsumeRequest 任務的 run 方法 判斷 proccessQueue 是否被 droped 的袍辞, 廢棄直接返回,不在消費消息 每個 messagequeue 都會生成一個隊列鎖來保證在當前 consumer 內(nèi)常摧,同一個隊列串行 消費搅吁, 判斷 processQueue 的 lock 屬性是否為 true, lock 屬性是否過期落午, 如果為 false 或者過期谎懦, 放到本地線程稍后鎖定在消費。 如果 lock 為 true 且沒有過期溃斋,開始消費消息 計算任務執(zhí)行的時間如果大于一分鐘且線程數(shù)小于隊列數(shù)情況下党瓮,將 processqueue, messagequeue 重新構建 ConsumeRequest 加到線程池 10ms 后在消費盐类,這樣防止個別隊列被 餓死 獲取客戶端的消費批次個數(shù)寞奸,默認一批次為一條 從 proccessqueue 獲取批次消息呛谜, processqueue.takeMessags(batchSize) , 從 msgTreeMap 中移除消息放到臨時 map 中 msgTreeMapTemp, 這個臨時 map 用來回滾消息和 commit 消 息來實現(xiàn)事物消費 調(diào)回調(diào)接口消費消息枪萄,返回狀態(tài)對象 ConsumeOrderlyStatus 根據(jù)消費狀態(tài)隐岛,處理結果 1) 非事物方式,自動提交 消息消息狀態(tài)為 success: 調(diào)用 processQueue.commit 方法 獲取 msgTreeMapTemp 的最后一個 key瓷翻,表示提交的 offset 清空 msgTreeMapTemp 的消息聚凹,已經(jīng)成功消費 2) 事物提交,由用戶來控制提交回滾(精衛(wèi)專用) 更新消費進度齐帚, 這里的更新只是一個內(nèi)存 offsetTable 的更新妒牙, 后面有定時任務定時更 新到 broker 上去 } 關閉 { shutdown DefaultMQPushConsumerImpl 關閉消費端 關閉消費線程 將分配到的 Set的消費進度保存到 broker 利 用 DefaultMQPushConsumerImpl 獲 取 ProcessQueueTable的 keyset 的 messagequeue 去獲取 RemoteBrokerOffsetStore.offsetTableMap 中的消費進

? ? ? ? ? ? ? ? 度,

? ? ? ? ? ? ? ? offsetTable 中 的 messagequeue 的 值对妄, 在 update 的時候如果 沒有對應 的

? ? ? ? ? ? ? ? Messagequeue 會構建湘今, 但是也會 rebalance 的時候?qū)]有分配到的 messagequeue

? ? ? ? ? ? ? ? 刪除

? ? ? ? ? ? ? ? rebalance 會將 offsettable 中沒有分配到 messagequeue 刪除,? 但是在從 offsettable

? ? ? ? ? ? ? ? 刪除之前會將 offset 保存到 broker


? ? ? ? ? ? ? ? Unregiser 客戶端


? ? ? ? ? ? ? ? pullMessageService 關閉


? ? ? ? ? ? ? ? scheduledExecutorService 關閉剪菱,關閉一些客戶端的起的定時任務


? ? ? ? ? ? ? ? mqClientApi 關閉


? ? ? ? ? ? ? ? rebalanceService 關閉



? ? ? ? }

補充 一

消息的延遲

? ? {

? ? ? ? 通過測試程序可以看出? 通過設置 message 的DelayTimeLevel 可以設置消息延遲處理

? ? }

? ? 消息重試機制? 容錯機制

? ? {

? ? ? ? 通過源碼可以看出 消費方法的返回對象 只有兩個值


? ? ? ? CONSUME_SUCCESS // 消費成功


? ? ? ? RECONSUME_LATER // 消費失敗摩瞎,稍后重試?



? ? ? ? CONSUME_SUCCESS 無異議?


? ? ? ? 關鍵是 RECONSUME_LATER


? ? ? ? ? ? 我們可以通過 RECONSUME_LATER 來容錯。 阿里提供的這個? 重試機制 是通過添加到一個錯誤隊列中 設置期? DelayTimeLevel 來實現(xiàn)的


? ? ? ? ? ? 第一次消費的時候? 打印 MessageExt 沒有 properties屬性的詳細信息? 返回 RECONSUME_LATER 稍后重試


? ? ? ? ? ? [queueId=0, storeSize=106, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803327059, storeHost=/10         .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000031F10, commitLogOffset=204560, bodyCRC=910247988, reconsumeTimes=0, preparedTransactionOffset=0, toStrin         g()=Message [topic=Topic2, flag=0, properties={MAX_OFFSET=1, MIN_OFFSET=0}, body=9]]

? ? ? ? ? ? 第二次消費的時候


? ? ? ? ? ? [queueId=0, storeSize=260, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803516104, storeHost=/10         .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000032079, commitLogOffset=204921, bodyCRC=910247988, reconsumeTimes=1, preparedTransactionOffset=0, toStrin         g()=Message [topic=Topic2, flag=0, properties={ORIGIN_MESSAGE_ID=0A0A0C1B00002A9F0000000000031F10, DELAY=3, REAL_TOPIC=%RETRY%ConsumerGroupName, WAIT=fals         e, RETRY_TOPIC=Topic2, MAX_OFFSET=1, MIN_OFFSET=0, REAL_QID=0}, body=9]]

? ? ? ? ? ? 可以看出 消息? 雖然? queueId 是相同的值 0 但是? msgId 卻變了 孝常! 同時用rocketmq-console 來監(jiān)控 該 消費者 你會發(fā)現(xiàn) 多了個 Topic? %RETRY%ConsumerGroupName?


? ? ? ? ? ? 所有 可以得出一個結論?


? ? ? ? ? ? 我們返回 消費失敗旗们,稍后重試? RECONSUME_LATER? 消息會回到 broker 同時創(chuàng)建一條相同的消息 訪如? %RETRY%ConsumerGroupName?

? ? ? ? ? ? 同時 設置 該 消息的 延遲消費 每次延遲時間 +1?


? ? ? ? ? ? 我覺得可以通過 reconsumeTimes 來做一個簡單的容錯? 獲取 當前消費的 次數(shù)? 是否大于 設定值? 大于就說明其是死信? 記錄到異常數(shù)據(jù)庫



? ? }

備注問題:

    背景:

          生產(chǎn)端 使用 linux 服務器 (UTF-8 編碼)

              Message me = new Message();

              me.setBody("中國人".getBytes());

              producer.send(me);

          消費端 使用? Windows 服務器 (GBK 編碼)

              MessageExt msg = msgs.get(0);

              String strBody = new String(msg.getBody());

    問題 :

          生產(chǎn)端無問題,消費端 存在 字符集 編碼問題 构灸。

    原因 :

          生產(chǎn)端發(fā)送給MQ 的數(shù)據(jù)是 字節(jié) 上渴!? getBytes() 不指定字節(jié)格式 會默認使用 本地系統(tǒng)編碼格式? linux下通常是 UTF-8 格式

          消費端由于是Windows 本地系統(tǒng)的編碼格式是 GBK 格式 。

          new String(msg.getBody()); 方法 不指定編碼格式 使用的也是 本地系統(tǒng)編碼格式 也就是 GBK格式

          可能會說 直接 GBK轉(zhuǎn)換UTF-8就好了喜颁,但是稠氮! GBK 對應的是2個字節(jié)? UTF-8 對應的是3個字節(jié)? 當出現(xiàn) 3個字的中文或者 特殊符號的時候

          轉(zhuǎn)換過程中會 主動 2補1 所有 “中國人”? 這里 人 字就會亂碼

          String iso = new String(strBody.getBytes("UTF-8"), "ISO-8859-1");

            strBody = new String(iso.getBytes("ISO-8859-1"), "UTF-8");


          上面這種解決方法在 測試方法中有效? 在消費端 具體消費類中的消費方法 并未生效?

          這里希望有大神可以指出為什么!洛巢?

    解決方法:

          MessageExt msg = msgs.get(0);

          strBody = new String(msg.getBody(), "UTF-8");?

          在第一次 字節(jié)轉(zhuǎn)換成字符串的時候 就指定 該字節(jié)按照 UTF-8 格式轉(zhuǎn)換!?

      PS:

          雖然解決方法很簡單次兆,但是 稍微不注意 就會跳過這里啊? 勁量做到統(tǒng)一開發(fā)環(huán)境案遘浴!

? ? ? 消費端 多實例問題

      經(jīng)過試驗芥炭,一個消費 組 只能處理一個 Topic 下的一個 Tags? !

努力或許不會有收獲漓库,但是不努力卻一定不會有收獲!

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末园蝠,一起剝皮案震驚了整個濱河市渺蒿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌彪薛,老刑警劉巖茂装,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件怠蹂,死亡現(xiàn)場離奇詭異,居然都是意外死亡少态,警方通過查閱死者的電腦和手機城侧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來彼妻,“玉大人嫌佑,你說我怎么就攤上這事∏惹福” “怎么了屋摇?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長幽邓。 經(jīng)常有香客問我炮温,道長,這世上最難降的妖魔是什么颊艳? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任茅特,我火速辦了婚禮,結果婚禮上棋枕,老公的妹妹穿的比我還像新娘白修。我一直安慰自己,他們只是感情好重斑,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布兵睛。 她就那樣靜靜地躺著,像睡著了一般窥浪。 火紅的嫁衣襯著肌膚如雪祖很。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天漾脂,我揣著相機與錄音假颇,去河邊找鬼。 笑死骨稿,一個胖子當著我的面吹牛笨鸡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播坦冠,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼形耗,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了辙浑?” 一聲冷哼從身側響起激涤,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎判呕,沒想到半個月后倦踢,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體送滞,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尽超。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖愧哟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情哼蛆,我是刑警寧澤蕊梧,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站腮介,受9級特大地震影響肥矢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜叠洗,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一甘改、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧灭抑,春花似錦十艾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至案腺,卻和暖如春庆冕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背劈榨。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工访递, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人同辣。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓拷姿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親邑闺。 傳聞我的和親對象是個殘疾皇子跌前,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容