目錄
- 項(xiàng)目背景
- Reflux 簡介
- 同類產(chǎn)品
- 源碼下載
- 軟件版本說明
- 適用讀者
- clientId 詳述
- demo 程序構(gòu)成
- 配置 terran4j 的 Maven 倉庫
- 創(chuàng)建 demo-reflux 項(xiàng)目
- 創(chuàng)建 demo-reflux-server 項(xiàng)目
- Reflux 服務(wù)端 —— 建立 WebSocket 端點(diǎn)
- 創(chuàng)建 demo-reflux-client 項(xiàng)目
- Reflux 客戶端 —— 建立 WebSocket 連接
- 服務(wù)端推送消息到客戶端
- 分布式解決方案
項(xiàng)目背景
傳統(tǒng)的 HTTP 通信是單向的,即由客戶端發(fā)送請求岖免,服務(wù)端只能被動響應(yīng),不能主動向客戶端推送消息瓜挽。
然而有很多業(yè)務(wù)場景是需要服務(wù)端主動向客戶端推送消息的傲诵,為了能實(shí)現(xiàn)這個目的,傳統(tǒng)技術(shù)有兩種:
- 讓客戶端起一個 demo 線程,定期輪詢請求服務(wù)端墩剖,服務(wù)端有消息就返回消息挑胸。
- 讓客戶端發(fā)起一個長連接的 HTTP 請求痒筒,服務(wù)端對這個請求“hold”住不放,直到有消息時才返回消息茬贵。
- 完全基于 TCP 層自定義協(xié)議實(shí)現(xiàn)簿透。
前兩種實(shí)現(xiàn)方式都只能算“偽推送”,網(wǎng)絡(luò)消耗較大解藻,運(yùn)行效率較低老充。
第3種方式,可能要占用額外的端口(而不是直復(fù)用 HTTP 端口)螟左,并且開發(fā)較復(fù)雜啡浊,部署也復(fù)雜。
在以上背景下胶背,Web Socket 技術(shù)就應(yīng)運(yùn)而生了巷嚣,它基于 TCP 底層實(shí)現(xiàn)真正的雙向通信,
運(yùn)行效率比“偽推送”高得多钳吟,又不占用額外的端口廷粒,可以說是完美解決了這個問題。
Spring Boot 也集成了 Web Socket 技術(shù)红且,但開發(fā)方面提供的支持較少坝茎,要在實(shí)際場景下實(shí)現(xiàn)“服務(wù)端推送”仍需要編寫很多代碼。
本模塊就是要基于 Spring Boot + Web Socket 技術(shù)暇番,實(shí)現(xiàn)逆向通信(即客戶端請求建立連接后景东,能讓服務(wù)端主動向客戶端推送消息)。
Reflux 簡介
reflux 在英文中是“逆流奔誓、回流”的意思斤吐,考慮到web項(xiàng)目一般是client端主動發(fā)起請求搔涝,服務(wù)端只是被動響應(yīng),而 reflux 利用 WebSocket 技術(shù)實(shí)現(xiàn)了逆向推送(即服務(wù)端主動向客戶端發(fā)送消息)和措,因此用這個英文單詞作為項(xiàng)目名稱庄呈。
Reflux 在 Spring Boot 的基礎(chǔ)上提供了 client 端和 server 端的 JAVA API,它的使用流程如下:
- 客戶端向服務(wù)端發(fā)起 WebSocket 請求派阱,請求中帶一個名為 clientId 的參數(shù)诬留,以表明此客戶端的身份。
- 服務(wù)端在校驗(yàn)了 clientId 的合法性后贫母,接受請求以建立連接文兑。
- 服務(wù)端在以后(連接建立后)的任意時間點(diǎn),都可以主動向客戶端推送消息腺劣。
交互流程如下所示:
同類產(chǎn)品
實(shí)際上業(yè)界也有基于以上說的3種方案做成獨(dú)立產(chǎn)品的绿贞,如:
大部分做得好的是獨(dú)立的收費(fèi)產(chǎn)品,很少有又好用芋酌、又開源免費(fèi)的產(chǎn)品增显,希望本產(chǎn)品可以為廣大開發(fā)者們提供一種新的選擇。
源碼下載
目前筆者已將本項(xiàng)目作為一個開源項(xiàng)目來維護(hù)脐帝,源代碼放在了 這里 同云。
本文所用的示例代碼也放在“碼云”上了,歡迎大家免費(fèi)下載或?yàn)g覽:
-
demo-reflux-server
: 服務(wù)端示例代碼腮恩。 -
demo-reflux-client
: 客戶端示例代碼。 -
demo-reflux
: 客戶端與服務(wù)端公共部分代碼温兼。
軟件版本說明
相關(guān)軟件使用的版本:
- Java: 1.8
- Maven: 3.3.9
- SpringBoot: 1.5.2.RELEASE
程序在以上版本均調(diào)試過秸滴,可以正常運(yùn)行。
其它版本理論上相同募判,但僅供參考荡含。
適用讀者
本文適合有Java + Maven + SpringBoot 開發(fā)經(jīng)驗(yàn)的開發(fā)者們。
如果您有 Java 開發(fā)經(jīng)驗(yàn)但對Spring Boot 還不熟悉的話届垫,建議先閱讀筆者寫過的一本書 《Spring Boot 快速入門》 释液。
這本書的目標(biāo)是幫助有 Java 開發(fā)經(jīng)驗(yàn)的程序員們快速掌握使用 Spring Boot 開發(fā)的基本技巧,感受到 Spring Boot 的極簡開發(fā)風(fēng)格及超爽編程體驗(yàn)装处。
clientId 詳述
上一節(jié)所講到的 clientId 误债,是客戶端的身份憑證浸船,至于如何管理 clientId 則不在 Reflux 范圍之內(nèi)。
“如何管理 clientId”寝蹈,意指以下問題:
- clientId 如何生成李命;
- 客戶端又如何獲取到 clientId;
- 建立連接時箫老,服務(wù)端又怎么校驗(yàn) clientId 的合法性封字;
- 服務(wù)端如何存儲 clientId 與客戶端其它信息的關(guān)聯(lián)。
我們列舉兩個場景來描述 clientId 在實(shí)際項(xiàng)目中是怎么管理的:
- 在一個移動互聯(lián)網(wǎng)類的項(xiàng)目中耍鬓,client 端是移動端 App 應(yīng)用程序阔籽,如果用戶登錄功能是類似于 OAuth2.0 的方式的話,那 client 會先訪問登錄請求牲蜀,登錄成功后獲得一個 access_token笆制,這個 access_token 就可以作為我們這里的 clientId 來使用,服務(wù)端也可以通過調(diào)用賬號系統(tǒng)API來校驗(yàn) clientId 的合法性各薇。
- 在一個 PAAS 平臺類的項(xiàng)目中项贺,client 端可能是使用 PAAS 平臺的應(yīng)用系統(tǒng)。一般來說峭判,server端(PAAS平臺)會給每個應(yīng)用系統(tǒng)分配一個appKey + appSecret 作為應(yīng)用系統(tǒng)的憑證开缎,client 端(應(yīng)用系統(tǒng)的一臺實(shí)例)可以用 appSecret 作為密鑰,給 “appKey + 實(shí)例IP” 加密林螃,將密文作為 clientId奕删,server 端校驗(yàn) clientId 時再用 appSecret 解密即可獲知應(yīng)用方的 appKey 及客戶端實(shí)例的 IP。
當(dāng)然疗认,這里是只是舉兩個場景作為例子完残,實(shí)際上使用“服務(wù)端推送”技術(shù)的場景是非常多的,開發(fā)者可以根據(jù)自身的業(yè)務(wù)需求進(jìn)行處理横漏。
demo 程序構(gòu)成
下面幾節(jié)我們會講解一個 demo 程序的開發(fā)谨设,幫助我們理解 Reflux 的用法。
demo 程序分以下幾個項(xiàng)目:
-
demo-reflux
: 客戶端與服務(wù)端公共部分代碼缎浇,定義了一個名為Hello
的Java Bean扎拣,作為消息的內(nèi)容載體。 -
demo-reflux-server
: 服務(wù)端代碼素跺,提供一個 WebSocket 端點(diǎn)供客戶端連接二蓝,還提供一個Controller用于發(fā)起消息推送。 -
demo-reflux-client
: 客戶端代碼指厌,啟動后會去連接服務(wù)端以建立 WebSocket 連接刊愚,同時會監(jiān)聽來自服務(wù)端的消息推送。
配置 terran4j 的 Maven 倉庫
Reflux 是筆者(terran4j)多個開源項(xiàng)目的其中一個項(xiàng)目踩验,筆者為了方便大家使用鸥诽,專門搭建了一個開放的 maven 倉庫商玫,并將所有開源項(xiàng)目的 jar 包發(fā)布到這個倉庫中了,因此需要您在 maven 的 settings.xml 文件上配置上這個倉庫衙传,配置方法參見《配置 terran4j 的 maven 倉庫》决帖。
創(chuàng)建 demo-reflux 項(xiàng)目
首先,我們基于 Spring Boot 創(chuàng)建名為 demo-reflux 的項(xiàng)目蓖捶,并在 pom.xml 文件中引入 reflux 的依賴:
<dependency>
<groupId>terran4j</groupId>
<artifactId>terran4j-commons-reflux</artifactId>
<version>1.0.2</version>
</dependency>
terran4j-commons-reflux 項(xiàng)目的當(dāng)前最新穩(wěn)定本是 1.0.2 地回,若有更新升級會本這里給出最新版本號。
另外 terran4j-commons-reflux 是發(fā)布在 terran4j 的 maven 倉庫中俊鱼,所以 需要在您 maven 的 settings.xml 中配置此 maven 倉庫刻像,配置方法請參見 這篇文檔 。
整個 pom.xml 文件代碼如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>terran4j</groupId>
<artifactId>demo-reflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>demo-reflux</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>terran4j</groupId>
<artifactId>terran4j-commons-reflux</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
</project>
然后我們定義一個名為 Hello
的 Java Bean并闲,代碼如下所示:
package com.terran4j.demo.reflux;
import com.terran4j.commons.util.Strings;
public class Hello {
private String name;
private String greeting;
private long currentTime;
public Hello() {
super();
}
public Hello(String name) {
this(name, "Hello, " + name);
}
public Hello(String name, String greeting) {
super();
this.name = name;
this.greeting = greeting;
this.currentTime = System.currentTimeMillis();
}
public final String getName() {
return name;
}
public final void setName(String name) {
this.name = name;
}
public final String getGreeting() {
return greeting;
}
public final void setGreeting(String greeting) {
this.greeting = greeting;
}
public final long getCurrentTime() {
return currentTime;
}
public final void setCurrentTime(long currentTime) {
this.currentTime = currentTime;
}
public final String toString() {
return Strings.toString(this);
}
}
這個 Hello 是對消息內(nèi)容的描述细睡,server 端推送消息時,發(fā)的是 Hello 對象帝火,客戶端接收消息時也是收的 Hello 對象溜徙,這點(diǎn)下面會再講到。
創(chuàng)建 demo-reflux-server 項(xiàng)目
然后犀填,我們創(chuàng)建服務(wù)端的項(xiàng)目 demo-reflux-server 蠢壹,并在 pom.xml 文件中添加剛才 demo-reflux 項(xiàng)目的依賴,如:
<dependency>
<groupId>terran4j</groupId>
<artifactId>demo-reflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
由于 demo-reflux 項(xiàng)目中已經(jīng)添加了 terran4j-commons-reflux 的依賴了九巡,所以這里不需要重復(fù)添加图贸。
整個 pom.xml 文件如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>terran4j</groupId>
<artifactId>demo-reflux-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>demo-reflux-server</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>terran4j</groupId>
<artifactId>demo-reflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
然后我們編寫 main 函數(shù),并在類上添加 @EnableRefluxServer, 代碼如下所示:
package com.terran4j.demo.reflux.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.terran4j.commons.reflux.server.EnableRefluxServer;
@EnableRefluxServer
@SpringBootApplication
public class RefluxServerApplication {
public static void main(String[] args) {
SpringApplication.run(RefluxServerApplication.class, args);
}
}
@EnableRefluxServer 會在 Spring 容器中定義很多 Spring Bean 對象冕广,以提供 Reflux 服務(wù)端的能力疏日。
我們還要在 application.yml 中定義下所用的端口:
server:
port: 8081
為了避免端口沖突,demo-reflux-server 項(xiàng)目使用 8081 端口撒汉;后面的 demo-reflux-client 項(xiàng)目將使用 8082 端口沟优。
Reflux 服務(wù)端 —— 建立 WebSocket 端點(diǎn)
- 編寫 DemoServerEndpoint 類,以建立WebSocket端點(diǎn)睬辐,代碼如下所示:
package com.terran4j.demo.reflux.server;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import com.terran4j.commons.reflux.server.RefluxServerEndpoint;
@ServerEndpoint("/demo/connect")
@Component
public class DemoServerEndpoint extends RefluxServerEndpoint {
@Override
protected boolean authenticate(String clientId) {
/**
* 這里為了簡化演示代碼挠阁,就不作 clientId 的校驗(yàn)了,直接返回 true 溉委。
*/
return true;
}
}
說明一下:
- 必須繼承于 RefluxServerEndpoint 類鹃唯,RefluxServerEndpoint 提供了很多現(xiàn)成的處理 WebSocket 連接的方法爱榕。
- 必須實(shí)現(xiàn)方法
boolean authenticate(String clientId)
用于校驗(yàn) clientId 的合法性瓣喊,這里為了簡化演示代碼,就不作 clientId 的校驗(yàn)了黔酥,直接返回 true 藻三。 - 類上必須加上 @ServerEndpoint 的注解洪橘,用于定義連接 WebSocket 時的請求路徑。
- 類上必須加上 @Component 注解棵帽,用于注冊成為 Spring Bean熄求。
建立好 WebSocket 端點(diǎn)后,既使客戶端不是 java 的(比如: PHP, Javascript, Android, iOS)逗概,也可以按 Web Socket 的協(xié)議請求連接了弟晚。
當(dāng)然如果是 java ,用 reflux 提供的 client API 就非常簡單了逾苫,這點(diǎn)后面會講到卿城。
創(chuàng)建 demo-reflux-client 項(xiàng)目
現(xiàn)在我們嘗試編寫 Reflux 客戶端項(xiàng)目 demo-reflux-client 。
與服務(wù)端一樣铅搓,也是先創(chuàng)建 demo-reflux-client 項(xiàng)目瑟押,也是在 pom.xml 中添加依賴:
<dependency>
<groupId>terran4j</groupId>
<artifactId>demo-reflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
整個 pom.xml 文件與服務(wù)端類似,這里就不重復(fù)了星掰。
然后我們編寫 main 函數(shù)多望,并在類上添加 @EnableRefluxClient, 代碼如下所示:
package com.terran4j.demo.reflux.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.terran4j.commons.reflux.client.EnableRefluxClient;
@EnableRefluxClient
@SpringBootApplication
public class RefluxClientApplication {
public static void main(String[] args) {
SpringApplication.run(RefluxClientApplication.class, args);
}
}
@EnableRefluxClient 會在 Spring 容器中定義很多 Spring Bean 對象,以提供 Reflux 客戶端的能力氢烘。
Reflux 客戶端 —— 建立 WebSocket 連接
現(xiàn)在我們來編寫一個名為 MyRefluxConnector 的類怀偷,來連接服務(wù)端:
package com.terran4j.demo.reflux.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import com.terran4j.commons.reflux.RefluxClient;
@Service
public class MyRefluxConnector implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(MyRefluxConnector.class);
@Value("${reflux.server.url}")
private String serverURL;
@Value("${reflux.client.id}")
private String clientId;
@Autowired
private RefluxClient refluxClient;
@Override
public void run(ApplicationArguments args) throws Exception {
// 建立 Web Socket 連接。
refluxClient.connect(serverURL, clientId);
if (log.isInfoEnabled()) {
log.info("connect server success, serverURL = {}, clientId = {}", //
serverURL, clientId);
}
}
}
其實(shí)主要就兩步:
- 第一步威始,用 @Autowired 的方式注入 Bean: RefluxClient refluxClient 枢纠。
- 第二步,調(diào)用
refluxClient.connect(serverURL, clientId)
方法建立連接黎棠。
就這么簡單晋渺,serverURL 是一個 WebSocket 的URL,比如在本例中脓斩,它的值為:
ws://localhost:8081/demo/connect
ws 是 WebSocket 協(xié)議的意思木西,之所示路徑是 /demo/connect
,是在服務(wù)端 DemoServerEndpoint
類中定義的随静,如:
@ServerEndpoint("/demo/connect")
@Component
public class DemoServerEndpoint extends RefluxServerEndpoint
為了能在客戶端程序啟動時就連接 WebSocket八千,這個類實(shí)現(xiàn)了 ApplicationRunner
接口并將代碼放在 run 方法中,當(dāng)然這只是代碼演示的方便燎猛,您可以根據(jù)實(shí)際業(yè)務(wù)的需求恋捆,選擇什么時機(jī)進(jìn)行連接。
好了重绷,我們可以分別啟動 RefluxServerApplication, RefluxClientApplication 兩個 main 函數(shù)來看看效果了沸停。
client端、server端兩個程序都啟動后昭卓,看到 client 端的控制臺輸出如下:
......
2017-08-20 16:05:24.786 INFO 8564 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http)
2017-08-20 16:05:24.791 INFO 8564 --- [ main] c.t.c.reflux.client.RefluxClientImpl : connect server web socket url: ws://localhost:8081/demo/connect?clientId=12345
2017-08-20 16:05:25.094 INFO 8564 --- [ main] c.t.c.reflux.client.ClientConnection : Opening client websocket, server = ws://localhost:8081/demo/connect
2017-08-20 16:05:25.098 INFO 8564 --- [ main] c.t.c.reflux.client.RefluxClientImpl : 目標(biāo)服務(wù)連接成功: ws://localhost:8081/demo/connect?clientId=12345
2017-08-20 16:05:25.100 INFO 8564 --- [ main] c.t.d.reflux.client.MyRefluxConnector : connect server success, serverURL = ws://localhost:8081/demo/connect, clientId = 12345
2017-08-20 16:05:25.103 INFO 8564 --- [ main] c.t.d.r.client.RefluxClientApplication : Started RefluxClientApplication in 4.605 seconds (JVM running for 5.146)
server 端的控制臺輸出如下:
......
2017-08-20 16:05:05.855 INFO 7052 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8081 (http)
2017-08-20 16:05:05.862 INFO 7052 --- [ main] c.t.d.r.server.RefluxServerApplication : Started RefluxServerApplication in 4.986 seconds (JVM running for 5.583)
2017-08-20 16:05:24.998 INFO 7052 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-08-20 16:05:24.999 INFO 7052 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2017-08-20 16:05:25.024 INFO 7052 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 25 ms
2017-08-20 16:05:25.106 INFO 7052 --- [nio-8081-exec-1] c.t.c.r.server.RefluxServerEndpoint : 來自客戶端的連接, clientId = 12345
2017-08-20 16:05:25.108 INFO 7052 --- [nio-8081-exec-1] c.t.c.reflux.server.RefluxServerImpl : 有新連接加入! 當(dāng)前連接數(shù)為 1
說明連接成功了愤钾。
服務(wù)端推送消息到客戶端
建立連接后瘟滨,服務(wù)端可以主動推送消息了,我們編寫 DemoController 類來實(shí)現(xiàn)消息推送能颁,如下所示:
package com.terran4j.demo.reflux.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.terran4j.commons.reflux.RefluxServer;
import com.terran4j.demo.reflux.Hello;
@RestController
public class DemoController {
private static final Logger log = LoggerFactory.getLogger(DemoController.class);
@Autowired
private RefluxServer refluxServer;
@RequestMapping(value = "/demo/send", method = RequestMethod.GET)
public String sendHello(@RequestParam("name") String name) {
Hello hello = new Hello(name);
refluxServer.sendAll(hello);
if (log.isInfoEnabled()) {
log.info("send hello message to ALL client done:\n{}", //
hello);
}
return "success";
}
}
其實(shí)主要就兩步:
- 第一步杂瘸, 用 @Autowired 注入 Bean: RefluxServer refluxServer 。
- 第二步伙菊,調(diào)用方法
refluxServer.sendAll(hello);
發(fā)送消息败玉,消息內(nèi)容是自定義的任意 Java Bean 對象(如這里的Hello hello)。
RefluxServer 提供了兩個推送消息的方法镜硕,一個是 int sendAll(Object content)
推送消息到所有已建立連接的客戶端绒怨,它返回 int 值表示推送成功的連接數(shù)量;
還有一個是 boolean send(Object content, String clientId)
方法谦疾,它會定向推送消息到指定 clientId 的某個客戶端南蹂,它返回 boolean 值表示是否推送成功。
同時客戶端需要編寫代碼以接收消息念恍,我們在 demo-reflux-client 項(xiàng)目中編寫 MyRefluxReceiver 類來接收消息六剥,代碼如下所示:
package com.terran4j.demo.reflux.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.terran4j.commons.reflux.OnMessage;
import com.terran4j.demo.reflux.Hello;
@Service
public class MyRefluxReceiver {
private static final Logger log = LoggerFactory.getLogger(MyRefluxReceiver.class);
@OnMessage
public void onHello(Hello hello) {
if (log.isInfoEnabled()) {
log.info("receive message hello:\n{}", hello);
}
}
}
需要注意以下幾點(diǎn):
- 在一個 Spring Bean 類中編寫一個用于接受消息的方法,并加上 @OnMessage 注解峰伙,如上面的 onHello 方法疗疟。
- 此方法的入?yún)⒂星覂H有一個,類型必需與發(fā)消息時一樣的類瞳氓,若發(fā)消息時的消息類與收消息時的消息類不一樣的話策彤,就會接收不到。
- 此方法的返回類型只是能 void 或 String 類型的匣摘,void 表示無返回內(nèi)容店诗, String 表示有返回內(nèi)容(通過WebSocket返回到服務(wù)端)。
最后音榜,我們將 client 與 server 同時啟動庞瘸,然后在瀏覽器中輸入 URL :
http://localhost:8081/demo/send?name=terran4j
這會調(diào)用服務(wù)端的這個方法:
@RequestMapping(value = "/demo/send", method = RequestMethod.GET)
public String sendHello(@RequestParam("name") String name) {
Hello hello = new Hello(name);
refluxServer.sendAll(hello);
if (log.isInfoEnabled()) {
log.info("send hello message to ALL client done:\n{}", //
hello);
}
return "success";
}
服務(wù)端控制臺的輸出如下所示:
2017-08-20 16:42:58.031 INFO 15712 --- [nio-8081-exec-2] c.t.demo.reflux.server.DemoController : send hello message to ALL client done:
{
"name" : "terran4j",
"greeting" : "Hello, terran4j",
"currentTime" : 1503218577967
}
而在客戶端,Reflux 框架收到消息后也會調(diào)用下面這個 onHello 方法:
@OnMessage
public void onHello(Hello hello) {
if (log.isInfoEnabled()) {
log.info("receive message hello:\n{}", hello);
}
}
結(jié)果客戶端控制臺輸出如下:
2017-08-20 16:42:58.055 INFO 7848 --- [lient-AsyncIO-1] c.t.demo.reflux.client.MyRefluxReceiver : receive message hello:
{
"name" : "terran4j",
"greeting" : "Hello, terran4j",
"currentTime" : 1503218577967
}
說明消息的推送和接收都成功了赠叼。
分布式解決方案
目前講的消息推送一直是在單機(jī)下進(jìn)行的擦囊,如果服務(wù)端是有多臺實(shí)例的分布式環(huán)境呢?
在分布式的環(huán)境下嘴办,會出現(xiàn)以下情況:
- 服務(wù)端有多臺實(shí)例瞬场,并且都是同構(gòu)的。
- 服務(wù)端實(shí)例個數(shù)是可能會動態(tài)增加或減少(但某臺實(shí)例當(dāng)機(jī)不應(yīng)該影響服務(wù)的可用性)涧郊。
- 某個客戶端連接了其中一臺服務(wù)端實(shí)例贯被,但執(zhí)行“向這個客戶端推送消息”的是另一個實(shí)例。
一個有效的解決方案就是使用具有“發(fā)布-訂閱”機(jī)制的消息中間件,比如: RabbitMQ, Kafka 等刃榨,也可以用 Redis 提供的“發(fā)布-訂閱”功能。
具體來說双仍,就是執(zhí)行定向推送消息的實(shí)例枢希,如果發(fā)現(xiàn)目標(biāo)客戶端連接的并不是自己,就發(fā)到消息中間件上朱沃,其它實(shí)例都在監(jiān)聽這一topic苞轿,如果此客戶端是連接到本實(shí)例中,就執(zhí)行推送逗物,否則就忽略之搬卒。
另外,客戶端內(nèi)部有一個守護(hù)線程翎卓,輪詢檢查與服務(wù)端的連接是否中斷契邀,中斷的話就重新請求連接。
至于是選 RabbitMQ, Kafka, 還是 Redis失暴,則根據(jù)業(yè)務(wù)需求而定:
- 業(yè)務(wù)上要求穩(wěn)定坯门,不能丟消息的場景下,建議用 RabbitMQ 逗扒。
- 業(yè)務(wù)上要求超大并發(fā)古戴、高吞吐量的場景下,建議用 Kafka 矩肩。
- 業(yè)務(wù)上要求高實(shí)時现恼、低延遲的場景下,建議用 Redis 黍檩。
由于需要根據(jù)業(yè)務(wù)場景而定叉袍,對分布式的支持的功能并未包含在本項(xiàng)目中,但根據(jù)以上的分析刽酱,在 Reflux 的基礎(chǔ)上自行實(shí)現(xiàn)并不復(fù)雜畦韭,建議廣大開發(fā)者發(fā)揮自己的聰明才智自行解決,更歡迎向本項(xiàng)目貢獻(xiàn)代碼肛跌。
資源分享與技術(shù)交流
如果你覺得本項(xiàng)目對你有用的話艺配,希望可以定期收到更多分享的精彩技術(shù)干貨,或者希望與筆者交流相關(guān)技術(shù)問題衍慎,可以加一下我們的 SpringBoot及微服務(wù) 微信公眾號转唉,請拿起手機(jī)掃描下面的二維碼關(guān)注下吧!