本文僅為筆者平日學(xué)習(xí)記錄之用彤敛,侵刪
原文:https://mp.weixin.qq.com/s/t9Z2d1Pay4EiJBIU1dNrsw
Flink任務(wù)執(zhí)行模式包含了yarn-session鳖宾、standalone称鳞、per-job、local, 在1.10中又增加k8s的執(zhí)行模式蹲姐,那么在任務(wù)提交過程中如何根據(jù)不同的執(zhí)行模式進行任務(wù)提交呢扛芽?主要通過兩個接口來實現(xiàn):PipelineExecutorFactory 與PipelineExecutor。PipelineExecutorFactory用于在不同模式下創(chuàng)建不同的PipelineExecutor, 用于提交任務(wù)帆精,PipelineExecutorFactory表示的一個創(chuàng)建執(zhí)行器工廠接口,PipelineExecutor 表示一個執(zhí)行器接口隧魄,正如你所想這里使用的就是經(jīng)典的工廠設(shè)計模式卓练,在任務(wù)提交過程中會根據(jù)不同的提交模式, 使用不同的PipelineExecutorFactory創(chuàng)建不同的PipelineExecutor购啄。
public interface PipelineExecutorFactory {
/**
* Returns the name of the executor that this factory creates.
*/
String getName();
/**
根據(jù)configuration判斷是否滿足當(dāng)前的factory
*/
boolean isCompatibleWith(final Configuration configuration);
/**
* 獲取對應(yīng)模式下的executor
*/
PipelineExecutor getExecutor(final Configuration configuration);
}
PipelineExecutorFactory幾個實現(xiàn)分別為:
- LocalExecutorFactory(local)
- RemoteExecutorFactory(standalone)
- YarnJobClusterExecutorFactory(per-job)
- YarnSessionClusterExecutorFactory(yarn-session)
public interface PipelineExecutor {
/**
* 執(zhí)行任務(wù)
*/
CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}
PipelineExecutor對應(yīng)實現(xiàn):
RemoteExecutor(standalone)
LocalExecutor(local)
YarnJobClusterExecutor(per-job)
YarnSessionClusterExecutor(yarn-session)
那么具體是如何選擇factory呢襟企?由PipelineExecutorServiceLoader接口來完成,其只有一個實現(xiàn)類DefaultExecutorServiceLoader狮含, 透過命名你可能會才想到這里面用到了ServiceLoader顽悼,你的猜想是正確的,它就是通過SPI機制去加載flink所提供的不同factory辉川,在META-INF.services 下可以找到其對應(yīng)的配置:
DefaultExecutorServiceLoader.java部分源碼
//SPI機制
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
//獲取對應(yīng)的factory
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
while (factories.hasNext()) {
try {
final PipelineExecutorFactory factory = factories.next();
//判斷標(biāo)準(zhǔn) 根據(jù)任務(wù)啟動配置
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable e) {
if (e.getCause() instanceof NoClassDefFoundError) {
LOG.info("Could not load factory due to missing dependencies.");
} else {
throw e;
}
}
}
//只能有一個factory符合要求
if (compatibleFactories.size() > 1) {
final String configStr =
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("\n"));
throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
}
return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}
ServiceLoader.load(PipelineExecutorFactory.class) 會從類路徑的META-INF.services下找到PipelineExecutorFactory的全路徑文件拴测,然后實例化出所有的factory集索,通過PipelineExecutorFactory.isCompatibleWith找到匹配的factory妆距。