最近項(xiàng)目上的需要按傅,要在某些場(chǎng)景下攔截ES的Request 和Response梆暮,進(jìn)而把ES的整個(gè)plugin 的機(jī)制原理分析了一遍服协。但是并不等于說(shuō)會(huì)寫(xiě)plugin就可以為所欲為,實(shí)操下來(lái)還是發(fā)現(xiàn)不少問(wèn)題啦粹,這篇文章主要還是圍繞一個(gè)目的:如何通過(guò)寫(xiě)plugin來(lái)想辦法攔截ES的Request 和Response 的思路來(lái)寫(xiě)偿荷。
這篇文章不是詳細(xì)介紹如何寫(xiě)一個(gè)plugin的,如果有這個(gè)需求我建議閱讀下面任何一篇文章就可以了
Creating a Plugin for Elasticsearch 5.0 Using Maven
這里簡(jiǎn)單幾句話說(shuō)說(shuō)Plugin唠椭,在Elasticsearch 5.x 針對(duì)不同的需求跳纳,如果想對(duì)某個(gè)模塊做擴(kuò)展性開(kāi)發(fā)和修改,可以通過(guò)實(shí)現(xiàn)這個(gè)模塊開(kāi)放出來(lái)的Plugin 接口贪嫂,然后寫(xiě)好完整的邏輯并打包上傳寺庄,重啟ES 就可以加載你寫(xiě)的Plugin,可實(shí)現(xiàn)的Plugin 都在org.elasticsearch.plugin
包下
從這個(gè)圖上看,首先猜測(cè)斗塘,如果我們要寫(xiě)Plugin來(lái)攔截Request 和Response饶深,那么無(wú)非就是實(shí)現(xiàn)ActionPlugin 和SearchPlugin,那下面一步步來(lái)逛拱。
用ActionPlugin 實(shí)現(xiàn)
首先回顧一下ES整個(gè)端到端的調(diào)用敌厘,這里都是以Rest 請(qǐng)求為例,因?yàn)門(mén)ransport Client調(diào)用方式就簡(jiǎn)單很多了朽合,直接在 client.search()的時(shí)候塞一個(gè)回調(diào)函數(shù)就可以了俱两。(下面每個(gè)類的含義不懂的可以回看之前的文章Elasticsearch 5.x 源碼分析(6)Request 和Response 在ES 中的傳輸和解析
)
圖畫(huà)的差別介意,再結(jié)合一下流程圖來(lái)看
OK曹步,先看ActionPlugin接口描述和結(jié)合上面兩幅圖對(duì)比宪彩,我們可以得到下面三個(gè)總結(jié):
- 可以通過(guò)實(shí)現(xiàn)Plugin的RestHandlerWrapper (待會(huì)會(huì)解釋這個(gè)是什么東西)來(lái)攔截RestAction的調(diào)用,也就是說(shuō)Request 應(yīng)該是可以攔截得到的
- 可以通過(guò)實(shí)現(xiàn)getActionFilters() 來(lái)攔截具體的Action的調(diào)用讲婚,那么在這一層應(yīng)該可以攔截所有的內(nèi)部調(diào)用(理解一下一次的請(qǐng)求可能會(huì)由非常多次的內(nèi)部調(diào)用完成)
- 看看上圖最后一條線尿孔,意思就是,別指望在這些wrapper或者filters里面可以抓取到Response筹麸,因?yàn)槔锩嫒慷际钱惒秸{(diào)用活合,所以,你能做的就是想辦法在Wrapper 或者Filters 那里去添加一個(gè)onResponse() 回調(diào)來(lái)實(shí)現(xiàn)了
RestHandlerWrapper
那什么是RestHandlerWrapper 呢物赶,在Elasticsearch 5.2之前白指,其實(shí)你是可以在Plugin 里注冊(cè)一個(gè)或多個(gè)RestFilters 的,就像這樣
據(jù)ES開(kāi)發(fā)團(tuán)隊(duì)的說(shuō)法就是酵紫,這種方式太危險(xiǎn)告嘲,它是可以控制調(diào)用鏈的(在里面獲取到
restFilterChain
也就是可以得到別人的plugin的filter),因此從5.2 開(kāi)始他們把這個(gè)機(jī)制改了奖地,整個(gè)ES全局只允許有一個(gè)RestHandlerWrapper 類橄唬,并且里面只有一個(gè)方法,是獲取不到RestFilterChain的参歹。
想了解更多這個(gè)需求可以鏈進(jìn)去
Plugins: Replace Rest filters with RestHandler wrapper #21905
RestFilters are a complex way of allowing plugins to add extra code
before rest actions are executed. This change removes rest filters, and
replaces with a wrapper which a single plugin may provide.
下面是我的debug 代碼
@Override
public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadContext) {
return (RestHandler r) ->{
threadContext.putHeader("request_time", System.currentTimeMillis() + "");
threadContext.putHeader("response_time", System.currentTimeMillis() + "");
LOGGER.error(Thread.currentThread().getId() + "===getRestHandlerWrapper ====" + "request" + System.currentTimeMillis() + "====" + r.toString());
//LOGGER.error(r.getClass().getClassLoader().toString());
//LOGGER.error(PallasPlugin.class.getClassLoader().toString());
String className = r.getClass().getName();
if (className.contains(".RestSearchAction") || className.contains(".RestSearchTemplateAction")) {
return new MyRestHandler(r);
} else {
return r;
}
};
}
public static class MyRestHandler implements RestHandler {
private RestHandler r;
public MyRestHandler(RestHandler r) {
this.r = r;
}
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
//request.params().put("UUID", UUID.randomUUID().toString());
LOGGER.error(request.params());
LOGGER.error(request.getHeaders());
r.handleRequest(request, channel, client);
}
}
雖然調(diào)試信息很簡(jiǎn)單仰楚,但是這里已經(jīng)可以取得
結(jié)論一
- threadContext put的那些Header 在Response是拿不到的
- request是不允許亂塞params 的,想插個(gè)自定義屬性泽示,直接報(bào)錯(cuò)
- handleRequest()是一個(gè)void方法缸血,所以想非入侵地去給Response 添加回調(diào),很難很難
- 從上面一點(diǎn)推導(dǎo)來(lái)講就是械筛,這里可以攔截到RestRequest捎泻,但是攔截不到RestResponse
- 細(xì)心的人留意到上面為什么我要
r.getClass()
沒(méi)有而不是直接instance of
沒(méi),這里最最重要的是埋哟,Plugin是獨(dú)立ClassLoader 來(lái)加載的笆豁,在Plugin里是讀取不到其他Plugin/Modules 的類的郎汪,只能讀core包里面的類
ActionFilters
再看看我的測(cè)試代碼
public static class MyActionFilter implements ActionFilter {
@Inject
public MyActionFilter() {
}
@Override
public int order() {
return 0;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
if(action.equals("indices:data/read/search") || action.equals("indices:data/read/search/template")) {
LOGGER.error(Thread.currentThread().getId() + "==" + action + "===" + request.toString() + ":" + listener.toString());
LOGGER.error("===replacing Action Lisener====");
LOGGER.error(task.getId() + "," + task.getParentTaskId().getId() + "," + task.getStatus() + "," + task.getType());
LOGGER.error(request.remoteAddress());
chain.proceed(task, action, request, new MyActionListener(listener,task));
} else {
LOGGER.error("===NOT replacing Action Lisener====");
chain.proceed(task, action, request, listener);
}
}
}
public static class MyActionListener implements ActionListener {
private ActionListener listener;
private Task task;
public MyActionListener(ActionListener listener, Task task) {
this.listener = listener;
this.task = task;
}
@Override
public void onResponse(Object o) {
LOGGER.error(Thread.currentThread().getId() + "==onResponse()===" + o.toString());
LOGGER.error(task.getStartTime());
LOGGER.error(System.currentTimeMillis());
listener.onResponse(o);
}
@Override
public void onFailure(Exception e) {
LOGGER.error(Thread.currentThread().getId() + "==onError()===");
listener.onFailure(e);
}
}
看到onRespone() 是否異常興奮,是的闯狱,ActionFilters 就nice 很多煞赢,request 、Response都可以給你哄孤,通過(guò)這個(gè)測(cè)試得到了
結(jié)論二
- ActionFilters確實(shí)可以攔截所有的內(nèi)部Action調(diào)用照筑,不管你是用RestClient 過(guò)來(lái)還是TransportClient 過(guò)來(lái)。
- 這里攔截的是SearchRequest 和SearchResponse瘦陈, 而不是RestRequest 和RestResponse
- 在chain.proceed() 包裝一個(gè)listener凝危,就可以在onResponse() 里做邏輯了,而不影響它本身邏輯
這里有一點(diǎn)尤其需要注意晨逝,剛剛說(shuō)了蛾默,這里是攔截所有調(diào)用,也就是說(shuō)捉貌,就算你是一個(gè)SearchTemplateAction調(diào)用支鸡,其實(shí)也可能進(jìn)來(lái)了多次,因?yàn)樵赟earchTemplateAction內(nèi)部還會(huì)發(fā)起多個(gè)并發(fā)的SearchAction請(qǐng)求趁窃,一個(gè)是給本機(jī)牧挣,一個(gè)是給其他nodes,而其他nodes也會(huì)發(fā)送多個(gè)請(qǐng)求過(guò)來(lái)棚菊,同樣會(huì)被攔截浸踩,這樣對(duì)于有目的地去監(jiān)聽(tīng)某一個(gè)具體的request和Response 是不利的叔汁。
這里我想到兩個(gè)辦法可以解決這個(gè)問(wèn)題:
- 因?yàn)檫@個(gè)調(diào)用鏈里ActionFilter的重復(fù)進(jìn)入其實(shí)是同一個(gè)線程完成的统求,因此可以通過(guò)ThreadLocal來(lái)實(shí)現(xiàn)信息的保存,比如首先進(jìn)入時(shí)保存?zhèn)€信息据块,那再次進(jìn)入就可以識(shí)別了
- 更加暴力點(diǎn)的做法是直接再封裝一個(gè)SearchRequest來(lái)記錄信息
用SearchPlugin 實(shí)現(xiàn)
到這里我們知道用ActionPlugin的方式是勉強(qiáng)可以實(shí)現(xiàn)的码邻,雖然道路比較曲折,那么SearchPlugin 是否也可以實(shí)現(xiàn)呢另假?從這個(gè)Plugin中我第一眼就瞄中了List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners()
這個(gè)方法像屋,結(jié)果一試,無(wú)果边篮,百思不得其解己莺,再次點(diǎn)進(jìn)去源碼想看看它如何調(diào)用,結(jié)果一點(diǎn)戈轿,真想罵人凌受!
這個(gè)接口是完全沒(méi)有地方用到,所以實(shí)現(xiàn)了也沒(méi)用思杯,那么
總結(jié)就是SearchPlugin是實(shí)現(xiàn)不了攔截Request或者Response的胜蛉。
好了,我的調(diào)研就基本到這里了,最后給多篇擴(kuò)展閱讀
Search Plugin to intercept search response
How to intercept search request (in a pluign) without creating separate RestAction