1 請求URL 與對應(yīng)Action
如果我們發(fā)送如下的http請求测秸,elasticsearch是如何匹配對應(yīng)Action的呢?
curl -XGET 'http://elasticsearch_ip:port/xxx/url/path' -d '{
"請求的content"
}'
在org.elasticsearch.rest.action包下面觉壶,存放了處理各種請求的rest action類
在每個action的構(gòu)造方法中,會將當(dāng)前action對象注冊到指定的url上
public class RestXxxAction extends BaseRestHandler {
@Inject
public RestXxxAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(GET, "/xxx/url/path", this);
controller.registerHandler(POST, "/xxx/url/path", this);
}
}
在registerHandler()方法中,會將url path和對應(yīng)handler添加到http method對象上
public class RestController extends AbstractLifecycleComponent<RestController> {
public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
switch (method) {
case GET:
getHandlers.insert(path, handler);
break;
case DELETE:
deleteHandlers.insert(path, handler);
break;
case POST:
postHandlers.insert(path, handler);
break;
case PUT:
putHandlers.insert(path, handler);
break;
case OPTIONS:
optionsHandlers.insert(path, handler);
break;
case HEAD:
headHandlers.insert(path, handler);
break;
default:
throw new ElasticsearchIllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
}
}
}
elasticsearch使用HttpRequestHandler接收http請求消息敛瓷,最終會在executeHandler()方法中調(diào)用getHandler()方法獲取path對應(yīng)的handler
public class RestController extends AbstractLifecycleComponent<RestController> {
void executeHandler(RestRequest request, RestChannel channel) throws Exception {
final RestHandler handler = getHandler(request);
if (handler != null) {
handler.handleRequest(request, channel);
} else {
if (request.method() == RestRequest.Method.OPTIONS) {
channel.sendResponse(new BytesRestResponse(OK));
} else {
channel.sendResponse(new BytesRestResponse(BAD_REQUEST,
"No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
}
}
}
}
在getHandler()方法中学赛,主要是根據(jù)method和path獲取在RestXxxAction中添加的handler
public class RestController extends AbstractLifecycleComponent<RestController> {
private RestHandler getHandler(RestRequest request) {
String path = getPath(request);
RestRequest.Method method = request.method();
if (method == RestRequest.Method.GET) {
return getHandlers.retrieve(path, request.params());
} else if (method == RestRequest.Method.POST) {
return postHandlers.retrieve(path, request.params());
} else if (method == RestRequest.Method.PUT) {
return putHandlers.retrieve(path, request.params());
} else if (method == RestRequest.Method.DELETE) {
return deleteHandlers.retrieve(path, request.params());
} else if (method == RestRequest.Method.HEAD) {
return headHandlers.retrieve(path, request.params());
} else if (method == RestRequest.Method.OPTIONS) {
return optionsHandlers.retrieve(path, request.params());
} else {
return null;
}
}
}
elasticsearch在path對應(yīng)獲取Handler后年堆,就執(zhí)行該handler的handleRequest()方法,作為rest邏輯入口
2 Action與對應(yīng)TransportAction
在ActionModule中盏浇,會將Action和對應(yīng)的TransportAction注冊到actions對象中
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(XxxAction.INSTANCE, TransportXxxAction.class);
}
public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
GenericAction<Request,
Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
Class... supportTransportActions) {
actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
}
}
在RestXxxAction中变丧,最終會調(diào)用AbstractClient的xxx方法,并傳入Action參數(shù)為XxxAction.INSTANCE绢掰,即為在ActionModule中注冊的Action類
public abstract class AbstractClient implements Client {
@Override
public void xxx(final SuggestRequest request, final ActionListener<SuggestResponse> listener) {
execute(XxxAction.INSTANCE, request, listener);
}
}
在NodeClient中會根據(jù)傳入的Action參數(shù)從actions獲取在ActionModule中注冊的TransportAction痒蓬,并執(zhí)行其execute()方法
public class NodeClient extends AbstractClient {
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
transportAction.execute(request, listener);
}
}
在TransportAction類圖個execute()方法中可以看出:
① TransportXxxAction都繼承自TransportAction類
② TransportAction的execute()方法僅調(diào)用了doExecute()方法
④ TransportXxxAction重寫了TransportAction的doExecute()方法
因此童擎,在最終的對應(yīng)關(guān)系為在ActionModule中注冊的Action調(diào)用的為TransportXxxAction的doExecute()方法
TransportAction的execute()方法僅調(diào)用了需要子類重寫的抽象方法doExecute()
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
protected abstract void doExecute(Request request, ActionListener<Response> listener);
public final void execute(Request request, ActionListener<Response> listener) {
// ...
if (filters.length == 0) {
try {
// TransportAction 子類都要重寫這個方法
doExecute(request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
}
} else {
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(actionName, request, listener);
}
}
}
3 節(jié)點間用Action匹配Handler
如果elasticsearch要請求的節(jié)點不是當(dāng)前節(jié)點,則需要將請求發(fā)送到對應(yīng)的節(jié)點上執(zhí)行
在TransportXxxAction中會將ACTION和對應(yīng)的Handler注冊到對應(yīng)的serverHandlers對象中攻晒,在使用transportService發(fā)送請求后顾复,MessageChannelHandler.messageReceived()會接受到信息,然后在handleRequest()方法中獲取注冊的Handler炎辨,執(zhí)行其messageReceived()方法或者交給線程池處理
public class TransportXxxAction {
private final TransportService transportService;
private final SearchService searchService;
@Inject
public TransportXxxAction(Settings settings,
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
XxxService xxxService) {
this.transportService = transportService;
transportService.registerHandler(XXX_ACTION_NAME, new XxxTransportHandler());
this.xxxService= xxxService;
}
public void executeXxx(DiscoveryNode node, final XxxTransportRequest request, final XxxListener listener) {
// 如果shard所在的節(jié)點id和當(dāng)前節(jié)點id相同
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
// 當(dāng)前節(jié)點執(zhí)行邏輯
XxxService.execute(request);
} else {
// shard 所在節(jié)點不是當(dāng)前節(jié)點, 發(fā)送請求執(zhí)行遠(yuǎn)程節(jié)點搜索
// node是要發(fā)送的節(jié)點
transportService.sendRequest(node, XXX_ACTION_NAME, request, new BaseTransportResponseHandler() {
@Override
public QueryXxxResult newInstance() {
return new QueryXxxResult();
}
@Override
public void handleResponse(XxxProvider response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
private class XxxTransportHandler extends BaseTransportRequestHandler {
/**
* 接收遠(yuǎn)程端口的tcp 請求, 執(zhí)行Xxx 邏輯
* @param request TransportRequest
* @param channel TransportChannel
* @throws Exception Exception
*/
@Override
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
// 遠(yuǎn)程節(jié)點待執(zhí)行邏輯
XxxProvider result = XxxService.execute(request);
// 將Query結(jié)果響應(yīng)發(fā)送給調(diào)用節(jié)點
channel.sendResponse(result);
}
@Override
public String executor() {
return ThreadPool.Names.Xxx;
}
}
}
在transportServiceAdapter.handler()方法中捕透,即根據(jù)注冊的action獲取對應(yīng)的TransportHandler,如果executor是same則執(zhí)行handler的messageReceived()方法碴萧,否則交給線程池執(zhí)行
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (ThreadPool.Names.SAME.equals(handler.executor())) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
}
下面講另外一種情況乙嘀,在ActionModule中注冊的Action也都會注冊到handler中
public class TransportXxxAction extends HandledTransportAction {
@Inject
public TransportXxxAction (Settings settings, ThreadPool threadPool) {
super(settings, XxxAction.NAME, threadPool);
}
在TransportAction類中同樣使用transportService.registerHandler()方法注冊handler
public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
super(settings, actionName, threadPool, actionFilters);
transportService.registerHandler(actionName, new TransportHandler() {
@Override
public Request newInstance(){
return newRequestInstance();
}
});
}
}
因此在InternalTransportClient中,主要是回調(diào)proxy.execute()
public class InternalTransportClient extends AbstractClient {
@Inject
public InternalTransportClient(
TransportClientNodesService nodesService,
InternalTransportAdminClient,
Map<String, GenericAction> actions,
Headers headers) {
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
for (GenericAction action : actions.values()) {
if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();
}
@Override
public void execute(final Action action, final Request request, ActionListener listener) {
headers.applyTo(request);
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
}
}
transportService.sendRequest()中的action.name()即為Action的Name參數(shù)破喻,已被注冊到對應(yīng)的handler中了
public class TransportActionNodeProxy extends AbstractComponent {
public void execute(DiscoveryNode node,
final Request request,
final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
transportService.sendRequest(node, action.name(),
request, transportOptions,
new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return action.newResponse();
}
@Override
public String executor() {
if (request.listenerThreaded()) {
return ThreadPool.Names.LISTENER;
}
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
}
}