概述
RocketMQ 底層通訊是使用Netty來(lái)實(shí)現(xiàn)的于置。
下面我們通過(guò)源碼分析下RocketMQ是怎么利用Netty進(jìn)行通訊的。
本文分析的是RocketMQ 最新版本 4.3.2版本莫鸭。
RocketMQ 項(xiàng)目結(jié)構(gòu)
首先來(lái)看下 RocketMQ 模塊構(gòu)成吏祸。
通過(guò) RocketMQ 項(xiàng)目結(jié)構(gòu)可以看出瓜浸,RocketMQ 分了好多模塊。 broker秋冰、client仲义、filter、namesrv剑勾、remoting 等埃撵。
大家比較熟悉的幾個(gè)模塊對(duì)應(yīng)的源碼如下:
Broker Master 和 Slave 對(duì)應(yīng)的 broker 模塊。
Producer 和 Consumer 對(duì)應(yīng)的是 client 模塊虽另。
NameSerer 服務(wù)對(duì)應(yīng)的是 namesrv 模塊暂刘。
而各個(gè)服務(wù)之間的通訊則使用的 remoting 模塊。
Remoting 模塊
通過(guò)romoting 的模塊結(jié)構(gòu)大概了解捂刺,RocketMQ 通訊使用了Netty進(jìn)行傳輸通訊谣拣。并在 org.apache.rocketmq.remoting.protocol 包中自定義了通訊協(xié)議。
通信模塊主要接口和類(lèi)
RemotingService 接口
public interface RemotingService {
//開(kāi)啟服務(wù)
void start();
//關(guān)閉服務(wù)
void shutdown();
//注冊(cè) hook (可以在調(diào)用之前和調(diào)用之后做一些擴(kuò)展處理)
void registerRPCHook(RPCHook rpcHook);
}
RemotingService 定義了服務(wù)端和客戶端都需要的三個(gè)接口族展。
registerRPCHook() 方法可以注冊(cè)一個(gè) hook森缠。可以在遠(yuǎn)程通信之前和通信之后仪缸,執(zhí)行用戶自定的一些處理贵涵。類(lèi)似前置處理器和后置處理器。
RPCHook 接口
public interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
final RemotingCommand response);
}
在啟動(dòng)服務(wù)之前,可以把自己實(shí)現(xiàn)的 RPCHook 注冊(cè)到服務(wù)中宾茂,執(zhí)行遠(yuǎn)程調(diào)用的時(shí)候處理一些業(yè)務(wù)邏輯瓷马。比如打印請(qǐng)求和響應(yīng)的日志信息。
RemotingServer 和 RemotingClient 接口
RemotingServer 和 RemotingClient 接口都繼承了RemotingService 接口跨晴,并擴(kuò)展了自己特有的方法欧聘。
RemotingServer 接口
public interface RemotingServer extends RemotingService {
//注冊(cè)一個(gè)處理請(qǐng)求的處理器, 根據(jù)requestCode, 獲取處理器,處理請(qǐng)求
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//注冊(cè)一個(gè)默認(rèn)的處理器,當(dāng)根據(jù)requestCode匹配不到處理器端盆,則使用這個(gè)默認(rèn)的處理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
//獲取端口
int localListenPort();
//根據(jù)requestCode獲取請(qǐng)求處理器
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
//同步調(diào)用(同步發(fā)送消息)
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
//異步調(diào)用(異步發(fā)送消息)
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//單向發(fā)送消息怀骤,只發(fā)送消息。不用處理發(fā)送的結(jié)果爱谁。
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
1晒喷、registerProcessor 方法
注冊(cè)一個(gè)處理請(qǐng)求的處理器, 存放到 HashMap中孝偎,requestCode為 Map 的 key访敌。
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
2、registerDefaultProcessor 方法
注冊(cè)一個(gè)默認(rèn)的處理器衣盾,當(dāng)根據(jù)requestCode匹配不到處理器寺旺,則使用這個(gè)默認(rèn)的處理器3、invokeSync 方法
以同步的方式向客戶端發(fā)送消息势决。4阻塑、invokeAsync 方法
以異步的方式向客戶端發(fā)送消息。5果复、invokeOneway 方法
只向客戶端發(fā)送消息陈莽,而不處理客戶端返回的消息。該方法只是向socket中寫(xiě)入數(shù)據(jù)虽抄,而不需要處理客戶端返回的消息走搁。
RemotingClient 接口
public interface RemotingClient extends RemotingService {
//更新 NameServer 地址
void updateNameServerAddressList(final List<String> addrs);
//獲取 NameServer 地址
List<String> getNameServerAddressList();
//同步調(diào)用(同步發(fā)送消息)
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
//異步調(diào)用(異步發(fā)送消息)
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//單向發(fā)送消息,只發(fā)送消息迈窟。不用處理發(fā)送的結(jié)果私植。
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
//注冊(cè)一個(gè)處理請(qǐng)求的處理器, 根據(jù)requestCode, 獲取處理器,處理請(qǐng)求
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//設(shè)置發(fā)送異步消息的線程池,如果不設(shè)置车酣,則使用默認(rèn)的
void setCallbackExecutor(final ExecutorService callbackExecutor);
//獲取線程池
ExecutorService getCallbackExecutor();
//判斷 channel 是否可寫(xiě)
boolean isChannelWritable(final String addr);
}
1曲稼、updateNameServerAddressList、getNameServerAddressList 方法
更新 NameServer 地址湖员。
獲取 NameServer 地址贫悄。2、invokeSync娘摔、invokeAsync窄坦、invokeOneway 方法
這三個(gè)方法參見(jiàn) RemotingServer 接口中的方法。3、setCallbackExecutor
設(shè)置處理異步響應(yīng)消息的線程池嫡丙。
服務(wù)端和客戶端的實(shí)現(xiàn)
- NettyRemotingServer 類(lèi)實(shí)現(xiàn)了RemotingServer 接口
- NettyRemotingClient 類(lèi)實(shí)現(xiàn)了RemotingClient接口
這兩個(gè)類(lèi)使用Netty 來(lái)實(shí)現(xiàn)服務(wù)端和客戶端服務(wù)的拴袭。
NettyRemotingServer 解析
通過(guò) NettyRemotingServer類(lèi)中的start() 方法開(kāi)啟一個(gè) Netty 的服務(wù)端。
代碼如下:
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
//編碼
new NettyEncoder(),
//解碼
new NettyDecoder(),
//心跳檢測(cè)
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//連接管理handler,處理connect, disconnect, close等事件
new NettyConnectManageHandler(),
//處理接收到RemotingCommand消息后的事件, 收到服務(wù)器端響應(yīng)后的相關(guān)操作
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
從 start 方法中啟動(dòng)一個(gè)Netty 的服務(wù)端曙博。
- 通過(guò)設(shè)置的自定義的
NettyEncoder
對(duì)發(fā)送的消息進(jìn)行編碼(序列化)拥刻。 - 通過(guò)
NettyDecoder
對(duì)接收的消息進(jìn)行解碼操作(反序列化) - 最后再把反序列化的對(duì)象交給
NettyServerHandler
進(jìn)行處理。
NettyRemotingClient 解析
通過(guò) NettyRemotingClient 類(lèi)中的 start 方法開(kāi)啟一個(gè) netty 客戶端
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
//發(fā)送消息編碼
new NettyEncoder(),
//接收消息解碼
new NettyDecoder(),
//心跳監(jiān)測(cè)
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
//連接管理handler,處理connect, disconnect, close等事件
new NettyConnectManageHandler(),
//處理接收到RemotingCommand消息后的事件, 收到服務(wù)器端響應(yīng)后的相關(guān)操作
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
從 start 方法中啟動(dòng)一個(gè)Netty 客戶端服務(wù)父泳。
- 通過(guò)設(shè)置的自定義的
NettyEncoder
對(duì)發(fā)送的消息進(jìn)行編碼(序列化)般哼。 - 通過(guò)
NettyDecoder
對(duì)接收的消息進(jìn)行解碼操作(反序列化) - 最后再把反序列化的對(duì)象交給 NettyServerHandler` 進(jìn)行處理。
序列化反序列化
通過(guò)分析 RemotingServer
和 RemotingClient
接口及實(shí)現(xiàn)可以發(fā)現(xiàn)惠窄,發(fā)送消息和接收到的消息都是 RemotingCommand
對(duì)象蒸眠。
經(jīng)過(guò)分析 NettyEncoder
和 NettyDecoder
發(fā)現(xiàn),序列化和反序列化調(diào)用的是 RemotingCommand
對(duì)象的 encode
和 decode
方法
消息格式
- 第一部分是消息的長(zhǎng)度杆融,占用4個(gè)字節(jié)楞卡。等于第二、三脾歇、四部分長(zhǎng)度的總和蒋腮。
- 第二部分是消息頭的長(zhǎng)度,占用4個(gè)字節(jié)藕各。等于第三部分長(zhǎng)度大小池摧。
- 第三部分是通過(guò)Json序列化的消息頭的數(shù)據(jù)。
- 第四部分是序列化的消息數(shù)據(jù)激况。
具體的消息格式我們通過(guò) RemotingCommand類(lèi)的 encode
和 decode
方法進(jìn)行分析作彤。
RemotingCommand.encode() 方法
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
1、定義消息頭的長(zhǎng)度為 length = 4
2乌逐、通過(guò) this.headerEncode() 獲取序列化的 header data竭讳。
3、然后申請(qǐng)一個(gè)長(zhǎng)度為 length + header length + header data +body
大小的ByteBuffer黔帕。
ByteBuffer result = ByteBuffer.allocate(4 + length);
4代咸、然后向 ByteBuffer result
中填充數(shù)據(jù)
headerEncode 方法
該方法主要是實(shí)現(xiàn)了消息頭的序列化。
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
序列化消息頭有兩種方式SerializeType.ROCKETMQ 和 SerializeType.JSON成黄。
如果是SerializeType.JSON方式序列化比較簡(jiǎn)單呐芥。
RemotingSerializable.encode 方法
SerializeType.JSON 類(lèi)型序列化。
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}
直接把對(duì)象轉(zhuǎn)換成json字符串奋岁,然后轉(zhuǎn)換成 byte[] 數(shù)組
RocketMQSerializable.rocketMQProtocolEncode 方法
SerializeType.ROCKETMQ 類(lèi)型序列化思瘟。
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
int totalLen = calTotalLen(remarkLen, extLen);
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
}
可以看到 代碼把 RemotingCommand 對(duì)象中的數(shù)據(jù)按照一定的順序轉(zhuǎn)換成字節(jié)存儲(chǔ)到ByteBuffer 中。
從代碼中可以看出消息頭中包括闻伶,request code滨攻、請(qǐng)求端實(shí)現(xiàn)語(yǔ)言、版本等信息。
RemotingCommand.decode() 方法
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
這里的byteBuffer中的數(shù)據(jù)包含 header length + header data +body data
光绕。
為什么不是包含
length+header length + header data +body data
呢女嘲?
因?yàn)閚etty在獲取這條消息的時(shí)候是通過(guò)io.netty.handler.codec.LengthFieldBasedFrameDecoder
進(jìn)行拆包的。該拆包的原理就是通過(guò) 消息的length
長(zhǎng)度進(jìn)行拆分的诞帐。所以拆分出來(lái)的數(shù)據(jù)就是header length + header data +body data
這部分欣尼。
1、從byteBuffer中獲取header length 長(zhǎng)度停蕉。
2愕鼓、然后再通過(guò)header length 長(zhǎng)度從 byteBuffer 獲取 header data。
3慧起、剩下的byteBuffer數(shù)據(jù)就是body的數(shù)據(jù)菇晃。
把解析出來(lái)的數(shù)據(jù)轉(zhuǎn)換成 RemotingCommand 對(duì)象。
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
判斷該數(shù)據(jù)是通過(guò) SerializeType.ROCKETMQ 序列化還是 SerializeType.JSON 序列化的蚓挤。
然后根據(jù)類(lèi)型進(jìn)行反序列化操作磺送。
RemotingSerializable.decode 方法
SerializeType.JSON 反序列化。
public static <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, CHARSET_UTF8);
return fromJson(json, classOfT);
}
直接把 json 數(shù)據(jù)反序列化成對(duì)象屈尼。
RocketMQSerializable.rocketMQProtocolDecode 方法
SerializeType.ROCKETMQ 反序列化册着。
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
根據(jù) encode 的順序進(jìn)行反序列化操作拴孤。