Elasticsearch 5.x 源碼分析(8)用plugin來(lái)攔截Request、Response 的可行性調(diào)研

最近項(xiàng)目上的需要按傅,要在某些場(chǎng)景下攔截ES的Request 和Response梆暮,進(jìn)而把ES的整個(gè)plugin 的機(jī)制原理分析了一遍服协。但是并不等于說(shuō)會(huì)寫(xiě)plugin就可以為所欲為,實(shí)操下來(lái)還是發(fā)現(xiàn)不少問(wèn)題啦粹,這篇文章主要還是圍繞一個(gè)目的:如何通過(guò)寫(xiě)plugin來(lái)想辦法攔截ES的Request 和Response 的思路來(lái)寫(xiě)偿荷。


這篇文章不是詳細(xì)介紹如何寫(xiě)一個(gè)plugin的,如果有這個(gè)需求我建議閱讀下面任何一篇文章就可以了

Creating a Plugin for Elasticsearch 5.0 Using Maven

Adding a New REST Endpoint to Elasticsearch

elasticsearch源碼分析之plugin的開(kāi)發(fā)

這里簡(jiǎn)單幾句話說(shuō)說(shuō)Plugin唠椭,在Elasticsearch 5.x 針對(duì)不同的需求跳纳,如果想對(duì)某個(gè)模塊做擴(kuò)展性開(kāi)發(fā)和修改,可以通過(guò)實(shí)現(xiàn)這個(gè)模塊開(kāi)放出來(lái)的Plugin 接口贪嫂,然后寫(xiě)好完整的邏輯并打包上傳寺庄,重啟ES 就可以加載你寫(xiě)的Plugin,可實(shí)現(xiàn)的Plugin 都在org.elasticsearch.plugin包下

Elasticsearch 5 可以實(shí)現(xiàn)的plugin的接口

從這個(gè)圖上看,首先猜測(cè)斗塘,如果我們要寫(xiě)Plugin來(lái)攔截Request 和Response饶深,那么無(wú)非就是實(shí)現(xiàn)ActionPlugin 和SearchPlugin,那下面一步步來(lái)逛拱。


用ActionPlugin 實(shí)現(xiàn)

首先回顧一下ES整個(gè)端到端的調(diào)用敌厘,這里都是以Rest 請(qǐng)求為例,因?yàn)門(mén)ransport Client調(diào)用方式就簡(jiǎn)單很多了朽合,直接在 client.search()的時(shí)候塞一個(gè)回調(diào)函數(shù)就可以了俱两。(下面每個(gè)類的含義不懂的可以回看之前的文章Elasticsearch 5.x 源碼分析(6)Request 和Response 在ES 中的傳輸和解析

ES查詢端到端示意圖

圖畫(huà)的差別介意,再結(jié)合一下流程圖來(lái)看

ES調(diào)用流程圖

OK曹步,先看ActionPlugin接口描述和結(jié)合上面兩幅圖對(duì)比宪彩,我們可以得到下面三個(gè)總結(jié):

  • 可以通過(guò)實(shí)現(xiàn)Plugin的RestHandlerWrapper (待會(huì)會(huì)解釋這個(gè)是什么東西)來(lái)攔截RestAction的調(diào)用,也就是說(shuō)Request 應(yīng)該是可以攔截得到的
  • 可以通過(guò)實(shí)現(xiàn)getActionFilters() 來(lái)攔截具體的Action的調(diào)用讲婚,那么在這一層應(yīng)該可以攔截所有的內(nèi)部調(diào)用(理解一下一次的請(qǐng)求可能會(huì)由非常多次的內(nèi)部調(diào)用完成)
  • 看看上圖最后一條線尿孔,意思就是,別指望在這些wrapper或者filters里面可以抓取到Response筹麸,因?yàn)槔锩嫒慷际钱惒秸{(diào)用活合,所以,你能做的就是想辦法在Wrapper 或者Filters 那里去添加一個(gè)onResponse() 回調(diào)來(lái)實(shí)現(xiàn)了
ActionPlugin

RestHandlerWrapper

那什么是RestHandlerWrapper 呢物赶,在Elasticsearch 5.2之前白指,其實(shí)你是可以在Plugin 里注冊(cè)一個(gè)或多個(gè)RestFilters 的,就像這樣

在Plugin里注冊(cè)一個(gè)RestFilter

據(jù)ES開(kāi)發(fā)團(tuán)隊(duì)的說(shuō)法就是酵紫,這種方式太危險(xiǎn)告嘲,它是可以控制調(diào)用鏈的(在里面獲取到restFilterChain 也就是可以得到別人的plugin的filter),因此從5.2 開(kāi)始他們把這個(gè)機(jī)制改了奖地,整個(gè)ES全局只允許有一個(gè)RestHandlerWrapper 類橄唬,并且里面只有一個(gè)方法,是獲取不到RestFilterChain的参歹。

想了解更多這個(gè)需求可以鏈進(jìn)去
Plugins: Replace Rest filters with RestHandler wrapper #21905
RestFilters are a complex way of allowing plugins to add extra code
before rest actions are executed. This change removes rest filters, and
replaces with a wrapper which a single plugin may provide.

下面是我的debug 代碼

@Override
    public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadContext) {
        return (RestHandler r) ->{
            threadContext.putHeader("request_time", System.currentTimeMillis() + "");
            threadContext.putHeader("response_time", System.currentTimeMillis() + "");
            LOGGER.error(Thread.currentThread().getId() + "===getRestHandlerWrapper ====" + "request" + System.currentTimeMillis() + "====" + r.toString());
            //LOGGER.error(r.getClass().getClassLoader().toString());
            //LOGGER.error(PallasPlugin.class.getClassLoader().toString());
            String className = r.getClass().getName();
            if (className.contains(".RestSearchAction") || className.contains(".RestSearchTemplateAction")) {
                return new MyRestHandler(r);
            } else {
                return r;
            }
        };
    }

public static class MyRestHandler implements RestHandler {

        private RestHandler r;

        public MyRestHandler(RestHandler r) {
            this.r = r;
        }

        @Override
        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            //request.params().put("UUID", UUID.randomUUID().toString());
            LOGGER.error(request.params());
            LOGGER.error(request.getHeaders());
            r.handleRequest(request, channel, client);
        }
    }

雖然調(diào)試信息很簡(jiǎn)單仰楚,但是這里已經(jīng)可以取得

結(jié)論一

  • threadContext put的那些Header 在Response是拿不到的
  • request是不允許亂塞params 的,想插個(gè)自定義屬性泽示,直接報(bào)錯(cuò)
  • handleRequest()是一個(gè)void方法缸血,所以想非入侵地去給Response 添加回調(diào),很難很難
  • 從上面一點(diǎn)推導(dǎo)來(lái)講就是械筛,這里可以攔截到RestRequest捎泻,但是攔截不到RestResponse
  • 細(xì)心的人留意到上面為什么我要 r.getClass() 沒(méi)有而不是直接instance of 沒(méi),這里最最重要的是埋哟,Plugin是獨(dú)立ClassLoader 來(lái)加載的笆豁,在Plugin里是讀取不到其他Plugin/Modules 的類的郎汪,只能讀core包里面的類

ActionFilters

再看看我的測(cè)試代碼

public static class MyActionFilter implements ActionFilter {

        @Inject
        public MyActionFilter() {
        }


        @Override
        public int order() {
            return 0;
        }

        @Override
        public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {

            if(action.equals("indices:data/read/search") || action.equals("indices:data/read/search/template")) {

                LOGGER.error(Thread.currentThread().getId() + "==" + action + "===" + request.toString() + ":" + listener.toString());
                LOGGER.error("===replacing Action Lisener====");
                LOGGER.error(task.getId() + "," + task.getParentTaskId().getId() + "," + task.getStatus() + "," + task.getType());
                LOGGER.error(request.remoteAddress());

                chain.proceed(task, action, request, new MyActionListener(listener,task));
            } else {
                LOGGER.error("===NOT replacing Action Lisener====");
                chain.proceed(task, action, request, listener);
            }

        }
    }

    public static class MyActionListener implements ActionListener {

        private ActionListener listener;

        private Task task;

        public MyActionListener(ActionListener listener, Task task) {
            this.listener = listener;
            this.task = task;
        }

        @Override
        public void onResponse(Object o) {
            LOGGER.error(Thread.currentThread().getId() + "==onResponse()===" + o.toString());
            LOGGER.error(task.getStartTime());
            LOGGER.error(System.currentTimeMillis());
            listener.onResponse(o);
        }

        @Override
        public void onFailure(Exception e) {
            LOGGER.error(Thread.currentThread().getId() + "==onError()===");
            listener.onFailure(e);
        }
    }

看到onRespone() 是否異常興奮,是的闯狱,ActionFilters 就nice 很多煞赢,request 、Response都可以給你哄孤,通過(guò)這個(gè)測(cè)試得到了

結(jié)論二
  • ActionFilters確實(shí)可以攔截所有的內(nèi)部Action調(diào)用照筑,不管你是用RestClient 過(guò)來(lái)還是TransportClient 過(guò)來(lái)。
  • 這里攔截的是SearchRequest 和SearchResponse瘦陈, 而不是RestRequest 和RestResponse
  • 在chain.proceed() 包裝一個(gè)listener凝危,就可以在onResponse() 里做邏輯了,而不影響它本身邏輯

這里有一點(diǎn)尤其需要注意晨逝,剛剛說(shuō)了蛾默,這里是攔截所有調(diào)用,也就是說(shuō)捉貌,就算你是一個(gè)SearchTemplateAction調(diào)用支鸡,其實(shí)也可能進(jìn)來(lái)了多次,因?yàn)樵赟earchTemplateAction內(nèi)部還會(huì)發(fā)起多個(gè)并發(fā)的SearchAction請(qǐng)求趁窃,一個(gè)是給本機(jī)牧挣,一個(gè)是給其他nodes,而其他nodes也會(huì)發(fā)送多個(gè)請(qǐng)求過(guò)來(lái)棚菊,同樣會(huì)被攔截浸踩,這樣對(duì)于有目的地去監(jiān)聽(tīng)某一個(gè)具體的request和Response 是不利的叔汁。
這里我想到兩個(gè)辦法可以解決這個(gè)問(wèn)題:

  1. 因?yàn)檫@個(gè)調(diào)用鏈里ActionFilter的重復(fù)進(jìn)入其實(shí)是同一個(gè)線程完成的统求,因此可以通過(guò)ThreadLocal來(lái)實(shí)現(xiàn)信息的保存,比如首先進(jìn)入時(shí)保存?zhèn)€信息据块,那再次進(jìn)入就可以識(shí)別了
  2. 更加暴力點(diǎn)的做法是直接再封裝一個(gè)SearchRequest來(lái)記錄信息

用SearchPlugin 實(shí)現(xiàn)

到這里我們知道用ActionPlugin的方式是勉強(qiáng)可以實(shí)現(xiàn)的码邻,雖然道路比較曲折,那么SearchPlugin 是否也可以實(shí)現(xiàn)呢另假?從這個(gè)Plugin中我第一眼就瞄中了List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners()這個(gè)方法像屋,結(jié)果一試,無(wú)果边篮,百思不得其解己莺,再次點(diǎn)進(jìn)去源碼想看看它如何調(diào)用,結(jié)果一點(diǎn)戈轿,真想罵人凌受!

SearchPlugin接口

這個(gè)接口是完全沒(méi)有地方用到,所以實(shí)現(xiàn)了也沒(méi)用思杯,那么

總結(jié)就是SearchPlugin是實(shí)現(xiàn)不了攔截Request或者Response的胜蛉。

好了,我的調(diào)研就基本到這里了,最后給多篇擴(kuò)展閱讀

Search Plugin to intercept search response

How to intercept search request (in a pluign) without creating separate RestAction

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末誊册,一起剝皮案震驚了整個(gè)濱河市领突,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌案怯,老刑警劉巖君旦,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異嘲碱,居然都是意外死亡于宙,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)悍汛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)捞魁,“玉大人,你說(shuō)我怎么就攤上這事离咐∑准螅” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵宵蛀,是天一觀的道長(zhǎng)昆著。 經(jīng)常有香客問(wèn)我,道長(zhǎng)术陶,這世上最難降的妖魔是什么凑懂? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮梧宫,結(jié)果婚禮上接谨,老公的妹妹穿的比我還像新娘。我一直安慰自己塘匣,他們只是感情好脓豪,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著忌卤,像睡著了一般扫夜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上驰徊,一...
    開(kāi)封第一講書(shū)人閱讀 49,071評(píng)論 1 285
  • 那天笤闯,我揣著相機(jī)與錄音,去河邊找鬼棍厂。 笑死颗味,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的勋桶。 我是一名探鬼主播脱衙,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼侥猬,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了捐韩?” 一聲冷哼從身側(cè)響起退唠,我...
    開(kāi)封第一講書(shū)人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎荤胁,沒(méi)想到半個(gè)月后瞧预,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仅政,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年垢油,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圆丹。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡滩愁,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辫封,到底是詐尸還是另有隱情硝枉,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布倦微,位于F島的核電站妻味,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏欣福。R本人自食惡果不足惜责球,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拓劝。 院中可真熱鬧雏逾,春花似錦、人聲如沸凿将。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)牧抵。三九已至,卻和暖如春侨把,著一層夾襖步出監(jiān)牢的瞬間犀变,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工秋柄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留获枝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓骇笔,卻偏偏與公主長(zhǎng)得像省店,于是被迫代替她去往敵國(guó)和親嚣崭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容