概述
生產(chǎn)者 producer 在發(fā)送消息的時(shí)候旗笔,每個(gè)消息發(fā)送到 broker 只存儲(chǔ)在某一個(gè) quene 上。那么 producer 是怎么選擇 queue 呢傲武?
下面主要通過(guò)以下5種方式進(jìn)行分析逸寓。
1、自定義 MessageQueueSelector 實(shí)現(xiàn)
2瘦黑、SelectMessageQueueByHash hash 選擇 queue。
3、 SelectMessageQueueByRandom 隨機(jī)選擇 queue幸斥。
4匹摇、 SelectMessageQueueByMachineRoom 機(jī)房選擇queue。
5甲葬、默認(rèn)發(fā)送隊(duì)列選擇實(shí)現(xiàn)
1廊勃、自定義 MessageQueueSelector 實(shí)現(xiàn)
下面這個(gè)示例是 rocketmq 官網(wǎng)上的一個(gè)示例。
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
從示例中可以看到 producer.send(msg, new MessageQueueSelector(){}, orderId)
在發(fā)送的時(shí)候 自定義了一個(gè) MessageQueueSelector演顾。
MessageQueueSelector 的 selelct(List<MessageQueue> mqs, Message msg, Object arg) 方法中有三個(gè)參數(shù)供搀。
- List<MessageQueue> mqs :topic 中的所有 queue 的集合。
- Message msg:發(fā)送的消息
- Object arg:上面示例中 send 方法的第三個(gè)參數(shù)钠至。
通過(guò)實(shí)現(xiàn) select 方法葛虐,通過(guò) arg 參數(shù)進(jìn)行取模 mqs.size() 進(jìn)行選擇隊(duì)列。
RocketMQ 已實(shí)現(xiàn)的 MessageQueueSelector
rocketmq 源碼中已經(jīng)提供了幾種 MessageQueueSelector 的實(shí)現(xiàn)棉钧。如下圖:
- SelectMessageQueueByHash:通過(guò) hash 進(jìn)行選擇 queue屿脐。
- SelectMessageQueueByRandom:隨機(jī)選擇 queue。
- SelectMessageQueueByMachineRoom:機(jī)房選擇queue宪卿。
2的诵、SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
通過(guò) arg 的 hash,通過(guò) mqs.size() 進(jìn)行取模佑钾,來(lái)選擇要存儲(chǔ)的隊(duì)列西疤。
3、SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
隨機(jī)產(chǎn)生一個(gè)小于等于 mqs.size() 的隨機(jī)正整數(shù)休溶,來(lái)選擇要存儲(chǔ)的隊(duì)列代赁。
4、SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
這個(gè)未實(shí)現(xiàn)兽掰,還是要通過(guò)自己的場(chǎng)景進(jìn)行實(shí)現(xiàn)芭碍。
5、默認(rèn)是輪詢(xún)進(jìn)行發(fā)送消息
如果直接調(diào)用 SendResult send(final Message msg)
方法孽尽,RocketMQ 是如何選擇隊(duì)列的呢窖壕?
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1、int index = tpInfo.getSendWhichQueue().getAndIncrement();
獲取 一個(gè)自增的index杉女。
2瞻讽、然后 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
進(jìn)行選擇一個(gè) queue。
通過(guò)上面的代碼可以看出熏挎,默認(rèn)是通過(guò)輪詢(xún)的方式進(jìn)行選擇發(fā)送隊(duì)列的速勇。
ThreadLocalIndex 實(shí)現(xiàn)
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
@Override
public String toString() {
return "ThreadLocalIndex{" +
"threadLocalIndex=" + threadLocalIndex.get() +
'}';
}
}
從 getAndIncrement() 方法中,可以看出婆瓜。
為每個(gè)線程分配一個(gè)隨機(jī)數(shù)快集,然后每次調(diào)用都自增 1。