謝評(píng)論區(qū) kangsong 指正衣盾,文章已更新
先貼一下異常信息
02-28 15:07:21.156 1975-2791/com.tencent.mm E/Xposed: [15:07:21]: publish failed, message: aaaa
正在進(jìn)行過(guò)多的發(fā)布 (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399)
at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171)
at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161)
at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507)
at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501)
at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
原因
根據(jù)堆棧信息找到報(bào)錯(cuò)地方
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
其中 actualInFlight
如下
// processed until the inflight window has space.
if (actualInFlight < this.maxInflight) {
// The in flight window is not full so process the
// first message in the queue
result = (MqttWireMessage)pendingMessages.elementAt(0);
pendingMessages.removeElementAt(0);
actualInFlight++;
//@TRACE 623=+1 actualInFlight={0}
log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
}
從 pendingMessages
中取出消息時(shí), actualInFlight
加 1, maxInflight
可以自己設(shè)定, 默認(rèn)值為 10.
public class ClientState {
...
volatile private Vector pendingMessages;
...
}
在 ClientState
中:
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
...
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//@TRACE 628=pending publish key={0} qos={1} message={2}
log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
...
}
}
可以看到 pendingMessages
中添加元素的時(shí)候并沒(méi)有做 qos
類型的判斷
private void decrementInFlight() {
final String methodName = "decrementInFlight";
synchronized (queueLock) {
actualInFlight--;
//@TRACE 646=-1 actualInFlight={0}
log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)});
if (!checkQuiesceLock()) {
queueLock.notifyAll();
}
}
}
當(dāng)收到消息反饋時(shí) actualInFlight
減 1.
解決辦法
- 消息發(fā)送發(fā)送限流
- 用單獨(dú)的一個(gè)線程來(lái)完成 MQ 消息的推送
-
options.setMaxInflight(1000)
增加actualInFlight
的值;
題外話
筆者出現(xiàn)這個(gè)錯(cuò)誤是因?yàn)槭褂?EventBus
, 之前使用單獨(dú)線程的 Handler
是沒(méi)有問(wèn)題的, 調(diào)查發(fā)現(xiàn), 使用 EventBus
是新建線程運(yùn)行的, 而 Handler
是單獨(dú)一個(gè)線程.
所以當(dāng)發(fā)送大量消息的時(shí)候, EventBus
幾乎是同一個(gè)點(diǎn)發(fā)出去, 就會(huì)造成這個(gè)錯(cuò)誤