Kafka Stream API在處理完數(shù)據(jù)后,會將數(shù)據(jù)發(fā)送到我們預(yù)定義的topic.如果我們需要將這些數(shù)據(jù)發(fā)送給我們的WebUI各淀,那么我們就需要寫一個Consumer,讓它訂閱上面的那個topic,然后發(fā)送數(shù)據(jù)給WebUI.
那么到底如何來做呢?下面我會一步步的描述.
首先,我們需要新建一個WebApp.其目錄結(jié)構(gòu)大體如下:
我們?yōu)槭裁葱枰陆ㄒ粋€WebApp項目呢?因為WebSocket是一個基于HTTP協(xié)議的協(xié)議谷丸,所以我們還需要使用Tomcat等服務(wù)器軟件讓其對外提供服務(wù).
新建完這個項目之后,我們需要添加需要的依賴应结,依賴如下:
依賴包括有WebSocket的依賴刨疼,Kafka的依賴.需要注意的是,kafka_2.11和kafka-clients版本號需要相同摊趾,否則可能會因為不匹配而造成意外的錯誤.我開始就是因為版本號不同而造成找不到類的錯誤.
另外币狠,它們的版本最好跟你的Kafka的版本相同,我這里使用的是kafka 0.11.0.0砾层,所以使用的也是這個版本的依賴.
我們就需要寫WebSocket服務(wù)器端代碼了:
我們先使用ServerEndpoint注解,產(chǎn)生一個WebSocket服務(wù)器端及其路徑.然后贱案,分別通過OnOpen, OnClose, OnMessage注解定義了當(dāng)有客戶端連接到服務(wù)器肛炮,當(dāng)有客戶端嘗試關(guān)閉連接止吐,當(dāng)客戶端向服務(wù)器端發(fā)送消息時如何進行處理.最后,我們定義了一個sendMessage方法侨糟,這個方法將被Kafka Consumer來使用碍扔,通過這個方法向客戶端發(fā)送消息.
完成了WebSocket服務(wù)器端代碼之后,我們就需要寫Kafka Consumer的代碼了.我們需要讓它訂閱某個Kafka Topic秕重,并能夠通過WebSocket發(fā)送數(shù)據(jù)給客戶端不同,實現(xiàn)代碼如下:
上面的代碼很容易理解,就不解釋了.
完成了Kafka Consumer的代碼之后溶耘,我們還需要讓Tomcat能夠運行它.因為Tomcat是一個服務(wù)器端軟件二拐,所以我們不能通過直接寫main方法的方式,讓Tomcat運行它.所以我們額外寫了這么一個類:
寫好了之后凳兵,我們還需要讓Tomcat知道百新,我們希望他能運行上面那個類.那么我們?nèi)绾蝸韺崿F(xiàn)呢?
我們需要在web.xml中,添加一個listener,如下所示:
這樣服務(wù)器端就全部寫好了庐扫,那么我們還需要一個WebUI來顯示接收到的數(shù)據(jù)饭望,我們在webapp目錄下,新建一個index.html形庭,其內(nèi)容為:
其顯示出來就是這樣:
當(dāng)點擊connect按鈕時铅辞,會讓客戶端連接到服務(wù)器端,當(dāng)點擊close按鈕時萨醒,客戶端從服務(wù)器端斷開斟珊,當(dāng)點擊sendMsg按鈕時,客戶端會向服務(wù)器發(fā)送一條消息.
當(dāng)客戶端和服務(wù)器端處于連接狀態(tài)中的話验靡,那么服務(wù)器端就可以發(fā)送數(shù)據(jù)給客戶端.
代碼都完成之后倍宾,我們需要將項目打包成war包并部署到Tomcat中,在項目根目錄下執(zhí)行下面的命令:
mvn package
打包完成之后胜嗓,將target目錄下生成的war文件拷貝到Tomcat根目錄下的webapp下高职,然后啟動Tomcat即可,通過執(zhí)行Tomcat bin目錄下的startup.sh腳本來啟動服務(wù)器.
一切都做好之后辞州,向Kafka Stream傳送數(shù)據(jù)怔锌,等到其處理好之后,就可以在上面的Web UI中看到WebSocket傳送來的數(shù)據(jù)了变过,如下圖所示:
這樣就完成了.
由于我的代碼是從Github上找的埃元,這里貼出原項目的地址: