?這節(jié)介紹Consumer接收消息的流程垮抗,分為Pull和Push模式懒震。
1. 初始化
?上一節(jié)講Rebalance時提到鳄厌,Consumer接受客戶端有兩種方式:
- Broker發(fā)現(xiàn)客戶端列表有變化,通知所有Consumer執(zhí)行Rebalance
- Consumer定時每20秒自動執(zhí)行Rebalance
其中1.的通知到達Consumer后拼缝,會立即觸發(fā)Rebalance,然后會重置2.的定時器等待時間彰亥。二者最后通知Consumer的方式為
- Push模式:當有新的Queue分配給客戶端時咧七,會新包裝一個PullRequest,用于后續(xù)自動拉取消息任斋,具體到DefaultMQPushConsumerImpl的executePullRequestImmediately方法
- Pull模式:回調(diào)DefaultMQPullConsumerImpl的MessageQueueListener有Queue發(fā)生改變
2. Push模式
?executePullRequestImmediately的內(nèi)容為:
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
即將PullRequest對象傳給了PullMessageService的executePullRequestImmediately方法:
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
PullMessageService的結構如下:
內(nèi)部維護著一個LinkedBlockingQueue屬性pullRequestQueue继阻,用于存儲待處理的PullRequest;還有一個ScheduledExecutorService,用于延期處理PullRequest废酷。具體流程如下:
- RebalanceImpl調(diào)用DefaultMQPushConsumerImpl的executePullRequestImmediately方法瘟檩,傳入PullRequest
- DefaultMQPushConsumerImpl內(nèi)部調(diào)用PullMessageService的executePullRequestImmediately方法,該方法會把傳入的PullRequest對象放到LinkedBlockingQueue中進行存儲澈蟆,等待后續(xù)處理墨辛。
- PullMessageService會循環(huán)從隊列中出隊一個PullRequest,并調(diào)用自身的pullMessage用于后續(xù)處理趴俘。該動作會從MQClientInstance中選擇對應的客戶端實例DefaultMQPushConsumerImpl睹簇,并委托給它的pullMessage方法。
- DefaultMQPushConsumerImpl會先判斷當前請求是否滿足條件寥闪,如果不滿足條件太惠,會調(diào)用PullMessage的executePullRequestLater方法,將當前請求延后處理疲憋。如果滿足條件凿渊,會封裝一個PullCallback對象用于處理異步消息,并調(diào)用PullAPIWrapper異步請求Broker拉取消息缚柳。
從上面的過程可以看出埃脏,Push模式內(nèi)部還是客戶端主動去拉取的,即所謂的封裝拉模式以實現(xiàn)推模式,簡單示意圖如下:
內(nèi)部通過PullMessageService循環(huán)的從PullRequest對應MessageQueue中主動拉取數(shù)據(jù)秋忙。
2.1. DefaultMQPushConsumerImpl.pullMessage(PullRequest)
?該方法用于完成從MessageQueue拉取消息的過程剂癌,主要過程如下:
判斷該MessageQueue對應的PullRequest是否已經(jīng)標記為drop,如果是則直接返回
-
進行一系列的檢查翰绊,如果檢查不通過佩谷,則等待一定時間后再放回PullMessageService的待處理隊列中旁壮,主要是通過PullMessageService中的ScheduledExecutorService來做到延遲執(zhí)行,涉及的情況包括:
- 如果客戶端未準備就緒(DefaultMQPushCOnsumerImpl執(zhí)行start后status為RUNNING)谐檀,則延遲PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再放回PullMessage的隊列中
- 如果是暫停狀態(tài)抡谐,則延遲PULL_TIME_DELAY_MILLS_WHEN_SUSPEND(1000)后再放回PullMessageService的等待隊列中
- 如果緩存的消息數(shù)大于配置的拉取線程數(shù)閾值(默認1000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
- 如果緩存的消息大小大于配置的拉取大小閾值(默認100M)桐猬,則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
- 緩存的數(shù)據(jù)offset相差的偏移量超過設定值(默認2000)麦撵,則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
- 如果沒有訂閱MessageQueue對應的topic,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再返回隊列中處理
包裝PullCallback對象溃肪,并調(diào)用PullAPIWrapper發(fā)起異步請求拉取消息
上面通過PullAPIWrapper收到結果后會將結果包裝為PullResult對象并回調(diào)PullCallback免胃。PullCallback和PullResult的定義如下:
public interface PullCallback {
void onSuccess(final PullResult pullResult);
void onException(final Throwable e);
}
public class PullResult {
private final PullStatus pullStatus;//請求狀態(tài)
private final long nextBeginOffset;//Broker返回的下一次開始消費的offset
private final long minOffset;
private final long maxOffset;
private List<MessageExt> msgFoundList;//消息列表,一次請求返回一批消息
}
下面為pullMessage方法處理異步返回結果的流程:
- 如果請求失敗惫撰,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再放回PullMessageService的待處理隊列中羔沙;處理成功則進入2.
- 調(diào)用PullAPIWrapper對結果進行預處理
- 根據(jù)請求狀態(tài)進行處理
- 有新消息(FOUND)
- 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
- 如果結果列表為空則不延遲,立馬放到PullMessageService的待處理隊列中厨钻,否則進入3
- 將PullResult中的結果List<MessageExt>放入ProcessQueue的緩存中扼雏,并通知ConsumeMessageService處理
- 將該PullRequest放回待處理隊列中等待再次處理,如果有設置拉取的間隔時間夯膀,則等待該時間后再翻到隊列中等待處理诗充,否則直接放到隊列中等待處理
- 沒有新消息(NO_NEW_MSG)
- 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
- 如果緩存的待消費消息數(shù)為0,則更新offset存儲
- 將PullRequest立馬放到PullMessageService的待處理隊列中
- 沒有匹配的消息(NO_MATCHED_MSG)
- 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
- 如果緩存的待消費消息數(shù)為0诱建,則更新offset存儲
- 將PullRequest立馬放到PullMessageService的待處理隊列中
- 不合法的偏移量(OFFSET_ILLEGAL)
- 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
- 標記該PullRequset為drop
- 10s后再更新并持久化消費offset蝴蜓;再通知Rebalance移除該MessageQueue
- 有新消息(FOUND)
?下面先介紹下ProcessQueue,這里只標識幾個相關的屬性:
public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//緩存的待消費消息,按照消息的起始offset排序
private final TreeMap</*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
//緩存的待消費消息數(shù)量
private final AtomicLong msgCount = new AtomicLong();
//緩存的待消費消息大小
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
//最近執(zhí)行pull的時間
private volatile long lastPullTimestamp = System.currentTimeMillis();
//最近被客戶端消費的時間
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
//當前是否在消費俺猿,用于順序消費模式励翼,對并行消費無效
private volatile boolean consuming = false;
private volatile long msgAccCnt = 0;
}
ProcessQueue展示了MessageQueue的消費情況。上面提到辜荠,發(fā)起pull請求后如果有數(shù)據(jù)汽抚,會先放到ProcessQueue的緩存中,即msgTreeMap屬性伯病,因而緩存的消息會按照消息的起始offset被排序存儲造烁。通過ProcessQueue可以查看MessageQueue當前的處理情況,ProcessQueue還用于輔助實現(xiàn)順序消費午笛。
2.2 ConsumeMessageService
?異步返回的消息內(nèi)容將交給ConsumeMessageService處理惭蟋,ConsumeMessageService是個接口,方法定義如下:
public interface ConsumeMessageService {
void start();
void shutdown();
void updateCorePoolSize(int corePoolSize);
void incCorePoolSize();
void decCorePoolSize();
int getCorePoolSize();
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}
通過定義可見药磺,要求實現(xiàn)類提供異步處理的功能告组。內(nèi)部提供的實現(xiàn)類有:
ConsumeMessageConcurrentlyService:并行消費;ConsumeMessageOrderlyService:順序消費癌佩,這里重點看ConsumeMessageConcurrentlyService木缝。異步請求后會將拉取的新消息列表交給submitConsumeRequest方法處理便锨,如下:
該方法會將傳入的消息列表分裝為一個ConsumeRequest,并提到到線程池中等待處理我碟。如果傳入的消息列表長度超過設定值(默認為1)放案,則會分多個批處理。
?在介紹消費具體過程之前先回顧客戶端啟動流程的Demo矫俺,接收消息的寫法如下:
public class Consumer {
public static void main (String[] args) throws InterruptedException, MQClientException {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");
// 設置NameServer的地址
consumer.setNamesrvAddr ("localhost:9876");
// 訂閱一個或者多個Topic吱殉,以及Tag來過濾需要消費的消息
consumer.subscribe ("TopicTest", "*");
// 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息
consumer.registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
// 標記該消息已經(jīng)被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start ();
System.out.printf ("Consumer Started.%n");
}
}
其中注冊了一個MessageListenerConcurrently,該類將用于用戶端處理消息厘托。
?回過來看ConsumeRequest友雳,該類實現(xiàn)了Runnable接口,會在run方法完成主要的處理工作铅匹,主要動作為:
- 調(diào)用DefaultMQPushConsumerImpl.executeHookBefore執(zhí)行前置hook動作
- 調(diào)用MessageListenerConcurrently.consumeMessage通知用戶端處理消息押赊,即上面demo內(nèi)容,會返回處理結果ConsumeConcurrentlyStatus
- 調(diào)用DefaultMQPushConsumerImpl.executeHookAfter執(zhí)行后置hook動作
- ConsumeMessageConcurrentlyService.processConsumeResult根據(jù)ConsumeConcurrentlyStatus執(zhí)行收尾動作
2.2.1. MessageListenerConcurrently.consumeMessage
?用戶真正接收消息并執(zhí)行處理動作的地方伊群,需要返回ConsumeConcurrentlyStatus告知框架處理結果。這里在方法里最好不要做耗時長的任務策精,快速處理后返回給框架結果舰始,避免消息堆積在線程池中⊙释啵可以將消息內(nèi)容復制一遍后再放到線程池中進行分發(fā)處理丸卷。
2.2.2. ConsumeMessageConcurrentlyService.processConsumeResult
?該方法主要在用戶消費完數(shù)據(jù)后進行收尾動作,過程如下:
ConsumerRequest在run方法的開始處询刹,實例化了一個ConsumeConcurrentlyContext對象谜嫉,用于后續(xù)傳遞內(nèi)容,該定義為:
public class ConsumeConcurrentlyContext {
private final MessageQueue messageQueue;
//重試的延遲級別,-1:不重試;0:由broker控制;>0由客戶端控制
private int delayLevelWhenNextConsume = 0;
//消息列表最后一個正常消費的消息索引號
private int ackIndex = Integer.MAX_VALUE;
}
其中ackIndex表示最后一個正常消費的消息索引號(0從開始,0~ackIndex為正常消費)凹联,該位置后的消息表示沒法正常消費沐兰。該值由用戶端控制,可以通過ackIndex來控制需要重發(fā)的消息。
?ackIndex默認值為Integer.MAX_VALUE蔽挠,如果為該值則認為所有消息正常消費住闯,不存在錯誤。上面流程中統(tǒng)計成功和失敗也是根據(jù)ackIndex來判斷的澳淑,對于ackIndex后的消息比原,如果是集群消費模式,則會先嘗試發(fā)送回broker杠巡,由broker控制重試時機量窘;如果重試失敗,會收集這些失敗的消息氢拥,延遲5秒后再調(diào)用一次ConsumeMessageService.submitConsumeRequest讓用戶端再次處理蚌铜。最后會將處理成功的消息從ProcessQueue中移除锨侯,更新緩存,然后將q消費的偏移量記錄下來厘线,等待后臺線程同步到broker或者本地识腿。
?綜合上面的介紹,Push模式下的處理流程大致如下:
Push模式通過PullMessageService循環(huán)從監(jiān)聽的MessageQueue中以Pull模式拉取消息造壮,并分發(fā)給用戶注冊的MesageListenerConsurrently對象處理渡讼,完了之后會自動處理消息的重試,offset更新等動作耳璧,從而模擬消息從Broker端主動推動過來成箫。
2. Pull模式
?同Push模式一樣,Pull模式的觸發(fā)也是通過Rebalance旨枯,如下:
同開頭提及的一樣蹬昌,會回調(diào)DefaultMQPullConsumerImpl的MessageQueueListener有Queue發(fā)生改變。
?系統(tǒng)提供了MQPullConsumerScheduleService攀隔,可以定時以Pull模式拉取消息皂贩,并將結果通知MessageQueueListener,內(nèi)部的實現(xiàn)為:
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {//mqAll該topic下的所有q昆汹,mqDivided該實例分配到的q
MessageModel messageModel =
MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知該topic下的監(jiān)聽器明刷,最新的所有q
break;
case CLUSTERING:
MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知該topic下的監(jiān)聽器,該實例分配的q
break;
default:
break;
}
}
}
putTask會將分配到的新的MessageQueue包裝為一個PullTaskImpl满粗,PullTaskImpl實現(xiàn)了Runnable,會在后臺一直執(zhí)行辈末;而將不屬于自己處理的MessageQueue對應的PullTaskImpl停掉。PullTaskImpl會查找該MessageQueue所監(jiān)聽topic對應的處理類PullTaskCallback映皆,調(diào)用doPullTask挤聘,將具體動作讓用戶處理。
?MQPullConsumerScheduleService的例子為:
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//注冊topic的監(jiān)聽器
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上報消費的offset捅彻,消費完后要主動上報
context.setPullNextDelayTimeMillis(100);//設置下一次觸發(fā)間隔
} catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
?也可以自己手動執(zhí)行pull组去,如下面的例子:
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
?相較于Push模式,Pull模式則需要用戶自己控制消息的重試步淹,offset更新等動作添怔。下面附上該部分當時源碼閱讀過程做的筆記簡圖:
更多原創(chuàng)內(nèi)容請搜索微信公眾號:啊駝(doubaotaizi)