Elasticsearch源碼分析-架構(gòu)設(shè)計之Action

1 請求URL 與對應(yīng)Action

如果我們發(fā)送如下的http請求测秸,elasticsearch是如何匹配對應(yīng)Action的呢?

curl -XGET 'http://elasticsearch_ip:port/xxx/url/path' -d '{
    "請求的content"
}'

在org.elasticsearch.rest.action包下面觉壶,存放了處理各種請求的rest action類
在每個action的構(gòu)造方法中,會將當(dāng)前action對象注冊到指定的url上

public class RestXxxAction extends BaseRestHandler {

    @Inject
    public RestXxxAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(GET, "/xxx/url/path", this);
        controller.registerHandler(POST, "/xxx/url/path", this);
    }
}

在registerHandler()方法中,會將url path和對應(yīng)handler添加到http method對象上

public class RestController extends AbstractLifecycleComponent<RestController> {
    public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
        switch (method) {
            case GET:
                getHandlers.insert(path, handler);
                break;
            case DELETE:
                deleteHandlers.insert(path, handler);
                break;
            case POST:
                postHandlers.insert(path, handler);
                break;
            case PUT:
                putHandlers.insert(path, handler);
                break;
            case OPTIONS:
                optionsHandlers.insert(path, handler);
                break;
            case HEAD:
                headHandlers.insert(path, handler);
                break;
            default:
                throw new ElasticsearchIllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
        }
    }
}

elasticsearch使用HttpRequestHandler接收http請求消息敛瓷,最終會在executeHandler()方法中調(diào)用getHandler()方法獲取path對應(yīng)的handler

public class RestController extends AbstractLifecycleComponent<RestController> {
    void executeHandler(RestRequest request, RestChannel channel) throws Exception {
        final RestHandler handler = getHandler(request);
        if (handler != null) {
            handler.handleRequest(request, channel);
        } else {
            if (request.method() == RestRequest.Method.OPTIONS) {
                channel.sendResponse(new BytesRestResponse(OK));
            } else {
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, 
  "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
            }
        }
    }
}

在getHandler()方法中学赛,主要是根據(jù)method和path獲取在RestXxxAction中添加的handler

public class RestController extends AbstractLifecycleComponent<RestController> {
    private RestHandler getHandler(RestRequest request) {
        String path = getPath(request);
        RestRequest.Method method = request.method();
        if (method == RestRequest.Method.GET) {
            return getHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.POST) {
            return postHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.PUT) {
            return putHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.DELETE) {
            return deleteHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.HEAD) {
            return headHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.OPTIONS) {
            return optionsHandlers.retrieve(path, request.params());
        } else {
            return null;
        }
    }
}

elasticsearch在path對應(yīng)獲取Handler后年堆,就執(zhí)行該handler的handleRequest()方法,作為rest邏輯入口

2 Action與對應(yīng)TransportAction

在ActionModule中盏浇,會將Action和對應(yīng)的TransportAction注冊到actions對象中

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(XxxAction.INSTANCE, TransportXxxAction.class);
    }

    public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
                        GenericAction<Request, 
                        Response> action, Class<? extends TransportAction<Request, Response>> transportAction, 
                        Class... supportTransportActions) {
        actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
    }
}

在RestXxxAction中变丧,最終會調(diào)用AbstractClient的xxx方法,并傳入Action參數(shù)為XxxAction.INSTANCE绢掰,即為在ActionModule中注冊的Action類

public abstract class AbstractClient implements Client {
    @Override
    public void xxx(final SuggestRequest request, final ActionListener<SuggestResponse> listener) {
        execute(XxxAction.INSTANCE, request, listener);
    }
}

在NodeClient中會根據(jù)傳入的Action參數(shù)從actions獲取在ActionModule中注冊的TransportAction痒蓬,并執(zhí)行其execute()方法

public class NodeClient extends AbstractClient {
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); 
        transportAction.execute(request, listener);
    }
}

在TransportAction類圖個execute()方法中可以看出:
① TransportXxxAction都繼承自TransportAction類
② TransportAction的execute()方法僅調(diào)用了doExecute()方法
④ TransportXxxAction重寫了TransportAction的doExecute()方法
因此童擎,在最終的對應(yīng)關(guān)系為在ActionModule中注冊的Action調(diào)用的為TransportXxxAction的doExecute()方法

TransportAction類圖

TransportAction的execute()方法僅調(diào)用了需要子類重寫的抽象方法doExecute()

public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
    
    protected abstract void doExecute(Request request, ActionListener<Response> listener);

    public final void execute(Request request, ActionListener<Response> listener) {
        // ...
        if (filters.length == 0) {
            try {
                // TransportAction 子類都要重寫這個方法
                doExecute(request, listener);
            } catch(Throwable t) {
                logger.trace("Error during transport action execution.", t);
                listener.onFailure(t);
            }
        } else {
            RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(actionName, request, listener);
        }
    }
}

3 節(jié)點間用Action匹配Handler

如果elasticsearch要請求的節(jié)點不是當(dāng)前節(jié)點,則需要將請求發(fā)送到對應(yīng)的節(jié)點上執(zhí)行
在TransportXxxAction中會將ACTION和對應(yīng)的Handler注冊到對應(yīng)的serverHandlers對象中攻晒,在使用transportService發(fā)送請求后顾复,MessageChannelHandler.messageReceived()會接受到信息,然后在handleRequest()方法中獲取注冊的Handler炎辨,執(zhí)行其messageReceived()方法或者交給線程池處理

public class TransportXxxAction {
    private final TransportService transportService;
    private final SearchService searchService;

    @Inject
    public TransportXxxAction(Settings settings, 
                        ThreadPool threadPool, 
                        TransportService transportService, 
                        ClusterService clusterService, 
                        XxxService xxxService) {
        this.transportService = transportService;
        transportService.registerHandler(XXX_ACTION_NAME, new XxxTransportHandler());
        this.xxxService= xxxService;
    }

    public void executeXxx(DiscoveryNode node, final XxxTransportRequest request, final XxxListener listener) {
        // 如果shard所在的節(jié)點id和當(dāng)前節(jié)點id相同
        if (clusterService.state().nodes().localNodeId().equals(node.id())) {
            // 當(dāng)前節(jié)點執(zhí)行邏輯
            XxxService.execute(request);
        } else {
            // shard 所在節(jié)點不是當(dāng)前節(jié)點, 發(fā)送請求執(zhí)行遠(yuǎn)程節(jié)點搜索
            // node是要發(fā)送的節(jié)點
            transportService.sendRequest(node, XXX_ACTION_NAME, request, new BaseTransportResponseHandler() {
                @Override
                public QueryXxxResult newInstance() {
                    return new QueryXxxResult();
                }
                @Override
                public void handleResponse(XxxProvider response) {
                    listener.onResult(response);
                }
                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }
                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
            });
        }
    }

    private class XxxTransportHandler extends BaseTransportRequestHandler {
        /**
         * 接收遠(yuǎn)程端口的tcp 請求, 執(zhí)行Xxx 邏輯
         * @param request   TransportRequest
         * @param channel   TransportChannel
         * @throws Exception    Exception
         */
        @Override
        public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
            // 遠(yuǎn)程節(jié)點待執(zhí)行邏輯
            XxxProvider result = XxxService.execute(request);
            // 將Query結(jié)果響應(yīng)發(fā)送給調(diào)用節(jié)點
            channel.sendResponse(result);
        }

        @Override
        public String executor() {
            return ThreadPool.Names.Xxx;
        }
    }
}

在transportServiceAdapter.handler()方法中捕透,即根據(jù)注冊的action獲取對應(yīng)的TransportHandler,如果executor是same則執(zhí)行handler的messageReceived()方法碴萧,否則交給線程池執(zhí)行

public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
    protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (ThreadPool.Names.SAME.equals(handler.executor())) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }
}

下面講另外一種情況乙嘀,在ActionModule中注冊的Action也都會注冊到handler中

public class TransportXxxAction extends HandledTransportAction {
    @Inject
    public TransportXxxAction (Settings settings, ThreadPool threadPool) {
        super(settings, XxxAction.NAME, threadPool);
}

在TransportAction類中同樣使用transportService.registerHandler()方法注冊handler

public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
    protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
        super(settings, actionName, threadPool, actionFilters);
        transportService.registerHandler(actionName, new TransportHandler() {
            @Override
            public Request newInstance(){
                return newRequestInstance();
            }
        });
    }
}

因此在InternalTransportClient中,主要是回調(diào)proxy.execute()

public class InternalTransportClient extends AbstractClient {
   @Inject
    public InternalTransportClient(
                                   TransportClientNodesService nodesService, 
                                   InternalTransportAdminClient,
                                   Map<String, GenericAction> actions, 
                                   Headers headers) {
        MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
        for (GenericAction action : actions.values()) {
            if (action instanceof Action) {
                actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.actions = actionsBuilder.immutableMap();
    }
    @Override
    public void execute(final Action action, final Request request, ActionListener listener) {
        headers.applyTo(request);
        final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
        nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
            @Override
            public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                proxy.execute(node, request, listener);
            }
        }, listener);
    }
}

transportService.sendRequest()中的action.name()即為Action的Name參數(shù)破喻,已被注冊到對應(yīng)的handler中了

public class TransportActionNodeProxy extends AbstractComponent {
    public void execute(DiscoveryNode node, 
                            final Request request, 
                            final ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        transportService.sendRequest(node, action.name(), 
                            request, transportOptions, 
                            new BaseTransportResponseHandler<Response>() {
            @Override
            public Response newInstance() {
                return action.newResponse();
            }

            @Override
            public String executor() {
                if (request.listenerThreaded()) {
                    return ThreadPool.Names.LISTENER;
                }
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(Response response) {
                listener.onResponse(response);
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
        });
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末虎谢,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子曹质,更是在濱河造成了極大的恐慌婴噩,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羽德,死亡現(xiàn)場離奇詭異几莽,居然都是意外死亡,警方通過查閱死者的電腦和手機宅静,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門章蚣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人姨夹,你說我怎么就攤上這事纤垂。” “怎么了磷账?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵峭沦,是天一觀的道長。 經(jīng)常有香客問我逃糟,道長吼鱼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任绰咽,我火速辦了婚禮蛉抓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘剃诅。我一直安慰自己,他們只是感情好驶忌,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布矛辕。 她就那樣靜靜地躺著笑跛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪聊品。 梳的紋絲不亂的頭發(fā)上飞蹂,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天,我揣著相機與錄音翻屈,去河邊找鬼陈哑。 笑死,一個胖子當(dāng)著我的面吹牛伸眶,可吹牛的內(nèi)容都是我干的惊窖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼厘贼,長吁一口氣:“原來是場噩夢啊……” “哼界酒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嘴秸,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤毁欣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后岳掐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凭疮,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年串述,在試婚紗的時候發(fā)現(xiàn)自己被綠了执解。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡剖煌,死狀恐怖材鹦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耕姊,我是刑警寧澤桶唐,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站茉兰,受9級特大地震影響尤泽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜规脸,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一坯约、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧莫鸭,春花似錦闹丐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽衫仑。三九已至,卻和暖如春堕花,著一層夾襖步出監(jiān)牢的瞬間文狱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工缘挽, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留瞄崇,地道東北人。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓壕曼,卻偏偏與公主長得像苏研,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子窝稿,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容