PushConsumer和PullConsumer等

Push

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PushConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr("hadoop102:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

如下圖囚枪,讀到了很多歷史數(shù)據(jù):

圖片.png

即時發(fā)2條消息:

圖片.png

即時收到2條消息:

圖片.png

Pull

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PullConsumer {
    //用于存取偏移量
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
//        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setNamesrvAddr("hadoop102:9876");
        consumer.start();
        //某個主題下的消息隊列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    //拉取消息體
                    //pullBlockIfNotFound 中比較復(fù)雜的是第三個參數(shù)
                    //offset:偏移量派诬,是消息訂閱者即消費者,上次消費到的消息位點
                    //maxNums:最大獲取消息數(shù)據(jù)量
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }
}

圖片.png

DefaultLitePullConsumer

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class LitePullConsumerSubscribe {
    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr("hadoop102:9876");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}
圖片.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末链沼,一起剝皮案震驚了整個濱河市默赂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌括勺,老刑警劉巖缆八,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異疾捍,居然都是意外死亡奈辰,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門乱豆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奖恰,“玉大人,你說我怎么就攤上這事宛裕∩校” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵揩尸,是天一觀的道長翰守。 經(jīng)常有香客問我,道長疲酌,這世上最難降的妖魔是什么蜡峰? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮朗恳,結(jié)果婚禮上湿颅,老公的妹妹穿的比我還像新娘。我一直安慰自己粥诫,他們只是感情好油航,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著怀浆,像睡著了一般谊囚。 火紅的嫁衣襯著肌膚如雪怕享。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天镰踏,我揣著相機與錄音函筋,去河邊找鬼。 笑死奠伪,一個胖子當著我的面吹牛跌帐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绊率,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谨敛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了滤否?” 一聲冷哼從身側(cè)響起脸狸,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎藐俺,沒想到半個月后炊甲,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡紊搪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了全景。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耀石。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖爸黄,靈堂內(nèi)的尸體忽然破棺而出滞伟,到底是詐尸還是另有隱情,我是刑警寧澤炕贵,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布梆奈,位于F島的核電站,受9級特大地震影響称开,放射性物質(zhì)發(fā)生泄漏亩钟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一鳖轰、第九天 我趴在偏房一處隱蔽的房頂上張望清酥。 院中可真熱鬧,春花似錦蕴侣、人聲如沸焰轻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辱志。三九已至蝠筑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間揩懒,已是汗流浹背什乙。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留旭从,地道東北人稳强。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像和悦,于是被迫代替她去往敵國和親退疫。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 寫在前面 本文是對阿里巴巴analyticDB論文的研讀結(jié)果鸽素,里面加入了自己的一些理解和疑惑褒繁,有不準確的地方,請告...
    呂信閱讀 2,692評論 0 5
  • 用兩張圖告訴你馍忽,為什么你的 App 會卡頓? - Android - 掘金 Cover 有什么料棒坏? 從這篇文章中你...
    hw1212閱讀 12,724評論 2 59
  • Kafka Kafka是最初由Linkedin公司開發(fā),是一個分布式遭笋、支持分區(qū)的(partition)坝冕、多副本的(...
    redleaf閱讀 341評論 0 2
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評論 13 425
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月,有人笑有人哭瓦呼,有人歡樂有人憂愁喂窟,有人驚喜有人失落,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,536評論 28 53