1、Transaction example
什么是Transaction 消息房午?
它可以被認(rèn)為是兩階段提交消息實(shí)現(xiàn)矿辽,以確保分布式系統(tǒng)中的最終一致性。事務(wù)性消息確惫幔可以原子方式執(zhí)行本地事務(wù)的執(zhí)行和消息的發(fā)送袋倔。
使用限制
(1)Transaction 消息沒(méi)有scheduler和batch支持。
(2)為了避免多次檢查單個(gè)消息并導(dǎo)致半隊(duì)列消息累積折柠,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為15次宾娜,但用戶(hù)可以通過(guò)更改“transactionCheckMax”來(lái)更改此限制“代理配置中的參數(shù),如果已經(jīng)通過(guò)”transactionCheckMax“檢查了一條消息扇售,則代理將默認(rèn)丟棄此消息并同時(shí)打印錯(cuò)誤日志前塔。用戶(hù)可以通過(guò)覆蓋“AbstractTransactionCheckListener”類(lèi)來(lái)更改此行為。
(3)在broker的配置中由參數(shù)“transactionTimeout”確定的一段時(shí)間之后將檢查T(mén)ransaction 消息承冰。用戶(hù)也可以通過(guò)在發(fā)送事務(wù)消息時(shí)設(shè)置用戶(hù)屬性“CHECK_IMMUNITY_TIME_IN_SECONDS”來(lái)更改此限制华弓,此參數(shù)優(yōu)先于“transactionMsgTimeout”參數(shù)。
(4)可以多次檢查或消費(fèi)Transaction 消息困乒。
(5)對(duì)用戶(hù)的目標(biāo)主題的已提交消息可能會(huì)失敗寂屏。目前,它取決于日志記錄顶燕。RocketMQ本身的高可用性機(jī)制確保了高可用性凑保。如果要確保Transaction 消息不會(huì)丟失并且保證事務(wù)完整性,建議使用同步雙寫(xiě)涌攻。機(jī)制欧引。
(6)事務(wù)消息的producer ID不能與其他類(lèi)型消息的producer ID共享。與其他類(lèi)型的消息不同恳谎,Transaction 消息允許后向查詢(xún)芝此。MQ Server按其producer ID查詢(xún)客戶(hù)端憋肖。
Application
Transactional 狀態(tài)
事務(wù)性消息有三種狀態(tài):
(1)TransactionStatus.CommitTransaction:提交事務(wù),這意味著允許消費(fèi)者使用此消息婚苹。
(2)TransactionStatus.RollbackTransaction:回滾事務(wù)岸更,表示該消息將被刪除而不允許使用。
(3)TransactionStatus.Unknown:中間狀態(tài)膊升,表示需要MQ檢查以確定狀態(tài)怎炊。
發(fā)送transactional 消息
(1)創(chuàng)建transactional producer
使用TransactionMQProducer類(lèi)創(chuàng)建producer client,并指定唯一的producerGroup廓译,并且可以設(shè)置自定義線程池來(lái)處理檢查請(qǐng)求评肆。執(zhí)行本地事務(wù)后,需要根據(jù)執(zhí)行結(jié)果回復(fù)MQ非区,并返回Transactional 狀態(tài)瓜挽。
package example6;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("192.168.247.132:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
(2)實(shí)現(xiàn)TransactionListener接口
“executeLocalTransaction”方法用于在發(fā)送半消息成功時(shí)執(zhí)行本地事務(wù)。它返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一征绸。
“checkLocalTransaction”方法用于檢查本地事務(wù)狀態(tài)并響應(yīng)MQ檢查請(qǐng)求久橙。它還返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap();
//這個(gè)方法會(huì)在每一條消息發(fā)出去后 執(zhí)行 保證事務(wù)的一致管怠。
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
//每隔一段時(shí)間 rocketMQ 會(huì)回調(diào) 這個(gè)方法 判斷 每一條消息是否提交淆衷。防止 消息狀態(tài)停滯 或者出現(xiàn)超時(shí)的情況
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
2、OpenMessaging示例
OpenMessaging排惨,其中包括建立行業(yè)指南和消息傳遞吭敢,流媒體規(guī)范,為財(cái)務(wù)暮芭,電子商務(wù),物聯(lián)網(wǎng)和大數(shù)據(jù)領(lǐng)域提供通用框架欲低。設(shè)計(jì)原則是面向云辕宏,簡(jiǎn)單,靈活和獨(dú)立于語(yǔ)言的分布式異構(gòu)環(huán)境砾莱。符合這些規(guī)范將有可能在所有主要平臺(tái)和操作系統(tǒng)上開(kāi)發(fā)異構(gòu)消息傳遞應(yīng)用程序瑞筐。
RocketMQ提供了OpenMessaging 0.1.0-alpha的部分實(shí)現(xiàn),以下示例演示了如何基于OpenMessaging訪問(wèn)RocketMQ腊瑟。
OMSProducer
以下示例說(shuō)明如何在同步聚假,異步或單向傳輸中向RocketMQ代理發(fā)送消息。
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
producer.shutdown();
messagingAccessPoint.shutdown();
}
}
OMSPullConsumer
使用OMS PullConsumer輪詢(xún)來(lái)自指定隊(duì)列的消息闰非。
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
consumer.startup();
System.out.printf("Consumer startup OK%n");
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}
OMSPushConsumer
將OMS PushConsumer附加到指定的隊(duì)列并使用MessageListener消耗消息
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
}
}