Apache Pulsar[5] API demo

生產(chǎn)者

public class PulsarProducer {
    private static String localClusterUrl = "pulsar://localhost:6650";
    public static void main(String[] args) {
        try {
            Producer<byte[]> producer = getProducer();
            String msg = "test send";

            Long start = System.currentTimeMillis();
            MessageId msgId = producer.send(msg.getBytes());
            System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
        } catch (Exception e) {
            System.err.println(e);
        }
    }

    public static Producer<byte[]> getProducer() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://my-tenant/my-namespace/my-topic")
                .producerName("test-producer")
                .create();
        return producer;
    }
}

消費者

public class PulsarConsumerDemo {
    private static String localClusterUrl = "pulsar://localhost:6650";

    public static void main(String[] args) {
        try {
            //將訂閱消費者指定的主題和訂閱
            Consumer<byte[]> consumer = getClient().newConsumer()
                    .topic("persistent://my-tenant/my-namespace/my-topic")
                    .subscriptionName("my-subscription")
                    .subscriptionType(SubscriptionType.Failover)
                    .subscribe();
            while (true) {
                Message msg = consumer.receive();
                System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
                // 確認(rèn)消息代乃,以便broker刪除消息
                consumer.acknowledge(msg);
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static PulsarClient getClient() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder()
                .serviceUrl(localClusterUrl).build();
        return client;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末碍脏,一起剝皮案震驚了整個濱河市遣臼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蚀瘸,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異令野,居然都是意外死亡,警方通過查閱死者的電腦和手機徽级,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門气破,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人餐抢,你說我怎么就攤上這事现使。” “怎么了旷痕?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵碳锈,是天一觀的道長。 經(jīng)常有香客問我欺抗,道長售碳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任佩迟,我火速辦了婚禮团滥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘报强。我一直安慰自己灸姊,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布秉溉。 她就那樣靜靜地躺著力惯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪召嘶。 梳的紋絲不亂的頭發(fā)上父晶,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天,我揣著相機與錄音弄跌,去河邊找鬼甲喝。 笑死,一個胖子當(dāng)著我的面吹牛铛只,可吹牛的內(nèi)容都是我干的埠胖。 我是一名探鬼主播糠溜,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼直撤!你這毒婦竟也來了非竿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤谋竖,失蹤者是張志新(化名)和其女友劉穎红柱,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蓖乘,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡锤悄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了嘉抒。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铁蹈。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖众眨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情容诬,我是刑警寧澤娩梨,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站览徒,受9級特大地震影響狈定,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜习蓬,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一纽什、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧躲叼,春花似錦芦缰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至或听,卻和暖如春探孝,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背誉裆。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工顿颅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人足丢。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓粱腻,卻偏偏與公主長得像庇配,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子栖疑,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容