webflux 實(shí)現(xiàn)服務(wù)端推送消息

實(shí)現(xiàn)即時(shí)消息的方法有很多種比如websocket,sse蜓氨; 而sse 又有spring mvc 實(shí)現(xiàn)的也有webflux 實(shí)現(xiàn)的月杉。mvc實(shí)現(xiàn)的網(wǎng)上已經(jīng)有很多了盗痒,而webflux 實(shí)現(xiàn)的不是很多横漏,也不是很全谨设,因此本文主要做的是webflux 實(shí)現(xiàn)的即時(shí)消息,sse 這里不多講缎浇,如果有不理解的可以自行百度扎拣,谷歌。
maven 依賴在最下面
下面是最簡(jiǎn)單的實(shí)現(xiàn)也是應(yīng)用場(chǎng)景最少的實(shí)現(xiàn)

    @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
        // 每?jī)擅胪扑鸵淮?        return Flux.interval(Duration.ofSeconds(2)).map(seq->
            Tuples.of(seq, LocalDateTime.now())).log()
                .map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
    }

上面的適合股票之類(lèi)的素跺,周期性的消息二蓝。比如每?jī)擅氚l(fā)送一次消息;這樣的場(chǎng)景是合適的亡笑,但是如果是非周期性的消息呢侣夷?比如我需要再應(yīng)用里發(fā)一個(gè)公告横朋,這個(gè)公告是突然的仑乌,不確定的,那么這個(gè)邏輯就不合適了琴锭。
下面介紹非周期性消息


import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
    private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();

    @GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<String> getMessage(){
        return Flux.create(stringFluxSink -> {
            MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
            // 用戶斷開(kāi)的時(shí)候取消訂閱
            stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
            // 訂閱消息
            subscribableChannel.subscribe(messageHandler);
        }, FluxSink.OverflowStrategy.LATEST);
    }


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        subscribableChannel.send(new GenericMessage<>(event.getSource()));
    }
    @PostMapping("/publish")
    public void publish(@RequestParam String message){
        subscribableChannel.send(new GenericMessage<>(message));

    }
}

這里有個(gè)局限性 就是單服務(wù)的消息晰甚,那如果是多服務(wù)的集群消息怎么解決呢?
下面代碼是使用redis 的發(fā)布訂閱模式來(lái)實(shí)現(xiàn)webflux 的sse 集群

import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
    private final RedissonClient redissonClient;
    public static final String USER_TOPIC = "user:";
    public static final String BROADCAST_TOPIC = "broadcast_topic";

    @GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
        return Flux.create(sink -> {
            // 訂閱 廣播
            redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            // 監(jiān)聽(tīng) 用戶主題 單個(gè)
            redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            //加入監(jiān)聽(tīng)如果斷開(kāi)鏈接就移除redis 的訂閱
            sink.onCancel(() -> {
                // 斷開(kāi)移除
                System.out.println("退出 userId:" + userId);
                redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
                redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
            });
        }, FluxSink.OverflowStrategy.LATEST);
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageDTO messageDTO) {
        redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
    }
}

redisson 配置

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個(gè)地方不可使用 json 序列化,否則會(huì)有問(wèn)題决帖,會(huì)出現(xiàn)一個(gè) java.lang.IllegalArgumentException: Value must not be null! 錯(cuò)誤
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

@Slf4j
@Configuration
public class RedissonConfigure {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://localhost:6379");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        config.setCodec(new JsonJacksonCodec(objectMapper));
        return Redisson.create(config);
    }
}

其他類(lèi)

import java.io.Serializable;

/**
 * @author haoran
 */
@Data
public class MessageDTO implements Serializable {
    private String message;
}

調(diào)試:


postman
服務(wù)8080
服務(wù)8081

由此可見(jiàn)當(dāng)我從8080 服務(wù)發(fā)送消息厕九,8080,8081兩個(gè)服務(wù)都接收到消息了

maven 依賴

    <parent>
        <artifactId>webfluxdemo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>server</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <dependency>
            <groupId>de.ruedigermoeller</groupId>
            <artifactId>fst</artifactId>
            <version>2.57</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

父pom

    <groupId>org.example</groupId>
    <artifactId>webfluxdemo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
    </parent>
    <modules>
        <module>client</module>
        <module>server</module>
        <module>RxJava</module>
    </modules>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>

    </dependencyManagement>
    <!-- ... -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <url>https://repo.spring.io/snapshot</url>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <url>https://repo.spring.io/milestone</url>
        </pluginRepository>
    </pluginRepositories>

參考·1 Reactor 3 參考文檔 (htmlpreview.github.io)
參考·2 https://www.lefer.cn/posts/30624/
結(jié)語(yǔ):百度真垃圾地回,查了半天也沒(méi)找到扁远,終歸要google;本文只是簡(jiǎn)單的實(shí)現(xiàn)了sse 在真實(shí)場(chǎng)景下會(huì)有很多不足刻像,比如redis 加入訂閱的是通過(guò)lamda 表達(dá)式實(shí)現(xiàn)的畅买,這里最好有個(gè)實(shí)現(xiàn)類(lèi)來(lái)實(shí)現(xiàn)訂閱發(fā)送消息的業(yè)務(wù)。
題外話:webflux 如何實(shí)現(xiàn)響應(yīng)式報(bào)表细睡?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谷羞,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子溜徙,更是在濱河造成了極大的恐慌湃缎,老刑警劉巖犀填,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異嗓违,居然都是意外死亡九巡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)蹂季,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)比庄,“玉大人,你說(shuō)我怎么就攤上這事乏盐〖岩ぃ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵父能,是天一觀的道長(zhǎng)神凑。 經(jīng)常有香客問(wèn)我,道長(zhǎng)何吝,這世上最難降的妖魔是什么溉委? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮爱榕,結(jié)果婚禮上瓣喊,老公的妹妹穿的比我還像新娘。我一直安慰自己黔酥,他們只是感情好藻三,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著跪者,像睡著了一般棵帽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上渣玲,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天逗概,我揣著相機(jī)與錄音,去河邊找鬼忘衍。 笑死逾苫,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的枚钓。 我是一名探鬼主播铅搓,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼秘噪!你這毒婦竟也來(lái)了狸吞?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蹋偏,沒(méi)想到半個(gè)月后便斥,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡威始,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年枢纠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片黎棠。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡晋渺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出脓斩,到底是詐尸還是另有隱情木西,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布随静,位于F島的核電站八千,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏燎猛。R本人自食惡果不足惜恋捆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望重绷。 院中可真熱鬧沸停,春花似錦、人聲如沸昭卓。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)葬凳。三九已至绰垂,卻和暖如春室奏,著一層夾襖步出監(jiān)牢的瞬間火焰,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工胧沫, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留昌简,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓绒怨,卻偏偏與公主長(zhǎng)得像纯赎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子南蹂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容