系列
開篇
- 這篇文章的主要目的是分析RocketMq Broker的線程模型和快速失敗機(jī)制痹栖。
- 線程模型的核心是按照不同的業(yè)務(wù)通過不同的線程池進(jìn)行處理。
- 快速失敗機(jī)制是針對(duì)線程池中任務(wù)堆積的情況快速失敗然后由業(yè)務(wù)側(cè)重試叁扫。
線程模型分析
public class BrokerController {
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
// broker側(cè)各類業(yè)務(wù)線程池等待的隊(duì)列
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
// broker側(cè)各類業(yè)務(wù)對(duì)應(yīng)的ExecutorService
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private BrokerFastFailure brokerFastFailure;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// 負(fù)責(zé)創(chuàng)建各類線程池隊(duì)列
// private int sendThreadPoolQueueCapacity = 10000;
// private int pullThreadPoolQueueCapacity = 100000;
// private int replyThreadPoolQueueCapacity = 10000;
// private int queryThreadPoolQueueCapacity = 20000;
// private int clientManagerThreadPoolQueueCapacity = 1000000;
// private int consumerManagerThreadPoolQueueCapacity = 1000000;
// private int heartbeatThreadPoolQueueCapacity = 50000;
// private int endTransactionPoolQueueCapacity = 100000;
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
// 快速失敗檢測(cè)服務(wù)
this.brokerFastFailure = new BrokerFastFailure(this);
}
public boolean initialize() throws CloneNotSupportedException {
if (result) {
// 創(chuàng)建各類ThreadPoolExecutor并綁定對(duì)應(yīng)的Queue
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
// 注冊(cè)各類處理Processor
this.registerProcessor();
}
}
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
// 綁定請(qǐng)求編碼韩脑、Processor渔工、ThreadPoolExecutor之間的關(guān)系
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
}
- BrokerController內(nèi)部根據(jù)不同的服務(wù)創(chuàng)建不同的ThreadPoolQueue,如sendThreadPoolQueue、pullThreadPoolQueue等旷档。
- BrokerController內(nèi)部根據(jù)不同的服務(wù)創(chuàng)建不同的ExecutorService,如sendMessageExecutor歇拆、pullMessageExecutor等鞋屈。
- BrokerController內(nèi)部根據(jù)不同的服務(wù)創(chuàng)建不同的Processor,如SendMessageProcessor查吊、replyMessageProcessor等谐区。
- BrokerController通過registerProcessor()方法來實(shí)現(xiàn)請(qǐng)求編碼、Processor逻卖、ExecutorService的綁定宋列。
- 根據(jù)綁定關(guān)系可知,每個(gè)請(qǐng)求編碼會(huì)由對(duì)應(yīng)的ExecutorService去執(zhí)行對(duì)應(yīng)的Processor來完成功能评也。
- BrokerFastFailure負(fù)責(zé)檢測(cè)并實(shí)現(xiàn)快速失敗的功能炼杖。
public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
return new FutureTaskExt<T>(runnable, value);
}
}
- 重寫了newTaskFor,在submit中將RequestTask包裝成FutureTaskExt對(duì)象進(jìn)行處理盗迟。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
}
public abstract class NettyRemotingAbstract {
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
}
- registerProcessor實(shí)現(xiàn)Processor和ExecutorService的Pair對(duì)坤邪,并綁定對(duì)應(yīng)的請(qǐng)求碼。
消息處理流程
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
}
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
- NettyRemotingServer在啟動(dòng)的過程中綁定serverHandler罚缕,是NettyServerHandler對(duì)象艇纺。
- NettyServerHandler處理消息讀事件channelRead0的過程中執(zhí)行processMessageReceived來處理消息事件。
public abstract class NettyRemotingAbstract {
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
}
}
};
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
}
- processRequestCommand根據(jù)請(qǐng)求碼RemotingCommand的code找對(duì)應(yīng)的Pair對(duì),包含對(duì)應(yīng)的Processor和ExecutorService黔衡。
- 將cmd封裝成對(duì)應(yīng)的RequestTask蚓聘,并通過pair.getObject2().submit(requestTask)由ExecutorService來執(zhí)行對(duì)應(yīng)的Task。
-
消息處理的核心是通過ExecutorService來處理cmd對(duì)應(yīng)的RequestTask盟劫。
BrokerFastFailure快速失敗
public class BrokerFastFailure {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerFastFailureScheduledThread"));
private final BrokerController brokerController;
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
}
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}
}
- BrokerFastFailure每10ms執(zhí)行清除過期請(qǐng)求夜牡,核心方法cleanExpiredRequest。
public class BrokerFastFailure {
private void cleanExpiredRequest() {
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
}
- 負(fù)責(zé)定時(shí)清理SendThreadPoolQueue侣签、PullThreadPoolQueue塘装、HeartbeatThreadPoolQueue、EndTransactionThreadPoolQueue影所。
public class BrokerFastFailure {
public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
FutureTaskExt object = (FutureTaskExt) runnable;
return (RequestTask) object.getRunnable();
}
} catch (Throwable e) {
log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
}
return null;
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
// Retrieves, but does not remove, the head of this queue
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
// 檢查超時(shí)是否超過maxWaitTimeMillsInQueue
// private long waitTimeMillsInSendQueue = 200;
// private long waitTimeMillsInPullQueue = 5 * 1000;
// private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
// private long waitTimeMillsInTransactionQueue = 3 * 1000;
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
}
- 獲取任務(wù)隊(duì)列中的第一個(gè)元素并判斷是否超時(shí)蹦肴,如果超時(shí)則立即返回錯(cuò)誤碼SYSTEM_BUSY。