現(xiàn)在我們在用JAVA做ES應(yīng)用開發(fā)的時候,通常會使用RestHighLevelClient來進(jìn)行發(fā)送請求宁改,早期沒有RestHighLevelClient的時候,是直接使用Transport進(jìn)行轉(zhuǎn)發(fā)請求叹阔。
1.ES請求流轉(zhuǎn)
首先我們來看下叮姑,從client發(fā)出http請求到ES集群后的整個流程贱傀。
1)首先請求到達(dá)集群節(jié)點(diǎn)后押桃,由Netty4HttpServerTransport接受請求况毅,通過RequestHandler類轉(zhuǎn)到Controller恤批,再有Controller根據(jù)http請求位隶,找打注冊在上面的Action。
2)根據(jù)Http請求選擇的TransportXXXAction會判斷當(dāng)前請求的shard是否在當(dāng)前節(jié)點(diǎn)开皿,如果在涧黄,直接訪問lucene,如果不在赋荆,則需要隊(duì)請求轉(zhuǎn)發(fā)
3)Node內(nèi)部的請求轉(zhuǎn)發(fā)都是基于Netty4Transpor的笋妥,默認(rèn)是9200端口,可以理解為Elasticsearch內(nèi)部的RPC通訊
4)請求到達(dá)node2之后窄潭,經(jīng)過對應(yīng)的XXXHandler處理后春宣,會訪問node2的lucene
2.RestHighLevelClient的請求流程
2.1新建client
HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder builder = RestClient.builder(httpHost);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "123456"));
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
builder.setConnectTimeout(3000);
return builder;
}
});
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setMaxConnTotal(30);
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
}
});
client = new RestHighLevelClient(builder);
如上代碼所示,在新建client的時候,可以指定請求的節(jié)點(diǎn)月帝,鑒權(quán)躏惋,請求超時,http線程池等參數(shù)嚷辅。示例只是用了一個節(jié)點(diǎn)簿姨,如果才用多個節(jié)點(diǎn)的,在builder的時候可以傳入多個HttpHost
2.2請求體構(gòu)造
我們以Search請求為例簸搞,來進(jìn)行舉例扁位,和search類似,client對很多方法都提供了同步和異步的方法
請求最主要的參數(shù)就是SearchRequest趁俊,這里需要放入當(dāng)前請求的索引域仇,查詢條件等
SearchRequest最主要的兩個參數(shù),一個是indices寺擂,還有一個就是SearchSourceBuilder暇务,indices表示請求的索引,SearchSourceBuilder表示請求的語法
用RestHighLevelClient的一個很重要的原因怔软,就是它的語法很大程度上和ES的DSL是一一對應(yīng)的般卑,比如SearchSourceBuilder我們可以理解為DSL最外層的{}, SearchSourceBuilder內(nèi)部的成員變量有,
QueryBuilder爽雄,fetchSourceContext, aggregations等,這些成員變量內(nèi)部的接口也和DSL語法基本差不多沐鼠。
在SearchRequest構(gòu)造完成之后挚瘟,我們可以調(diào)用toString()方法生成DSL,當(dāng)然在使用過程中我們也可以在kibina等工具內(nèi)用DSL調(diào)好請求饲梭,然后在Search的時候直接使用DSL乘盖,但是這種方式不利于維護(hù)查詢的語法。不建議使用憔涉。
2.3請求發(fā)送
執(zhí)行search方法后订框,最終會執(zhí)行到RestClient:performRequest
public Response performRequest(Request request) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsyncNoCatch(request, listener);
return listener.get();
}
通過該方法,我們看到無論我們在外面選擇的是同步或者異步的方法兜叨,其實(shí)clinet內(nèi)部都是按照異步處理的穿扳。所以2.1中介紹的線程池配置就很關(guān)鍵,需要根據(jù)不同的業(yè)務(wù)選擇不同的線程池大小国旷。
我們來看下這段代碼的主要邏輯performRequestAsyncNoCatch(request, listener)矛物,該方法中經(jīng)過一些參數(shù)校驗(yàn)和請求封裝后,進(jìn)入方法
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
這里有個關(guān)鍵的方法跪但,nextNode()履羞,我們看一下nextNode()做了些什么
/**
* Returns a non-empty {@link Iterator} of nodes to be used for a request
* that match the {@link NodeSelector}.
* <p>
* If there are no living nodes that match the {@link NodeSelector}
* this will return the dead node that matches the {@link NodeSelector}
* that is closest to being revived.
* @throws IOException if no nodes are available
*/
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}
我們先來看一下RestClient的幾個成員變量,即selectNodes的幾個入?yún)?/p>
private final AtomicInteger lastNodeIndex = new AtomicInteger(0); //上一次請求的node編號
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>(); //node的狀態(tài)map,表示某個node是否連不上
private final NodeSelector nodeSelector; //node 選擇器忆首,用于負(fù)載均衡
private volatile NodeTuple<List<Node>> nodeTuple; //當(dāng)前client配置的所有協(xié)調(diào)節(jié)點(diǎn)信息
理解了這幾個變量之后爱榔,我們再看selectNodes方法
/**
* Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
* if the previous attempt failed and so on. Package private for testing.
*/
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
/*
* Sort the nodes into living and dead lists.
*/
List<Node> livingNodes = new ArrayList<>(nodeTuple.nodes.size() - blacklist.size());
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) { //1
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null) {
livingNodes.add(node);
continue;
}
if (deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
}
deadNodes.add(new DeadNode(node, deadness));
}
if (false == livingNodes.isEmpty()) {//2
/*
* Normal state: there is at least one living node. If the
* selector is ok with any over the living nodes then use them
* for the request.
*/
List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
nodeSelector.select(selectedLivingNodes);
if (false == selectedLivingNodes.isEmpty()) {
/*
* Rotate the list using a global counter as the distance so subsequent
* requests will try the nodes in a different order.
*/
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
return selectedLivingNodes;
}
}
/*
* Last resort: there are no good nodes to use, either because
* the selector rejected all the living nodes or because there aren't
* any living ones. Either way, we want to revive a single dead node
* that the NodeSelectors are OK with. We do this by passing the dead
* nodes through the NodeSelector so it can have its say in which nodes
* are ok. If the selector is ok with any of the nodes then we will take
* the one in the list that has the lowest revival time and try it.
*/
if (false == deadNodes.isEmpty()) {//3
final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
/*
* We'd like NodeSelectors to remove items directly from deadNodes
* so we can find the minimum after it is filtered without having
* to compare many things. This saves us a sort on the unfiltered
* list.
*/
nodeSelector.select(new Iterable<Node>() {
@Override
public Iterator<Node> iterator() {
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
}
});
if (false == selectedDeadNodes.isEmpty()) {
return singletonList(Collections.min(selectedDeadNodes).node);
}
}
throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
+ "living " + livingNodes + " and dead " + deadNodes);
}
(1)該方法的第一步,就是從節(jié)點(diǎn)狀態(tài)map內(nèi)選出所有的node糙及,如果不是dead node详幽,則直接加入到livingNodes列表,如果是的話丁鹉,判斷一下是否需要充實(shí)(根據(jù)dead的時間)妒潭。這里每次請求結(jié)束后會根據(jù)請求結(jié)果更新blacklist的值
(2)第二步,判斷l(xiāng)iveingNodes是否為空揣钦,不為空雳灾,則配合負(fù)載算法,重排序livingNodes冯凹,在后面使用過程中谎亩,從livingNodes中選擇Node
(3)如果,liveingNodes為空宇姚,判斷deadNodes是否為空匈庭,不為空的話,從deadNodes中選擇一個最快被解禁的node浑劳,作為請求的Node(死馬當(dāng)活馬醫(yī))
3總結(jié)
看起RestHighLevelClient很簡單阱持,其實(shí)內(nèi)部還是有很多復(fù)雜邏輯的,有興趣的可以深入了解下
更多精彩內(nèi)容魔熏,請關(guān)注公眾號