Elasticsearch 5.x 源碼分析(2)TransportClient和RestClient

問題列表

  • ES有哪兩種Client
  • 兩種需要建立的連接數(shù)和需要建立連接的Nodes數(shù)是怎么樣的
  • 兩種Client都有辦法取得Cluster的所有的Nodes節(jié)點(diǎn)嗎
  • 其他Node是否會感知這個Client的存在?(未答焙蚓,待更新)

這章來聊聊Client,首先我們看一下TransportClient的源代碼,其實(shí)看了之前一篇文章的話可以看到TransportClient的初始化過程和一個Node的過程是非常類似的扶踊,然后我們再看一下ES 5.x 新弄的RestClient,一個稍微簡單,輕量的Client(重點(diǎn)是不需要和Node之間保持長連接呀潭,和RestClient 不會和ES版本保持強(qiáng)依賴。
本篇主要是看一下Client是如何初始化的至非,以及它是如何和其他Node保持關(guān)系的钠署,至于怎么調(diào)用Client發(fā)起查詢則不在本篇討論范圍之內(nèi)。


TransportClient

首先我們看看Elasticsearch 官網(wǎng)給出的new 一個Client的關(guān)鍵參數(shù):

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown

client.close();
Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...
Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

創(chuàng)建一個Client最最重要就是上面列的:

  • 要連接的IP地址
  • 要連接的集群
  • 是否采用嗅探
    PreBuiltTransportClient其實(shí)就是一個TransportClient的 builder類荒椭,完全沒后自己方法
public class PreBuiltTransportClient extends TransportClient {
...
    /**
     * Creates a new transport client with pre-installed plugins.
     *
     * @param settings            the settings passed to this transport client
     * @param plugins             a collection of additional plugins to run with this client
     * @param hostFailureListener a failure listener that is invoked if a node is disconnected; this can be <code>null</code>
     */
    public PreBuiltTransportClient(
        Settings settings,
        Collection<Class<? extends Plugin>> plugins,
        HostFailureListener hostFailureListener) {
        super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener);
    }

直接進(jìn)去TransportClient的類看谐鼎,首先是4個重要參數(shù)。

    public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
        Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope);
    public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
        Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope);
    public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
        Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope);
    public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
        Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);

nodes_sampler_intervalping_timeout兩個參數(shù)默認(rèn)值都是5s趣惠。構(gòu)造函數(shù)的話和Node的初始化是非常相似的:

private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
                                                Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
        if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
            providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
        }
        final PluginsService pluginsService = newPluginService(providedSettings, plugins);
        final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
        final List<Closeable> resourcesToClose = new ArrayList<>();
        final ThreadPool threadPool = new ThreadPool(settings);
        resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
        final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
        try {
            final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
            final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
            for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                additionalSettings.addAll(builder.getRegisteredSettings());
            }
            SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);

            SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
            List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
            entries.addAll(NetworkModule.getNamedWriteables());
            entries.addAll(searchModule.getNamedWriteables());
            entries.addAll(ClusterModule.getNamedWriteables());
            entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
                                         .flatMap(p -> p.getNamedWriteables().stream())
                                         .collect(Collectors.toList()));
            NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
            NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                    searchModule.getNamedXContents().stream(),
                    pluginsService.filterPlugins(Plugin.class).stream()
                            .flatMap(p -> p.getNamedXContent().stream())
                    ).flatMap(Function.identity()).collect(toList()));

            ModulesBuilder modules = new ModulesBuilder();
            // plugin modules must be added here, before others or we can get crazy injection errors...
            for (Module pluginModule : pluginsService.createGuiceModules()) {
                modules.add(pluginModule);
            }
            modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
            ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
                    settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
                    pluginsService.filterPlugins(ActionPlugin.class), null, null);
            modules.add(actionModule);

            CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
                settingsModule.getClusterSettings());
            resourcesToClose.add(circuitBreakerService);
            BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
            resourcesToClose.add(bigArrays);
            modules.add(settingsModule);
            NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
                bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
            final Transport transport = networkModule.getTransportSupplier().get();
            final TransportService transportService = new TransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(),
                boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
                    UUIDs.randomBase64UUID()), null);
            modules.add((b -> {
                b.bind(BigArrays.class).toInstance(bigArrays);
                b.bind(PluginsService.class).toInstance(pluginsService);
                b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                b.bind(Transport.class).toInstance(transport);
                b.bind(TransportService.class).toInstance(transportService);
                b.bind(NetworkService.class).toInstance(networkService);
            }));

            Injector injector = modules.createInjector();
            final TransportClientNodesService nodesService =
                new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
                    ? (t, e) -> {} : failureListner);
            final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
                actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));

            List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream()
                .map(injector::getInstance).collect(Collectors.toList()));
            resourcesToClose.addAll(pluginLifecycleComponents);

            transportService.start();
            transportService.acceptIncomingRequests();

            ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
            resourcesToClose.clear();
            return transportClient;
        } finally {
            IOUtils.closeWhileHandlingException(resourcesToClose);
        }
    }

當(dāng)然Client相比起來肯定不會初始化Node節(jié)點(diǎn)所有的東西狸棍,但是一些重要的東西是不會缺少的,比如threadPool味悄,NetworkModule草戈,TransportService等等。最后會new出一個ClientTemplate侍瑟,這個類封裝了5個變量

private static final class ClientTemplate {
        final Injector injector;
        private final List<LifecycleComponent> pluginLifecycleComponents;
        private final TransportClientNodesService nodesService;
        private final TransportProxyClient proxy;
        private final NamedWriteableRegistry namedWriteableRegistry;

        private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
                TransportClientNodesService nodesService, TransportProxyClient proxy, NamedWriteableRegistry namedWriteableRegistry) {
            this.injector = injector;
            this.pluginLifecycleComponents = pluginLifecycleComponents;
            this.nodesService = nodesService;
            this.proxy = proxy;
            this.namedWriteableRegistry = namedWriteableRegistry;
        }

        Settings getSettings() {
            return injector.getInstance(Settings.class);
        }

        ThreadPool getThreadPool() {
            return injector.getInstance(ThreadPool.class);
        }
    }

injector不用說了唐片,就是管理bean依賴的,proxy我在后面會說涨颜,namedWritableRegistry 我現(xiàn)在不知道是什么東西來的费韭,我感覺好像是一些request和response的一些序列化的東西,以后讀懂了會回來更新這里庭瑰。這里先看一下這個nodesService星持,它是一個TransportClientNodesService類,通俗講就是給Client用來感知和管理與周邊的Node通訊用见擦,應(yīng)該和NodeService是做類似的東西

   TransportClientNodesService(Settings settings, TransportService transportService,
                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
        super(settings);
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.transportService = transportService;
        this.threadPool = threadPool;
        this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();

        this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
        this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
        this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);

        if (logger.isDebugEnabled()) {
            logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
        }

        if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
            this.nodesSampler = new SniffNodesSampler();
        } else {
            this.nodesSampler = new SimpleNodeSampler();
        }
        this.hostFailureListener = hostFailureListener;
        this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
    }

重要參數(shù)Sniff 就用在這里钉汗,根絕不同的配置定義了SniffNodesSampler或者是SimpleNodeSampler羹令,留意最后還初始化了一個定時器鲤屡,就是按配置的如每5s去ping一下其他nodes。

    class SimpleNodeSampler extends NodeSampler {

        @Override
        protected void doSample() {
            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (DiscoveryNode listedNode : listedNodes) {
                try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
                    final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
                        new FutureTransportResponseHandler<LivenessResponse>() {
                            @Override
                            public LivenessResponse newInstance() {
                                return new LivenessResponse();
                            }
                        });
                    transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
                        TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                        handler);
                    final LivenessResponse livenessResponse = handler.txGet();
                    if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
                        logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
                        newFilteredNodes.add(listedNode);
                    } else {
                        // use discovered information but do keep the original transport address,
                        // so people can control which address is exactly used.
                        DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
                        newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
                            nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
                            nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
                    }
                } catch (ConnectTransportException e) {
                    logger.debug(
                        (Supplier<?>)
                            () -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
                    hostFailureListener.onNodeDisconnected(listedNode, e);
                } catch (Exception e) {
                    logger.info(
                        (Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
                }
            }

            nodes = validateNewNodes(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        }
    }

SampleNodeSampler很簡單福侈,其實(shí)就是把配置進(jìn)去的listedNodes去請求一個STATE的request酒来,注意,這里用的是TransportService去拿connection(底層是用netty4Transport)肪凛,而線程是用GENERIC的線程池堰汉。把成功建立連接的所有的Nodes保存起來辽社,而與每個Node也只保持1條連接。
再來看看SniffNodesSampler

class SniffNodesSampler extends NodeSampler {

        @Override
        protected void doSample() {
            // the nodes we are going to ping include the core listed nodes that were added
            // and the last round of discovered nodes
            Set<DiscoveryNode> nodesToPing = new HashSet<>();
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }

            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            try {
                for (final DiscoveryNode nodeToPing : nodesToPing) {
                    threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

首先也是先向所有的listedNode都ping一遍翘鸭,注意這里用的是MANAGEMENTthreadPool

                       @Override
                        protected void doRun() throws Exception {
                            Transport.Connection pingConnection = null;
                            if (nodes.contains(nodeToPing)) {
                                try {
                                    pingConnection = transportService.getConnection(nodeToPing);
                                } catch (NodeNotConnectedException e) {
                                    // will use a temp connection
                                }
                            }
                            if (pingConnection == null) {
                                logger.trace("connecting to cluster node [{}]", nodeToPing);
                                connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                pingConnection = connectionToClose;
                            }
                            transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                Requests.clusterStateRequest().clear().nodes(true).local(true),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                    .withTimeout(pingTimeout).build(),
                                new TransportResponseHandler<ClusterStateResponse>() {

                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }

                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }

                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(nodeToPing, response);
                                        onDone();
                                    }

                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info(
                                            (Supplier<?>) () -> new ParameterizedMessage(
                                                "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                        try {
                                            hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                        } finally {
                                            onDone();
                                        }
                                    }
                                });
                        }

同樣也是調(diào)用TransportService發(fā)起連接滴铅,這里要特別注意,這種方式下其實(shí)是建立了一堆連接connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);如每個類型多少條連接這樣就乓,所以這種模式一個Client會和一個Node保持一堆連接汉匙。回調(diào)函數(shù)都很簡單生蚁,成功和失敗的都?xì)w類噩翠,同時拿到了每個送回來的cluster的state保存下來clusterStateResponses.put(nodeToPing, response);

HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...",
                            entry.getValue().getState().nodes().getLocalNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }

            nodes = validateNewNodes(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));

最后匯總再確認(rèn)一遍所有的nodes,校驗(yàn)完后維護(hù)邦投,其實(shí)這里的nodes就是整個集群的所有的nodes了伤锚,剩下的就交給那個調(diào)度器去每間隔時間去ping了。
從這里我們也可以看出志衣,其實(shí)這里我們已經(jīng)建立好了連接了屯援,因此以后有什么請求的話其實(shí)client向一個node取一個連接或者一個類型的連接就可以發(fā)起請求了。這就是之前說的proxy的事念脯,我們回去看看proxy是什么東西玄呛。
proxy也是在ClientTemplate里面被初始化:final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
它只保存兩個變量,nodesService和二,proxies

private final TransportClientNodesService nodesService;
    private final Map<Action, TransportActionNodeProxy> proxies;

    TransportProxyClient(Settings settings, TransportService transportService,
                                TransportClientNodesService nodesService, List<GenericAction> actions) {
        this.nodesService = nodesService;
        Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
        for (GenericAction action : actions) {
            if (action instanceof Action) {
                proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = unmodifiableMap(proxies);
    }

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                              final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
    }

調(diào)用也很簡單徘铝,就是得到一個action的proxy,指定一個函數(shù)proxy.execute()然后把這個函數(shù)扔給nodesService去執(zhí)行它惯吕。說白就是proxies里記錄了每種action如何執(zhí)行請求惕它,然后讓nodesService隨機(jī)選一個node來發(fā)送:

    public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
        // we first read nodes before checking the closed state; this
        // is because otherwise we could be subject to a race where we
        // read the state as not being closed, and then the client is
        // closed and the nodes list is cleared, and then a
        // NoNodeAvailableException is thrown
        // it is important that the order of first setting the state of
        // closed and then clearing the list of nodes is maintained in
        // the close method
        final List<DiscoveryNode> nodes = this.nodes;
        if (closed) {
            throw new IllegalStateException("transport client is closed");
        }
        ensureNodesAreAvailable(nodes);
        int index = getNodeNumber();
        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
        DiscoveryNode node = retryListener.getNode(0);
        try {
            callback.doWithNode(node, retryListener);
        } catch (Exception e) {
            try {
                //this exception can't come from the TransportService as it doesn't throw exception at all
                listener.onFailure(e);
            } finally {
                retryListener.maybeNodeFailed(node, e);
            }
        }
    }

RestClient

RestClient 其實(shí)再簡單不過了,看頭100行的源代碼就基本看完了

public class RestClient implements Closeable {

    private static final Log logger = LogFactory.getLog(RestClient.class);

    private final CloseableHttpAsyncClient client;
    //we don't rely on default headers supported by HttpAsyncClient as those cannot be replaced
    private final Header[] defaultHeaders;
    private final long maxRetryTimeoutMillis;
    private final String pathPrefix;
    private final AtomicInteger lastHostIndex = new AtomicInteger(0);
    private volatile HostTuple<Set<HttpHost>> hostTuple;
    private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
    private final FailureListener failureListener;

    RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
               HttpHost[] hosts, String pathPrefix, FailureListener failureListener) {
        this.client = client;
        this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
        this.defaultHeaders = defaultHeaders;
        this.failureListener = failureListener;
        this.pathPrefix = pathPrefix;
        setHosts(hosts);
    }

    /**
     * Returns a new {@link RestClientBuilder} to help with {@link RestClient} creation.
     */
    public static RestClientBuilder builder(HttpHost... hosts) {
        return new RestClientBuilder(hosts);
    }

    /**
     * Replaces the hosts that the client communicates with.
     * @see HttpHost
     */
    public synchronized void setHosts(HttpHost... hosts) {
        if (hosts == null || hosts.length == 0) {
            throw new IllegalArgumentException("hosts must not be null nor empty");
        }
        Set<HttpHost> httpHosts = new HashSet<>();
        AuthCache authCache = new BasicAuthCache();
        for (HttpHost host : hosts) {
            Objects.requireNonNull(host, "host cannot be null");
            httpHosts.add(host);
            authCache.put(host, new BasicScheme());
        }
        this.hostTuple = new HostTuple<>(Collections.unmodifiableSet(httpHosts), authCache);
        this.blacklist.clear();
    }

這里與TransportClient最大不一樣就是這里不會啟用sniff废登,僅負(fù)責(zé)維護(hù)配置進(jìn)去的所有的hosts淹魄,需要簡要的話都把a(bǔ)uthCahe 保存起來,剩下就是所有的請求都交給apache 的httpClient去做了堡距。


Sniffer組件

image.png

那如果我真心覺得RestClient好用而我又想用sniff那咋整甲锡,貼心的ES團(tuán)隊(duì)給個小插件給你,用它來綁定一下你的client羽戒,它會幫你去嗅探出這個cluster的所有機(jī)器并回調(diào)client的setHosts()

void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
            if (running.compareAndSet(false, true)) {
                try {
                    List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
                    logger.debug("sniffed hosts: " + sniffedHosts);
                    if (excludeHost != null) {
                        sniffedHosts.remove(excludeHost);
                    }
                    if (sniffedHosts.isEmpty()) {
                        logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
                    } else {
                        this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
                    }
                } catch (Exception e) {
                    logger.error("error while sniffing nodes", e);
                } finally {
                    scheduleNextRun(nextSniffDelayMillis);
                    running.set(false);
                }
            }
        }

本篇完

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末缤沦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子易稠,更是在濱河造成了極大的恐慌缸废,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異企量,居然都是意外死亡测萎,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進(jìn)店門届巩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硅瞧,“玉大人,你說我怎么就攤上這事恕汇×憷遥” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵拇勃,是天一觀的道長四苇。 經(jīng)常有香客問我,道長方咆,這世上最難降的妖魔是什么月腋? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮瓣赂,結(jié)果婚禮上榆骚,老公的妹妹穿的比我還像新娘。我一直安慰自己煌集,他們只是感情好妓肢,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著苫纤,像睡著了一般碉钠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上卷拘,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天喊废,我揣著相機(jī)與錄音,去河邊找鬼栗弟。 笑死污筷,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的乍赫。 我是一名探鬼主播瓣蛀,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼雷厂!你這毒婦竟也來了惋增?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤罗侯,失蹤者是張志新(化名)和其女友劉穎器腋,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體钩杰,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡纫塌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了讲弄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片措左。...
    茶點(diǎn)故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖避除,靈堂內(nèi)的尸體忽然破棺而出怎披,到底是詐尸還是另有隱情,我是刑警寧澤瓶摆,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布凉逛,位于F島的核電站,受9級特大地震影響群井,放射性物質(zhì)發(fā)生泄漏状飞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一书斜、第九天 我趴在偏房一處隱蔽的房頂上張望诬辈。 院中可真熱鬧,春花似錦荐吉、人聲如沸焙糟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽穿撮。三九已至,卻和暖如春痪欲,著一層夾襖步出監(jiān)牢的瞬間混巧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工勤揩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咧党,地道東北人。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓陨亡,卻偏偏與公主長得像傍衡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子负蠕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容