Presto調(diào)度模塊源碼閱讀(2)-服務(wù)端響應(yīng)-資源組選擇

當用戶提交一個SQL作業(yè)時羊瘩,Presto客戶端會封裝一個Request通過Restful接口將請求發(fā)送到服務(wù)端,下面就詳細講解一下服務(wù)端的處理過程窑业。

Client端發(fā)送請求的地址是/v1/statement钦幔,對應(yīng)到StatementResource的createQuery方法。在該方法中會調(diào)用Query的static方法create常柄,在create方法中new了一個Query對象鲤氢,然后會調(diào)用SqlQueryManager的createQuery方法。

在createQuery方法中首先會創(chuàng)建QueryId西潘,生成規(guī)則是:

return new QueryId(String.format("%s_%05d_%s", lastTimestamp, counter++, coordinatorId));

然后presto會判斷集群是否有可用節(jié)點卷玉,其中isIncludeCoordinator變量對應(yīng)config.properties配置文件中的node-scheduler.include-coordinator配置項,表示是否允許調(diào)度task到coordinator節(jié)點進行計算喷市。

如果集群可用節(jié)點小于最小值1(參數(shù)query-manager.initialization-required-workers)相种,則給出熟悉的報錯信息“Cluster is still initializing……”。

            if (!acceptQueries.get()) {
                int activeWorkerCount = internalNodeManager.getNodes(ACTIVE).size();
                if (!isIncludeCoordinator) {
                    activeWorkerCount--;
                }
                if (nanosSince(initialNanos).compareTo(initializationTimeout) < 0 && activeWorkerCount < initializationRequiredWorkers) {
                    throw new PrestoException(
                            SERVER_STARTING_UP,
                            String.format("Cluster is still initializing, there are insufficient active worker nodes (%s) to run query", activeWorkerCount));
                }
                acceptQueries.set(true);
            }

除此之外presto還對sql長度做了限制品姓,要求不能超過query.max-length(默認1_000_000_000寝并,表示10億)。

然后presto會根據(jù)提交作業(yè)的客戶端信息選擇資源組腹备。

            Optional<String> queryType = getQueryType(query);
            selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

上圖代碼中selectGroup方法對應(yīng)到InternalResourceGroupManager的selectGroup方法衬潦,其中configurationManager的類型是AtomicReference<ResourceGroupConfigurationManager<C>>。selectGroup方法實現(xiàn)如下:

    @Override
    public SelectionContext<C> selectGroup(SelectionCriteria criteria)
    {
        return configurationManager.get().match(criteria)
                .orElseThrow(() -> new PrestoException(QUERY_REJECTED, "Query did not match any selection rule"));
    }

然后我們點進match方法植酥,來到了ResourceGroupConfigurationManager接口中镀岛,我們看到這個方法的實現(xiàn)類有如下三個:


ResourceGroupConfigurationManager接口實現(xiàn)類

那么問題來了,當我們調(diào)用match方法時友驮,執(zhí)行的是這三個實現(xiàn)類中的哪一個呢漂羊?

我們首先看一下configurationManager初始化時的值,如下圖所示初始化時其類型為LegacyResourceGroupConfigurationManager:

    @Inject
    public InternalResourceGroupManager(LegacyResourceGroupConfigurationManager legacyManager, ClusterMemoryPoolManager memoryPoolManager, NodeInfo nodeInfo, MBeanExporter exporter)
    {
        this.exporter = requireNonNull(exporter, "exporter is null");
        this.configurationManagerContext = new ResourceGroupConfigurationManagerContextInstance(memoryPoolManager, nodeInfo.getEnvironment());
        this.legacyManager = requireNonNull(legacyManager, "legacyManager is null");
        this.configurationManager = new AtomicReference<>(cast(legacyManager));
    }

然后我們搜一下configurationManager的引用卸留,發(fā)現(xiàn)在InternalResourceGroupManager類的setConfigurationManager方法中修改了他的值走越。如下圖:

    @VisibleForTesting
    public void setConfigurationManager(String name, Map<String, String> properties)
    {
        requireNonNull(name, "name is null");
        requireNonNull(properties, "properties is null");

        log.info("-- Loading resource group configuration manager --");

        ResourceGroupConfigurationManagerFactory configurationManagerFactory = configurationManagerFactories.get(name);
        checkState(configurationManagerFactory != null, "Resource group configuration manager %s is not registered", name);

        ResourceGroupConfigurationManager<C> configurationManager = cast(configurationManagerFactory.create(ImmutableMap.copyOf(properties), configurationManagerContext));
        checkState(this.configurationManager.compareAndSet(cast(legacyManager), configurationManager), "configurationManager already set");

        log.info("-- Loaded resource group configuration manager %s --", name);
    }

該方法在同一個類的loadConfigurationManager方法中被調(diào)用。loadConfigurationManager方法會判斷常量RESOURCE_GROUPS_CONFIGURATION對應(yīng)的etc/resource-groups.properties文件是否存在耻瑟,如果存在會讀取文件中配置的resource-groups.configuration-manager參數(shù)為Key值买喧,到configurationManagerFactories中取出對應(yīng)的ResourceGroupConfigurationManagerFactory對象,然后調(diào)用其create方法構(gòu)造一個ResourceGroupConfigurationManager對象匆赃,最終賦值給configurationManager淤毛。方法的實現(xiàn)如下:

    @Override
    public void loadConfigurationManager()
            throws Exception
    {
        if (RESOURCE_GROUPS_CONFIGURATION.exists()) {
            Map<String, String> properties = new HashMap<>(loadProperties(RESOURCE_GROUPS_CONFIGURATION));

            String configurationManagerName = properties.remove(CONFIGURATION_MANAGER_PROPERTY_NAME);
            checkArgument(!isNullOrEmpty(configurationManagerName),
                    "Resource groups configuration %s does not contain %s", RESOURCE_GROUPS_CONFIGURATION.getAbsoluteFile(), CONFIGURATION_MANAGER_PROPERTY_NAME);

            setConfigurationManager(configurationManagerName, properties);
        }
    }

而loadConfigurationManager方法又在PrestoServer類的初始化方法中被調(diào)用。

injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();

(PS:ResourceGroupManager的實現(xiàn)類型是在CoordinatorModule這個類中被注入的:

binder.bind(ResourceGroupManager.class).to(InternalResourceGroupManager.class);

也就是說算柳,當PrestoServer通過其main方法調(diào)用run方法進行初始化時低淡, 會讀取etc/resource-groups.properties文件中的配置項resource-groups.configuration-manager,再以它為Key值讀取configurationManagerFactories中對應(yīng)的ResourceGroupConfigurationManagerFactory,然后調(diào)用讀取出來的工廠類的create方法構(gòu)建ResourceGroupConfigurationManager對象蔗蹋,最后賦值給InternalResourceGroupManager類的configurationManager何荚。

另一個問題出現(xiàn)了,configurationManagerFactories這個Map類型的全局變量是在什么時候賦值的猪杭,里邊都有哪些值呢餐塘?

我們還是搜索一下它的引用,發(fā)現(xiàn)在InternalResourceGroupManager的addConfigurationManagerFactory方法中對其進行了putIfAbsent操作(不存在則put)皂吮。

    @Override
    public void addConfigurationManagerFactory(ResourceGroupConfigurationManagerFactory factory)
    {
        if (configurationManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
            throw new IllegalArgumentException(format("Resource group configuration manager '%s' is already registered", factory.getName()));
        }
    }

搜索引用發(fā)現(xiàn)戒傻,在PluginManager的installPlugin方法中調(diào)用了這個方法:

        for (ResourceGroupConfigurationManagerFactory configurationManagerFactory : plugin.getResourceGroupConfigurationManagerFactories()) {
            log.info("Registering resource group configuration manager %s", configurationManagerFactory.getName());
            resourceGroupManager.addConfigurationManagerFactory(configurationManagerFactory);
        }

然后我們看一下plugin.getResourceGroupConfigurationManagerFactories方法的定義,發(fā)現(xiàn)他有兩個實現(xiàn)類蜂筹,


Plugin接口的兩個實現(xiàn)類

ResourceGroupManagerPlugin的實現(xiàn)如下:

public class ResourceGroupManagerPlugin
        implements Plugin
{
    @Override
    public Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories()
    {
        return ImmutableList.of(
                new FileResourceGroupConfigurationManagerFactory(getClassLoader()),
                new DbResourceGroupConfigurationManagerFactory(getClassLoader()));
    }

H2ResourceGroupManagerPlugin的實現(xiàn)如下:

public class H2ResourceGroupManagerPlugin
        implements Plugin
{
    @Override
    public Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories()
    {
        return ImmutableList.of(
                new H2ResourceGroupConfigurationManagerFactory(getClassLoader()));
    }

我們在addConfigurationManagerFactory方法中可以看到需纳,添加到configurationManagerFactories這個Map中時,是以factory的name作為Key值艺挪,factory為Value的:

    @Override
    public void addConfigurationManagerFactory(ResourceGroupConfigurationManagerFactory factory)
    {
        if (configurationManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
            throw new IllegalArgumentException(format("Resource group configuration manager '%s' is already registered", factory.getName()));
        }
    }

所以我們看一下這三個實現(xiàn)類對應(yīng)的name值不翩,也就是resource-groups.configuration-manager參數(shù)的可選值:
db:DbResourceGroupConfigurationManagerFactory
h2:H2ResourceGroupConfigurationManagerFactory
file:FileResourceGroupConfigurationManagerFactory

然后,我們回過頭來看一下PluginManager的installPlugin方法麻裳,該方法在同類的loadPlugin方法中被調(diào)用口蝠,

    private void loadPlugin(URLClassLoader pluginClassLoader)
    {
        ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
        List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);

        if (plugins.isEmpty()) {
            log.warn("No service providers of type %s", Plugin.class.getName());
        }

        for (Plugin plugin : plugins) {
            log.info("Installing %s", plugin.getClass().getName());
            installPlugin(plugin);
        }
    }

loadPlugin方法又在該類中再次被調(diào)用:

    private void loadPlugin(String plugin)
            throws Exception
    {
        log.info("-- Loading plugin %s --", plugin);
        URLClassLoader pluginClassLoader = buildClassLoader(plugin);
        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
            loadPlugin(pluginClassLoader);
        }
        log.info("-- Finished loading plugin %s --", plugin);
    }

再往上是loadPlugins方法:

    public void loadPlugins()
            throws Exception
    {
        if (!pluginsLoading.compareAndSet(false, true)) {
            return;
        }

        for (File file : listFiles(installedPluginsDir)) {
            if (file.isDirectory()) {
                loadPlugin(file.getAbsolutePath());
            }
        }

        for (String plugin : plugins) {
            loadPlugin(plugin);
        }

        metadata.verifyComparableOrderableContract();

        pluginsLoaded.set(true);
    }

再次向上查找,原來loadPlugins方法是在PrestoServer的run方法中津坑,先與loadConfigurationManager方法被調(diào)用的:

injector.getInstance(PluginManager.class).loadPlugins();

也就是說妙蔗,Presto默認是按照LegacyResourceGroupConfigurationManager進行資源組管理的。

在PrestoServer調(diào)用run方法進行初始化時国瓮,首先會執(zhí)行PluginManager的loadPlugins方法灭必,向InternalResourceGroupManager中一個存放ResourceGroupManagerFactory類型元素的Map添加可用的資源組管理工廠類狞谱。

然后會調(diào)用InternalResourceGroupManager的loadConfigurationManager方法乃摹,判斷是否配置了參數(shù)resource-groups.configuration-manager,如果配置了則會按照配置的manager類型從這個Map中根據(jù)ResourceGroupFactory的name取出相應(yīng)的factory跟衅。

最后會根據(jù)取出的factory對象create一個ResourceGroupConfigurationManager孵睬,并將其賦值給configurationManager。

在Presto的官方文檔中我們看到伶跷,presto只描述了一種name為file的ResourceGroupManagerFactory掰读,對應(yīng)FileResourceGroupConfigurationManagerFactory“饶看來這是官方比較推薦的類型蹈集。

===============
Resource Groups
===============

Resource groups place limits on resource usage, and can enforce queueing policies on
queries that run within them or divide their resources among sub groups. A query
belongs to a single resource group, and consumes resources from that group (and its ancestors).
Except for the limit on queued queries, when a resource group runs out of a resource
it does not cause running queries to fail; instead new queries become queued.
A resource group may have sub groups or may accept queries, but may not do both.

The resource groups and associated selection rules are configured by a manager which is pluggable.
Add an ``etc/resource-groups.properties`` file with the following contents to enable
the built-in manager that reads a JSON config file:

.. code-block:: none

    resource-groups.configuration-manager=file
    resource-groups.config-file=etc/resource_groups.json

Change the value of ``resource-groups.config-file`` to point to a JSON config file,
which can be an absolute path, or a path relative to the Presto data directory.

Resource Group Properties
-------------------------

接下來我們看一下FileResourceGroupConfigurationManager類的match方法,如下圖:

    @Override
    public Optional<SelectionContext<VariableMap>> match(SelectionCriteria criteria)
    {
        return selectors.stream()
                .map(s -> s.match(criteria))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .findFirst();
    }

入?yún)electionCriteria是從session中取得的用戶信息雇初,如下圖:

            selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

從match方法可以看到他會從selectors中找到跟session中用戶信息相匹配的ResourceGroupSelector拢肆,如果得到的Optional對象不存在value,則給出熟悉的異常信息Query did not match any selection rule,如果存在value則作業(yè)繼續(xù)向下執(zhí)行郭怪。

selectors對象是從resource-groups.config-file配置項指定的文件中解析得到的ResourceGroup配置信息支示。其初始化的代碼是在FileResourceGroupConfigurationManager的構(gòu)造函數(shù)中,如下:

    @Inject
    public FileResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolManager, FileResourceGroupConfig config)
    {
        super(memoryPoolManager);
        requireNonNull(config, "config is null");

        ManagerSpec managerSpec;
        try {
            managerSpec = CODEC.fromJson(Files.readAllBytes(Paths.get(config.getConfigFile())));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (IllegalArgumentException e) {
            Throwable cause = e.getCause();
            if (cause instanceof UnrecognizedPropertyException) {
                UnrecognizedPropertyException ex = (UnrecognizedPropertyException) cause;
                String message = format("Unknown property at line %s:%s: %s",
                        ex.getLocation().getLineNr(),
                        ex.getLocation().getColumnNr(),
                        ex.getPropertyName());
                throw new IllegalArgumentException(message, e);
            }
            if (cause instanceof JsonMappingException) {
                // remove the extra "through reference chain" message
                if (cause.getCause() != null) {
                    cause = cause.getCause();
                }
                throw new IllegalArgumentException(cause.getMessage(), e);
            }
            throw e;
        }

        this.rootGroups = managerSpec.getRootGroups();
        this.cpuQuotaPeriod = managerSpec.getCpuQuotaPeriod();
        validateRootGroups(managerSpec);
        this.selectors = buildSelectors(managerSpec);
    }

其中鄙才,config.getConfigFile方法對應(yīng)配置項resource-groups.config-file:

    @NotNull
    public String getConfigFile()
    {
        return configFile;
    }

    @Config("resource-groups.config-file")
    public FileResourceGroupConfig setConfigFile(String configFile)
    {
        this.configFile = configFile;
        return this;
    }

在buildSelectors方法中可以看到selectors中添加的對象類型是StaticSelector颂鸿,這樣在match方法的lambda表達式s -> s.match中,s對象就是StaticSelector類型的了攒庵。

    protected List<ResourceGroupSelector> buildSelectors(ManagerSpec managerSpec)
    {
        ImmutableList.Builder<ResourceGroupSelector> selectors = ImmutableList.builder();
        for (SelectorSpec spec : managerSpec.getSelectors()) {
            validateSelectors(managerSpec.getRootGroups(), spec);
            selectors.add(new StaticSelector(
                    spec.getUserRegex(),
                    spec.getSourceRegex(),
                    spec.getClientTags(),
                    spec.getResourceEstimate(),
                    spec.getQueryType(),
                    spec.getGroup()));
        }
        return selectors.build();
    }

在StaticSelector的match方法中我們看到嘴纺,它會根據(jù)json文件中讀取到的信息與客戶端信息依次做校驗,如校驗不通過則返回一個沒有值的Optional對象叙甸,以便selectGroup方法拋出異常颖医。如果全部校驗通過,最終會封裝一個SelectionContext類型的Optional對象返回裆蒸。

    @Override
    public Optional<SelectionContext<VariableMap>> match(SelectionCriteria criteria)
    {
        Map<String, String> variables = new HashMap<>();

        if (userRegex.isPresent()) {
            Matcher userMatcher = userRegex.get().matcher(criteria.getUser());
            if (!userMatcher.matches()) {
                return Optional.empty();
            }

            addVariableValues(userRegex.get(), criteria.getUser(), variables);
        }

        if (sourceRegex.isPresent()) {
            String source = criteria.getSource().orElse("");
            if (!sourceRegex.get().matcher(source).matches()) {
                return Optional.empty();
            }

            addVariableValues(sourceRegex.get(), source, variables);
        }

        if (!clientTags.isEmpty() && !criteria.getTags().containsAll(clientTags)) {
            return Optional.empty();
        }

        if (selectorResourceEstimate.isPresent() && !selectorResourceEstimate.get().match(criteria.getResourceEstimates())) {
            return Optional.empty();
        }

        if (queryType.isPresent()) {
            String contextQueryType = criteria.getQueryType().orElse("");
            if (!queryType.get().equalsIgnoreCase(contextQueryType)) {
                return Optional.empty();
            }
        }

        variables.putIfAbsent(USER_VARIABLE, criteria.getUser());

        // Special handling for source, which is an optional field that is part of the standard variables
        variables.putIfAbsent(SOURCE_VARIABLE, criteria.getSource().orElse(""));

        VariableMap map = new VariableMap(variables);
        ResourceGroupId id = group.expandTemplate(map);

        return Optional.of(new SelectionContext<>(id, map));
    }

以上就是Presto資源組校驗的代碼熔萧,后續(xù)將繼續(xù)整理服務(wù)端響應(yīng)作業(yè)提交請求的代碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末僚祷,一起剝皮案震驚了整個濱河市佛致,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辙谜,老刑警劉巖俺榆,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異装哆,居然都是意外死亡罐脊,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門蜕琴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來萍桌,“玉大人,你說我怎么就攤上這事凌简∩涎祝” “怎么了?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵雏搂,是天一觀的道長藕施。 經(jīng)常有香客問我,道長凸郑,這世上最難降的妖魔是什么裳食? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮芙沥,結(jié)果婚禮上诲祸,老公的妹妹穿的比我還像新娘尘盼。我一直安慰自己,他們只是感情好烦绳,可當我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布卿捎。 她就那樣靜靜地躺著,像睡著了一般径密。 火紅的嫁衣襯著肌膚如雪午阵。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天享扔,我揣著相機與錄音底桂,去河邊找鬼。 笑死惧眠,一個胖子當著我的面吹牛籽懦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播氛魁,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼暮顺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了秀存?” 一聲冷哼從身側(cè)響起捶码,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎或链,沒想到半個月后惫恼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡澳盐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年祈纯,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叼耙。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡腕窥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出旬蟋,到底是詐尸還是另有隱情油昂,我是刑警寧澤革娄,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布倾贰,位于F島的核電站,受9級特大地震影響拦惋,放射性物質(zhì)發(fā)生泄漏匆浙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一厕妖、第九天 我趴在偏房一處隱蔽的房頂上張望首尼。 院中可真熱鬧,春花似錦、人聲如沸软能。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽查排。三九已至凳枝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間跋核,已是汗流浹背岖瑰。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留砂代,地道東北人蹋订。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像刻伊,于是被迫代替她去往敵國和親露戒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,509評論 2 348