六花椭、RocketMQ案例(三)

1、Transaction example

什么是Transaction 消息房午?

它可以被認(rèn)為是兩階段提交消息實(shí)現(xiàn)矿辽,以確保分布式系統(tǒng)中的最終一致性。事務(wù)性消息確惫幔可以原子方式執(zhí)行本地事務(wù)的執(zhí)行和消息的發(fā)送袋倔。

使用限制

(1)Transaction 消息沒(méi)有scheduler和batch支持。
(2)為了避免多次檢查單個(gè)消息并導(dǎo)致半隊(duì)列消息累積折柠,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為15次宾娜,但用戶(hù)可以通過(guò)更改“transactionCheckMax”來(lái)更改此限制“代理配置中的參數(shù),如果已經(jīng)通過(guò)”transactionCheckMax“檢查了一條消息扇售,則代理將默認(rèn)丟棄此消息并同時(shí)打印錯(cuò)誤日志前塔。用戶(hù)可以通過(guò)覆蓋“AbstractTransactionCheckListener”類(lèi)來(lái)更改此行為。
(3)在broker的配置中由參數(shù)“transactionTimeout”確定的一段時(shí)間之后將檢查T(mén)ransaction 消息承冰。用戶(hù)也可以通過(guò)在發(fā)送事務(wù)消息時(shí)設(shè)置用戶(hù)屬性“CHECK_IMMUNITY_TIME_IN_SECONDS”來(lái)更改此限制华弓,此參數(shù)優(yōu)先于“transactionMsgTimeout”參數(shù)。
(4)可以多次檢查或消費(fèi)Transaction 消息困乒。
(5)對(duì)用戶(hù)的目標(biāo)主題的已提交消息可能會(huì)失敗寂屏。目前,它取決于日志記錄顶燕。RocketMQ本身的高可用性機(jī)制確保了高可用性凑保。如果要確保Transaction 消息不會(huì)丟失并且保證事務(wù)完整性,建議使用同步雙寫(xiě)涌攻。機(jī)制欧引。
(6)事務(wù)消息的producer ID不能與其他類(lèi)型消息的producer ID共享。與其他類(lèi)型的消息不同恳谎,Transaction 消息允許后向查詢(xún)芝此。MQ Server按其producer ID查詢(xún)客戶(hù)端憋肖。

Application

Transactional 狀態(tài)

事務(wù)性消息有三種狀態(tài):
(1)TransactionStatus.CommitTransaction:提交事務(wù),這意味著允許消費(fèi)者使用此消息婚苹。
(2)TransactionStatus.RollbackTransaction:回滾事務(wù)岸更,表示該消息將被刪除而不允許使用。
(3)TransactionStatus.Unknown:中間狀態(tài)膊升,表示需要MQ檢查以確定狀態(tài)怎炊。

發(fā)送transactional 消息

(1)創(chuàng)建transactional producer
使用TransactionMQProducer類(lèi)創(chuàng)建producer client,并指定唯一的producerGroup廓译,并且可以設(shè)置自定義線程池來(lái)處理檢查請(qǐng)求评肆。執(zhí)行本地事務(wù)后,需要根據(jù)執(zhí)行結(jié)果回復(fù)MQ非区,并返回Transactional 狀態(tài)瓜挽。

package example6;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

public class TransactionProducer {


    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });


        producer.setNamesrvAddr("192.168.247.132:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();


    }
}

(2)實(shí)現(xiàn)TransactionListener接口
“executeLocalTransaction”方法用于在發(fā)送半消息成功時(shí)執(zhí)行本地事務(wù)。它返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一征绸。
“checkLocalTransaction”方法用于檢查本地事務(wù)狀態(tài)并響應(yīng)MQ檢查請(qǐng)求久橙。它還返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一。


public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap();

  //這個(gè)方法會(huì)在每一條消息發(fā)出去后 執(zhí)行 保證事務(wù)的一致管怠。
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

  //每隔一段時(shí)間  rocketMQ 會(huì)回調(diào) 這個(gè)方法 判斷 每一條消息是否提交淆衷。防止 消息狀態(tài)停滯 或者出現(xiàn)超時(shí)的情況
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

2、OpenMessaging示例

OpenMessaging排惨,其中包括建立行業(yè)指南和消息傳遞吭敢,流媒體規(guī)范,為財(cái)務(wù)暮芭,電子商務(wù),物聯(lián)網(wǎng)和大數(shù)據(jù)領(lǐng)域提供通用框架欲低。設(shè)計(jì)原則是面向云辕宏,簡(jiǎn)單,靈活和獨(dú)立于語(yǔ)言的分布式異構(gòu)環(huán)境砾莱。符合這些規(guī)范將有可能在所有主要平臺(tái)和操作系統(tǒng)上開(kāi)發(fā)異構(gòu)消息傳遞應(yīng)用程序瑞筐。

RocketMQ提供了OpenMessaging 0.1.0-alpha的部分實(shí)現(xiàn),以下示例演示了如何基于OpenMessaging訪問(wèn)RocketMQ腊瑟。

OMSProducer

以下示例說(shuō)明如何在同步聚假,異步或單向傳輸中向RocketMQ代理發(fā)送消息。

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPullConsumer

使用OMS PullConsumer輪詢(xún)來(lái)自指定隊(duì)列的消息闰非。

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPushConsumer

將OMS PushConsumer附加到指定的隊(duì)列并使用MessageListener消耗消息

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });

    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末膘格,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子财松,更是在濱河造成了極大的恐慌瘪贱,老刑警劉巖纱控,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異菜秦,居然都是意外死亡甜害,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)球昨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)尔店,“玉大人,你說(shuō)我怎么就攤上這事主慰∧只瘢” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵河哑,是天一觀的道長(zhǎng)避诽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)璃谨,這世上最難降的妖魔是什么沙庐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮佳吞,結(jié)果婚禮上拱雏,老公的妹妹穿的比我還像新娘。我一直安慰自己底扳,他們只是感情好铸抑,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著衷模,像睡著了一般鹊汛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阱冶,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天刁憋,我揣著相機(jī)與錄音,去河邊找鬼木蹬。 笑死至耻,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的镊叁。 我是一名探鬼主播尘颓,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼晦譬!你這毒婦竟也來(lái)了疤苹?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蛔添,失蹤者是張志新(化名)和其女友劉穎痰催,沒(méi)想到半個(gè)月后兜辞,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡夸溶,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年逸吵,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缝裁。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扫皱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捷绑,到底是詐尸還是另有隱情韩脑,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布粹污,位于F島的核電站段多,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏壮吩。R本人自食惡果不足惜进苍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸭叙。 院中可真熱鬧觉啊,春花似錦、人聲如沸沈贝。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)宋下。三九已至嗡善,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間杨凑,已是汗流浹背滤奈。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留撩满,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓绅你,卻偏偏與公主長(zhǎng)得像伺帘,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子忌锯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容