用Java構(gòu)建響應(yīng)式微服務(wù)3-構(gòu)建響應(yīng)式微服務(wù)

在這一章節(jié)里投放,我們將用Vert.X構(gòu)建我們的第一個(gè)微服務(wù)。像大多數(shù)采用http交互的微服務(wù)系統(tǒng)暴构,我們打算從http微服務(wù)開始跪呈。因?yàn)橄到y(tǒng)由多個(gè)互相通信的微服務(wù)構(gòu)成,我們將構(gòu)建另一個(gè)微服務(wù)取逾,它作為第一個(gè)微服務(wù)的消費(fèi)者耗绿。然后,我們將展示為何這樣的設(shè)計(jì)并不完全符合響應(yīng)式微服務(wù)砾隅。最后误阻,我們將實(shí)現(xiàn)基于消息的微服務(wù),看看消息是怎樣提升了響應(yīng)性晴埂。


第一個(gè)微服務(wù)

在這一章節(jié)究反,我們打算實(shí)現(xiàn)同類的微服務(wù)兩個(gè)。第一個(gè)微服務(wù)暴露一個(gè)hello服務(wù)儒洛,我們稱它為hello微服務(wù)精耐。另一個(gè)消費(fèi)這個(gè)服務(wù)兩次(并發(fā)地)。消費(fèi)者將被稱為hello消費(fèi)者微服務(wù)琅锻。這個(gè)小系統(tǒng)不僅展示了一個(gè)服務(wù)是怎樣提供服務(wù)的卦停,而且展示它是怎樣被消費(fèi)的。在圖3-1的左邊恼蓬,微服務(wù)用http交互惊完,hello消費(fèi)者微服務(wù)作為http客戶端向hello微服務(wù)發(fā)請(qǐng)求;在圖的右邊处硬,hello消費(fèi)者微服務(wù)用消息與hello微服務(wù)交互小槐。這個(gè)不同影響了系統(tǒng)的響應(yīng)性。


圖3-1

在前一章節(jié)荷辕,我們看到兩種不同的方式使用Vert.X API: 回調(diào)和RxJava凿跳。展示它們的不同有助于你發(fā)現(xiàn)更佳途徑,hello微服務(wù)是使用基于回調(diào)開發(fā)模式實(shí)現(xiàn)疮方,而hello消費(fèi)者微服務(wù)是用RxJava實(shí)現(xiàn)拄显。


實(shí)現(xiàn)http微服務(wù)

微服務(wù)通常通過http暴露他們的API,通過http請(qǐng)求來消費(fèi)案站。讓我們看看用Vert.X怎樣實(shí)現(xiàn)這些http交互躬审。這個(gè)部分開發(fā)的代碼在代碼倉庫的microservices/hello-microservice-http目錄下可獲得。


開始

創(chuàng)建hello-microservice-http目錄蟆盐,然后生成工程結(jié)構(gòu):

mkdir hello-microservice-http

cd hello-microservice-http

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup?\

-DprojectGroupId=io.vertx.microservice \

-DprojectArtifactId=hello-microservice-http \

-Dverticle=io.vertx.book.http.HelloMicroservice \

-Ddependencies=web

這個(gè)命令生成maven工程承边,配置Vert.X Maven插件。另外石挂,它加上vertx-web依賴博助。Vert.X Web是一個(gè)模塊,它提供你基于Vert.X構(gòu)建流行的web應(yīng)用的一切痹愚。


Verticle

打開src/main/java/io/vertx/book/http/HelloMicroservice.java富岳,這個(gè)被生成的verticle代碼沒做什么很有趣的事蛔糯,但它是一個(gè)起點(diǎn):

package io.vertx.book.http;

import io.vertx.core.AbstractVerticle;

public class HelloMicroservice extendsAbstractVerticle {

???????? @Override

???????? publicvoid start() {

???????? }

}

現(xiàn)在,執(zhí)行下面的maven命令:

mvn compile vertx:run

你現(xiàn)在可以編輯verticle窖式,每次你保存文件后蚁飒,應(yīng)用將被重新編譯并自動(dòng)重啟。


http微服務(wù)

是時(shí)候讓MyVerticle做點(diǎn)什么了萝喘。讓我們啟動(dòng)一個(gè)http server淮逻。正如你前面章節(jié)看到的,用Vert.X創(chuàng)建一個(gè)http server僅僅:

@Override

public void start() {

???????? vertx.createHttpServer()

???????? .requestHandler(req-> req.response().end("hello"))

???????? .listen(8080);

}

一旦加上這些代碼并保存阁簸,在瀏覽上訪問http://localhost:8080你應(yīng)該能看到hello爬早。這段代碼創(chuàng)建一個(gè)http server監(jiān)聽端口8080,注冊(cè)了一個(gè)請(qǐng)求處理器启妹,每一個(gè)http請(qǐng)求進(jìn)來時(shí)它會(huì)被調(diào)用∩秆希現(xiàn)在,我們僅僅輸出hello到http響應(yīng)饶米。


使用路由和參數(shù)

許多服務(wù)是通過web url調(diào)用的脑漫,因此,檢查路徑是重要的咙崎,以知道請(qǐng)求在要求什么优幸。然而,在請(qǐng)求處理器里面做路徑檢查以實(shí)現(xiàn)不同的動(dòng)作可能會(huì)變得復(fù)雜褪猛。幸運(yùn)地网杆,Vert.X Web提供了一個(gè)路由器,通過它你可以注冊(cè)路由伊滋。路由是Vert.X Web檢查路徑碳却、調(diào)用相關(guān)動(dòng)作的機(jī)制。讓我們重寫start方法笑旺,用兩個(gè)路由:

@Override

public void start() {

???????? Routerrouter = Router.router(vertx);

???????? router.get("/").handler(rc-> rc.response().end("hello"));

???????? router.get("/:name").handler(rc-> rc.response().end("hello " + rc.pathParam("name")));

???????? vertx.createHttpServer()

???????? .requestHandler(router::accept)

???????? .listen(8080);

}

我們創(chuàng)建了路由器對(duì)象后昼浦,我們注冊(cè)了兩個(gè)路由:第一個(gè)處理根路徑的請(qǐng)求僅僅輸出hello,第二個(gè)路由有一個(gè)路徑參數(shù)(:name)筒主,處理器追加參數(shù)值到歡迎中关噪。最后,我們更改請(qǐng)求處理器(requestHandler)乌妙,使用路由器的accept方法使兔。

如果你沒有停止vertx:run,你打開瀏覽器:

訪問http://localhost:8080藤韵,你應(yīng)該會(huì)看到hello

訪問http://localhost:8080/vert.x虐沥,你應(yīng)該會(huì)看到hello vert.x


生成JSON

在微服務(wù)里,JSON是常用的。讓我們修改前一個(gè)類欲险,生成JSON:

@Override

public void start() {

???????? Routerrouter = Router.router(vertx);

???????? router.get("/").handler(this::hello);

???????? router.get("/:name").handler(this::hello);

???????? vertx.createHttpServer()

???????? .requestHandler(router::accept)

???????? .listen(8080);

}

private void hello(RoutingContext rc) {

???????? Stringmessage = "hello";

???????? if(rc.pathParam("name") != null) {

?????????????????? message+= " " + rc.pathParam("name");

???????? }

???????? JsonObjectjson = new JsonObject().put("message", message);

???????? rc.response()

???????? .putHeader(HttpHeaders.CONTENT_TYPE,"application/json")

???????? .end(json.encode());

}

Vert.X提供一個(gè)JsonObject類來創(chuàng)建和操作JSON镐依。放上這段代碼,你打開瀏覽器:

訪問http://localhost:8080天试,你應(yīng)該會(huì)看到{“message”:“hello”}

訪問http://localhost:8080/vert.x槐壳,你應(yīng)該會(huì)看到{“message”: “hello vert.x”}


打包和運(yùn)行

按CTRL+C,停止vertx:run的執(zhí)行秋秤,在同一目錄下執(zhí)行下面的命令:

mvn package

這生成一個(gè)fat jar在target目錄下:hellomicroservice-http-1.0-SNAPSHOT.jar。fat jar之所以胖脚翘,因?yàn)閖ar包有一個(gè)合理的大小(約6.3MB)灼卢,包含了運(yùn)行應(yīng)用所需的一切:

java -jar target/hello-microservice-http-1.0-SNAPSHOT.jar

你可以通過訪問http://localhost:8080來檢查確定它是運(yùn)行起來的。保持住運(yùn)行来农,因?yàn)橄乱粋€(gè)微服務(wù)將調(diào)用它鞋真。


消費(fèi)http微服務(wù)

一個(gè)微服務(wù)不構(gòu)成一個(gè)應(yīng)用,你需要一個(gè)微服務(wù)系統(tǒng)∥钟冢現(xiàn)在我們有了一個(gè)運(yùn)行中的微服務(wù)涩咖,讓我們寫第二個(gè)微服務(wù)來消費(fèi)它。第二個(gè)微服務(wù)也提供了一個(gè)http請(qǐng)求接口繁莹,每一個(gè)請(qǐng)求會(huì)調(diào)用我們剛剛實(shí)現(xiàn)的微服務(wù)檩互。這個(gè)章節(jié)展示的代碼可以從代碼倉庫的microservices/helloconsumer-microservice-http目錄獲得。


創(chuàng)建工程

一樣地咨演,讓我們創(chuàng)建一個(gè)新工程:

mkdir hello-consumer-microservice-http

cd hello-consumer-microservice-http

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup

\

-DprojectGroupId=io.vertx.microservice \

-DprojectArtifactId=hello-consumer-microservice-http \

-Dverticle=io.vertx.book.http.HelloConsumerMicroservice \

-Ddependencies=web,web-client,rx

最后的命令增加了其它的依賴:Vert.X web客戶端闸昨,異步的http客戶端。我們將使用這個(gè)客戶端來向第一個(gè)微服務(wù)發(fā)請(qǐng)求薄风。這個(gè)命令也增加的Vert.X RxJava綁定饵较,我們打算在后面使用它。

現(xiàn)在編輯src/main/java/io/vertx/book/http/HelloConsumerMicroservice.java文件遭赂,更改它的內(nèi)容:

package io.vertx.book.http;

import io.vertx.core.AbstractVerticle;

import io.vertx.core.json.JsonObject;

import io.vertx.ext.web.*;

import io.vertx.ext.web.client.*;

import io.vertx.ext.web.codec.BodyCodec;

public class HelloConsumerMicroservice extends AbstractVerticle {

???????? private WebClientclient;


???????? @Override

???????? public void start(){

???????? ???????? client = WebClient.create(vertx);

?????????????????? Routerrouter = Router.router(vertx);

?????????????????? router.get("/").handler(this::invokeMyFirstMicroservice);

?????????????????? vertx.createHttpServer()

?????????????????? .requestHandler(router::accept)

?????????????????? .listen(8081);

???????? }


???????? private voidinvokeMyFirstMicroservice(RoutingContext rc) {

?????????????????? HttpRequestrequest = client

?????????????????? .get(8080,"localhost","/vert.x")

?????????????????? .as(BodyCodec.jsonObject());

?????????????????? request.send(ar-> {

?????????????????? if(ar.failed()) {

??????????????????????????? rc.fail(ar.cause());

?????????????????? } else {

??????????????????????????? rc.response().end(ar.result().body().encode());

?????????????????? }

?????????????????? });

???????? }

}

在start方法循诉,我們創(chuàng)建一個(gè)WebClient,一個(gè)Router撇他,我們注冊(cè)了一個(gè)根路徑的route茄猫,啟動(dòng)http server,傳遞router的accept方法給requestHandler困肩。這個(gè)方法用web客戶端來調(diào)用第一個(gè)微服務(wù)的指定路徑(/vert.x)募疮,輸出結(jié)果到http響應(yīng)。

一旦http請(qǐng)求被創(chuàng)建僻弹,我們調(diào)用send方法來發(fā)出請(qǐng)求阿浓,無論是響應(yīng)返回或者是有錯(cuò)誤發(fā)生,我們?cè)O(shè)定的處理器會(huì)被調(diào)用蹋绽。If-else塊檢查請(qǐng)求成功與否芭毙。不要忘了這是一個(gè)遠(yuǎn)程交互筋蓖,有很多原因?qū)е率 @缤硕兀谝粋€(gè)微服務(wù)可能沒有運(yùn)行粘咖。當(dāng)它成功時(shí),我們輸入收到的數(shù)據(jù)到響應(yīng)侈百,否則瓮下,我們返回一個(gè)500的http響應(yīng)。


多次調(diào)用服務(wù)

現(xiàn)在讓我們改變當(dāng)前的行動(dòng)钝域,用兩個(gè)不同的路徑參數(shù)請(qǐng)求hello微服務(wù)兩次:

HttpRequest request1 = client

.get(8080, "localhost", "/Luke")

.as(BodyCodec.jsonObject());

HttpRequest request2 = client

.get(8080, "localhost", "/Leia")

.as(BodyCodec.jsonObject());

這兩個(gè)請(qǐng)求是獨(dú)立的讽坏,能夠并發(fā)地執(zhí)行±ぃ可是這里我們想輸出一個(gè)把這兩個(gè)請(qǐng)求的結(jié)果裝配起來的響應(yīng)路呜。需要調(diào)用兩次服務(wù)、把兩個(gè)結(jié)果裝配起來的代碼可以變得復(fù)雜织咧。當(dāng)我們接收到其中一個(gè)響應(yīng)時(shí)胀葱,我們需要檢查另一個(gè)請(qǐng)求完成與否。當(dāng)然笙蒙,對(duì)于兩個(gè)請(qǐng)求抵屿,這個(gè)代碼仍然是可管理的。當(dāng)我們需要處理更多的時(shí)候捅位,它變得極其復(fù)雜晌该。幸運(yùn)地,正如前一章節(jié)所講绿渣,我們能夠使用響應(yīng)式編程朝群,RxJava使代碼變得簡(jiǎn)單。

我們介紹了vertx-mavan-plugin插件引入Vert.X RxJava API中符。在HelloConsumerMicroservice里姜胖,我們替換重要的import語句:

import io.vertx.core.json.JsonObject;

import io.vertx.rxjava.core.AbstractVerticle;

import io.vertx.rxjava.ext.web.*;

import io.vertx.rxjava.ext.web.client.*;

import io.vertx.rxjava.ext.web.codec.BodyCodec;

import rx.Single;

用RX, 我們要寫的調(diào)用兩個(gè)請(qǐng)求、構(gòu)造他們的結(jié)果成一個(gè)響應(yīng)的復(fù)雜代碼變得比較簡(jiǎn)單:

private voidinvokeMyFirstMicroservice(RoutingContext rc) {

???????? HttpRequestrequest1 = client

???????? .get(8080,"localhost", "/Luke")

???????? .as(BodyCodec.jsonObject());

???????? HttpRequestrequest2 = client

???????? .get(8080,"localhost", "/Leia")

???????? .as(BodyCodec.jsonObject());

???????? Singles1 = request1.rxSend().map(HttpResponse::body);

???????? Singles2 = request2.rxSend().map(HttpResponse::body);

???????? Single.zip(s1,s2, (luke, leia) -> {

?????????????????? //We have the results of both requests in Luke and Leia

?????????????????? returnnew JsonObject()

?????????????????? .put("Luke",luke.getString("message"))

?????????????????? .put("Leia",leia.getString("message"));

???????? })

???????? .subscribe(

?????????????????? result-> rc.response().end(result.encodePrettily()),

?????????????????? error-> {

??????????????????????????? error.printStackTrace();

??????????????????????????? rc.response()

??????????????????????????? .setStatusCode(500).end(error.getMessage());

?????????????????? }

???????? );

}

注意rxSend方法調(diào)用淀散。Vert.X里右莱,RxJava方法加上了rx前綴,以更容易識(shí)別档插。rxSend方法的結(jié)果是一個(gè)Single慢蜓,可訂閱的單個(gè)元素,代表一個(gè)操作的延期結(jié)果郭膛,single.zip方法用一組Single作為參數(shù)晨抡,一旦所有的Single收到它們的值,就用這些值來調(diào)用一個(gè)函數(shù)。最后耘柱,訂閱(subscribe)如捅,這個(gè)方法用兩個(gè)函數(shù)作為參數(shù):

第1個(gè)函數(shù)是用zip函數(shù)的結(jié)果(一個(gè)json對(duì)象)作為參數(shù)被調(diào)用,我們輸出接收到的json內(nèi)容到http響應(yīng)调煎;

第2個(gè)函數(shù)是某種失敗(超時(shí)镜遣,異常等等)發(fā)生時(shí)被調(diào)用,在這里士袄,我們用空的json對(duì)象做出響應(yīng)悲关。

這段代碼生效后,hello微服務(wù)仍在運(yùn)行娄柳,如果我們打開http://localhost:8081我們應(yīng)該會(huì)看到:

{

"Luke" : "hello Luke",

"Leia" : "hello Leia"

}


這是響應(yīng)式微服務(wù)嗎寓辱?

現(xiàn)在我們有兩個(gè)微服務(wù)。他們是可獨(dú)立部署和修改的西土。他們也使用輕量級(jí)的http協(xié)議交互讶舰。但是他們是響應(yīng)式微服務(wù)嗎鞍盗?不需了,他們不是。記住般甲,響應(yīng)式微服務(wù)必須是:

. 自治的

. 異步的

. 可恢復(fù)的

. 彈性的

當(dāng)前設(shè)計(jì)的主要問題是兩個(gè)微服務(wù)是緊耦合的肋乍。Web客戶端是顯示地配置了第一個(gè)微服務(wù)的地址。如果第一個(gè)微服務(wù)失敗了敷存,我們不能夠通過請(qǐng)求另一個(gè)來恢復(fù)服務(wù)墓造。我們想降低負(fù)載,創(chuàng)建一個(gè)新的hello微服務(wù)實(shí)例將幫助不了我們锚烦。感謝Vert.X web客戶端觅闽,交互是異步的。然而涮俄,我們沒有用一個(gè)虛擬的目標(biāo)地址來調(diào)用微服務(wù)蛉拙、而是直接用它的URL,這不能提供我們需要的可恢復(fù)性和彈性彻亲。

不要失望孕锄,在下一章節(jié)我們將朝響應(yīng)式微服務(wù)邁出一大步。


Vert.X事件總線---一個(gè)消息后端

Vert.X提供了事件總線苞尝,允許一個(gè)應(yīng)用的不同組件用消息來交互畸肆。消息被送到地址,有消息頭和消息體宙址。一個(gè)地址是一個(gè)字符串轴脐,代表一個(gè)目標(biāo)地址。消息消費(fèi)者注冊(cè)它們自己到這個(gè)地址、以接收消息豁辉。事件總線也是集群的令野,意味著它能夠跨越網(wǎng)絡(luò)、在分布的發(fā)送者和消費(fèi)者之間傳遞消息徽级。以集群模式啟動(dòng)Vert.X應(yīng)用气破,被連接的節(jié)點(diǎn)可以共享數(shù)據(jù)結(jié)構(gòu)、做停止失敗檢查餐抢、負(fù)載均衡现使。事件總線能夠在集群的所有節(jié)點(diǎn)間傳遞消息。為了創(chuàng)建這樣一個(gè)集群旷痕,你可以用Apache Ignite碳锈、Apache Zookeeper、Infinispan或者是Hazelcast欺抗。在這本書里售碳,我們打算用Infinispan,但是绞呈,我們不做高級(jí)的配置贸人。如果需要,可參考Infinispan文檔(http://infnispan.org/)佃声。Infinispan(或者你選的其他技術(shù))管理節(jié)點(diǎn)的發(fā)現(xiàn)和存儲(chǔ)艺智。事件總線用直接的p2p

tcp連接通訊。

事件總線提供了三種傳遞語法:第一種圾亏,send方法允許一個(gè)組件送一個(gè)消息到一個(gè)地址十拣,

單個(gè)消費(fèi)者將接收它。假如不止一個(gè)消費(fèi)者注冊(cè)到這個(gè)地址志鹃,Vert.X將采用輪詢策略來選擇一個(gè)消費(fèi)者:

// Consumer

vertx.eventBus().consumer("address",message -> {

???????? System.out.println("Received:'" + message.body() + "'");

});

// Sender

vertx.eventBus().send("address","hello");

與send不同夭问,你可以用publish方法傳遞消息給所有注冊(cè)在這個(gè)地址的消費(fèi)者。

最后曹铃,send方法能夠帶一個(gè)應(yīng)答處理器缰趋,這個(gè)請(qǐng)求/響應(yīng)機(jī)制允許兩個(gè)組件間實(shí)現(xiàn)基于消息的異步交互:

// Consumer

vertx.eventBus().consumer("address",message -> {

???????? message.reply("pong");

});

// Sender

vertx.eventBus().send("address","ping", reply -> {

???????? if(reply.succeeded()) {

?????????????????? System.out.println("Received:" + reply.result().body());

???????? }else {

?????????????????? //No reply or failure

?????????????????? reply.cause().printStackTrace();

???????? }

});

如果你用RX API,你能夠用rxSend方法铛只,它返回一個(gè)Single埠胖,當(dāng)應(yīng)用被收到時(shí)這個(gè)Single接收一個(gè)值。我們將很快看到這個(gè)方法淳玩。


基于消息的微服務(wù)

讓我們重新實(shí)現(xiàn)hello微服務(wù)直撤,這次用事件總線代替http server來接收請(qǐng)求。微服務(wù)應(yīng)答消息蜕着、提供響應(yīng)谋竖。


創(chuàng)建工程

讓我們創(chuàng)建一個(gè)新工程红柱。這次我們將加上Infnispan依賴,一個(gè)內(nèi)存數(shù)據(jù)網(wǎng)格蓖乘,被用來管理集群:

mkdir hello-microservice-message

cd hello-microservice-message

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup

\

-DprojectGroupId=io.vertx.microservice \

-DprojectArtifactId=hello-microservice-message \

-Dverticle=io.vertx.book.message.HelloMicroservice \

-Ddependencies=infinispan

一旦生成后锤悄,為了構(gòu)建集群,我們需要配置Infinispan嘉抒。缺省的配置是用組播的方式來發(fā)現(xiàn)節(jié)點(diǎn)零聚。

如果你的網(wǎng)絡(luò)支持組播,就可以些侍。否則隶症,檢查代碼倉庫的resource/cluster目錄。


寫消息驅(qū)動(dòng)的Verticle

編輯src/main/java/io/vertx/book/message/HelloMicroservice.java文件岗宣,修改start方法:

@Override

public void start() {

???????? // Receive messagefrom the address 'hello'

???????? vertx.eventBus().consumer("hello",message -> {

???????? JsonObject json =new JsonObject()

???????? .put("served-by",this.toString());

???????? // Check whether wehave received a payload in the

???????? // incoming message

???????? if (message.body().isEmpty()){

?????????????????? message.reply(json.put("message","hello"));

???????? } else {

?????????????????? message.reply(json.put("message","hello" + message.body()));

???????? }

???????? });

}

這段代碼從vertx對(duì)象獲取事件總線(eventBus)蚂会,注冊(cè)一個(gè)消費(fèi)者到地址hello。當(dāng)接收到一個(gè)消息時(shí)耗式,它應(yīng)答它胁住。取決于進(jìn)來的消息是否有一個(gè)空的消息體,我們給以不同的響應(yīng)刊咳。像前面章節(jié)的例子一樣彪见,我們返回一個(gè)json對(duì)象。你可能想知道為什么我們?cè)趈son里面加了served-by芦缰。你很快就會(huì)明白為什么∑蟪玻現(xiàn)在verticle寫好了枫慷,是時(shí)候啟動(dòng)它:

mvn compile vertx:run \

-Dvertx.runArgs="-cluster -Djava.net.preferIPv4Stack=true"

-cluster選項(xiàng)告訴Vert.X以集群模式啟動(dòng)让蕾。

現(xiàn)在讓我們寫一個(gè)微服務(wù)來消費(fèi)這個(gè)服務(wù)。


初始化基于消息的交互

在這一節(jié)里或听,我們將創(chuàng)建另一個(gè)微服務(wù)來調(diào)用hello微服務(wù)探孝,通過送一個(gè)消息到hello地址并獲得應(yīng)答。微服務(wù)將重新實(shí)現(xiàn)與前一章節(jié)同樣的邏輯誉裆,調(diào)用服務(wù)兩次顿颅。

同樣地,讓我們創(chuàng)建一個(gè)新的工程:

mkdir hello-consumer-microservice-message

cd hello-consumer-microservice-message

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup

\

-DprojectGroupId=io.vertx.microservice \

-DprojectArtifactId=hello-consumer-microservice-message \

-Dverticle=io.vertx.book.message.HelloConsumerMicroservice \

-Ddependencies=infinispan,rx

這里我們也加了Vert.X RxJava以便于使用Vert.X提供的RX API足丢。如果在前一節(jié)中你更改了Infinispan的配置粱腻,你需要拷貝它到這個(gè)新工程。

現(xiàn)在編輯io.vertx.book.message.HelloConsumerMicroservice斩跌。因?yàn)槲覀兇蛩阌肦xJava绍些,改變相應(yīng)的引入語句為io.vertx.rxjava.core.AbstractVerticle,然后實(shí)現(xiàn)start方法:

@Override

public void start() {

???????? EventBus bus =vertx.eventBus();

???????? Singleobs1 = bus

???????? .rxSend("hello","Luke")

???????? .map(Message::body);

???????? Singleobs2 = bus

???????? .rxSend("hello","Leia")

???????? .map(Message::body);

???????? Single.zip(obs1,obs2, (luke, leia) ->

?????????????????? newJsonObject()

?????????????????? .put("Luke",luke.getString("message"))

?????????????????? .put("Leia",leia.getString("message"))

???????? )

???????? subscribe(x ->System.out.println(x.encode()), Throwable::printStackTrace);

}

這段代碼與前一章節(jié)是很類似的耀鸦,替代用WebClient請(qǐng)求http柬批,我們用事件總線發(fā)送消息到hello地址啸澡、獲取應(yīng)答內(nèi)容。我們用zip操作獲得兩個(gè)響應(yīng)并且構(gòu)建最終的結(jié)果氮帐。在subscribe方法嗅虏,我們打印最終結(jié)果到控制臺(tái)或者是輸出異常堆棧。

讓我們把這段代碼和http server合并在一起上沐,當(dāng)接收到http請(qǐng)求時(shí)皮服,我們調(diào)用hello服務(wù)兩次、把構(gòu)建結(jié)果作為響應(yīng)返回:

@Override

public void start() {

???????? vertx.createHttpServer()

???????? .requestHandler(req-> {

???????? EventBus bus =vertx.eventBus();

???????? Singleobs1 = bus

???????? .rxSend("hello","Luke")

???????? .map(Message::body);

???????? Singleobs2 = bus

???????? .rxSend("hello","Leia")

???????? .map(Message::body);

???????? Single.zip(obs1,obs2, (luke, leia) ->

?????????????????? newJsonObject()

?????????????????? .put("Luke",luke.getString("message") + " from " +luke.getString("served-by"))

?????????????????? .put("Leia",leia.getString("message") + " from " +leia.getString("served-by"))

???????? )

???????? .subscribe(

?????????????????? x ->req.response().end(x.encodePrettily()),

?????????????????? t -> {

??????????????????????????? t.printStackTrace();

??????????????????????????? req.response().setStatusCode(500).end(t.getMessage());

?????????????????? }

???????? );

})

.listen(8082);

這段代碼僅僅是打包事件總線的交互到請(qǐng)求處理器(requestHandler)并且處理http響應(yīng)参咙。在失敗的情況下冰更,我們返回一個(gè)包含錯(cuò)誤信息的json對(duì)象。

如果你運(yùn)行這段代碼用

mvn compile vertx:run -Dvertx.runArgs="-cluster-Djava.net.preferIPv4Stack=true"

打開你的瀏覽器訪問http://localhost:8082昂勒,你應(yīng)該會(huì)看到像這樣:

{

"Luke" : "hello Luke from ...HelloMicroservice@39721ab",

"Leia" : "hello Leia from ...HelloMicroservice@39721ab"

}


現(xiàn)在是響應(yīng)式嗎蜀细?

這段代碼與我們前面寫的基于http的微服務(wù)很類似,唯一的不同是我們用事件總線代替http戈盈。這改變了響應(yīng)性奠衔?的確是,讓我們看看為什么塘娶。


彈性

彈性是http版本的微服務(wù)沒有的一個(gè)特性归斤。因?yàn)槲⒎?wù)是被定位到一個(gè)指定的微服務(wù)實(shí)例(用硬編碼的URL),它沒有提供我們需要的彈性刁岸。但是我們現(xiàn)在采用送到一個(gè)地址的消息脏里,這改變了游戲。讓我們看看這個(gè)微服務(wù)系統(tǒng)的表現(xiàn)虹曙。

記得前面執(zhí)行的輸出迫横。返回的json對(duì)象顯示verticle有處理hello消息。輸出總是顯示是同一個(gè)verticle酝碳。這個(gè)信息表明是同一個(gè)實(shí)例矾踱。我們預(yù)計(jì)這是因?yàn)槲覀冎挥幸粋€(gè)實(shí)例在運(yùn)行。現(xiàn)在讓我們看看用兩個(gè)實(shí)例將發(fā)生什么疏哗。

停止hello微服務(wù)的vertx:run窜护,運(yùn)行:

mvn clean package

然后斯稳,打開兩個(gè)不同的終端,在hello-microservice-message目錄里執(zhí)行下面的命令:

java -jar target/hello-microservice-message-1.0-SNAPSHOT.jar \

--cluster -Djava.net.preferIPv4Stack=true

這將啟動(dòng)兩個(gè)Hello微服務(wù)實(shí)例,返回到你的瀏覽器刷新頁面你應(yīng)該看到類似這樣:

{

"Luke" : "hello Luke from...HelloMicroservice@16d0d069",

"Leia" : "hello Leia from...HelloMicroservice@411fc4f"

}

兩個(gè)Hello實(shí)例被調(diào)用滨溉。Vert.X集群連接不同的節(jié)點(diǎn)红选,事件總線也是被集群的腰池。感謝事件總線輪循名眉,Vert.X事件總線分發(fā)消息到可用的實(shí)例、在監(jiān)聽同一地址的不同節(jié)點(diǎn)間均衡負(fù)載哮针。

因此关面,通過用事件總線坦袍,我們有了我們所需要的彈性特征。


可恢復(fù)性

可恢復(fù)性又如何呢等太?在當(dāng)前的代碼中捂齐,如果hello微服務(wù)失敗了,我們將得到一個(gè)失敗缩抡、執(zhí)行這個(gè)代碼:

t -> {

t.printStackTrace();

req.response().setStatusCode(500).end(t.getMessage());

}

盡管用戶得到了錯(cuò)誤信息奠宜,我們沒有崩潰,我們沒有限制伸縮性瞻想,仍然能夠處理請(qǐng)求压真。然而,為了提升用戶體驗(yàn)蘑险,我們應(yīng)該總是在適當(dāng)?shù)臅r(shí)間內(nèi)響應(yīng)滴肿,即使我們沒有從服務(wù)中接收到響應(yīng)。實(shí)現(xiàn)這個(gè)邏輯佃迄,我們可以用timeout增加代碼泼差。

為了展示,讓我們修改hello微服務(wù)呵俏、注入失敗堆缘。這段代碼放在代碼倉庫的microservices/hello-microservice-faulty目錄下。

這個(gè)新的start方法隨機(jī)地選擇3個(gè)策略中的一個(gè):1. 用一個(gè)顯示的失敗響應(yīng)普碎,2.忘了響應(yīng)吼肥,3.發(fā)送正確的結(jié)果:

@Override

public void start() {

vertx.eventBus().consumer("hello", message-> {

double chaos = Math.random();

JsonObject json = new JsonObject().put("served-by",this.toString());

if (chaos < 0.6) {

// Normal behavior

if (message.body().isEmpty()) {

message.reply(json.put("message", "hello"));

} else {

message.reply(json.put("message", "hello "+message.body()));

}

} else if (chaos < 0.9) {

System.out.println("Returning a failure");

// Reply with a failure

message.fail(500, "message processing failure");

} else {

System.out.println("Not replying");

// Just do not reply, leading to a timeout on the

// consumer side.

}

});

}

重新打包并重啟兩個(gè)hello微服務(wù)的實(shí)例。

使用這個(gè)故障注入的服務(wù)麻车,我們需要改進(jìn)消費(fèi)方的容錯(cuò)性缀皱。事實(shí)上,消費(fèi)方可能得到超時(shí)或是接收到一個(gè)顯示的失敗绪氛。在hello消費(fèi)者微服務(wù)里唆鸡,改變請(qǐng)求hello服務(wù)的代碼為:

EventBus bus = vertx.eventBus();

Single obs1 = bus

.rxSend("hello", "Luke")

.subscribeOn(RxHelper.scheduler(vertx))

.timeout(3, TimeUnit.SECONDS)

.retry()

.map(Message::body);

Single obs2 = bus.

rxSend("hello", "Leia")

.subscribeOn(RxHelper.scheduler(vertx))

.timeout(3, TimeUnit.SECONDS)

.retry()

.map(Message::body);

這段代碼放在代碼倉庫的microservices/hello-consumer-microservice-timeout目錄下涝影。如果有給定的時(shí)間內(nèi)沒有接收到響應(yīng)枣察,timeout方法發(fā)出一個(gè)失敗。如果得到一個(gè)超時(shí)失敗或是一個(gè)顯示的失敗燃逻,retry方法將試圖重試去獲取值序目。subScribeOn方法指明請(qǐng)求需要在哪一個(gè)線程上執(zhí)行。我們用Vert.X事件輪循器來調(diào)用callback伯襟。沒有指定的話猿涨,方法將被從缺省的RxJava線程池中取一個(gè)線程來執(zhí)行,破壞了Vert.X的線程模式姆怪。RxHelper類是Vert.X提供的叛赚。盲目地重試服務(wù)調(diào)用不是明智的容錯(cuò)策略澡绩,它甚至可能是有害的。下一章節(jié)闡述不同的方法俺附。

現(xiàn)在你可以重新加載頁面肥卡。你總能獲得一個(gè)結(jié)果,即使是失敗或者超時(shí)事镣。記住當(dāng)調(diào)用服務(wù)時(shí)線程是不阻塞的步鉴,因此,你總是能夠接收新的請(qǐng)求璃哟、在一個(gè)合適的時(shí)間內(nèi)響應(yīng)氛琢。然而,超時(shí)重試經(jīng)常是有害而不是有益随闪,正如我們?cè)谙乱徽鹿?jié)將看到的那樣阳似。


小結(jié)

在這一章節(jié),我們學(xué)習(xí)了怎樣用Vert.X開發(fā)一個(gè)http微服務(wù)铐伴,怎樣消費(fèi)它障般。正如我們所學(xué)的,在代碼里硬編碼被消費(fèi)服務(wù)的URL不是一個(gè)明智的主意盛杰,因?yàn)樗茐牧隧憫?yīng)式特征之一挽荡。在第二部分,我們用消息替換http交互即供,這展示了消息和Vert.X事件總線怎樣構(gòu)建響應(yīng)式微服務(wù)定拟。

那么,現(xiàn)在是yes還是no逗嫡。是的青自,我們知道怎樣構(gòu)建響應(yīng)式微服務(wù),但是驱证,這里仍然有一些我們需要關(guān)注的缺點(diǎn):首先延窜,如果你僅僅有http服務(wù),你怎樣避免硬編碼位置抹锄?可恢復(fù)性呢逆瑞?在這一章節(jié)我們已經(jīng)看到了超時(shí)和重試,但是熔斷器(circuit breaker)伙单、故障轉(zhuǎn)移(failover)获高、隔倉(bulkhead)呢?讓我們繼續(xù)我們的旅程吻育。

如果你想更深入這些topic:

. Vert.X Web文檔(http://vertx.io/docs/vertx-web/java)

. Vert.X Web客戶端文檔(http://vertx.io/docs/vertx-web-client/java)

. Vert.X響應(yīng)式微服務(wù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末念秧,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子布疼,更是在濱河造成了極大的恐慌摊趾,老刑警劉巖币狠,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異砾层,居然都是意外死亡总寻,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門梢为,熙熙樓的掌柜王于貴愁眉苦臉地迎上來渐行,“玉大人,你說我怎么就攤上這事铸董∷钣。” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵粟害,是天一觀的道長(zhǎng)蕴忆。 經(jīng)常有香客問我,道長(zhǎng)悲幅,這世上最難降的妖魔是什么套鹅? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮汰具,結(jié)果婚禮上卓鹿,老公的妹妹穿的比我還像新娘。我一直安慰自己留荔,他們只是感情好吟孙,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著聚蝶,像睡著了一般杰妓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上碘勉,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天巷挥,我揣著相機(jī)與錄音,去河邊找鬼验靡。 笑死倍宾,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晴叨。 我是一名探鬼主播凿宾,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼兼蕊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起件蚕,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤孙技,失蹤者是張志新(化名)和其女友劉穎产禾,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體牵啦,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡亚情,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了哈雏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楞件。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖裳瘪,靈堂內(nèi)的尸體忽然破棺而出土浸,到底是詐尸還是另有隱情,我是刑警寧澤彭羹,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布黄伊,位于F島的核電站,受9級(jí)特大地震影響派殷,放射性物質(zhì)發(fā)生泄漏还最。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一毡惜、第九天 我趴在偏房一處隱蔽的房頂上張望拓轻。 院中可真熱鬧,春花似錦经伙、人聲如沸悦即。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辜梳。三九已至,卻和暖如春泳叠,著一層夾襖步出監(jiān)牢的瞬間作瞄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工危纫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宗挥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓种蝶,卻偏偏與公主長(zhǎng)得像契耿,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子螃征,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容