Elasticsearch RestHighLevelClient發(fā)起請求的過程分析

現(xiàn)在我們在用JAVA做ES應(yīng)用開發(fā)的時候,通常會使用RestHighLevelClient來進(jìn)行發(fā)送請求宁改,早期沒有RestHighLevelClient的時候,是直接使用Transport進(jìn)行轉(zhuǎn)發(fā)請求叹阔。

1.ES請求流轉(zhuǎn)

首先我們來看下叮姑,從client發(fā)出http請求到ES集群后的整個流程贱傀。
1)首先請求到達(dá)集群節(jié)點(diǎn)后押桃,由Netty4HttpServerTransport接受請求况毅,通過RequestHandler類轉(zhuǎn)到Controller恤批,再有Controller根據(jù)http請求位隶,找打注冊在上面的Action。
2)根據(jù)Http請求選擇的TransportXXXAction會判斷當(dāng)前請求的shard是否在當(dāng)前節(jié)點(diǎn)开皿,如果在涧黄,直接訪問lucene,如果不在赋荆,則需要隊(duì)請求轉(zhuǎn)發(fā)
3)Node內(nèi)部的請求轉(zhuǎn)發(fā)都是基于Netty4Transpor的笋妥,默認(rèn)是9200端口,可以理解為Elasticsearch內(nèi)部的RPC通訊
4)請求到達(dá)node2之后窄潭,經(jīng)過對應(yīng)的XXXHandler處理后春宣,會訪問node2的lucene


image.png

2.RestHighLevelClient的請求流程

2.1新建client

HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder builder =  RestClient.builder(httpHost);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
        new UsernamePasswordCredentials("elastic", "123456"));
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
    @Override
    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
        builder.setConnectTimeout(3000);
        return builder;
    }
});
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
        httpAsyncClientBuilder.setMaxConnTotal(30);
        httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpAsyncClientBuilder;
    }
});
client = new RestHighLevelClient(builder);

如上代碼所示,在新建client的時候,可以指定請求的節(jié)點(diǎn)月帝,鑒權(quán)躏惋,請求超時,http線程池等參數(shù)嚷辅。示例只是用了一個節(jié)點(diǎn)簿姨,如果才用多個節(jié)點(diǎn)的,在builder的時候可以傳入多個HttpHost

2.2請求體構(gòu)造

我們以Search請求為例簸搞,來進(jìn)行舉例扁位,和search類似,client對很多方法都提供了同步和異步的方法


image.png

請求最主要的參數(shù)就是SearchRequest趁俊,這里需要放入當(dāng)前請求的索引域仇,查詢條件等

SearchRequest最主要的兩個參數(shù),一個是indices寺擂,還有一個就是SearchSourceBuilder暇务,indices表示請求的索引,SearchSourceBuilder表示請求的語法

用RestHighLevelClient的一個很重要的原因怔软,就是它的語法很大程度上和ES的DSL是一一對應(yīng)的般卑,比如SearchSourceBuilder我們可以理解為DSL最外層的{}, SearchSourceBuilder內(nèi)部的成員變量有,
QueryBuilder爽雄,fetchSourceContext, aggregations等,這些成員變量內(nèi)部的接口也和DSL語法基本差不多沐鼠。

在SearchRequest構(gòu)造完成之后挚瘟,我們可以調(diào)用toString()方法生成DSL,當(dāng)然在使用過程中我們也可以在kibina等工具內(nèi)用DSL調(diào)好請求饲梭,然后在Search的時候直接使用DSL乘盖,但是這種方式不利于維護(hù)查詢的語法。不建議使用憔涉。

2.3請求發(fā)送

執(zhí)行search方法后订框,最終會執(zhí)行到RestClient:performRequest

public Response performRequest(Request request) throws IOException {
    SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
    performRequestAsyncNoCatch(request, listener);
    return listener.get();
}

通過該方法,我們看到無論我們在外面選擇的是同步或者異步的方法兜叨,其實(shí)clinet內(nèi)部都是按照異步處理的穿扳。所以2.1中介紹的線程池配置就很關(guān)鍵,需要根據(jù)不同的業(yè)務(wù)選擇不同的線程池大小国旷。

我們來看下這段代碼的主要邏輯performRequestAsyncNoCatch(request, listener)矛物,該方法中經(jīng)過一些參數(shù)校驗(yàn)和請求封裝后,進(jìn)入方法

performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
        request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);

這里有個關(guān)鍵的方法跪但,nextNode()履羞,我們看一下nextNode()做了些什么

/**
 * Returns a non-empty {@link Iterator} of nodes to be used for a request
 * that match the {@link NodeSelector}.
 * <p>
 * If there are no living nodes that match the {@link NodeSelector}
 * this will return the dead node that matches the {@link NodeSelector}
 * that is closest to being revived.
 * @throws IOException if no nodes are available
 */
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
    NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
    Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
    return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}

我們先來看一下RestClient的幾個成員變量,即selectNodes的幾個入?yún)?/p>

private final AtomicInteger lastNodeIndex = new AtomicInteger(0);  //上一次請求的node編號
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();  //node的狀態(tài)map,表示某個node是否連不上
private final NodeSelector nodeSelector; //node 選擇器忆首,用于負(fù)載均衡
private volatile NodeTuple<List<Node>> nodeTuple; //當(dāng)前client配置的所有協(xié)調(diào)節(jié)點(diǎn)信息

理解了這幾個變量之后爱榔,我們再看selectNodes方法

/**
 * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
 * if the previous attempt failed and so on. Package private for testing.
 */
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
                                  AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
    /*
     * Sort the nodes into living and dead lists.
     */
    List<Node> livingNodes = new ArrayList<>(nodeTuple.nodes.size() - blacklist.size());
    List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
    for (Node node : nodeTuple.nodes) { //1
        DeadHostState deadness = blacklist.get(node.getHost());
        if (deadness == null) {
            livingNodes.add(node);
            continue;
        }
        if (deadness.shallBeRetried()) {
            livingNodes.add(node);
            continue;
        }
        deadNodes.add(new DeadNode(node, deadness));
    }

    if (false == livingNodes.isEmpty()) {//2
        /*
         * Normal state: there is at least one living node. If the
         * selector is ok with any over the living nodes then use them
         * for the request.
         */
        List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
        nodeSelector.select(selectedLivingNodes);
        if (false == selectedLivingNodes.isEmpty()) {
            /*
             * Rotate the list using a global counter as the distance so subsequent
             * requests will try the nodes in a different order.
             */
            Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
            return selectedLivingNodes;
        }
    }

    /*
     * Last resort: there are no good nodes to use, either because
     * the selector rejected all the living nodes or because there aren't
     * any living ones. Either way, we want to revive a single dead node
     * that the NodeSelectors are OK with. We do this by passing the dead
     * nodes through the NodeSelector so it can have its say in which nodes
     * are ok. If the selector is ok with any of the nodes then we will take
     * the one in the list that has the lowest revival time and try it.
     */
    if (false == deadNodes.isEmpty()) {//3
        final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
        /*
         * We'd like NodeSelectors to remove items directly from deadNodes
         * so we can find the minimum after it is filtered without having
         * to compare many things. This saves us a sort on the unfiltered
         * list.
         */
        nodeSelector.select(new Iterable<Node>() {
            @Override
            public Iterator<Node> iterator() {
                return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
            }
        });
        if (false == selectedDeadNodes.isEmpty()) {
            return singletonList(Collections.min(selectedDeadNodes).node);
        }
    }
    throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
            + "living " + livingNodes + " and dead " + deadNodes);
}

(1)該方法的第一步,就是從節(jié)點(diǎn)狀態(tài)map內(nèi)選出所有的node糙及,如果不是dead node详幽,則直接加入到livingNodes列表,如果是的話丁鹉,判斷一下是否需要充實(shí)(根據(jù)dead的時間)妒潭。這里每次請求結(jié)束后會根據(jù)請求結(jié)果更新blacklist的值
(2)第二步,判斷l(xiāng)iveingNodes是否為空揣钦,不為空雳灾,則配合負(fù)載算法,重排序livingNodes冯凹,在后面使用過程中谎亩,從livingNodes中選擇Node
(3)如果,liveingNodes為空宇姚,判斷deadNodes是否為空匈庭,不為空的話,從deadNodes中選擇一個最快被解禁的node浑劳,作為請求的Node(死馬當(dāng)活馬醫(yī))

3總結(jié)

看起RestHighLevelClient很簡單阱持,其實(shí)內(nèi)部還是有很多復(fù)雜邏輯的,有興趣的可以深入了解下

更多精彩內(nèi)容魔熏,請關(guān)注公眾號


公眾號二維碼.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末衷咽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蒜绽,更是在濱河造成了極大的恐慌镶骗,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件躲雅,死亡現(xiàn)場離奇詭異鼎姊,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)相赁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門相寇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人钮科,你說我怎么就攤上這事裆赵。” “怎么了跺嗽?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵战授,是天一觀的道長页藻。 經(jīng)常有香客問我,道長植兰,這世上最難降的妖魔是什么份帐? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮楣导,結(jié)果婚禮上废境,老公的妹妹穿的比我還像新娘。我一直安慰自己筒繁,他們只是感情好噩凹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著毡咏,像睡著了一般驮宴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上呕缭,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天堵泽,我揣著相機(jī)與錄音,去河邊找鬼恢总。 笑死迎罗,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的片仿。 我是一名探鬼主播纹安,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼砂豌!你這毒婦竟也來了厢岂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤奸鸯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后可帽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體娄涩,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年映跟,在試婚紗的時候發(fā)現(xiàn)自己被綠了蓄拣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡努隙,死狀恐怖球恤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情荸镊,我是刑警寧澤咽斧,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布堪置,位于F島的核電站,受9級特大地震影響张惹,放射性物質(zhì)發(fā)生泄漏舀锨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一宛逗、第九天 我趴在偏房一處隱蔽的房頂上張望坎匿。 院中可真熱鬧,春花似錦雷激、人聲如沸替蔬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽承桥。三九已至,卻和暖如春恭垦,著一層夾襖步出監(jiān)牢的瞬間快毛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工番挺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留唠帝,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓玄柏,卻偏偏與公主長得像襟衰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子粪摘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評論 2 354