故事從 Vert.x 開始
它是 Vert.x 的控制中心风题,也是您做幾乎一切事情的基礎(chǔ),包括創(chuàng)建客戶端和服務(wù)器嫉父、 獲取事件總線的引用沛硅、設(shè)置定時(shí)器等等。
創(chuàng)建實(shí)例的兩種方式
1.Vertxvertx=Vertx.vertx(); 大部分應(yīng)用將只會(huì)需要一個(gè)vert.x實(shí)例绕辖,
2.Vertxvertx=Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));如果默認(rèn)缺省的配置不適合您摇肌,可以創(chuàng)建vertx對象的同時(shí)指定配置項(xiàng)。VertxOptions
對象有很多配置仪际,包括集群围小,高可用昵骤,池大小等,在javadoc中描述了所有配置的細(xì)節(jié)肯适。
創(chuàng)建集群模式的 Vert.x 對象
// 注意要添加對應(yīng)的集群管理器依賴变秦,詳情見集群管理器章節(jié)VertxOptionsoptions=newVertxOptions();Vertx.clusteredVertx(options, res -> {if(res.succeeded()) {Vertxvertx=res.result();// 獲取到了集群模式下的 Vertx 對象// 做一些其他的事情}else{// 獲取失敗,可能是集群管理器出現(xiàn)了問題}});
Vert.x API調(diào)用您提供的?處理器?來處理事件框舔。 例如每隔一秒發(fā)送一個(gè)事件的計(jì)時(shí)器:
vertx.setPeriodic(1000, id -> {// 這個(gè)處理器將會(huì)每隔一秒被調(diào)用一次System.out.println("timer fired!");});
又比如收到一個(gè) HTTP 請求:
server.requestHandler(request -> {// 服務(wù)器每次收到一個(gè)HTTP請求時(shí)這個(gè)處理器將被調(diào)用request.response().end("hello world!");});
對象有很多配置蹦玫,包括集群,高可用刘绣,池大小等樱溉,在javadoc中描述了所有配置的細(xì)節(jié)。
Future的異步結(jié)果
vert.x 4使用future承載異步結(jié)果
異步的方法會(huì)返回一個(gè)future對象额港,包含成功或者失敗的異步結(jié)果饺窿,我們不能直接操作future的異步結(jié)果歧焦,而應(yīng)該設(shè)置future的handler移斩,黨future執(zhí)行完畢,結(jié)果可用時(shí)绢馍,會(huì)調(diào)用handler進(jìn)行處理向瓷,
FileSystemfs=vertx.fileSystem();Future future = fs.props("/my_file.txt");future.onComplete((AsyncResult ar) -> {if(ar.succeeded()) {FilePropsprops=ar.result(); System.out.println("File size = "+ props.size()); }else{ System.out.println("Failure: "+ ar.cause().getMessage()); }});
futures代表的是異步結(jié)果的讀取端,
promise代表的是異步結(jié)果的寫入端舰涌。在大多數(shù)情況vert.x程序不需要自信創(chuàng)建promise對象猖任,
future組合和future協(xié)作 提供了轉(zhuǎn)換和合并異步結(jié)果的工具。
Promise<String> promise = Promise.promise();
legacyGreetAsync(promise);
Future<String> greeting = promise.future();
小心:onSuccess,onFailure和onComplete之類的終端操作并不能保證回調(diào)的調(diào)用順序瓷耙。
future.onComplete(ar -> {// 做些什么});
future.onComplete(ar -> {// 可能先被調(diào)用});
第二個(gè)回調(diào)完全有可能在第一個(gè)回調(diào)之前被調(diào)用朱躺。
如果需要保證順序調(diào)用,可以將future組合與andThen一起使用搁痛。
Future組合
compose方法作用順序組合future:
若當(dāng)前future成功长搀,執(zhí)行compose方法指定的方法,該方法返回新的future鸡典,當(dāng)返回的新future完成時(shí)源请,future組合成功。若future失敗則組合失敗彻况。
FileSystemfs=vertx.fileSystem();
Future future = fs .createFile("/foo") .compose(v -> {// createFile文件創(chuàng)建完成后執(zhí)行returnfs.writeFile("/foo", Buffer.buffer()); }) .compose(v -> {// writeFile文件寫入完成后執(zhí)行returnfs.move("/foo","/bar"); });
這里例子中谁尸,有三個(gè)操作被串起來了,
1.一個(gè)文件被創(chuàng)建 createFile
2.一些東西被寫入到文件 writefile
3文件被移走move
如果這三個(gè)步驟全部成功纽甘,則最終的future會(huì)是成功的良蛮,其中任何一步失敗,則最終future就是失敗的悍赢。
除了上訴方法决瞳,future還提供了更多方法咬展,map,recover,otherwise,andThen及flatMap(等同compose方法)
future協(xié)作
vert.x中的futures支持協(xié)調(diào)多個(gè)future。支持并發(fā)組合(并執(zhí)行多個(gè)異步調(diào)用)和順序組合(依次執(zhí)行異步調(diào)用)
CompositeFuture.all?方法接受多個(gè)future對象作為參數(shù)(最多6個(gè)瞒斩,可以傳入一個(gè)list)破婆,當(dāng)所有的future都成功完成,該方法將返回一個(gè)成功的future胸囱,當(dāng)人一個(gè)future執(zhí)行失敗祷舀,則返回一個(gè)失敗的future。
Future httpServerFuture = httpServer.listen();
Future netServerFuture = netServer.listen();
CompositeFuture.all(httpServerFuture, netServerFuture).onComplete(ar -> {if(ar.succeeded()) {
// 所有服務(wù)器啟動(dòng)完成}else{// 有一個(gè)服務(wù)器啟動(dòng)失敗}});
所有被合并的future中的操作同時(shí)運(yùn)行烹笔,當(dāng)組合的處理操作完成時(shí)裳扯,該方法返回的future上綁定的處理器handler會(huì)被調(diào)用,只要有一個(gè)操作失敗(其中的某一個(gè)future的狀態(tài)被標(biāo)記成失敗)谤职,則返回的future會(huì)被標(biāo)記為失敗饰豺,如果所有的操作都成功,則返回的future將會(huì)成功完成允蜈。
當(dāng)操作成功時(shí)冤吨,需要按照?CompositeFuture.all 的參數(shù)順序來調(diào)用resultAt方法以獲取操作的執(zhí)行結(jié)果,以上面的代碼為力饶套,無論是那個(gè)操作先完成漩蟆,通過resultAt(0)獲取到的都是httpServer?的結(jié)果,而通過resultAt(1)獲取到netService的結(jié)果?
CompositeFuture.all(Arrays.asList(future1, future2, future3));傳入list也是如此
all?方法的合并會(huì)?等待?所有的 Future 成功執(zhí)行(或任一失敿寺)怠李,而?any?方法的合并會(huì)?等待?第一個(gè)成功執(zhí)行的 Future。CompositeFuture.any?方法接受多個(gè)future作為參數(shù)最多6個(gè)可以傳入list蛤克,當(dāng)任意一個(gè)future成功得到結(jié)果捺癞,則future成功,黨所有future都執(zhí)行失敗构挤,則future失敗髓介。
CompositeFuture.any(future1, future2).onComplete(ar -> {if(ar.succeeded()) {// 至少一個(gè)成功}else{// 所有的都失敗}});
CompositeFuture.any(Arrays.asList(f1, f2, f3));
join方法的合并會(huì) 等待所有future完成,無論成敗儿倒,
composeiteFuture.join方法接受多個(gè)future作為參數(shù)最多6個(gè)版保,并將結(jié)果歸并成一個(gè)future,當(dāng)全部future成功執(zhí)行完成夫否,得到的future是成功狀態(tài)的彻犁,黨至少一個(gè)future執(zhí)行失敗,得到的future是失敗狀態(tài)凰慈。
CompositeFuture.join(future1, future2, future3).onComplete(ar -> {if(ar.succeeded()) {// 所有都成功}else{// 全部完成(無論成功還是失敼薄),且至少一個(gè)失敗}});
CompositeFuture.join(Arrays.asList(future1, future2, future3));
兼容completionStage
jdk的completionStage接口用于組合異步操作微谓,vert.x的future Api可兼容completionState,
我們可以用toCOmpletionStage方法將vert.x的future對象轉(zhuǎn)成completionStage對象森篷,如:
Future future = vertx.createDnsClient().lookup("vertx.io");future.toCompletionStage().whenComplete((ip, err) -> {if(err !=null) { System.err.println("Could not resolve vertx.io"); err.printStackTrace(); }else{ System.out.println("vertx.io => "+ ip); }});
相應(yīng)地输钩,可使用?Future.fromCompletionStage方法將CompletionStage對象轉(zhuǎn)唯vert.x的future對象,
Future.fromCompletionStage?有兩個(gè)重載方法
第一個(gè)重載方法只接收一個(gè)?CompletionStage?參數(shù)仲智,會(huì)在執(zhí)行?CompletionStage?實(shí)例的線程中調(diào)用?Future?的方法买乃;
第二個(gè)重載方法額外多接收一個(gè)?Context?參數(shù),會(huì)在Vert.x的Context中調(diào)用?Future?的方法钓辆。
重要:由于vert.x的future通常會(huì)與vert.x的代碼剪验,庫以及客戶端等一起使用,為了與vert.x的線程模型更好地配合前联,大部分場景下使用:
Future.fromCompletionStage(CompletionStage, Context)?方法功戚。
下面的例子展示了如何將?CompletionStage?對象轉(zhuǎn)為Vert.x的?Future?對象,這里選擇使用Vert.x的Context執(zhí)行:
Future.fromCompletionStage(completionStage, vertx.getOrCreateContext()) .
flatMap(str -> {Stringkey=UUID.randomUUID().toString();returnstoreInDb(key, str); }) .
onSuccess(str -> { System.out.println("We have a result: "+ str); }) .
onFailure(err -> { System.err.println("We have a problem"); err.printStackTrace(); });
verticles
vert.x通過開箱即用的方式提供了一個(gè)簡單便捷的似嗤,可擴(kuò)展的部署和并發(fā)模型機(jī)制啸臀,可以用次模型機(jī)制來保管代碼組件。
vertical的實(shí)現(xiàn)必須實(shí)現(xiàn)verticle接口或者實(shí)現(xiàn)抽象類abstractVerticle繼承更簡單
public class MyVerticle extends AbstractVerticle {// Verticle部署時(shí)調(diào)用public void start() {}
// 可選 - Verticle撤銷時(shí)調(diào)用public void stop({}}
vertical類分2種
1.standard verticals 這是最常用的一類vertical它們永遠(yuǎn)運(yùn)行在event loop線程上烁落,
2.worker verticles會(huì)運(yùn)行在worker pool種的線程種
standard verticals被創(chuàng)建時(shí)乘粒,它會(huì)分派給一個(gè)event loop線程,并在這個(gè)event loop種執(zhí)行它的start方法顽馋,當(dāng)您在一個(gè)event loop上調(diào)用了core api中的方法并傳入處理器時(shí)谓厘,vert.x將保證用于調(diào)用該方法時(shí)相同的event loop來執(zhí)行這些處理器
Worker verticles和standard vertical很像,但它并不是由一個(gè)event loop來執(zhí)行寸谜,而是由vert.x中的worker poo;中線程執(zhí)行。
部署vertical
方式1
VerticlemyVerticle=new MyVerticle();
vertx.deployVerticle(myVerticle);
方式2
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
// 部署JavaScript的Verticlevertx.deployVerticle("verticles/myverticle.js");
// 部署Ruby的Verticlevertx.deployVerticle("verticles/my_verticle.rb");
等待部署完成
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
if(res.succeeded()) { System.out.println("Deployment id is: "+ res.result()); }
else{ System.out.println("Deployment failed!"); }});
如果部署成功属桦,這個(gè)完成處理器的結(jié)果中將包含部署ID的字符串熊痴。
撤銷vertical
可以通過underlay方法來撤銷部署好的vertical
撤銷操作也是異步的,想要在撤銷完成后收到通知聂宾,可以指定令一個(gè)完成處理器果善。
vertx.undeploy(deploymentID, res -> {if(res.succeeded()) { System.out.println("Undeployed ok"); }else{ System.out.println("Undeploy failed!"); }});
設(shè)置vertical實(shí)例數(shù)量:
使用名稱部署vertical時(shí),可以指定需要部署的vertical實(shí)例數(shù)量系谐。
DeploymentOptionsoptions=newDeploymentOptions().setInstances(16);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
以上功能對于跨多核擴(kuò)展時(shí)很有用巾陕,例如:有一個(gè)帶web服務(wù)的vertical需要部署在多核的機(jī)器上,可以部署多個(gè)實(shí)例來利用所有的核纪他。
向vertical傳入配置
可在部署時(shí)傳給vertical一個(gè)json格式的配置
JsonObjectconfig=newJsonObject().put("name","tim").put("directory","/blah");
DeploymentOptionsoptions=newDeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
傳入之后鄙煤,這個(gè)配置可以通過context對象或使用config方法訪問,這個(gè)配置會(huì)以json對象的形式返回茶袒,可以通過一下代碼讀取數(shù)據(jù)
System.out.println("Configuration: "+ config().getString("name"));
在vertical中訪問環(huán)境變量
環(huán)境變量和系統(tǒng)屬性可以中介通過java api訪問
System.getProperty("prop");
System.getenv("HOME");
高可用性
vertical可以啟用高可以方式部署梯刚,在這種方式下,當(dāng)其中一個(gè)部署在vert.x實(shí)例中的vertical突然掛掉薪寓,這個(gè)vertical可以在集群環(huán)境中的另一個(gè)vert.x實(shí)例中重新部署亡资。
若要啟用高可用方式運(yùn)行一個(gè)vertical澜共,僅需要追加-ha參數(shù)
當(dāng)啟用高可用方式時(shí),不需要追加- cluster參數(shù)
從命令行運(yùn)行vertical
可以從命令行直接運(yùn)行 Vert.x 的 Verticle锥腻。
注意:在path設(shè)置jdk是為了支持java代碼運(yùn)行時(shí)編譯 on the fly compilation
退出vert.x環(huán)境?
您可以調(diào)用?close?方法關(guān)閉它嗦董。
這將關(guān)閉所有內(nèi)部線程池并關(guān)閉其他資源,允許JVM退出瘦黑。
Context 對象
Contextcontext=vertx.getOrCreateContext();
Verticle 中自動(dòng)清除定時(shí)器
如果您在 Verticle 中創(chuàng)建了計(jì)時(shí)器展懈, 當(dāng)這個(gè) Verticle 被撤銷時(shí)這個(gè)計(jì)時(shí)器會(huì)被自動(dòng)關(guān)閉。
Verticle worker pool
Verticle 使用 Vert.x 中的 Worker Pool 來執(zhí)行阻塞式行為供璧,例如?executeBlocking?或 Worker Verticle存崖。
可以在部署配置項(xiàng)中指定不同的 Worker 線程池:
vertx.deployVerticle("the-verticle",newDeploymentOptions().setWorkerPoolName("the-specific-pool"));