問題列表
- ES有哪兩種Client
- 兩種需要建立的連接數(shù)和需要建立連接的Nodes數(shù)是怎么樣的
- 兩種Client都有辦法取得Cluster的所有的Nodes節(jié)點(diǎn)嗎
- 其他Node是否會感知這個Client的存在?(未答焙蚓,待更新)
這章來聊聊Client,首先我們看一下TransportClient的源代碼,其實(shí)看了之前一篇文章的話可以看到TransportClient的初始化過程和一個Node的過程是非常類似的扶踊,然后我們再看一下ES 5.x 新弄的RestClient,一個稍微簡單,輕量的Client(重點(diǎn)是不需要和Node之間保持長連接呀潭,和RestClient 不會和ES版本保持強(qiáng)依賴。
本篇主要是看一下Client是如何初始化的至非,以及它是如何和其他Node保持關(guān)系的钠署,至于怎么調(diào)用Client發(fā)起查詢則不在本篇討論范圍之內(nèi)。
TransportClient
首先我們看看Elasticsearch 官網(wǎng)給出的new 一個Client的關(guān)鍵參數(shù):
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown
client.close();
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
創(chuàng)建一個Client最最重要就是上面列的:
- 要連接的IP地址
- 要連接的集群
- 是否采用嗅探
PreBuiltTransportClient
其實(shí)就是一個TransportClient的 builder類荒椭,完全沒后自己方法
public class PreBuiltTransportClient extends TransportClient {
...
/**
* Creates a new transport client with pre-installed plugins.
*
* @param settings the settings passed to this transport client
* @param plugins a collection of additional plugins to run with this client
* @param hostFailureListener a failure listener that is invoked if a node is disconnected; this can be <code>null</code>
*/
public PreBuiltTransportClient(
Settings settings,
Collection<Class<? extends Plugin>> plugins,
HostFailureListener hostFailureListener) {
super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener);
}
直接進(jìn)去TransportClient的類看谐鼎,首先是4個重要參數(shù)。
public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope);
public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope);
public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope);
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
nodes_sampler_interval
和ping_timeout
兩個參數(shù)默認(rèn)值都是5s趣惠。構(gòu)造函數(shù)的話和Node的初始化是非常相似的:
private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
try {
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(NetworkModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
entries.addAll(ClusterModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
.collect(Collectors.toList()));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
searchModule.getNamedXContents().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedXContent().stream())
).flatMap(Function.identity()).collect(toList()));
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
pluginsService.filterPlugins(ActionPlugin.class), null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),
boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
UUIDs.randomBase64UUID()), null);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
}));
Injector injector = modules.createInjector();
final TransportClientNodesService nodesService =
new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
? (t, e) -> {} : failureListner);
final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
resourcesToClose.clear();
return transportClient;
} finally {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
當(dāng)然Client相比起來肯定不會初始化Node節(jié)點(diǎn)所有的東西狸棍,但是一些重要的東西是不會缺少的,比如threadPool
味悄,NetworkModule
草戈,TransportService
等等。最后會new出一個ClientTemplate
侍瑟,這個類封裝了5個變量
private static final class ClientTemplate {
final Injector injector;
private final List<LifecycleComponent> pluginLifecycleComponents;
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
private final NamedWriteableRegistry namedWriteableRegistry;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
TransportClientNodesService nodesService, TransportProxyClient proxy, NamedWriteableRegistry namedWriteableRegistry) {
this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents;
this.nodesService = nodesService;
this.proxy = proxy;
this.namedWriteableRegistry = namedWriteableRegistry;
}
Settings getSettings() {
return injector.getInstance(Settings.class);
}
ThreadPool getThreadPool() {
return injector.getInstance(ThreadPool.class);
}
}
injector不用說了唐片,就是管理bean依賴的,proxy我在后面會說涨颜,namedWritableRegistry 我現(xiàn)在不知道是什么東西來的费韭,我感覺好像是一些request和response的一些序列化的東西,以后讀懂了會回來更新這里庭瑰。這里先看一下這個nodesService星持,它是一個TransportClientNodesService
類,通俗講就是給Client用來感知和管理與周邊的Node通訊用见擦,應(yīng)該和NodeService是做類似的東西
TransportClientNodesService(Settings settings, TransportService transportService,
ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
super(settings);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);
if (logger.isDebugEnabled()) {
logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
}
if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
this.nodesSampler = new SniffNodesSampler();
} else {
this.nodesSampler = new SimpleNodeSampler();
}
this.hostFailureListener = hostFailureListener;
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
}
重要參數(shù)Sniff 就用在這里钉汗,根絕不同的配置定義了SniffNodesSampler
或者是SimpleNodeSampler
羹令,留意最后還初始化了一個定時器鲤屡,就是按配置的如每5s去ping一下其他nodes。
class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
handler);
final LivenessResponse livenessResponse = handler.txGet();
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
}
} catch (ConnectTransportException e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
} catch (Exception e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
}
}
nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}
SampleNodeSampler
很簡單福侈,其實(shí)就是把配置進(jìn)去的listedNodes
去請求一個STATE的request酒来,注意,這里用的是TransportService去拿connection(底層是用netty4Transport)肪凛,而線程是用GENERIC的線程池堰汉。把成功建立連接的所有的Nodes保存起來辽社,而與每個Node也只保持1條連接。
再來看看SniffNodesSampler
:
class SniffNodesSampler extends NodeSampler {
@Override
protected void doSample() {
// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Set<DiscoveryNode> nodesToPing = new HashSet<>();
for (DiscoveryNode node : listedNodes) {
nodesToPing.add(node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.add(node);
}
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
try {
for (final DiscoveryNode nodeToPing : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
首先也是先向所有的listedNode都ping一遍翘鸭,注意這里用的是MANAGEMENT
的threadPool
@Override
protected void doRun() throws Exception {
Transport.Connection pingConnection = null;
if (nodes.contains(nodeToPing)) {
try {
pingConnection = transportService.getConnection(nodeToPing);
} catch (NodeNotConnectedException e) {
// will use a temp connection
}
}
if (pingConnection == null) {
logger.trace("connecting to cluster node [{}]", nodeToPing);
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
pingConnection = connectionToClose;
}
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
onDone();
}
@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} finally {
onDone();
}
}
});
}
同樣也是調(diào)用TransportService發(fā)起連接滴铅,這里要特別注意,這種方式下其實(shí)是建立了一堆連接connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
如每個類型多少條連接這樣就乓,所以這種模式一個Client會和一個Node保持一堆連接汉匙。回調(diào)函數(shù)都很簡單生蚁,成功和失敗的都?xì)w類噩翠,同時拿到了每個送回來的cluster的state保存下來clusterStateResponses.put(nodeToPing, response);
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...",
entry.getValue().getState().nodes().getLocalNode(), clusterName);
newFilteredNodes.add(entry.getKey());
continue;
}
for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
newNodes.add(cursor.value);
}
}
nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
最后匯總再確認(rèn)一遍所有的nodes,校驗(yàn)完后維護(hù)邦投,其實(shí)這里的nodes就是整個集群的所有的nodes了伤锚,剩下的就交給那個調(diào)度器去每間隔時間去ping了。
從這里我們也可以看出志衣,其實(shí)這里我們已經(jīng)建立好了連接了屯援,因此以后有什么請求的話其實(shí)client向一個node取一個連接或者一個類型的連接就可以發(fā)起請求了。這就是之前說的proxy的事念脯,我們回去看看proxy是什么東西玄呛。
proxy也是在ClientTemplate里面被初始化:final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
它只保存兩個變量,nodesService
和二,proxies
private final TransportClientNodesService nodesService;
private final Map<Action, TransportActionNodeProxy> proxies;
TransportProxyClient(Settings settings, TransportService transportService,
TransportClientNodesService nodesService, List<GenericAction> actions) {
this.nodesService = nodesService;
Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
for (GenericAction action : actions) {
if (action instanceof Action) {
proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.proxies = unmodifiableMap(proxies);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
final Request request, ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
}
調(diào)用也很簡單徘铝,就是得到一個action的proxy,指定一個函數(shù)proxy.execute()
然后把這個函數(shù)扔給nodesService
去執(zhí)行它惯吕。說白就是proxies
里記錄了每種action如何執(zhí)行請求惕它,然后讓nodesService
隨機(jī)選一個node來發(fā)送:
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
// we first read nodes before checking the closed state; this
// is because otherwise we could be subject to a race where we
// read the state as not being closed, and then the client is
// closed and the nodes list is cleared, and then a
// NoNodeAvailableException is thrown
// it is important that the order of first setting the state of
// closed and then clearing the list of nodes is maintained in
// the close method
final List<DiscoveryNode> nodes = this.nodes;
if (closed) {
throw new IllegalStateException("transport client is closed");
}
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
DiscoveryNode node = retryListener.getNode(0);
try {
callback.doWithNode(node, retryListener);
} catch (Exception e) {
try {
//this exception can't come from the TransportService as it doesn't throw exception at all
listener.onFailure(e);
} finally {
retryListener.maybeNodeFailed(node, e);
}
}
}
RestClient
RestClient 其實(shí)再簡單不過了,看頭100行的源代碼就基本看完了
public class RestClient implements Closeable {
private static final Log logger = LogFactory.getLog(RestClient.class);
private final CloseableHttpAsyncClient client;
//we don't rely on default headers supported by HttpAsyncClient as those cannot be replaced
private final Header[] defaultHeaders;
private final long maxRetryTimeoutMillis;
private final String pathPrefix;
private final AtomicInteger lastHostIndex = new AtomicInteger(0);
private volatile HostTuple<Set<HttpHost>> hostTuple;
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private final FailureListener failureListener;
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
HttpHost[] hosts, String pathPrefix, FailureListener failureListener) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = defaultHeaders;
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
setHosts(hosts);
}
/**
* Returns a new {@link RestClientBuilder} to help with {@link RestClient} creation.
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hosts);
}
/**
* Replaces the hosts that the client communicates with.
* @see HttpHost
*/
public synchronized void setHosts(HttpHost... hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
Set<HttpHost> httpHosts = new HashSet<>();
AuthCache authCache = new BasicAuthCache();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
httpHosts.add(host);
authCache.put(host, new BasicScheme());
}
this.hostTuple = new HostTuple<>(Collections.unmodifiableSet(httpHosts), authCache);
this.blacklist.clear();
}
這里與TransportClient最大不一樣就是這里不會啟用sniff废登,僅負(fù)責(zé)維護(hù)配置進(jìn)去的所有的hosts淹魄,需要簡要的話都把a(bǔ)uthCahe 保存起來,剩下就是所有的請求都交給apache 的httpClient去做了堡距。
Sniffer組件
那如果我真心覺得RestClient好用而我又想用sniff那咋整甲锡,貼心的ES團(tuán)隊(duì)給個小插件給你,用它來綁定一下你的client羽戒,它會幫你去嗅探出這個cluster的所有機(jī)器并回調(diào)client的setHosts()
void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
if (running.compareAndSet(false, true)) {
try {
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
logger.debug("sniffed hosts: " + sniffedHosts);
if (excludeHost != null) {
sniffedHosts.remove(excludeHost);
}
if (sniffedHosts.isEmpty()) {
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
} else {
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
}
} catch (Exception e) {
logger.error("error while sniffing nodes", e);
} finally {
scheduleNextRun(nextSniffDelayMillis);
running.set(false);
}
}
}
本篇完