前言:
在做flink實時計算平臺的時候羡儿,我遇到過這樣一個問題礼患,在執(zhí)行sql任務的時候,我們需要預先創(chuàng)建一些Table,View掠归,甚至是functions缅叠。這些其實flink-sql-client已經(jīng)提供了,但不支持yarn的per-job模式虏冻。所以我就弄了一個任務肤粱,專門執(zhí)行ddl的sql,但是這過程中有一個問題厨相,其實這個job不需要借助于yarn的資源领曼,直接本地跑就行了鸥鹉,只要能連接到你的catalog。
MysqlCatalog
Flink官方是沒有實現(xiàn)基于Mysql的Catalog的庶骄,最新版本的Flink1.11中毁渗,雖然有Jdbc的Catalog,但它的實現(xiàn)的本意并不是一個元數(shù)據(jù)管理单刁,而是把flink的schema映射到數(shù)據(jù)的對應的表中灸异,從而實現(xiàn)可以直接往表里寫數(shù)據(jù),顯然不是我想要的羔飞。
如何實現(xiàn)一個Mysql的Catalog不是本文的重點肺樟,后續(xù)我會專門寫一篇基于Mysql的Catalog的實現(xiàn)的文章,敬請期待逻淌。
DDL執(zhí)行任務
Flink1.11開始已經(jīng)全面支持 Flink DDL 的SQL了儡嘶,包括創(chuàng)建catalog,創(chuàng)建 database,創(chuàng)建view等。使用streamTableEnv.executeSql
就能輕松搞定恍风,無需再去自己解析蹦狂。下面是我用于專門執(zhí)行ddl的 flink任務的代碼
public static void executeSql(String sqlContent) throws Exception {
ParameterTool active = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream("app.properties"));
String activeFile = String.format("app-%s.properties", active.get("profiles.active"));
ParameterTool tool = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream(activeFile));
Configuration conf = tool.getConfiguration();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, settings);
// 注冊默認的catalog
MysqlCatalog catalog = new MysqlCatalog(
conf.get(CATALOG_DEFAULT_NAME),
conf.get(CATALOG_DEFAULT_DATABASE),
conf.get(CATALOG_DEFAULT_USER),
conf.get(CATALOG_DEFAULT_PASSWORD),
conf.get(CATALOG_DEFAULT_URL));
streamTableEnv.registerCatalog(conf.get(CATALOG_DEFAULT_NAME), catalog);
//使用默認的catalog,在sql里顯示 使用'use catalog xxx'語句,可以覆蓋
streamTableEnv.executeSql(String.format("USE CATALOG `%s`", conf.get(CATALOG_DEFAULT_NAME)));
// 使用默認的database,在sql里顯示 使用'use xxx'語句,可以覆蓋
streamTableEnv.executeSql(String.format("USE `%s`", conf.get(CATALOG_DEFAULT_DATABASE)));
String[] contentArr = StringUtils.split(sqlContent, ";");
List<String> sqls = new ArrayList<>(contentArr.length);
Collections.addAll(sqls, contentArr);
for(String sql : sqls) {
streamTableEnv.executeSql(sql).print();
}
}
如何本地執(zhí)行
所謂的本地執(zhí)行就是在線上執(zhí)行時,無需提交到集群朋贬;就像在IDEA里直接運行一樣凯楔,首先假設你的平臺實現(xiàn)了一個提交ddl的rest接口,通過調用這個接口锦募,傳入待執(zhí)行的sql摆屯,就能在catalog中創(chuàng)建一張表。那如何能做到糠亩?在我閱讀Flink-Clients的源代碼的時候虐骑,我發(fā)現(xiàn)ClientUtils里有這樣一段代碼:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// jar包的classloader
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
//當前線程的classloader
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
//把當前的classloader設置成jar包的classloader
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
//最后還原classloader
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
顯然,當你需要在一個線程里執(zhí)行其它classloader里的代碼時赎线,只需要設置成代碼的classloader廷没,執(zhí)行完后,再還原classloader就可以了垂寥,使用套路就是:
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
.....
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
有了上面的思路颠黎,那么本地執(zhí)行executeSql就完美解決了,代碼如下:
public void executeSql(String sqlContent) {
File flinkFile = new File(flinkHome + "/lib");
if(!flinkFile.exists()) {
throw new ServiceException(String.format("file:[%s] not exists", flinkHome + "/lib"));
}
if(!flinkFile.isDirectory()) {
throw new ServiceException(String.format("file:[%s] is not a directory", flinkHome + "/lib"));
}
List<URL> urls = new ArrayList<>();
File[] files = flinkFile.listFiles();
try {
for (File file : files) {
urls.add(file.toURI().toURL());
}
File localEnvFile = new File(sqlJarFile);
urls.add(localEnvFile.toURI().toURL());
} catch (MalformedURLException e) {
throw new ServiceException(e.getMessage());
}
ClassLoader flinkClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
Objects.requireNonNull(flinkClassLoader, "flink classloader can not be null");
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(flinkClassLoader);
//加載遠端的類
Class<?> clazz = flinkClassLoader.loadClass("com.shizhengchao.github.local.env.LocalStreamingJob");
//反射調用執(zhí)行
Method method = clazz.getMethod("executeSql", String.class);
method.invoke(null, sqlContent);
} catch (Exception e) {
if(e instanceof InvocationTargetException) {
throw new ServiceException(e.getCause());
} else {
throw new ServiceException(e.getMessage());
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
最后滞项,數(shù)據(jù)庫里也有對應的ddl的信息了:
小插曲
在這期間出一了一個小問題:我的平臺使用了ebean作為我的ORM框架狭归,而我的mysqlcatalo的實現(xiàn),我最初也是用ebean文判。這會導致在設置ClassLoader時过椎,兩邊的Mapper沖突,出現(xiàn)catalog那端的mapper直接把實時平臺的mapper覆蓋掉了戏仓,導致一直報相應的Bean沒有注冊到EbeanServer疚宇。最后亡鼠,我不得不把Catalog的實現(xiàn)換成了原生JDBC。