【Azure 事件中心】azure-spring-cloud-stream-binder-eventhubs客戶端組件問(wèn)題, 實(shí)踐消息非順序可達(dá)

2021053101.png

問(wèn)題描述

查閱了Azure的官方文檔( 將事件發(fā)送到特定分區(qū): https://docs.azure.cn/zh-cn/event-hubs/event-hubs-availability-and-consistency?tabs=java#send-events-to-a-specific-partition)算吩,在工程里引用組件“azure-spring-cloud-stream-binder-eventhubs”來(lái)連接EventHub發(fā)送和消費(fèi)消息事件枉证。在發(fā)送端一個(gè)For循環(huán)中發(fā)送帶順序號(hào)的消息癌椿,編號(hào)從0開(kāi)始,并且在消息的header中指定了 "Partition Key"忠售,相同PartitionKey的消息會(huì)被發(fā)送到相同的Partition,來(lái)保證這些消息的順序。

但是在消費(fèi)端的工程中消費(fèi)這些消息時(shí),看到打印到日志中的結(jié)果并不是從0遞增的信夫。所以想知道是發(fā)送端在發(fā)送時(shí)就已經(jīng)亂序發(fā)送了?還是消息到達(dá)EventHub后亂序保存了?還是消費(fèi)端的消費(fèi)方式的問(wèn)題静稻,導(dǎo)致打印出的結(jié)果是亂序的警没?

下面是發(fā)送端的代碼:

public void testPushMessages(int mcount, String partitionKey) {
String message = "Message ";
for (int i=0; i <mcount; i++) {
source.output().send(MessageBuilder.withPayload(partitionKey + mcount + i).setHeaderIfAbsent(AzureHeaders.PARTITION_KEY,partitionKey).build());
    }
}

下面是消費(fèi)端代碼:

@StreamListener(Sink.INPUT)
public void onEvent(String message, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer,
                    @Header(AzureHeaders.RAW_PARTITION_ID) String rawPartitionId,
                    @Header(AzureHeaders.PARTITION_KEY) String partitionKey) {
    checkpointer.success()
            .doOnSuccess(s -> log.info("Message '{}' successfully check pointed.rawPartitionId={},partitionKey={}", message, rawPartitionId, partitionKey))
            .doOnError(s -> log.error("Checkpoint message got exception."))
            .subscribe();

下面是打印的日志

......,"data":"Message 'testKey4testMessage1' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage29' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage27' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage26' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage25' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage28' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage14' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage13' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage15' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage5' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage7' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage20' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage19' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage18' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage0' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage9' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage12' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}
......,"data":"Message 'testKey5testMessage8' successfully check pointed.rawPartitionId=1,partition<*****>","xcptn":""}

從日志中可以看到,消息確實(shí)都被發(fā)送到了同一個(gè)分區(qū)(rawPartitionId=1)振湾,但是從消息體的序號(hào)上看惠奸,是亂序的

問(wèn)題分析

這個(gè)是和這個(gè)配置相關(guān)的fixedDelay,指定默認(rèn)輪詢器的固定延遲恰梢,是一個(gè)周期性觸發(fā)器佛南,之前代碼會(huì)根據(jù)這個(gè)輪詢器進(jìn)行發(fā)送和接受消息的。使用Send發(fā)送的方法嵌言,現(xiàn)在最新的SDK 不使用這個(gè)方法嗅回,所以需要使用新的sdk 發(fā)送數(shù)據(jù)測(cè)試一下。

新sdk 參考文檔您可以參考一下:https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder

SDK版本為

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>azure-spring-cloud-stream-binder-eventhubs</artifactId>
    <version>2.4.0</version>
</dependency>

在參考官網(wǎng)的示例后摧茴,使用Supplier方法發(fā)送消息绵载,代替Send。經(jīng)過(guò)多次測(cè)試苛白,指定partitionkey 之后娃豹,發(fā)送消息是順序發(fā)送的,消費(fèi)的時(shí)候也是按照順序消費(fèi)的购裙,下面是測(cè)試的代碼和結(jié)果

發(fā)送端的代碼

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.sample.eventhubs.binder;

import com.azure.spring.integration.core.EventHubHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

import static com.azure.spring.integration.core.EventHubHeaders.SEQUENCE_NUMBER;

@Configuration
public class EventProducerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProducerConfiguration.class);

    private int i = 0;

    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            //LOGGER.info("Sending message, sequence " + i);
            String partitionKey="info";

            LOGGER.info("Send message " + MessageBuilder.withPayload("hello world, "+i).setHeaderIfAbsent(EventHubHeaders.PARTITION_KEY, partitionKey).build());
            return MessageBuilder.withPayload("hello world, "+ i++).
                    setHeaderIfAbsent(EventHubHeaders.PARTITION_KEY, partitionKey).build();

        };
    }

}

接收端的代碼

package com.ywt.demoEventhub;

import com.azure.spring.integration.core.EventHubHeaders;
import com.azure.spring.integration.core.api.reactor.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;

import java.util.function.Consumer;

import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;

@Configuration
public class EventConsume {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventConsume.class);
    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubHeaders.OFFSET),
                    message.getHeaders().get(EventHubHeaders.ENQUEUED_TIME)
            );

            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed number '{}' ", message.getPayload(), message.getHeaders().get(EventHubHeaders.CHECKPOINTER)))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .subscribe();
        };
    }
}

發(fā)送消息的日志

2021053102.png

消費(fèi)消息的日志

2021053103.png

參考資料

Azure Spring Cloud Stream Binder for Event Hub Code Sample shared library for Javahttps://github.com/Azure/azure-sdk-for-java/tree/master/sdk/spring/azure-spring-boot-samples/azure-spring-cloud-sample-eventhubs-binder

How to create a Spring Cloud Stream Binder application with Azure Event Hubs - Add sample code to implement basic event hub functionality : https://docs.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-cloud-stream-binder-java-app-azure-event-hub#add-sample-code-to-implement-basic-event-hub-functionality

[END]

當(dāng)在復(fù)雜的環(huán)境中面臨問(wèn)題懂版,格物之道需:濁而靜之徐清,安以動(dòng)之徐生躏率。 云中躯畴,恰是如此!

分類: 【Azure 事件中心】

標(biāo)簽: 消費(fèi)端代碼, 事件指定順序, 事件中心 Azure Event Hub

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市薇芝,隨后出現(xiàn)的幾起案子蓬抄,更是在濱河造成了極大的恐慌,老刑警劉巖夯到,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嚷缭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡耍贾,警方通過(guò)查閱死者的電腦和手機(jī)阅爽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)逼争,“玉大人优床,你說(shuō)我怎么就攤上這事∈慕梗” “怎么了胆敞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵着帽,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我移层,道長(zhǎng)仍翰,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己修噪,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布院领。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪饮怯。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,562評(píng)論 1 305
  • 那天贪壳,我揣著相機(jī)與錄音,去河邊找鬼配猫。 笑死识埋,一個(gè)胖子當(dāng)著我的面吹牛系忙,可吹牛的內(nèi)容都是我干的蛹疯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼孝扛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼列吼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起苦始,我...
    開(kāi)封第一講書(shū)人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤寞钥,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后陌选,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體理郑,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蹄溉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了香浩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片类缤。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖邻吭,靈堂內(nèi)的尸體忽然破棺而出餐弱,到底是詐尸還是另有隱情,我是刑警寧澤囱晴,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布膏蚓,位于F島的核電站,受9級(jí)特大地震影響畸写,放射性物質(zhì)發(fā)生泄漏驮瞧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一枯芬、第九天 我趴在偏房一處隱蔽的房頂上張望论笔。 院中可真熱鬧,春花似錦千所、人聲如沸狂魔。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)最楷。三九已至,卻和暖如春待错,著一層夾襖步出監(jiān)牢的瞬間籽孙,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工火俄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留犯建,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓瓜客,卻偏偏與公主長(zhǎng)得像胎挎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子忆家,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容