Vert.x的核心是一組我們稱之為Vert.x Core的Java API略荡。
知識(shí)庫(kù)雄卷。
Vert.x核心庫(kù)提供了以下功能:
- TCP客戶端和服務(wù)器
- HTTP客戶端和服務(wù)器同時(shí)支持WebSocket
- 事件總線
- 共享數(shù)據(jù)-局部的map和集群下的分布式map
- 定時(shí)和延遲的任務(wù)
- 部署卸載Verticles
- 數(shù)據(jù)報(bào)套接字
- DNS客戶端
- 訪問(wèn)文件系統(tǒng)
- 高可用
- 本地傳輸
- 集群
核心API的功能功能相對(duì)較底層以至于你可能找不到你想要的組件蕉堰,例如數(shù)據(jù)庫(kù)訪問(wèn)、權(quán)限授權(quán)或者高級(jí)Web瞬项,雖然我們核心包中不提供岁经,但是不代表我們擴(kuò)展包沒有提供朋沮,
所有有此需要的開發(fā)人員可以在Vert.x ext(擴(kuò)展)包中找到你所想要的組件。
Vert.x核心包很小而且很輕量級(jí)缀壤。你可以只使用你想要的部分即可樊拓。它能夠很輕松的集成到你現(xiàn)有的項(xiàng)目中-我們不會(huì)強(qiáng)制要求你的應(yīng)用結(jié)構(gòu)以我們指定的方式使用。
你可以使用Vert.x核心支持的任何語(yǔ)言塘慕。但是這里有一個(gè)很酷的地方-我們并不強(qiáng)迫你直接從JavaScript或Ruby中使用Java API筋夏,畢竟,不同的語(yǔ)言有不同的約定和習(xí)慣用法图呢,在Ruby開發(fā)者(例如)上強(qiáng)制Java習(xí)慣用法是很奇怪的条篷。 與之不同的是骗随,我們會(huì)自動(dòng)為每一種語(yǔ)言生成和Java API相等的約定和習(xí)慣用法。(idiomatic)
從現(xiàn)在開始赴叹,我們將使用 核心(core) 來(lái)指代 Vert.x core鸿染。
如果你使用Maven或者Gradle,在你的項(xiàng)目描述文件中添加以下依賴便可以輕松訪問(wèn)Vert.x 核心API了稚瘾。
- Maven (在你的pom.xml文件中添加以下內(nèi)容)
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.5.0</version>
</dependency>
- Gradle (在你的build.gradle文件添加以下內(nèi)容):
dependencies {
compile 'io.vertx:vertx-core:3.5.0'
}
廢話不多說(shuō)牡昆,開車?yán)瞺
Vert.x 之千里之行始于足下
在Vert.x-land大陸你能做的不多摊欠,除非你用Vertx對(duì)象進(jìn)行操作丢烘!(ps:沒有Vertx對(duì)象意味著你啥也干不了,哈哈)
Vertx對(duì)象是Vert.x的控制中心些椒,你幾乎可以用它來(lái)做任何事播瞳。(感覺有點(diǎn)像上帝(God))
包括創(chuàng)建客戶端和服務(wù)器,從事件總線中獲取一個(gè)引用免糕,設(shè)置定時(shí)器赢乓,以及許多其他的事情。
你要如何獲取一個(gè)Vert.x的實(shí)例呢?
如果你想要集成Vert.x并獲取一個(gè)簡(jiǎn)單的實(shí)例石窑,那么你可以像下面這么做:
Vertx vertx = Vertx.vertx();
注意 :大多數(shù)應(yīng)用可能只需要一個(gè)Vert.x實(shí)例牌芋,但是如果你有需要它也可以創(chuàng)建多個(gè)Vert.x實(shí)例,舉個(gè)栗子松逊,服務(wù)器和客戶端可以隔離成不同的事件總線或者不同的組躺屁。
當(dāng)創(chuàng)建Vertx對(duì)象時(shí)指定配置選項(xiàng)
如果默認(rèn)的配置選項(xiàng)你覺得不適用,那你可以在創(chuàng)建Vertx對(duì)象的時(shí)候指定配置選項(xiàng):
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));
VertxOptions 對(duì)象有很多設(shè)置選項(xiàng)供你選擇,允許你配置集群经宏、高可用犀暑、池大小和其它各種設(shè)置。了解更多配置細(xì)節(jié)請(qǐng)參考Java文檔烁兰。
創(chuàng)建Vert.x集群
如果你想創(chuàng)建 Vert.x集群 (獲取更多信息請(qǐng)參考 事件總線 集群章節(jié))耐亏,然后通常你會(huì)使用異步變量創(chuàng)建 Vert.x 對(duì)象。
這是因?yàn)樵谝粋€(gè)集群組里面創(chuàng)建不同的 Vert.x 實(shí)例對(duì)象通常會(huì)花費(fèi)一些時(shí)間(也許幾秒鐘)沪斟。在此期間广辰,我們不想要阻塞調(diào)用的線程,因此我們會(huì)給你一個(gè)異步的結(jié)果币喧。
你是鏈?zhǔn)降膯幔?/h2>
眼尖的朋友可能已經(jīng)注意到了我們上面的栗子中使用了鏈?zhǔn)降腁PI轨域。
鏈?zhǔn)紸PI能夠在多個(gè)方法中以鏈的方式互相調(diào)用 。舉個(gè)栗子:
request().response().putHeader("Content-Type", "text/plain").write("some text").end();
在 Vert.x 的API中這是很常見的模式杀餐,因此要習(xí)慣使用這種模式。
通常使用鏈?zhǔn)斤L(fēng)格來(lái)寫代碼能讓你的代碼更加簡(jiǎn)潔朱巨,當(dāng)然我們又不是流氓史翘,光天化日之下哪敢調(diào)戲程序員啊,如果你不喜歡鏈?zhǔn)椒椒▉?lái)書寫代碼我們也不會(huì)強(qiáng)制要求你使用這種方法,你完全可以無(wú)視它并以自己喜歡的風(fēng)格來(lái)書寫代碼琼讽,舉下面的一個(gè)栗子:
HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.write("some text");
response.end();
不要調(diào)用我們必峰,我們會(huì)調(diào)用你
Vert.x 的API主要是 事件驅(qū)動(dòng) 的。這意味著當(dāng) Vert.x 中發(fā)生您感興趣的事情時(shí)钻蹬,Vert.x 會(huì)通過(guò)發(fā)送事件調(diào)用你吼蚁。
下面舉了一些事件的栗子:
- 定時(shí)器觸發(fā)
- 數(shù)據(jù)到達(dá)套接字
- 從磁盤讀取數(shù)據(jù)完成
- 發(fā)生異常
- HTTP服務(wù)器接收了一個(gè)請(qǐng)求
你可以通過(guò)提供一個(gè)處理器給 Vert.x 的API來(lái)處理事件。下面舉個(gè)栗子來(lái)說(shuō)明接收一個(gè)定時(shí)器事件每秒打印輸出:
vertx.setPeriodic(1000, id -> {
// This handler will get called every second'
System.out.println("timer fired!");
});
或者接收一個(gè)HTTP的請(qǐng)求:
server.requestHandler(request -> {
// This handler will be called every time an HTTP request is received at the server
request.response().end("Hello,World!");
});
當(dāng) Vert.x 傳遞一個(gè)事件給你的處理程序问欠,一段時(shí)間后肝匆,Vert.x 并會(huì)異步的調(diào)用它。
所以在 Vert.x 中我們有一些重要的概念需要注意:
不要阻塞我
除去少有的情況(例如:以"Sync"結(jié)尾的文件系統(tǒng)操作)顺献,Vert.x 的API都是不會(huì)阻塞調(diào)用線程的旗国。
如果結(jié)果可以立即提供,它將會(huì)立即返回注整,通常情況下就需要提供一個(gè)處理器在一段時(shí)間后來(lái)接收事件能曾。
因?yàn)?Vert.x 的API是沒有阻塞的,這意味著你可以用 Vert.x 處理大量并發(fā)但僅使用少量的線程肿轨。
當(dāng)下面的情況發(fā)生時(shí)調(diào)用的線程可能會(huì)被阻塞:
從套接字上讀取數(shù)據(jù)
將數(shù)據(jù)寫入磁盤
發(fā)送一個(gè)消息給收信者然后等待回信
…? 其它多數(shù)情況
在上述所有情況下, 當(dāng)您的線程等待結(jié)果時(shí), 它不能做任何其他事情-它實(shí)際上是無(wú)用的寿冕。
這意味著, 如果您希望使用阻塞 API 進(jìn)行大量的并發(fā), 那么您需要大量的線程來(lái)防止應(yīng)用程序被停止。
線程在所需內(nèi)存和其它方面都有開銷(例如:它們的棧內(nèi)存數(shù)據(jù))和上下文切換椒袍。
對(duì)于許多現(xiàn)代應(yīng)用程序所需的并發(fā)級(jí)別, 阻塞方法是不可擴(kuò)展的驼唱。
反應(yīng)堆和多反應(yīng)堆
在此之前我們已經(jīng)注意到 Vert.x 的API都是事件驅(qū)動(dòng)的 - 當(dāng)它們可用時(shí),Vert.x 會(huì)傳遞事件給處理器處理槐沼。
在大多數(shù)情況下 Vert.x 使用一個(gè)事件循環(huán)線程調(diào)用你的處理器曙蒸。
因?yàn)閂ert.x和你的應(yīng)用中不存在阻塞,所以事件循環(huán)線程可以在事件到達(dá)時(shí)持續(xù)不斷地將其分發(fā)給不同的處理器岗钩。
因?yàn)闆]有阻塞代碼纽窟,事件循環(huán)線程能夠在短時(shí)間內(nèi)分發(fā)大量的事件。例如兼吓,單個(gè)事件循環(huán)線程能夠快速地處理成上千個(gè)HTTP請(qǐng)求臂港。
我們將這個(gè)模式稱之為 Reactor Pattern (反應(yīng)堆模式)。
在此之前你可能聽說(shuō)過(guò)這個(gè) - 例如视搏,Node.js(實(shí)現(xiàn)了反應(yīng)堆模式)
在一個(gè)標(biāo)準(zhǔn)的反應(yīng)堆模式實(shí)現(xiàn)中审孽,有一個(gè)單獨(dú)的事件循環(huán)線程 (single event loop),它在一個(gè)循環(huán)中運(yùn)行浑娜,處理事件到達(dá)時(shí)傳遞所有的事件到所有的處理器佑力。
問(wèn)題是單線程任何時(shí)候都只能運(yùn)行在單核心上面,因此如果你想要單線程反應(yīng)堆應(yīng)用(例如:你的Node.js應(yīng)用)在您的多核服務(wù)器上擴(kuò)展筋遭,則必須啟動(dòng)并管理許多不同的進(jìn)程打颤。
Vert.x 不同之處并在這里暴拄。每個(gè) Vartx 實(shí)例不是一個(gè)事件循環(huán)而是維護(hù)好多個(gè)事件循環(huán)。默認(rèn)情況下我們選擇機(jī)器可用的內(nèi)核數(shù)量作為默認(rèn)數(shù)量编饺,但是這是可以覆寫的乖篷。
這意味著與Node.js不同,單個(gè) Vertx 進(jìn)程可以跨服務(wù)器進(jìn)行擴(kuò)展透且。
我們將這種模式稱為多反應(yīng)堆模式 (Multi-Reactor Pattern)撕蔼,將其與單線程反應(yīng)堆模式區(qū)分開來(lái)。
注意 :雖然Vertx實(shí)例維護(hù)了多個(gè)事件循環(huán)秽誊,任何特定的處理程序也不會(huì)同時(shí)執(zhí)行鲸沮,并且在大多數(shù)情況下(除了wokrer verticle)將始終使用完全相同的事件循環(huán)來(lái)調(diào)用。
黃金法則-不要阻塞事件循環(huán)線程
我們已經(jīng)知道 Vert.x API是非阻塞同時(shí)也不會(huì)阻塞事件循環(huán)养距,但是如果你在你自己的處理器中阻塞了事件循環(huán)這將毫無(wú)用處诉探。
如果你要這樣做,事件循環(huán)被阻塞時(shí)它將不能處理任何事情棍厌,如果在 Vertx 實(shí)例中阻塞了所有的事件循環(huán)肾胯,那么你的應(yīng)用將會(huì)完全停止!
因此不要這樣搞耘纱,小伙子敬肚!在此已經(jīng)警告過(guò)你啦。
舉幾個(gè)阻塞的栗子:
Thread.sleep()
等待一個(gè)鎖
正在等待互斥或監(jiān)視器(例如 synchronized 部分)
做一個(gè)長(zhǎng)時(shí)間的數(shù)據(jù)庫(kù)操作束析,并等待一個(gè)結(jié)果
需要長(zhǎng)時(shí)間來(lái)做一個(gè)復(fù)雜的計(jì)算
死循環(huán)
以上任何的一種情況都會(huì)在很長(zhǎng)一段時(shí)間內(nèi)阻塞事件循環(huán)線程(event loop)艳馒,你應(yīng)經(jīng)立即去下一步,并等待進(jìn)一步的指示员寇。
那么……什么是相當(dāng)長(zhǎng)的時(shí)間呢弄慰?
這段時(shí)間是多久呢?這實(shí)際上取決于您的應(yīng)用程序和您需要的并發(fā)量蝶锋。
如果你有一個(gè)事件循環(huán)陆爽,并且你想每秒處理10000個(gè)HTTP請(qǐng)求,那很明顯每個(gè)請(qǐng)求的處理時(shí)間不能超過(guò)0.1ms扳缕,所以阻塞時(shí)間不能超過(guò)這個(gè)時(shí)間慌闭。
這道數(shù)學(xué)題目又不難,就留給讀者來(lái)計(jì)算吧躯舔。
如果您的應(yīng)用程序沒有響應(yīng)驴剔,則可能是您在某處阻塞了事件循環(huán)而導(dǎo)致的。為了幫助您診斷這些問(wèn)題粥庄,如果檢測(cè)到事件循環(huán)在一段時(shí)間未返回丧失,Vert.x 將會(huì)自動(dòng)記錄警告。如果您在日志中看到這樣的警告惜互,那么您應(yīng)該進(jìn)行代碼檢查了利花。
線程 vertx-eventloop-thread-3 已被阻塞20458毫秒
Vert.x 也會(huì)提供堆棧跟蹤來(lái)準(zhǔn)確確定阻塞發(fā)生的位置科侈。
如果想要關(guān)閉這些警告或設(shè)置载佳,可以在創(chuàng)建Vert.x對(duì)象之前通過(guò)設(shè)置 VertxOptions 對(duì)象來(lái)更改炒事。
運(yùn)行阻塞代碼
在一個(gè)完美的世界里面,不會(huì)有戰(zhàn)爭(zhēng)或饑餓蔫慧,所有的操作都是異步的挠乳,小兔子會(huì)在陽(yáng)光明媚的綠色草地上和小羊羔手拉手。
但是現(xiàn)實(shí)不是這樣的姑躲。(你最近有看新聞嗎睡扬?(ps:發(fā)生了什么?))
事實(shí)上黍析,大多數(shù)的庫(kù)都是同步操作卖怜,尤其是JVM生態(tài)系統(tǒng)中有許多同步API,并且許多方法都可能阻塞阐枣。一個(gè)很好的栗子就是JDBC API-它本質(zhì)上是同步的马靠,不管多么努力,Vert.x 都不可能使用魔法在上面撒鹽使其變?yōu)楫惒健?/p>
我們不會(huì)把所有的東西都改為異步蔼两,因此我們需要為您提供一種在 Vert.x 應(yīng)用程序中安全的使用 “傳統(tǒng)” 阻塞API的方法甩鳄。
正如前面所討論的,你不能直接從事件循環(huán)中調(diào)用阻塞操作额划,因?yàn)檫@將阻塞它做其它有用的工作妙啃。那么你怎么做到這一點(diǎn)呢?
這是通過(guò)調(diào)用 executeBlocking 指定要執(zhí)行阻塞代碼以及在阻塞代碼執(zhí)行返回異步結(jié)果處理程序來(lái)完成的:
vertx.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
默認(rèn)情況下俊戳,如果從相同的上下文中多次調(diào)用 executeBlocking(例如:相同的Verticle實(shí)例)揖赴,則不同的 executeBlocking 被會(huì)被串行執(zhí)行(即一個(gè)接一個(gè)地執(zhí)行 one by one)。
如果你不關(guān)心執(zhí)行順序抑胎,你可以調(diào)用 executeBlocking 時(shí)指定 false 作為 ordered 的參數(shù) 燥滑。在這種設(shè)置下,工作池上就可以并發(fā)執(zhí)行任何 executeBlocking 了圆恤。
運(yùn)行阻塞代碼的另一種方法是使用 Worker Verticle 突倍。
Worker Verticle 始終使用來(lái)自工作池的線程來(lái)執(zhí)行的。
默認(rèn)情況下盆昙,阻塞代碼都在 Vert.x 工作池上執(zhí)行羽历,需要配置 setWorkerPoolSize。
可以為不同的業(yè)務(wù)創(chuàng)建額外的線程池:
WorkerExecutor executot = vertx.createSharedWorkerExecutor("my-worker-pool");
executot.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
當(dāng)worker executor不再需要的時(shí)候必須要將其關(guān)閉掉淡喜。
executor.close();
當(dāng)幾個(gè)工作者使用相同的名稱創(chuàng)建線程池時(shí)秕磷,它們將會(huì)共享同一個(gè)線程池。工作者線程池關(guān)閉的時(shí)候所有工作者線程也會(huì)被關(guān)閉掉炼团。
當(dāng)在 Verticle 中創(chuàng)建一個(gè)執(zhí)行器時(shí)澎嚣,Verticle 卸載的時(shí)候它也會(huì)自動(dòng)將執(zhí)行器關(guān)閉掉疏尿。
Worker executor 可以在被創(chuàng)建時(shí)候進(jìn)行配置:
int poolSize = 10;
// 2 minutes
long maxExecuteTime = 120000;
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);
注意: 這個(gè)需要在創(chuàng)建工作池的時(shí)候進(jìn)行配置。
異步協(xié)調(diào)
Vert.x 可以實(shí)現(xiàn)多個(gè)異步結(jié)果future的協(xié)調(diào)易桃。它還支持并發(fā)組合(并發(fā)運(yùn)行好幾個(gè)異步操作)和順序組合(異步鏈操作)褥琐。
并發(fā)組合
CompositeFuture.all 接受多個(gè)future參數(shù)(最多6個(gè));當(dāng)所有的future都成功了晤郑,就返回成功的future敌呈,否則只要有一個(gè)失敗就會(huì)返回失敗(failed)的future:
Future<HttpServer> httpServerFuture = Future.future();
httpServerFuture.listen(httpServerFuture.completer());
Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.complete());
CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
if (ar.succeeded()) {
// All servers started
} else {
// At least one server failed
}
});
這個(gè)操作時(shí)同時(shí)運(yùn)行的,在組合完成后造寝,處理器 (Handler) 將會(huì)追加到返回的future上磕洪。
handler:處理器
invoke:調(diào)用
upon:在...之上
當(dāng)其中的一個(gè)操作失敗(被傳遞的future會(huì)標(biāo)記為失斀肓)同時(shí)結(jié)果future也會(huì)標(biāo)記為失敗析显。當(dāng)所有的操作都成功了,結(jié)果future也會(huì)是成功完成的签赃。
同時(shí)谷异,你還可以傳遞一個(gè)List集合future(可能我空):
CompositeFuture.all(Arrays.asList(future1, future2, future3));
CompositeFuture.all 是當(dāng)所有的future都成功了才返回成功,其中的一個(gè)future失敗就代表著失敗姊舵。(要俺們都成功了那才是成功晰绎,不然都算是失敗)括丁。CompositeFuture.any 與此不同的是只要有一個(gè)成功了荞下,并返回成功的future,只有當(dāng)所有的future都失敗那就表示失敗了史飞。(也就是說(shuō)只要有一個(gè)成功尖昏,那咱們就是成功的)。
CompositeFuture.any(future1, future2).setHandler(ar -> {
if (ar.succeeded()) {
// At least one is succeeded
} else {
// All failed
}
});
同樣你也可以使用一個(gè)list列表的方式:
CompositeFuture.any(Arrays.asList(f1, f2, f2));
CompositeFuture.join 是等待所有的future完成构资,不論成功失敵樗摺(不以成功失敗論英雄),可以支持多個(gè)參數(shù)(至多6個(gè))吐绵。當(dāng)所有的future都成功了才返回一個(gè)成功的future迹淌,
CompositeFuture.join 需要等待所有的future完成,無(wú)論是成功還是失敿旱ァ(不以成功失敗論英雄)唉窃。 CompositeFuture.join 可以有好幾個(gè)future參數(shù)(最多6個(gè)),當(dāng)所有future成功時(shí)并返回成功的future纹笼,當(dāng)所有future都完成并且但是其中一個(gè)失敗纹份,那就代表著失敗:
CompositeFuture.join(future1, future2, future3).setHandler(ar -> {
if (ar.succeeded()) {
// All succeeded
} else {
// All Complete and at least one failed
}
});
同樣你也可以使用list集合的方式:
CompositeFuture.join(Arrays.asList(future1, future2, future3));
順序組合
all 和 any 都實(shí)現(xiàn)了并發(fā)組合, compose 可以使用鏈的方式設(shè)置組合future(因此這種方式叫順序組合)蔓涧。
FileSystem fs = vertx.fileSystem();
Future<Void> startFuture = Future.future();
Future<Void fut1 = Future.future();
fs.createFile("/foo", fut1.completer());
fut1.compose(v -> {
// What the file is created (fut1), execute this:
Future<Void> fut2 = Future.future();
fs.writeFile("/foo", Buffer.buffer(), fut2.complter());
return fut2;
}).compose(v -> {
// When the file is written (fut2), execute this:
fs.remove("/foo", "/bar", startFuture.completer());
},
// mark startFuture it as failed if any step fails.
startFuture);
在上面的例子中件已,這3個(gè)操作都是鏈?zhǔn)降模?/p>
- 創(chuàng)建文件(fut1 )
- 寫入數(shù)據(jù)(fut2 )
- 刪除文件(startFuture )
當(dāng)這3個(gè)步驟都成功了,最終的future (startFuture)就成功了元暴。然而篷扩,如果其中的一個(gè)步驟失敗了,最終的future并也是失敗的昨寞。
這個(gè)例子使用:
compose:當(dāng)現(xiàn)有的future完成時(shí)瞻惋,運(yùn)行給定的方法會(huì)返回一個(gè)future。當(dāng)返回的future完成時(shí)援岩,它并完成了這個(gè)組合操作。
compose:當(dāng)現(xiàn)有的future完成時(shí)掏导,運(yùn)行給定的處理器完成給定的 future (下一個(gè))享怀。
在第二種情況下, 處理程序應(yīng)完成下一個(gè)future, 以報(bào)告其成功或失敗。
你可以使用 completer 來(lái)完成一個(gè)future操作結(jié)果成功還是失敗趟咆。它避免了傳統(tǒng)的不得不的寫操作:如果成功了則返回完成的future否則返回失敗的future添瓷。