前言介紹
RocketMQ目前在國(guó)內(nèi)應(yīng)該是比較流行的MQ 了绳锅,目前本人也在公司的項(xiàng)目中進(jìn)行使用和研究西饵,借著這個(gè)機(jī)會(huì),分析一下RocketMQ 發(fā)送一條消息到存儲(chǔ)一條消息的過(guò)程鳞芙,這樣會(huì)對(duì)以后大家分析和研究RocketMQ相關(guān)的問(wèn)題有一定的幫助眷柔。
技術(shù)范圍
分析的總體技術(shù)范圍發(fā)送到存儲(chǔ),本文的主要目的是主要是為了認(rèn)識(shí)一條消息并分析被發(fā)出且被存儲(chǔ)的原朝,代碼中驯嘱,關(guān)于 MQ 文件系統(tǒng)的優(yōu)化,設(shè)計(jì)等喳坠。
現(xiàn)在出發(fā)
來(lái)自官方源碼example的一段發(fā)送代碼:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
send發(fā)送的分析
直接看看send方法鞠评,send 方法會(huì)設(shè)置一個(gè)默認(rèn)的 timeout:3秒。默認(rèn)使用 SYNC 模式壕鹉,另外有Async和OneWay模式剃幌。需要處理方法簽名中的 Client 端的異常,網(wǎng)絡(luò)異常御板,Broker 端的異常锥忿,線程中斷異常牛郑。
sendDefaultImpl核心實(shí)現(xiàn)類
DefaultMQProducerImpl 的 sendDefaultImpl方法就是發(fā)送的主要邏輯怠肋。
代碼里,有個(gè)地方可以提一下淹朋,關(guān)于更新故障時(shí)間的策略笙各,RocketMQ有一個(gè)類 MQFaultStrategy,用來(lái)處理MQ錯(cuò)誤础芍,然后對(duì) MQ Server 進(jìn)行服務(wù)降級(jí)杈抢。
服務(wù)降級(jí)策略
如果發(fā)送一條消息在550ms以內(nèi),那么就不用降級(jí)仑性,如果550毫秒以外惶楼,就進(jìn)行容錯(cuò)降級(jí)(熔斷)30 秒,以此類推。
sendKernelImpl核心方法
再看DefaultMQProducerImpl 的 sendKernelImpl發(fā)送到內(nèi)核的方法實(shí)現(xiàn)歼捐。
先找到broker的地址何陆。嘗試壓縮大于4M 的消息(批量消息不壓縮),然后執(zhí)行各種鉤子豹储。
- Request對(duì)象(存放數(shù)據(jù))
- Context 上下文對(duì)象(存放調(diào)用上下文)贷盲。
這里會(huì)設(shè)置一個(gè)消息生成時(shí)間,即bornTimestamp剥扣,后面使用消息軌跡的時(shí)候巩剖,可以查看。
同步模式的核心處理
默認(rèn)情況下:如果采用SYNC 模式钠怯,就調(diào)用 MQClientAPIImpl 來(lái)發(fā)送消息佳魔,這一層還是在 Client 模塊里,在這一層呻疹,會(huì)設(shè)置更詳細(xì)的消息細(xì)節(jié)吃引,構(gòu)造命令對(duì)象。最后調(diào)用 remotingClient的 invokeSync 發(fā)送消息刽锤。
MQClientAPIImpl的sendMessage
MQClientAPIImpl的sendMessage這一層镊尺,會(huì)給命令對(duì)象設(shè)置一個(gè)CmdCode,叫SEND_MESSAGE并思,這個(gè)東西就是一個(gè)和Broker的契約庐氮,Broker會(huì)根據(jù)這個(gè)Code進(jìn)行不同的策略。
RPC的實(shí)現(xiàn)方式
- 如果這里用RPC的方式宋彼,例如弄砍,使用一個(gè)接口的抽象方法。
- 然后输涕,Broker對(duì)抽象方法進(jìn)行 RPC 調(diào)用音婶,這樣可不可以呢?
- 最后莱坎,看看 remotingClient的invokeSync是如何實(shí)現(xiàn)的衣式。
Remoting模塊發(fā)送消息實(shí)現(xiàn)
invokeSync方法
- 首先,執(zhí)行 RPCBefore 鉤子檐什,類似Spring的各種Bean擴(kuò)展組件
- 然后碴卧,就是對(duì)超時(shí)進(jìn)行判斷。
- 最后乃正,幾乎每個(gè)方法都有對(duì)超時(shí)的判斷住册,超時(shí)判斷和超時(shí)處理在分布式場(chǎng)景非常重要。
- 根據(jù)addr找到對(duì)應(yīng)的Socket Channel
- 然后執(zhí)行invokeSyncImpl方法瓮具。
- 這里其實(shí)和其他大部分的RPC框架都是類似的了荧飞,生產(chǎn)一個(gè)永遠(yuǎn)自增的Request ID凡人,創(chuàng)建一個(gè)Feature對(duì)象和這個(gè)ID綁定,方便Netty返回?cái)?shù)據(jù)對(duì)這個(gè)ID對(duì)應(yīng)的線程進(jìn)行喚醒叹阔。
- 然后調(diào)用Netty的writeAndFlush方法划栓,將數(shù)據(jù)寫進(jìn)Socket,同時(shí)添加一個(gè)監(jiān)聽(tīng)器条获,如果發(fā)送失敗忠荞,喚醒當(dāng)前線程。
- 發(fā)送完畢之后帅掘,當(dāng)前線程進(jìn)行等待委煤,使用CountDownLatch.wait方法實(shí)現(xiàn),當(dāng)Netty返回?cái)?shù)據(jù)時(shí)修档,使用CountDownLatch.countDown進(jìn)行喚醒
- 然后返回從 Broker 寫入的結(jié)果碧绞,可能成功,也可能失敗吱窝,需要到上層(Client 層)解析讥邻,網(wǎng)絡(luò)層只負(fù)責(zé)網(wǎng)絡(luò)的事情。
Netty 會(huì)使用 Handler 處理出去的數(shù)據(jù)和返回的數(shù)據(jù)院峡,我們看看 Client 端 Netty 有哪些 Handler.
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
使用了一個(gè) Encoder兴使,Decoder,空閑處理器照激,連接管理器发魄,ClientHandler。
XXCoder就是對(duì)Cmd對(duì)象進(jìn)行序列化和反序列化的俩垃,這里的空閑使用的讀寫最大空閑時(shí)間為120s励幼,超過(guò)這個(gè),就會(huì)觸發(fā)空閑事件口柳。
連接管理器
RocketMQ就會(huì)關(guān)閉Channel 連接苹粟。而針對(duì)空閑事件進(jìn)行處理的就是連接管理器了。
連接管理器處理空閑跃闹、Close嵌削、Connect、異常等事件辣卒,使用監(jiān)聽(tīng)器模式掷贾,不同的監(jiān)聽(tīng)器對(duì)不同的事件進(jìn)行處理睛榄。另外荣茫,這里也許可以借鑒 EventBus,每個(gè)事件可以設(shè)置多個(gè)監(jiān)聽(tīng)器场靴。
如何處理返回值
看了RocketMQ中 Netty 的設(shè)計(jì)啡莉,再看看返回值處理就簡(jiǎn)單了港准,NettyClientHandler 會(huì)在 channelRead0 方法處理 Netty Server 的返回值。對(duì)應(yīng) RMQ咧欣,則是 processMessageReceived 方法浅缸。該方法很簡(jiǎn)潔:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
其實(shí),這是一個(gè)模板方法魄咕,固定算法衩椒,由子類實(shí)現(xiàn),分為 Request 實(shí)現(xiàn)和 Response 實(shí)現(xiàn)哮兰。我們看看 Response 實(shí)現(xiàn)毛萌。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 找到 Response .
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {// 返回結(jié)果
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
通過(guò) cmd 對(duì)象的 Request ID 找到 Feature,執(zhí)行 responseFuture.putResponse喝滞,設(shè)置返回值阁将,喚醒阻塞等待的發(fā)送線程。
這里還有一個(gè) release 調(diào)用右遭,這個(gè)和異步發(fā)送有關(guān)做盅,默認(rèn)最大同時(shí) 65535 個(gè)異步請(qǐng)求,具體就不展開了窘哈。
到這里吹榴,喚醒阻塞的發(fā)送線程,返回?cái)?shù)據(jù)滚婉,客戶端層面的發(fā)送就結(jié)束了腊尚。
Broker端如何處理消息
看源碼,看到有個(gè) SEND_MESSAGE Code满哪,是 Client 和 Broker Server 的一個(gè)約定代碼婿斥,我們看看這個(gè)代碼在哪里用的。
在 broker 模塊的 BrokerController 類中哨鸭,有個(gè) registerProcessor 方法民宿,會(huì)將 SEND_MESSAGE Code 和一個(gè) SendMessageProcessor 對(duì)象綁定。
NettyServerHandler
NettyRemotingServer是處理Request 的類像鸡,ServerBootstrap 會(huì)在 pipeline 中添加一個(gè) NettyServerHandler處理器活鹰,這個(gè)處理器的channelRead0方法會(huì)調(diào)用 NettyRemotingServer的父類processMessageReceived 方法。
processMessageReceived
從processorTable 里只估,根據(jù) Cmd Code志群,也就是 SEND_MESSAGE 獲取對(duì)應(yīng)的 Processor
Processor 由 2 部分組成,
一部分是處理數(shù)據(jù)的對(duì)象蛔钙,一部分是這個(gè)對(duì)象所對(duì)應(yīng)的線程池锌云。用于異步處理邏輯,防止阻塞 Netty IO線程吁脱。
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 處理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
前后都是執(zhí)行一些鉤子桑涎,例如 ACL
RocketMQ會(huì)有一個(gè) BrokerController 類彬向,會(huì)注冊(cè) Code 和 Processor 的綁定關(guān)系,BrokerController 也會(huì)把這些綁定攻冷,注冊(cè)到 Netty Server 中娃胆,當(dāng) Netty Server 從 Socket 收到 Cmd 對(duì)象,根據(jù) Cmd 對(duì)象的 Code等曼,就可以找到對(duì)應(yīng) Processor 類里烦,對(duì)數(shù)據(jù)進(jìn)行處理。
中間是處理 Request請(qǐng)求的禁谦。這個(gè) processRequest 方法招驴,有很多的實(shí)現(xiàn),SendMessageProcessor的sendMessage 是處理消息的主要邏輯枷畏。
消息存儲(chǔ)引擎别厘,這里我們看DefaultMessageStore的putMessage 實(shí)現(xiàn)。
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
由于RocketMQ寫數(shù)據(jù)是PageCache里面寫的拥诡,因此触趴,如果寫的慢,就是 PageCache 忙渴肉,這里忙的標(biāo)準(zhǔn)是冗懦,如果鎖文件的時(shí)間,超過(guò)了 1 秒仇祭,那就是忙披蕉。
if (this.isOSPageCacheBusy()) {// 檢查 mmp 忙不忙.
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
最后調(diào)用 PutMessageResult result = this.commitLog.putMessage(msg) 寫數(shù)據(jù)。如果耗時(shí)超過(guò) 500 毫秒乌奇,就會(huì)打印日志没讲。這樣我們排查問(wèn)題的時(shí)候,可以看看 storeStats 的日志礁苗。
commitLog 的 putMessage 方法
先拿到最新的MappedFile 文件爬凑,MappedFile 文件的命名是用 offset 命名的,一個(gè)文件默認(rèn) 1gb试伙,這個(gè)大小和 mmp 的機(jī)制有關(guān)嘁信,通常不能過(guò)大。
然后上鎖疏叨,這段代碼是可以說(shuō)整個(gè) RocketMQ Server 的熱點(diǎn)區(qū)域潘靖,
這里上鎖會(huì)記錄上鎖的時(shí)間,方便前面做 PageCache Busy 的判斷蚤蔓。
寫入代碼
result = mappedFile.appendMessage(msg, this.appendMessageCallback)
寫完之后卦溢,釋放鎖,如果超過(guò) 500 毫秒,打印 cost time 日志既绕。
統(tǒng)計(jì)
處理刷盤和slave 同步,這里看刷盤策略和同步策略涮坐,是 SYNC 還是 ASYNC凄贩。經(jīng)過(guò)我的測(cè)試,同步刷盤和異步刷盤的性能差距是 10 倍袱讹。
而 Slave 的數(shù)據(jù)同步疲扎,如果用 SYNC 模式,tps 最高也就 2000 多一丟度捷雕,為什么椒丧??jī)?nèi)網(wǎng),兩臺(tái)機(jī)器 ping 一下都要 0.2 毫秒救巷,一秒最多 5000 次壶熏,再加上處理邏輯, 2000 已經(jīng)到頂了浦译,網(wǎng)絡(luò)成了瓶頸棒假。
我們看看 mappedFile.appendMessage 方法的實(shí)現(xiàn)。一路追蹤精盅,有個(gè)關(guān)鍵邏輯帽哑, 在 appendMessagesInner 里:
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
// 寫數(shù)據(jù)到 緩存
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
代碼中,使用了 mappedFile 從 Linux 映射的 MMap buffer叹俏,對(duì)數(shù)據(jù)進(jìn)行寫入妻枕。我們看看 doAppend 方法。
總長(zhǎng)度粘驰、魔數(shù)屡谐、CRC 校驗(yàn)、隊(duì)列 ID蝌数、各種 flag康嘉、存儲(chǔ)時(shí)間,物理 offset籽前、存儲(chǔ) IP亭珍、時(shí)間戳、擴(kuò)展屬性等等枝哄。最終肄梨,這條消息會(huì)被寫入到 MMap 中。
那什么時(shí)候刷盤
- 如果是 SYNC 模式挠锥,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí)众羡,就會(huì)立刻刷盤并等待刷盤結(jié)果。
- 如果是 ASYNC 模式蓖租,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí)粱侣,會(huì)通知異步線程進(jìn)行刷盤羊壹,但不等待結(jié)果。
如果沒(méi)有新數(shù)據(jù)齐婴,則為 500ms 執(zhí)行一次刷盤策略油猫。
簡(jiǎn)單說(shuō)下異步刷盤:
默認(rèn)刷盤 4 頁(yè),Linux 一頁(yè)是 4kb 數(shù)據(jù)柠偶,4頁(yè)就是 16kb情妖。
如果寫的數(shù)據(jù)減去已經(jīng)刷的數(shù)據(jù),剩下的數(shù)據(jù)大于等于 4 頁(yè)诱担,就執(zhí)行刷盤毡证,執(zhí)行 mappedByteBuffer.force() 或者 fileChannel.force(false);