一番舆、概述
RocketMQ的producer默認(rèn)有兩個,一個是DefaultMQProducer矾踱,另一個是TransactionMQProducer恨狈,本文只對send方法做一個總結(jié),其他的細節(jié)在其他章節(jié)介紹
二呛讲、DefaultMQProducer
一共定義了17種send方法禾怠,從4.x版本,事務(wù)消息被放到了TransactionMQProducer中贝搁,所以有15個send方法吗氏,這15個方法中,又有兩個異步帶超時時間的send方法被廢棄了雷逆,所以有效的send方法有13個:
1弦讽、同步發(fā)送
/**
* 同步發(fā)送模式. 只有消息被成功接收并且被固化完成后才會收到反饋。
* 內(nèi)置有重發(fā)機制, producer將會重試
* {@link #retryTimesWhenSendFailed,default=2} 次 膀哲,然后才會報錯.
* 因此往产,有一定的概率向broker發(fā)送重復(fù)的消息
* 使用者有責(zé)任去解決潛在的重復(fù)數(shù)據(jù)造成的影響
* @param msg 待發(fā)送數(shù)據(jù)
* @return {@link SendResult} 實體,來通知發(fā)送者發(fā)送狀態(tài)等信息, 比如消息的ID
* {@link SendStatus} 指明 broker 存儲/復(fù)制 的狀態(tài), 發(fā)送到了哪個隊列等等
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)連接異常
* @throws MQBrokerException broker異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
2某宪、同步發(fā)送仿村,帶超時時間
/**
* 與 {@link #send(Message)} 相同,只不過多了超時時間的指定.
*
* @param msg 待發(fā)送消息
* @param timeout 發(fā)送超時時間
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)連接異常
* @throws MQBrokerException broker異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
3兴喂、異步發(fā)送
/**
* 異步發(fā)送消息
* 消息發(fā)送后蔼囊,立即返回包颁。broker處理返程后, 觸發(fā)sendCallback回調(diào)方法
* 與上面一樣,在給出發(fā)送失敗標(biāo)志前压真,會嘗試2次娩嚼,所以開發(fā)者要處理重復(fù)發(fā)送帶來的問題
* @param msg 待發(fā)送消息
* @param sendCallback 回調(diào)函數(shù)
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
4、異步發(fā)送滴肿,帶超時時間
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
5岳悟、單向發(fā)送,不等待broker回饋
/**
* 發(fā)送方法不會等待broker的反饋泼差,只會一直發(fā)
* 所以有很高的吞吐量贵少,但是有一定概率丟失消息
*
* @param msg 待發(fā)送消息
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg);
}
6、同步發(fā)送堆缘,指定隊列
/**
* 同步發(fā)送滔灶,指定隊列
* @param msg 待發(fā)送消息
* @param mq 指定的消息隊列
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
7、同步發(fā)送吼肥,指定隊列录平,并附帶超時時間
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq, timeout);
}
8、異步發(fā)送缀皱,指定隊列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
9斗这、異步發(fā)送,指定隊列啤斗,附帶超時時間
這個在4.4.0版本被設(shè)置為廢棄表箭,后續(xù)版本會給出
/**
* 因為在處超時異常存在問題,所以廢棄
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
}
10钮莲、單向發(fā)送免钻,指定隊列
@Override
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
}
11、同步發(fā)送崔拥,指定隊列選擇策略
官方的有序消息的DEMO就是基于隊列選擇器做的极舔,讓一些列有序的消息(相同ID)發(fā)送到同一個隊列
/**
* 指定隊列選擇策略MessageQueueSelector
*
* @param msg 待發(fā)送消息
* @param selector 隊列選擇器
* @param arg 配合隊列選擇器選擇隊列的參數(shù),一般可以是業(yè)務(wù)參數(shù)(ID等)
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
12握童、同步發(fā)送消息姆怪,指定隊列選擇策略,并附帶超時時間
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
}
13澡绩、異步發(fā)送消息,指定隊列選擇策略
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
}
14俺附、異步發(fā)送消息肥卡,指定隊列選擇策略,并附帶超時時間
這個方法在4.4.0版本廢棄事镣,后續(xù)提供
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
15步鉴、單向發(fā)送,指定隊列選擇策略
@Override
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
三、TransactionMQProducer
1氛琢、發(fā)送事務(wù)消息
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}