實(shí)現(xiàn)即時(shí)消息的方法有很多種比如websocket,sse蜓氨; 而sse 又有spring mvc 實(shí)現(xiàn)的也有webflux 實(shí)現(xiàn)的月杉。mvc實(shí)現(xiàn)的網(wǎng)上已經(jīng)有很多了盗痒,而webflux 實(shí)現(xiàn)的不是很多横漏,也不是很全谨设,因此本文主要做的是webflux 實(shí)現(xiàn)的即時(shí)消息,sse 這里不多講缎浇,如果有不理解的可以自行百度扎拣,谷歌。
maven 依賴在最下面
下面是最簡(jiǎn)單的實(shí)現(xiàn)也是應(yīng)用場(chǎng)景最少的實(shí)現(xiàn)
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
// 每?jī)擅胪扑鸵淮? return Flux.interval(Duration.ofSeconds(2)).map(seq->
Tuples.of(seq, LocalDateTime.now())).log()
.map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
}
上面的適合股票之類(lèi)的素跺,周期性的消息二蓝。比如每?jī)擅氚l(fā)送一次消息;這樣的場(chǎng)景是合適的亡笑,但是如果是非周期性的消息呢侣夷?比如我需要再應(yīng)用里發(fā)一個(gè)公告横朋,這個(gè)公告是突然的仑乌,不確定的,那么這個(gè)邏輯就不合適了琴锭。
下面介紹非周期性消息
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<String> getMessage(){
return Flux.create(stringFluxSink -> {
MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
// 用戶斷開(kāi)的時(shí)候取消訂閱
stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
// 訂閱消息
subscribableChannel.subscribe(messageHandler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
subscribableChannel.send(new GenericMessage<>(event.getSource()));
}
@PostMapping("/publish")
public void publish(@RequestParam String message){
subscribableChannel.send(new GenericMessage<>(message));
}
}
這里有個(gè)局限性 就是單服務(wù)的消息晰甚,那如果是多服務(wù)的集群消息怎么解決呢?
下面代碼是使用redis 的發(fā)布訂閱模式來(lái)實(shí)現(xiàn)webflux 的sse 集群
import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
private final RedissonClient redissonClient;
public static final String USER_TOPIC = "user:";
public static final String BROADCAST_TOPIC = "broadcast_topic";
@GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
return Flux.create(sink -> {
// 訂閱 廣播
redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
// 監(jiān)聽(tīng) 用戶主題 單個(gè)
redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
//加入監(jiān)聽(tīng)如果斷開(kāi)鏈接就移除redis 的訂閱
sink.onCancel(() -> {
// 斷開(kāi)移除
System.out.println("退出 userId:" + userId);
redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
});
}, FluxSink.OverflowStrategy.LATEST);
}
@PostMapping("/publish")
public void publish(@RequestBody MessageDTO messageDTO) {
redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
}
}
redisson 配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 這個(gè)地方不可使用 json 序列化,否則會(huì)有問(wèn)題决帖,會(huì)出現(xiàn)一個(gè) java.lang.IllegalArgumentException: Value must not be null! 錯(cuò)誤
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
@Slf4j
@Configuration
public class RedissonConfigure {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://localhost:6379");
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
config.setCodec(new JsonJacksonCodec(objectMapper));
return Redisson.create(config);
}
}
其他類(lèi)
import java.io.Serializable;
/**
* @author haoran
*/
@Data
public class MessageDTO implements Serializable {
private String message;
}
調(diào)試:
由此可見(jiàn)當(dāng)我從8080 服務(wù)發(fā)送消息厕九,8080,8081兩個(gè)服務(wù)都接收到消息了
maven 依賴
<parent>
<artifactId>webfluxdemo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
父pom
<groupId>org.example</groupId>
<artifactId>webfluxdemo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
</parent>
<modules>
<module>client</module>
<module>server</module>
<module>RxJava</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
</dependencyManagement>
<!-- ... -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>https://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
參考·1 Reactor 3 參考文檔 (htmlpreview.github.io)
參考·2 https://www.lefer.cn/posts/30624/
結(jié)語(yǔ):百度真垃圾地回,查了半天也沒(méi)找到扁远,終歸要google;本文只是簡(jiǎn)單的實(shí)現(xiàn)了sse 在真實(shí)場(chǎng)景下會(huì)有很多不足刻像,比如redis 加入訂閱的是通過(guò)lamda 表達(dá)式實(shí)現(xiàn)的畅买,這里最好有個(gè)實(shí)現(xiàn)類(lèi)來(lái)實(shí)現(xiàn)訂閱發(fā)送消息的業(yè)務(wù)。
題外話:webflux 如何實(shí)現(xiàn)響應(yīng)式報(bào)表细睡?