1.消費(fèi)方式和消費(fèi)者組
1.消費(fèi)方式: 拉取和推送兩種(事實(shí)上所有從遠(yuǎn)程獲取數(shù)據(jù)都是這兩種方式).
2.消費(fèi)者組與消費(fèi)模式
多個(gè)消費(fèi)者組成一個(gè)消費(fèi)組, 兩種模式: 集群(消息被其中任何一個(gè)消息者消費(fèi)), 廣播模式(全部消費(fèi)者消費(fèi)).
2.Consumer消費(fèi)消息的基本流程
RocketMQ 分別使用 DefaultMQPullConsumer 和 DefaultMQPushConsumer 實(shí)現(xiàn)了拉取和推送兩種方式. 下面主要以DefaultMQPullConsumer為例進(jìn)行分析.
先看源碼中給出的Demo:
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //@1
consumer.start(); //@2
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("TopicTest3");
mq.setBrokerName("vivedeMacBook-Pro.local");
long offset = 26;
long beginTime = System.currentTimeMillis();
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); //@3
System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
System.out.printf("%s%n", pullResult);
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
首先在@1處構(gòu)建Consumer并且制定其所屬的消費(fèi)者組. 在@2處啟動(dòng)Consumer, 并且在@3處拉取消息.
Consumer啟動(dòng)
事實(shí)上DefaultMQPullConsumer將所有操作都委托給DefaultMQPullConsumerImpl, 下面看DefaultMQPullConsumerImpl#start.
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); //@1
this.copySubscription(); //@2
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID(); //@3
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); //@4
//@5
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(//
mQClientFactory, //
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
//@6
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//@7
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
}
this.offsetStore.load();
//@8
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//@9
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
//@10
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "http://
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
首先判斷當(dāng)前consumer的狀態(tài), 除了CREATE_JUST之外, 全部是非法狀態(tài).(這個(gè)容易理解, 因?yàn)闀r(shí)剛剛啟動(dòng), 不應(yīng)該處于其他狀態(tài)).
狀態(tài)合法后, 大體過(guò)程如下:
- @1校驗(yàn)各配置項(xiàng)是否合法(consumerGroup, allocateMessageQueueStrategy, messageModel)
- @2將當(dāng)前defaultMQPullConsumer中的訂閱關(guān)系復(fù)制到當(dāng)前rebalanceImpl(負(fù)載均衡器, 主要負(fù)責(zé)決定, 當(dāng)前的consumer應(yīng)該從哪些Queue中消費(fèi)消息)中.
- @3如果是集群模式,則將當(dāng)前defaultMQPullConsumer實(shí)例名改為線程ID.
- @4實(shí)例化MQClientInstance(這個(gè)類是一個(gè)大雜燴,負(fù)責(zé)管理client(consumer, producer), 并提供多中功能接口供各個(gè)Service(Rebalance, PullMessage等)調(diào)用)
- @5初始化rebalance變量
- @6初始化pullAPIWrapper(長(zhǎng)連接, 負(fù)責(zé)從broker處拉取消息, 然后利用ConsumeMessageService回調(diào)用戶的Listener執(zhí)行消息消費(fèi)邏輯)
- @7構(gòu)建offsetStore消費(fèi)進(jìn)度存儲(chǔ)對(duì)象(有兩種實(shí)現(xiàn), Local和Rmote, Local存儲(chǔ)在本地磁盤上, 適用于BROADCASTING廣播消費(fèi)模式; 而Remote則將消費(fèi)進(jìn)度存儲(chǔ)在Broker上, 適用于CLUSTERING集群消費(fèi)模式).
- @8向mqClientFactory注冊(cè)本消費(fèi)者
- @9啟動(dòng)mqClientFactory(啟動(dòng)各種定時(shí)任務(wù), 如定時(shí)獲取nameserver地址, 定時(shí)清理下線的borker, 啟動(dòng)各種service, 如拉消息服務(wù), 負(fù)載均衡服務(wù))
- @10將serviceState修改為ServiceState.RUNNING
Consumer獲取消息
Consumer獲取消息使用pullBlockIfNotFound方法, 方法簽名如下:
PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
該方法一共有4個(gè)參數(shù).
- mq, 從哪個(gè)隊(duì)列中拉取消息;
- subExpression, SubscriptionData中的subString;
- offset, 消息拉取的offset;
- maxNums, 最大拉取的消息數(shù)目.
跟蹤該方法,最終會(huì)委托給DefaultMQPullConsumerImpl中的pullSyncImpl方法執(zhí)行, 代碼如下:
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); //@1
//@2
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
this.subscriptionAutomatically(mq.getTopic()); //@3
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //@4
//@5
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; //@6
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
); //@7
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //@8
//@9
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
過(guò)程概括如下:
- @1檢驗(yàn)當(dāng)前consumer客戶端是否處于RUNNING狀態(tài), 否則非法;
- @2檢查mq, offset, maxNums三個(gè)參數(shù)是否合法;
- @3構(gòu)建rebalanceImpl中的SubscriptionData;
- @4構(gòu)建sysFlag;
- @5構(gòu)建當(dāng)前consumer的SubscriptionData(這一步和@3有點(diǎn)重復(fù));
- @6從broker拉取消息時(shí)的超時(shí)時(shí)間;
- @7從broker拉取消息;
- @8對(duì)pullresult進(jìn)行處理, 這一步主要進(jìn)行兩個(gè)操作, a.更新消息隊(duì)列拉取消息Broker編號(hào)的映射, b.解析消息赦肋,并根據(jù)訂閱信息消息tagCode匹配合適消息.
- @9如果HookList不為空, 執(zhí)行HookList中的操作.