0 背景
隨著訂單量的持續(xù)上升,美團(tuán)外賣各系統(tǒng)服務(wù)面臨的壓力也越來越大条霜。作為外賣鏈路的核心環(huán)節(jié)催什,商家端提供了商家接單、配送等一系列核心功能宰睡,業(yè)務(wù)對(duì)系統(tǒng)吞吐量的要求也越來越高蒲凶。而商家端API服務(wù)是流量入口,所有商家端流量都會(huì)由其調(diào)度拆内、聚合旋圆,對(duì)外面向商家提供功能接口,對(duì)內(nèi)調(diào)度各個(gè)下游服務(wù)獲取數(shù)據(jù)進(jìn)行聚合麸恍,具有鮮明的I/O密集型(I/O Bound)特點(diǎn)灵巧。在當(dāng)前日訂單規(guī)模已達(dá)千萬級(jí)的情況下,使用同步加載方式的弊端逐漸顯現(xiàn)抹沪,因此我們開始考慮將同步加載改為并行加載的可行性刻肄。
1 為何需要并行加載
外賣商家端API服務(wù)是典型的I/O密集型(I/O Bound)服務(wù)。除此之外融欧,美團(tuán)外賣商家端交易業(yè)務(wù)還有兩個(gè)比較大的特點(diǎn):
- 服務(wù)端必須一次返回訂單卡片所有內(nèi)容:根據(jù)商家端和服務(wù)端的“增量同步協(xié)議注1”敏弃,服務(wù)端必須一次性返回訂單的所有信息,包含訂單主信息噪馏、商品麦到、結(jié)算绿饵、配送、用戶信息瓶颠、騎手信息拟赊、餐損、退款步清、客服賠付(參照下面訂單卡片截圖)等要门,需要從下游三十多個(gè)服務(wù)中獲取數(shù)據(jù)。在特定條件下廓啊,如第一次登錄和長(zhǎng)時(shí)間沒登錄的情況下欢搜,客戶端會(huì)分頁(yè)拉取多個(gè)訂單,這樣發(fā)起的遠(yuǎn)程調(diào)用會(huì)更多谴轮。
- 商家端和服務(wù)端交互頻繁:商家對(duì)訂單狀態(tài)變化敏感炒瘟,多種推拉機(jī)制保證每次變更能夠觸達(dá)商家,導(dǎo)致App和服務(wù)端的交互頻繁第步,每次變更需要拉取訂單最新的全部?jī)?nèi)容疮装。
在外賣交易鏈路如此大的流量下,為了保證商家的用戶體驗(yàn)粘都,保證接口的高性能廓推,并行從下游獲取數(shù)據(jù)就成為必然。
圖1 訂單卡片
2 并行加載的實(shí)現(xiàn)方式
并行從下游獲取數(shù)據(jù)翩隧,從IO模型上來講分為同步模型和異步模型樊展。
2.1 同步模型
從各個(gè)服務(wù)獲取數(shù)據(jù)最常見的是同步調(diào)用,如下圖所示:
圖2 同步調(diào)用
在同步調(diào)用的場(chǎng)景下堆生,接口耗時(shí)長(zhǎng)专缠、性能差,接口響應(yīng)時(shí)長(zhǎng)T > T1+T2+T3+……+Tn淑仆,這時(shí)為了縮短接口的響應(yīng)時(shí)間涝婉,一般會(huì)使用線程池的方式并行獲取數(shù)據(jù),商家端訂單卡片的組裝正是使用了這種方式蔗怠。
圖3 并行之線程池
這種方式由于以下兩個(gè)原因墩弯,導(dǎo)致資源利用率比較低:
- CPU資源大量浪費(fèi)在阻塞等待上,導(dǎo)致CPU資源利用率低寞射。在Java 8之前最住,一般會(huì)通過回調(diào)的方式來減少阻塞,但是大量使用回調(diào)怠惶,又引發(fā)臭名昭著的回調(diào)地獄問題,導(dǎo)致代碼可讀性和可維護(hù)性大大降低轧粟。
- 為了增加并發(fā)度策治,會(huì)引入更多額外的線程池脓魏,隨著CPU調(diào)度線程數(shù)的增加,會(huì)導(dǎo)致更嚴(yán)重的資源爭(zhēng)用通惫,寶貴的CPU資源被損耗在上下文切換上茂翔,而且線程本身也會(huì)占用系統(tǒng)資源,且不能無限增加履腋。
同步模型下珊燎,會(huì)導(dǎo)致硬件資源無法充分利用,系統(tǒng)吞吐量容易達(dá)到瓶頸遵湖。
2.2 NIO異步模型
我們主要通過以下兩種方式來減少線程池的調(diào)度開銷和阻塞時(shí)間:
- 通過RPC NIO異步調(diào)用的方式可以降低線程數(shù)悔政,從而降低調(diào)度(上下文切換)開銷,如Dubbo的異步調(diào)用可以參考《dubbo調(diào)用端異步》一文延旧。
- 通過引入CompletableFuture(下文簡(jiǎn)稱CF)對(duì)業(yè)務(wù)流程進(jìn)行編排谋国,降低依賴之間的阻塞。本文主要講述CompletableFuture的使用和原理迁沫。
2.3 為什么會(huì)選擇CompletableFuture芦瘾?
我們首先對(duì)業(yè)界廣泛流行的解決方案做了橫向調(diào)研,主要包括Future集畅、CompletableFuture注2近弟、RxJava、Reactor挺智。它們的特性對(duì)比如下:
- 可組合:可以將多個(gè)依賴操作通過不同的方式進(jìn)行編排祷愉,例如CompletableFuture提供thenCompose、thenCombine等各種then開頭的方法逃贝,這些方法就是對(duì)“可組合”特性的支持谣辞。
- 操作融合:將數(shù)據(jù)流中使用的多個(gè)操作符以某種方式結(jié)合起來,進(jìn)而降低開銷(時(shí)間沐扳、內(nèi)存)泥从。
- 延遲執(zhí)行:操作不會(huì)立即執(zhí)行,當(dāng)收到明確指示時(shí)操作才會(huì)觸發(fā)沪摄。例如Reactor只有當(dāng)有訂閱者訂閱時(shí)躯嫉,才會(huì)觸發(fā)操作。
- 回壓:某些異步階段的處理速度跟不上杨拐,直接失敗會(huì)導(dǎo)致大量數(shù)據(jù)的丟失祈餐,對(duì)業(yè)務(wù)來說是不能接受的,這時(shí)需要反饋上游生產(chǎn)者降低調(diào)用量哄陶。
RxJava與Reactor顯然更加強(qiáng)大帆阳,它們提供了更多的函數(shù)調(diào)用方式,支持更多特性屋吨,但同時(shí)也帶來了更大的學(xué)習(xí)成本蜒谤。而我們本次整合最需要的特性就是“異步”山宾、“可組合”,綜合考慮后鳍徽,我們選擇了學(xué)習(xí)成本相對(duì)較低的CompletableFuture资锰。
3 CompletableFuture使用與原理
3.1 CompletableFuture的背景和定義
3.1.1 CompletableFuture解決的問題
CompletableFuture是由Java 8引入的,在Java8之前我們一般通過Future實(shí)現(xiàn)異步阶祭。
- Future用于表示異步計(jì)算的結(jié)果绷杜,只能通過阻塞或者輪詢的方式獲取結(jié)果,而且不支持設(shè)置回調(diào)方法濒募,Java 8之前若要設(shè)置回調(diào)一般會(huì)使用guava的ListenableFuture鞭盟,回調(diào)的引入又會(huì)導(dǎo)致臭名昭著的回調(diào)地獄(下面的例子會(huì)通過ListenableFuture的使用來具體進(jìn)行展示)。
- CompletableFuture對(duì)Future進(jìn)行了擴(kuò)展萨咳,可以通過設(shè)置回調(diào)的方式處理計(jì)算結(jié)果懊缺,同時(shí)也支持組合操作,支持進(jìn)一步的編排培他,同時(shí)一定程度解決了回調(diào)地獄的問題鹃两。
下面將舉例來說明,我們通過ListenableFuture舀凛、CompletableFuture來實(shí)現(xiàn)異步的差異俊扳。假設(shè)有三個(gè)操作step1、step2猛遍、step3存在依賴關(guān)系馋记,其中step3的執(zhí)行依賴step1和step2的結(jié)果。
Future(ListenableFuture)的實(shí)現(xiàn)(回調(diào)地獄)如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
ListenableFuture<String> future1 = guavaExecutor.submit(() -> {
//step 1
System.out.println("執(zhí)行step 1");
return "step1 result";
});
ListenableFuture<String> future2 = guavaExecutor.submit(() -> {
//step 2
System.out.println("執(zhí)行step 2");
return "step2 result";
});
ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);
Futures.addCallback(future1And2, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
System.out.println(result);
ListenableFuture<String> future3 = guavaExecutor.submit(() -> {
System.out.println("執(zhí)行step 3");
return "step3 result";
});
Futures.addCallback(future3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
}
}, guavaExecutor);
}
@Override
public void onFailure(Throwable t) {
}}, guavaExecutor);
CompletableFuture的實(shí)現(xiàn)如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行step 1");
return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行step 2");
return "step2 result";
});
cf1.thenCombine(cf2, (result1, result2) -> {
System.out.println(result1 + " , " + result2);
System.out.println("執(zhí)行step 3");
return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));
顯然懊烤,CompletableFuture的實(shí)現(xiàn)更為簡(jiǎn)潔梯醒,可讀性更好。
3.1.2 CompletableFuture的定義
CompletableFuture實(shí)現(xiàn)了兩個(gè)接口(如上圖所示):Future腌紧、CompletionStage茸习。Future表示異步計(jì)算的結(jié)果,CompletionStage用于表示異步執(zhí)行過程中的一個(gè)步驟(Stage)壁肋,這個(gè)步驟可能是由另外一個(gè)CompletionStage觸發(fā)的号胚,隨著當(dāng)前步驟的完成,也可能會(huì)觸發(fā)其他一系列CompletionStage的執(zhí)行浸遗。從而我們可以根據(jù)實(shí)際業(yè)務(wù)對(duì)這些步驟進(jìn)行多樣化的編排組合猫胁,CompletionStage接口正是定義了這樣的能力,我們可以通過其提供的thenAppy跛锌、thenCompose等函數(shù)式編程方法來組合編排這些步驟弃秆。
3.2 CompletableFuture的使用
下面我們通過一個(gè)例子來講解CompletableFuture如何使用,使用CompletableFuture也是構(gòu)建依賴樹的過程。一個(gè)CompletableFuture的完成會(huì)觸發(fā)另外一系列依賴它的CompletableFuture的執(zhí)行:
如上圖所示菠赚,這里描繪的是一個(gè)業(yè)務(wù)接口的流程盼樟,其中包括CF1\CF2\CF3\CF4\CF5共5個(gè)步驟,并描繪了這些步驟之間的依賴關(guān)系锈至,每個(gè)步驟可以是一次RPC調(diào)用、一次數(shù)據(jù)庫(kù)操作或者是一次本地方法調(diào)用等译秦,在使用CompletableFuture進(jìn)行異步化編程時(shí)峡捡,圖中的每個(gè)步驟都會(huì)產(chǎn)生一個(gè)CompletableFuture對(duì)象,最終結(jié)果也會(huì)用一個(gè)CompletableFuture來進(jìn)行表示筑悴。
根據(jù)CompletableFuture依賴數(shù)量们拙,可以分為以下幾類:零依賴、一元依賴阁吝、二元依賴和多元依賴砚婆。
3.2.1 零依賴:CompletableFuture的創(chuàng)建
我們先看下如何不依賴其他CompletableFuture來創(chuàng)建新的CompletableFuture:
如上圖紅色鏈路所示,接口接收到請(qǐng)求后突勇,首先發(fā)起兩個(gè)異步調(diào)用CF1装盯、CF2,主要有三種方式:
ExecutorService executor = Executors.newFixedThreadPool(5);
//1甲馋、使用runAsync或supplyAsync發(fā)起異步調(diào)用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return "result1";
}, executor);
//2埂奈、CompletableFuture.completedFuture()直接創(chuàng)建一個(gè)已完成狀態(tài)的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一個(gè)未完成的CompletableFuture定躏,然后通過complete()账磺、completeExceptionally(),完成該CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
第三種方式的一個(gè)典型使用場(chǎng)景痊远,就是將回調(diào)方法轉(zhuǎn)為CompletableFuture垮抗,然后再依賴CompletableFure的能力進(jìn)行調(diào)用編排,示例如下:
@FunctionalInterface
public interface ThriftAsyncCall {
void invoke() throws TException;
}
/**
* 該方法為美團(tuán)內(nèi)部rpc注冊(cè)監(jiān)聽的封裝碧聪,可以作為其他實(shí)現(xiàn)的參照
* OctoThriftCallback 為thrift回調(diào)方法
* ThriftAsyncCall 為自定義函數(shù)冒版,用來表示一次thrift調(diào)用(定義如上)
*/
public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {
//新建一個(gè)未完成的CompletableFuture
CompletableFuture<T> resultFuture = new CompletableFuture<>();
//監(jiān)聽回調(diào)的完成,并且與CompletableFuture同步狀態(tài)
callback.addObserver(new OctoObserver<T>() {
@Override
public void onSuccess(T t) {
resultFuture.complete(t);
}
@Override
public void onFailure(Throwable throwable) {
resultFuture.completeExceptionally(throwable);
}
});
if (thriftCall != null) {
try {
thriftCall.invoke();
} catch (TException e) {
resultFuture.completeExceptionally(e);
}
}
return resultFuture;
}
3.2.2 一元依賴:依賴一個(gè)CF
如上圖紅色鏈路所示矾削,CF3壤玫,CF5分別依賴于CF1和CF2,這種對(duì)于單個(gè)CompletableFuture的依賴可以通過thenApply哼凯、thenAccept欲间、thenCompose等方法來實(shí)現(xiàn),代碼如下所示:
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
//result1為CF1的結(jié)果
//......
return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
//result2為CF2的結(jié)果
//......
return "result5";
});
3.2.3 二元依賴:依賴兩個(gè)CF
如上圖紅色鏈路所示断部,CF4同時(shí)依賴于兩個(gè)CF1和CF2猎贴,這種二元依賴可以通過thenCombine等回調(diào)來實(shí)現(xiàn),如下代碼所示:
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
//result1和result2分別為cf1和cf2的結(jié)果
return "result4";
});
3.2.4 多元依賴:依賴多個(gè)CF
如上圖紅色鏈路所示,整個(gè)流程的結(jié)束依賴于三個(gè)步驟CF3她渴、CF4达址、CF5,這種多元依賴可以通過allOf或anyOf方法來實(shí)現(xiàn)趁耗,區(qū)別是當(dāng)需要多個(gè)依賴全部完成時(shí)使用allOf沉唠,當(dāng)多個(gè)依賴中的任意一個(gè)完成即可時(shí)使用anyOf,如下代碼所示:
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
//這里的join并不會(huì)阻塞苛败,因?yàn)閭鹘othenApply的函數(shù)是在CF3满葛、CF4、CF5全部完成時(shí)罢屈,才會(huì)執(zhí)行 嘀韧。
result3 = cf3.join();
result4 = cf4.join();
result5 = cf5.join();
//根據(jù)result3、result4缠捌、result5組裝最終result;
return "result";
});
3.3 CompletableFuture原理
CompletableFuture中包含兩個(gè)字段:result和stack锄贷。result用于存儲(chǔ)當(dāng)前CF的結(jié)果绢慢,stack(Completion)表示當(dāng)前CF完成后需要觸發(fā)的依賴動(dòng)作(Dependency Actions)射赛,去觸發(fā)依賴它的CF的計(jì)算鲁沥,依賴動(dòng)作可以有多個(gè)(表示有多個(gè)依賴它的CF)猫缭,以棧(Treiber stack)的形式存儲(chǔ)误续,stack表示棧頂元素箭养。
這種方式類似“觀察者模式”蒿褂,依賴動(dòng)作(Dependency Action)都封裝在一個(gè)單獨(dú)Completion子類中潘拨。下面是Completion類關(guān)系結(jié)構(gòu)圖绩衷。CompletableFuture中的每個(gè)方法都對(duì)應(yīng)了圖中的一個(gè)Completion的子類蹦魔,Completion本身是觀察者的基類。
- UniCompletion繼承了Completion咳燕,是一元依賴的基類勿决,例如thenApply的實(shí)現(xiàn)類UniApply就繼承自UniCompletion。
- BiCompletion繼承了UniCompletion招盲,是二元依賴的基類低缩,同時(shí)也是多元依賴的基類。例如thenCombine的實(shí)現(xiàn)類BiRelay就繼承自BiCompletion曹货。
3.3.1 CompletableFuture的設(shè)計(jì)思想
按照類似“觀察者模式”的設(shè)計(jì)思想咆繁,原理分析可以從“觀察者”和“被觀察者”兩個(gè)方面著手。由于回調(diào)種類多顶籽,但結(jié)構(gòu)差異不大玩般,所以這里單以一元依賴中的thenApply為例,不再枚舉全部回調(diào)類型礼饱。如下圖所示:
3.3.1.1 被觀察者
- 每個(gè)CompletableFuture都可以被看作一個(gè)被觀察者坏为,其內(nèi)部有一個(gè)Completion類型的鏈表成員變量stack究驴,用來存儲(chǔ)注冊(cè)到其中的所有觀察者。當(dāng)被觀察者執(zhí)行完成后會(huì)彈棧stack屬性匀伏,依次通知注冊(cè)到其中的觀察者洒忧。上面例子中步驟fn2就是作為觀察者被封裝在UniApply中。
- 被觀察者CF中的result屬性够颠,用來存儲(chǔ)返回結(jié)果數(shù)據(jù)熙侍。這里可能是一次RPC調(diào)用的返回值,也可能是任意對(duì)象履磨,在上面的例子中對(duì)應(yīng)步驟fn1的執(zhí)行結(jié)果核行。
3.3.1.2 觀察者
CompletableFuture支持很多回調(diào)方法,例如thenAccept蹬耘、thenApply、exceptionally等减余,這些方法接收一個(gè)函數(shù)類型的參數(shù)f综苔,生成一個(gè)Completion類型的對(duì)象(即觀察者),并將入?yún)⒑瘮?shù)f賦值給Completion的成員變量fn位岔,然后檢查當(dāng)前CF是否已處于完成狀態(tài)(即result != null)如筛,如果已完成直接觸發(fā)fn,否則將觀察者Completion加入到CF的觀察者鏈stack中抒抬,再次嘗試觸發(fā)杨刨,如果被觀察者未執(zhí)行完則其執(zhí)行完畢之后通知觸發(fā)。
- 觀察者中的dep屬性:指向其對(duì)應(yīng)的CompletableFuture擦剑,在上面的例子中dep指向CF2妖胀。
- 觀察者中的src屬性:指向其依賴的CompletableFuture,在上面的例子中src指向CF1惠勒。
- 觀察者Completion中的fn屬性:用來存儲(chǔ)具體的等待被回調(diào)的函數(shù)赚抡。這里需要注意的是不同的回調(diào)方法(thenAccept、thenApply纠屋、exceptionally等)接收的函數(shù)類型也不同涂臣,即fn的類型有很多種,在上面的例子中fn指向fn2售担。
3.3.2 整體流程
3.3.2.1 一元依賴
這里仍然以thenApply為例來說明一元依賴的流程:
- 將觀察者Completion注冊(cè)到CF1赁遗,此時(shí)CF1將Completion壓棧。
- 當(dāng)CF1的操作運(yùn)行完成時(shí)族铆,會(huì)將結(jié)果賦值給CF1中的result屬性岩四。
- 依次彈棧,通知觀察者嘗試運(yùn)行骑素。
初步流程設(shè)計(jì)如上圖所示炫乓,這里有幾個(gè)關(guān)于注冊(cè)與通知的并發(fā)問題刚夺,大家可以思考下:
Q1:在觀察者注冊(cè)之前,如果CF已經(jīng)執(zhí)行完成末捣,并且已經(jīng)發(fā)出通知侠姑,那么這時(shí)觀察者由于錯(cuò)過了通知是不是將永遠(yuǎn)不會(huì)被觸發(fā)呢 ? A1:不會(huì)箩做。在注冊(cè)時(shí)檢查依賴的CF是否已經(jīng)完成莽红。如果未完成(即result == null)則將觀察者入棧,如果已完成(result != null)則直接觸發(fā)觀察者操作邦邦。
Q2:在”入棸灿酰“前會(huì)有”result == null“的判斷,這兩個(gè)操作為非原子操作燃辖,CompletableFufure的實(shí)現(xiàn)也沒有對(duì)兩個(gè)操作進(jìn)行加鎖鬼店,完成時(shí)間在這兩個(gè)操作之間,觀察者仍然得不到通知黔龟,是不是仍然無法觸發(fā)妇智?
A2:不會(huì)。入棧之后再次檢查CF是否完成氏身,如果完成則觸發(fā)巍棱。
Q3:當(dāng)依賴多個(gè)CF時(shí),觀察者會(huì)被壓入所有依賴的CF的棧中蛋欣,每個(gè)CF完成的時(shí)候都會(huì)進(jìn)行航徙,那么會(huì)不會(huì)導(dǎo)致一個(gè)操作被多次執(zhí)行呢 ?如下圖所示陷虎,即當(dāng)CF1到踏、CF2同時(shí)完成時(shí),如何避免CF3被多次觸發(fā)尚猿。
A3:CompletableFuture的實(shí)現(xiàn)是這樣解決該問題的:觀察者在執(zhí)行之前會(huì)先通過CAS操作設(shè)置一個(gè)狀態(tài)位夭禽,將status由0改為1。如果觀察者已經(jīng)執(zhí)行過了谊路,那么CAS操作將會(huì)失敗讹躯,取消執(zhí)行。
通過對(duì)以上3個(gè)問題的分析可以看出缠劝,CompletableFuture在處理并行問題時(shí)潮梯,全程無加鎖操作,極大地提高了程序的執(zhí)行效率惨恭。我們將并行問題考慮納入之后秉馏,可以得到完善的整體流程圖如下所示:
CompletableFuture支持的回調(diào)方法十分豐富,但是正如上一章節(jié)的整體流程圖所述脱羡,他們的整體流程是一致的萝究。所有回調(diào)復(fù)用同一套流程架構(gòu)免都,不同的回調(diào)監(jiān)聽通過策略模式實(shí)現(xiàn)差異化。
3.3.2.2 二元依賴
我們以thenCombine為例來說明二元依賴:
thenCombine操作表示依賴兩個(gè)CompletableFuture帆竹。其觀察者實(shí)現(xiàn)類為BiApply绕娘,如上圖所示,BiApply通過src和snd兩個(gè)屬性關(guān)聯(lián)被依賴的兩個(gè)CF栽连,fn屬性的類型為BiFunction险领。與單個(gè)依賴不同的是,在依賴的CF未完成的情況下秒紧,thenCombine會(huì)嘗試將BiApply壓入這兩個(gè)被依賴的CF的棧中绢陌,每個(gè)被依賴的CF完成時(shí)都會(huì)嘗試觸發(fā)觀察者BiApply,BiApply會(huì)檢查兩個(gè)依賴是否都完成熔恢,如果完成則開始執(zhí)行脐湾。這里為了解決重復(fù)觸發(fā)的問題,同樣用的是上一章節(jié)提到的CAS操作叙淌,執(zhí)行時(shí)會(huì)先通過CAS設(shè)置狀態(tài)位沥割,避免重復(fù)觸發(fā)。
3.3.2.3 多元依賴
依賴多個(gè)CompletableFuture的回調(diào)方法包括allOf凿菩、anyOf,區(qū)別在于allOf觀察者實(shí)現(xiàn)類為BiRelay帜讲,需要所有被依賴的CF完成后才會(huì)執(zhí)行回調(diào)衅谷;而anyOf觀察者實(shí)現(xiàn)類為OrRelay,任意一個(gè)被依賴的CF完成后就會(huì)觸發(fā)似将。二者的實(shí)現(xiàn)方式都是將多個(gè)被依賴的CF構(gòu)建成一棵平衡二叉樹获黔,執(zhí)行結(jié)果層層通知,直到根節(jié)點(diǎn)在验,觸發(fā)回調(diào)監(jiān)聽玷氏。
3.3.3 小結(jié)
本章節(jié)為CompletableFuture實(shí)現(xiàn)原理的科普,旨在嘗試不粘貼源碼腋舌,而通過結(jié)構(gòu)圖盏触、流程圖以及搭配文字描述把CompletableFuture的實(shí)現(xiàn)原理講述清楚。把晦澀的源碼翻譯為“整體流程”章節(jié)的流程圖块饺,并且將并發(fā)處理的邏輯融入赞辩,便于大家理解。
4 實(shí)踐總結(jié)
在商家端API異步化的過程中授艰,我們遇到了一些問題辨嗽,這些問題有的會(huì)比較隱蔽,下面把這些問題的處理經(jīng)驗(yàn)整理出來淮腾。希望能幫助到更多的同學(xué)糟需,大家可以少踩一些坑屉佳。
4.1 線程阻塞問題
4.1.1 代碼執(zhí)行在哪個(gè)線程上?
要合理治理線程資源洲押,最基本的前提條件就是要在寫代碼時(shí)武花,清楚地知道每一行代碼都將執(zhí)行在哪個(gè)線程上。下面我們看一下CompletableFuture的執(zhí)行線程情況诅诱。
CompletableFuture實(shí)現(xiàn)了CompletionStage接口髓堪,通過豐富的回調(diào)方法,支持各種組合操作娘荡,每種組合場(chǎng)景都有同步和異步兩種方法干旁。
同步方法(即不帶Async后綴的方法)有兩種情況。
- 如果注冊(cè)時(shí)被依賴的操作已經(jīng)執(zhí)行完成炮沐,則直接由當(dāng)前線程執(zhí)行争群。
- 如果注冊(cè)時(shí)被依賴的操作還未執(zhí)行完,則由回調(diào)線程執(zhí)行大年。
異步方法(即帶Async后綴的方法):可以選擇是否傳遞線程池參數(shù)Executor運(yùn)行在指定線程池中换薄;當(dāng)不傳遞Executor時(shí),會(huì)使用ForkJoinPool中的共用線程池CommonPool(CommonPool的大小是CPU核數(shù)-1翔试,如果是IO密集的應(yīng)用轻要,線程數(shù)可能成為瓶頸)。
例如:
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 執(zhí)行線程:" + Thread.currentThread().getName());
//業(yè)務(wù)操作
return "";
}, threadPool1);
//此時(shí)垦缅,如果future1中的業(yè)務(wù)操作已經(jīng)執(zhí)行完畢并返回冲泥,則該thenApply直接由當(dāng)前main線程執(zhí)行;否則壁涎,將會(huì)由執(zhí)行以上業(yè)務(wù)操作的threadPool1中的線程執(zhí)行凡恍。
future1.thenApply(value -> {
System.out.println("thenApply 執(zhí)行線程:" + Thread.currentThread().getName());
return value + "1";
});
//使用ForkJoinPool中的共用線程池CommonPool
future1.thenApplyAsync(value -> {
//do something
return value + "1";
});
//使用指定線程池
future1.thenApplyAsync(value -> {
//do something
return value + "1";
}, threadPool1);
4.2 線程池須知
4.2.1 異步回調(diào)要傳線程池
前面提到,異步回調(diào)方法可以選擇是否傳遞線程池參數(shù)Executor怔球,這里我們建議強(qiáng)制傳線程池嚼酝,且根據(jù)實(shí)際情況做線程池隔離。
當(dāng)不傳遞線程池時(shí)竟坛,會(huì)使用ForkJoinPool中的公共線程池CommonPool闽巩,這里所有調(diào)用將共用該線程池,核心線程數(shù)=處理器數(shù)量-1(單核核心線程數(shù)為1)担汤,所有異步回調(diào)都會(huì)共用該CommonPool又官,核心與非核心業(yè)務(wù)都競(jìng)爭(zhēng)同一個(gè)池中的線程,很容易成為系統(tǒng)瓶頸漫试。手動(dòng)傳遞線程池參數(shù)可以更方便的調(diào)節(jié)參數(shù)六敬,并且可以給不同的業(yè)務(wù)分配不同的線程池,以求資源隔離驾荣,減少不同業(yè)務(wù)之間的相互干擾外构。
4.2.2 線程池循環(huán)引用會(huì)導(dǎo)致死鎖
public Object doGet() {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任務(wù)
}, threadPool1);
return cf1.join();
}
如上代碼塊所示普泡,doGet方法第三行通過supplyAsync向threadPool1請(qǐng)求線程,并且內(nèi)部子任務(wù)又向threadPool1請(qǐng)求線程审编。threadPool1大小為10撼班,當(dāng)同一時(shí)刻有10個(gè)請(qǐng)求到達(dá),則threadPool1被打滿垒酬,子任務(wù)請(qǐng)求線程時(shí)進(jìn)入阻塞隊(duì)列排隊(duì)砰嘁,但是父任務(wù)的完成又依賴于子任務(wù),這時(shí)由于子任務(wù)得不到線程勘究,父任務(wù)無法完成矮湘。主線程執(zhí)行cf1.join()進(jìn)入阻塞狀態(tài),并且永遠(yuǎn)無法恢復(fù)口糕。
為了修復(fù)該問題缅阳,需要將父任務(wù)與子任務(wù)做線程池隔離,兩個(gè)任務(wù)請(qǐng)求不同的線程池景描,避免循環(huán)依賴導(dǎo)致的阻塞十办。
4.2.3 異步RPC調(diào)用注意不要阻塞IO線程池
服務(wù)異步化后很多步驟都會(huì)依賴于異步RPC調(diào)用的結(jié)果,這時(shí)需要特別注意一點(diǎn)超棺,如果是使用基于NIO(比如Netty)的異步RPC向族,則返回結(jié)果是由IO線程負(fù)責(zé)設(shè)置的,即回調(diào)方法由IO線程觸發(fā)棠绘,CompletableFuture同步回調(diào)(如thenApply件相、thenAccept等無Async后綴的方法)如果依賴的異步RPC調(diào)用的返回結(jié)果,那么這些同步回調(diào)將運(yùn)行在IO線程上弄唧,而整個(gè)服務(wù)只有一個(gè)IO線程池,這時(shí)需要保證同步回調(diào)中不能有阻塞等耗時(shí)過長(zhǎng)的邏輯霍衫,否則在這些邏輯執(zhí)行完成前候引,IO線程將一直被占用,影響整個(gè)服務(wù)的響應(yīng)敦跌。
4.3 其他
4.3.1 異常處理
由于異步執(zhí)行的任務(wù)在其他線程上執(zhí)行澄干,而異常信息存儲(chǔ)在線程棧中,因此當(dāng)前線程除非阻塞等待返回結(jié)果柠傍,否則無法通過try\catch捕獲異常麸俘。CompletableFuture提供了異常捕獲回調(diào)exceptionally,相當(dāng)于同步調(diào)用中的try\catch惧笛。使用方法如下所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//內(nèi)部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//業(yè)務(wù)方法从媚,內(nèi)部會(huì)發(fā)起異步rpc調(diào)用
return remarkResultFuture
.exceptionally(err -> {//通過exceptionally 捕獲異常,打印日志并返回默認(rèn)值
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);
return 0;
});
}
有一點(diǎn)需要注意患整,CompletableFuture在回調(diào)方法中對(duì)異常進(jìn)行了包裝拜效。大部分異常會(huì)封裝成CompletionException后拋出喷众,真正的異常存儲(chǔ)在cause屬性中,因此如果調(diào)用鏈中經(jīng)過了回調(diào)方法處理那么就需要用Throwable.getCause()方法提取真正的異常紧憾。但是到千,有些情況下會(huì)直接返回真正的異常(Stack Overflow的討論),最好使用工具類提取異常赴穗,如下代碼所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//內(nèi)部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//業(yè)務(wù)方法憔四,內(nèi)部會(huì)發(fā)起異步rpc調(diào)用
return remarkResultFuture
.thenApply(result -> {//這里增加了一個(gè)回調(diào)方法thenApply,如果發(fā)生異常thenApply內(nèi)部會(huì)通過new CompletionException(throwable) 對(duì)異常進(jìn)行包裝
//這里是一些業(yè)務(wù)操作
})
.exceptionally(err -> {//通過exceptionally 捕獲異常般眉,這里的err已經(jīng)被thenApply包裝過了赵,因此需要通過Throwable.getCause()提取異常
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
return 0;
});
}
上面代碼中用到了一個(gè)自定義的工具類ExceptionUtils,用于CompletableFuture的異常提取煤篙,在使用CompletableFuture做異步編程時(shí)斟览,可以直接使用該工具類處理異常。實(shí)現(xiàn)代碼如下:
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//這里判斷異常類型是否為CompletionException辑奈、ExecutionException苛茂,如果是則進(jìn)行提取,否則直接返回鸠窗。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
4.3.2 沉淀的工具方法介紹
在實(shí)踐過程中我們沉淀了一些通用的工具方法妓羊,在使用CompletableFuture開發(fā)時(shí)可以直接拿來使用,詳情參見“附錄”稍计。
5 異步化收益
通過異步化改造躁绸,美團(tuán)商家端API系統(tǒng)的性能得到明顯提升,與改造前對(duì)比的收益如下:
- 核心接口吞吐量大幅提升臣嚣,其中訂單輪詢接口改造前TP99為754ms净刮,改造后降為408ms。
- 服務(wù)器數(shù)量減少1/3硅则。
6 參考文獻(xiàn)
- CompletableFuture (Java Platform SE 8 )
- java - Does CompletionStage always wrap exceptions in CompletionException? - Stack Overflow
- exception - Surprising behavior of Java 8 CompletableFuture exceptionally method - Stack Overflow
- 文檔 | Apache Dubbo
7 名詞解釋及備注
注1:“增量同步”是指商家客戶端與服務(wù)端之間的訂單增量數(shù)據(jù)同步協(xié)議淹父,客戶端使用該協(xié)議獲取新增訂單以及狀態(tài)發(fā)生變化的訂單。
注2:本文涉及到的所有技術(shù)點(diǎn)依賴的Java版本為JDK 8怎虫,CompletableFuture支持的特性分析也是基于該版本暑认。
附錄
自定義函數(shù)
@FunctionalInterface
public interface ThriftAsyncCall {
void invoke() throws TException ;
}
CompletableFuture處理工具類
/**
* CompletableFuture封裝工具類
*/
@Slf4j
public class FutureUtils {
/**
* 該方法為美團(tuán)內(nèi)部rpc注冊(cè)監(jiān)聽的封裝,可以作為其他實(shí)現(xiàn)的參照
* OctoThriftCallback 為thrift回調(diào)方法
* ThriftAsyncCall 為自定義函數(shù)大审,用來表示一次thrift調(diào)用(定義如上)
*/
public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {
CompletableFuture<T> thriftResultFuture = new CompletableFuture<>();
callback.addObserver(new OctoObserver<T>() {
@Override
public void onSuccess(T t) {
thriftResultFuture.complete(t);
}
@Override
public void onFailure(Throwable throwable) {
thriftResultFuture.completeExceptionally(throwable);
}
});
if (thriftCall != null) {
try {
thriftCall.invoke();
} catch (TException e) {
thriftResultFuture.completeExceptionally(e);
}
}
return thriftResultFuture;
}
/**
* 設(shè)置CF狀態(tài)為失敗
*/
public static <T> CompletableFuture<T> failed(Throwable ex) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(ex);
return completableFuture;
}
/**
* 設(shè)置CF狀態(tài)為成功
*/
public static <T> CompletableFuture<T> success(T result) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.complete(result);
return completableFuture;
}
/**
* 將List<CompletableFuture<T>> 轉(zhuǎn)為 CompletableFuture<List<T>>
*/
public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
/**
* 將List<CompletableFuture<List<T>>> 轉(zhuǎn)為 CompletableFuture<List<T>>
* 多用于分頁(yè)查詢的場(chǎng)景
*/
public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap( listFuture -> listFuture.join().stream())
.collect(Collectors.toList())
);
}
/*
* 將List<CompletableFuture<Map<K, V>>> 轉(zhuǎn)為 CompletableFuture<Map<K, V>>
* @Param mergeFunction 自定義key沖突時(shí)的merge策略
*/
public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(
Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));
}
/**
* 將List<CompletableFuture<T>> 轉(zhuǎn)為 CompletableFuture<List<T>>蘸际,并過濾調(diào)null值
*/
public static <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(e -> e != null)
.collect(Collectors.toList())
);
}
/**
* 將List<CompletableFuture<List<T>>> 轉(zhuǎn)為 CompletableFuture<List<T>>,并過濾調(diào)null值
* 多用于分頁(yè)查詢的場(chǎng)景
*/
public static <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null))
.collect(Collectors.toList())
);
}
/**
* 將List<CompletableFuture<Map<K, V>>> 轉(zhuǎn)為 CompletableFuture<Map<K, V>>
* @Param filterFunction 自定義過濾策略
*/
public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures,
Predicate<? super T> filterFunction) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(filterFunction)
.collect(Collectors.toList())
);
}
/**
* 將List<CompletableFuture<List<T>>> 轉(zhuǎn)為 CompletableFuture<List<T>>
* @Param filterFunction 自定義過濾策略
*/
public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures,
Predicate<? super T> filterFunction) {
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap( listFuture -> listFuture.join().stream().filter(filterFunction))
.collect(Collectors.toList())
);
}
/**
* 將CompletableFuture<Map<K,V>>的list轉(zhuǎn)為 CompletableFuture<Map<K,V>>徒扶。 多個(gè)map合并為一個(gè)map粮彤。 如果key沖突,采用新的value覆蓋。
*/
public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(
Collection<CompletableFuture<Map<K, V>>> completableFutures) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));
}}
異常提取工具類
public class ExceptionUtils {
/**
* 提取真正的異常
*/
public static Throwable extractRealException(Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
打印日志
@Slf4j
public abstract class AbstractLogAction<R> {
protected final String methodName;
protected final Object[] args;
public AbstractLogAction(String methodName, Object... args) {
this.methodName = methodName;
this.args = args;
}
protected void logResult(R result, Throwable throwable) {
if (throwable != null) {
boolean isBusinessError = throwable instanceof TBase || (throwable.getCause() != null && throwable
.getCause() instanceof TBase);
if (isBusinessError) {
logBusinessError(throwable);
} else if (throwable instanceof DegradeException || throwable instanceof DegradeRuntimeException) {//這里為內(nèi)部rpc框架拋出的異常驾诈,使用時(shí)可以酌情修改
if (RhinoSwitch.getBoolean("isPrintDegradeLog", false)) {
log.error("{} degrade exception, param:{} , error:{}", methodName, args, throwable);
}
} else {
log.error("{} unknown error, param:{} , error:{}", methodName, args, ExceptionUtils.extractRealException(throwable));
}
} else {
if (isLogResult()) {
log.info("{} param:{} , result:{}", methodName, args, result);
} else {
log.info("{} param:{}", methodName, args);
}
}
}
private void logBusinessError(Throwable throwable) {
log.error("{} business error, param:{} , error:{}", methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));
}
private boolean isLogResult() {
//這里是動(dòng)態(tài)配置開關(guān)缠诅,用于動(dòng)態(tài)控制日志打印,開源動(dòng)態(tài)配置中心可以使用nacos乍迄、apollo等管引,如果項(xiàng)目沒有使用配置中心則可以刪除
return RhinoSwitch.getBoolean(methodName + "_isLogResult", false);
}}
日志處理實(shí)現(xiàn)類
/**
* 發(fā)生異常時(shí),根據(jù)是否為業(yè)務(wù)異常打印日志闯两。
* 跟CompletableFuture.whenComplete配合使用褥伴,不改變completableFuture的結(jié)果(正常OR異常)
*/
@Slf4j
public class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable> {
public LogErrorAction(String methodName, Object... args) {
super(methodName, args);
}
@Override
public void accept(R result, Throwable throwable) {
logResult(result, throwable);
}
}
打印日志方式
completableFuture
.whenComplete(
new LogErrorAction<>("orderService.getOrder", params));
異常情況返回默認(rèn)值
/**
* 當(dāng)發(fā)生異常時(shí)返回自定義的值
*/
public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {
private final R defaultValue;
/**
* 當(dāng)返回值為空的時(shí)候是否替換為默認(rèn)值
*/
private final boolean isNullToDefault;
/**
* @param methodName 方法名稱
* @param defaultValue 當(dāng)異常發(fā)生時(shí)自定義返回的默認(rèn)值
* @param args 方法入?yún)? */
public DefaultValueHandle(String methodName, R defaultValue, Object... args) {
super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = false;
}
/**
* @param isNullToDefault
* @param defaultValue 當(dāng)異常發(fā)生時(shí)自定義返回的默認(rèn)值
* @param methodName 方法名稱
* @param args 方法入?yún)? */
public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {
super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = isNullToDefault;
}
@Override
public R apply(R result, Throwable throwable) {
logResult(result, throwable);
if (throwable != null) {
return defaultValue;
}
if (result == null && isNullToDefault) {
return defaultValue;
}
return result;
}
public static <R> DefaultValueHandle.DefaultValueHandleBuilder<R> builder() {
return new DefaultValueHandle.DefaultValueHandleBuilder<>();
}
public static class DefaultValueHandleBuilder<R> {
private boolean isNullToDefault;
private R defaultValue;
private String methodName;
private Object[] args;
DefaultValueHandleBuilder() {
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> isNullToDefault(final boolean isNullToDefault) {
this.isNullToDefault = isNullToDefault;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> defaultValue(final R defaultValue) {
this.defaultValue = defaultValue;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> methodName(final String methodName) {
this.methodName = methodName;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> args(final Object... args) {
this.args = args;
return this;
}
public DefaultValueHandle<R> build() {
return new DefaultValueHandle<R>(this.isNullToDefault, this.defaultValue, this.methodName, this.args);
}
public String toString() {
return "DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault=" + this.isNullToDefault + ", defaultValue=" + this.defaultValue + ", methodName=" + this.methodName + ", args=" + Arrays.deepToString(this.args) + ")";
}
}
默認(rèn)返回值應(yīng)用示例
completableFuture.handle(new DefaultValueHandle<>("orderService.getOrder", Collections.emptyMap(), params));