MQProducer
從類關(guān)系中可以看出检眯,MQProducer 有兩種實現(xiàn)方式溯警。一個是 DefaultMQProducer帘皿,另一個是 TransactionMQProducer揭北。
- DefaultMQProducer: 我們常用的生產(chǎn)者寨辩。
- TransactionMQProducer:繼承自 DefaultMQProducer圃庭,并支持事務(wù)消息锄奢。
下面我們來分析下 DefaultMQProducer 啟動的過程。
啟動示例
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("....");
......
producer.start();
......
}catch(Exception e){}
}
}
創(chuàng)建 DefaultMQProducer 實例剧腻,然后制定一些參數(shù)拘央,調(diào)用 start() 方法就開啟了生產(chǎn)者。
DefaultMQProducer 參數(shù)分析
public class DefaultMQProducer extends ClientConfig implements MQProducer {
//producer 組名
private String producerGroup;
// Topic 名字书在,默認為“TBW102”
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
// 創(chuàng)建 Topic 默認的4個隊列
private volatile int defaultTopicQueueNums = 4;
// 發(fā)送消息超時時間
private int sendMsgTimeout = 3000;
// 當發(fā)送的消息大于 4K 時灰伟,開始壓縮消息。
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步發(fā)送消息,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
private int retryTimesWhenSendFailed = 2;
// 異步發(fā)送消息栏账,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
private int retryTimesWhenSendAsyncFailed = 2;
//發(fā)送broker消息存儲失敗時帖族,是否嘗試去試發(fā)送其他的broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//最大允許發(fā)送字節(jié)數(shù)
private int maxMessageSize = 1024 * 1024 * 4; // 4M
DefaultMQProducer 中定義的類屬性
- producerGroup: 生產(chǎn)者組名
- createTopicKey :Topic 名字,默認為“TBW102”
- defaultTopicQueueNums :創(chuàng)建 Topic 默認的4個隊列
- sendMsgTimeout :默認發(fā)送消息3秒超時
- compressMsgBodyOverHowmuch :當發(fā)送的消息大于 4K 時挡爵,開始壓縮消息竖般。
- retryTimesWhenSendFailed :同步發(fā)送消息,發(fā)送失敗時再嘗試發(fā)送2次數(shù)茶鹃。
- retryTimesWhenSendAsyncFailed :異步發(fā)送消息涣雕,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
- retryAnotherBrokerWhenNotStoreOK :發(fā)送broker消息存儲失敗時,是否嘗試去試發(fā)送其他的broker
DefaultMQProducer 還有可以設(shè)置其他的參數(shù)闭翩,這里就不說明了挣郭。
Producer 啟動
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 1. 只有 serviceState 狀態(tài)為 CREATE_JUST 時,才啟動 Producer
case CREATE_JUST:
//2. 防止啟動多個 Producer疗韵,先把 serviceState 狀態(tài)修改為 START_FAILED兑障。
this.serviceState = ServiceState.START_FAILED;
// 3. 檢查 groupName 是否合法
this.checkConfig();
//4. 判斷是否需要設(shè)置 InstanceName 。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 5. 構(gòu)建 MQClientInstance 對象伶棒。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 6. 將 DefaultMQProducerImpl 對象注冊到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 7.以主題名"TBW102"為key值旺垒,新初始化的TopicPublishInfo對象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 8. 啟動 第五步創(chuàng)建的 MQClientInstance 實例。
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 9. 設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 10. 向所有的 broker 發(fā)送心跳和上傳 FilterClass
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
- 啟動Producer的時候判斷 serviceState 的當前狀態(tài)肤无,只有 serviceState 狀態(tài)為 CREATE_JUST 時先蒋,才啟動 Producer。否則拋出異常信息宛渐。
2竞漾、同時防止啟動多個 Producer,先把 serviceState 狀態(tài)修改為 START_FAILED窥翩。
3业岁、 檢查 groupName 是否合法。比如不能為空寇蚊,是否符合正則 ^[%|a-zA-Z0-9_-]+$
笔时,并且最大長度不能超過 255(CHARACTER_MAX_LENGTH = 255
);
groupName 也不能等于 DEFAULT_PRODUCER
仗岸。只要滿足上面條件允耿,則拋異常信息。
4扒怖、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"
,然后調(diào)用 changeInstanceNameToPID() 方法判斷名字不是 "DEFAULT" 則更改 instanceName较锡。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
.....
}
5、構(gòu)建 MQClientInstance 對象盗痒。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
- 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
- 5.2 如果 factoryTable 中是不已經(jīng)存在 MQClientInstance 實例蚂蕴,則創(chuàng)建。 (下面有單獨分析該源碼)
6、將 DefaultMQProducerImpl 對象注冊到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
中
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
7骡楼、以主題名"TBW102"為key值熔号,新初始化的TopicPublishInfo對象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
8、調(diào)用 第五步創(chuàng)建的 MQClientInstance 實例 的start() 方法鸟整。
該方法做了很多事情:
- 獲取NameServer地址
- 啟動 Netty 客戶端服務(wù)
- 設(shè)置多個定時任務(wù)
- 開啟 pullMessageService 服務(wù)
- 開啟 rebalanceService 服務(wù)
- 開啟 發(fā)送消息服務(wù)
下面有具體代碼分析MQClientInstance.start() 方法跨嘉。
9、設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
10吃嘿、向所有的 broker 發(fā)送心跳和上傳 FilterClass
創(chuàng)建MQClientInstance實例(第5.2步)
上面 5.2 步驟中創(chuàng)建MQClientInstance 的代碼如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// Netty 中注冊接收請求的處理器。
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//設(shè)置 NameServer 地址梦重。
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 客戶端ID
this.clientId = clientId;
//創(chuàng)建 MQAdminImpl 對象進行和 NameServer 進行交互兑燥,比如創(chuàng)建Topic、獲取 Queue等
this.mQAdminImpl = new MQAdminImpl(this);
// 創(chuàng)建 pullMessageService 服務(wù)
this.pullMessageService = new PullMessageService(this);
// 創(chuàng)建 rebalanceService 服務(wù)
this.rebalanceService = new RebalanceService(this);
// 創(chuàng)建 DefaultMQProducer 服務(wù)
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// 開啟 Comsumer 統(tǒng)計服務(wù)
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
主要功能:
- 創(chuàng)建 MQAdminImpl 對象進行和 NameServer 進行交互琴拧,比如創(chuàng)建Topic降瞳、獲取 Queue等
- 創(chuàng)建 pullMessageService 服務(wù)
- 創(chuàng)建 rebalanceService 服務(wù),供 Consumer 端使用
- 創(chuàng)建 DefaultMQProducer 服務(wù)蚓胸,
- 開啟 Comsumer 統(tǒng)計服務(wù)挣饥。統(tǒng)計最近一段時間內(nèi),消費成功個數(shù)沛膳、消費失敗個數(shù)等信息扔枫。
啟動MQClientInstance 服務(wù) (第8步)
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 如果配置NameServer地址,則從默認服務(wù)器地址中獲取(該地址不可改變)
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 2. 啟動 Netty 客戶端服務(wù)
this.mQClientAPIImpl.start();
// 3. 設(shè)置多個定時任務(wù)
this.startScheduledTask();
// 4. 開啟 pullMessageService 服務(wù)
this.pullMessageService.start();
// 5. 開啟 rebalanceService 服務(wù)
this.rebalanceService.start();
// 6. 開啟 發(fā)送消息服務(wù)
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
主要這幾步驟操作锹安。
- 1短荐、獲取NameServer地址
如果啟動 Producer 時沒有指定 NameServer,則程序會向一個Http地址發(fā)送請求來獲取NameServer地址叹哭。通過這種方式可以動態(tài)的配置 NameServer忍宋。從而達到動態(tài)增加和刪除NameServer服務(wù)。
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
- 2风罩、啟動 Netty 客戶端服務(wù)
- 3糠排、調(diào)用startScheduledTask() 方法設(shè)置多個定時任務(wù)
- 4、開啟 pullMessageService 服務(wù)
- 5超升、開啟 rebalanceService 服務(wù)
- 6入宦、開啟 發(fā)送消息服務(wù)
startScheduledTask() 方法:定時任務(wù)
private void startScheduledTask() {
// 1.如果 NameServer 地址默認沒配置,則定時向一個Http地址獲取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 2. 定時的從 NameServer 中獲取 Topic廓俭、broker云石、queue 相關(guān)信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 3. 定時清理無效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 4. 定時的持久化 Consumer 端消費每個 queue的 offset 數(shù)據(jù)研乒。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 5. 調(diào)整消費端的線程數(shù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
1汹忠、定時更新 NameServer 地址
每個2分鐘,程序會向一個Http地址發(fā)送請求來獲取NameServer地址來動態(tài)更新NameServer地址。2宽菜、 定時的從 NameServer 中獲取 Topic谣膳、broker、queue 相關(guān)信息
默認每隔 30秒去 NameServer 中獲取Topic铅乡、broker继谚、queue等相關(guān)信息。
如果有新broker注冊或下線阵幸,producer端會在30秒之內(nèi)感知花履。3、定時清理無效的Broker挚赊,并向所有的Broker 發(fā)送心跳數(shù)據(jù).
默認每隔 30 秒向 Broker 發(fā)送心跳數(shù)據(jù) 和 用戶自定義的 filterclass 類诡壁。4、定時的持久化 Consumer 端消費每個 queue的 offset 數(shù)據(jù)荠割。
默認每隔 5 秒持久或 Consumer 消費的 queue 的 offset信息妹卿。
持久化分為,遠程持久化和本地持久化蔑鹦。
MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上夺克。
BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。-
5嚎朽、調(diào)整消費端的線程數(shù)
每隔 1 分鐘計算每一個queue中消息擠壓的數(shù)量铺纽,如果超過100000條,則增加消費線程的并發(fā)數(shù)火鼻,如果小于80000條則減少消費者的線程數(shù)室囊。
不過進入源碼中看,調(diào)整消費者的線程數(shù)都注釋掉了魁索。