不借助于Flink集群,如何讓任務本地執(zhí)行

前言:
在做flink實時計算平臺的時候羡儿,我遇到過這樣一個問題礼患,在執(zhí)行sql任務的時候,我們需要預先創(chuàng)建一些Table,View掠归,甚至是functions缅叠。這些其實flink-sql-client已經(jīng)提供了,但不支持yarn的per-job模式虏冻。所以我就弄了一個任務肤粱,專門執(zhí)行ddl的sql,但是這過程中有一個問題厨相,其實這個job不需要借助于yarn的資源领曼,直接本地跑就行了鸥鹉,只要能連接到你的catalog。

MysqlCatalog

Flink官方是沒有實現(xiàn)基于Mysql的Catalog的庶骄,最新版本的Flink1.11中毁渗,雖然有Jdbc的Catalog,但它的實現(xiàn)的本意并不是一個元數(shù)據(jù)管理单刁,而是把flink的schema映射到數(shù)據(jù)的對應的表中灸异,從而實現(xiàn)可以直接往表里寫數(shù)據(jù),顯然不是我想要的羔飞。

如何實現(xiàn)一個Mysql的Catalog不是本文的重點肺樟,后續(xù)我會專門寫一篇基于Mysql的Catalog的實現(xiàn)的文章,敬請期待逻淌。

DDL執(zhí)行任務

Flink1.11開始已經(jīng)全面支持 Flink DDL 的SQL了儡嘶,包括創(chuàng)建catalog,創(chuàng)建 database,創(chuàng)建view等。使用streamTableEnv.executeSql就能輕松搞定恍风,無需再去自己解析蹦狂。下面是我用于專門執(zhí)行ddl的 flink任務的代碼

public static void executeSql(String sqlContent) throws Exception {
        ParameterTool active = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream("app.properties"));
        String activeFile = String.format("app-%s.properties", active.get("profiles.active"));
        ParameterTool tool = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream(activeFile));
        Configuration conf = tool.getConfiguration();

        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // 注冊默認的catalog
        MysqlCatalog catalog = new MysqlCatalog(
                conf.get(CATALOG_DEFAULT_NAME),
                conf.get(CATALOG_DEFAULT_DATABASE),
                conf.get(CATALOG_DEFAULT_USER),
                conf.get(CATALOG_DEFAULT_PASSWORD),
                conf.get(CATALOG_DEFAULT_URL));

        streamTableEnv.registerCatalog(conf.get(CATALOG_DEFAULT_NAME), catalog);

        //使用默認的catalog,在sql里顯示 使用'use catalog xxx'語句,可以覆蓋
        streamTableEnv.executeSql(String.format("USE CATALOG `%s`", conf.get(CATALOG_DEFAULT_NAME)));
        // 使用默認的database,在sql里顯示 使用'use xxx'語句,可以覆蓋
        streamTableEnv.executeSql(String.format("USE `%s`", conf.get(CATALOG_DEFAULT_DATABASE)));

        String[] contentArr = StringUtils.split(sqlContent, ";");
        List<String> sqls = new ArrayList<>(contentArr.length);
        Collections.addAll(sqls, contentArr);
        for(String sql : sqls) {
            streamTableEnv.executeSql(sql).print();
        }
    }

如何本地執(zhí)行

所謂的本地執(zhí)行就是在線上執(zhí)行時,無需提交到集群朋贬;就像在IDEA里直接運行一樣凯楔,首先假設你的平臺實現(xiàn)了一個提交ddl的rest接口,通過調用這個接口锦募,傳入待執(zhí)行的sql摆屯,就能在catalog中創(chuàng)建一張表。那如何能做到糠亩?在我閱讀Flink-Clients的源代碼的時候虐骑,我發(fā)現(xiàn)ClientUtils里有這樣一段代碼:

public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout) throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        // jar包的classloader
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        //當前線程的classloader
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            //把當前的classloader設置成jar包的classloader
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

            ContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

            StreamContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

            try {
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            //最后還原classloader
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

顯然,當你需要在一個線程里執(zhí)行其它classloader里的代碼時赎线,只需要設置成代碼的classloader廷没,執(zhí)行完后,再還原classloader就可以了垂寥,使用套路就是:

try {
    Thread.currentThread().setContextClassLoader(userCodeClassLoader);
    .....
} finally {
    Thread.currentThread().setContextClassLoader(contextClassLoader);
}

有了上面的思路颠黎,那么本地執(zhí)行executeSql就完美解決了,代碼如下:

public void executeSql(String sqlContent) {
    File flinkFile = new File(flinkHome + "/lib");
    if(!flinkFile.exists()) {
      throw new ServiceException(String.format("file:[%s] not exists", flinkHome + "/lib"));
    }
    if(!flinkFile.isDirectory()) {
      throw new ServiceException(String.format("file:[%s] is not a directory", flinkHome + "/lib"));
    }

    List<URL> urls = new ArrayList<>();
    File[] files = flinkFile.listFiles();
    try {
      for (File file : files) {
        urls.add(file.toURI().toURL());
      }
      File localEnvFile = new File(sqlJarFile);
      urls.add(localEnvFile.toURI().toURL());
    } catch (MalformedURLException e) {
      throw new ServiceException(e.getMessage());
    }
    
    ClassLoader flinkClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
    Objects.requireNonNull(flinkClassLoader, "flink classloader can not be null");
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
      Thread.currentThread().setContextClassLoader(flinkClassLoader);
       //加載遠端的類
      Class<?> clazz = flinkClassLoader.loadClass("com.shizhengchao.github.local.env.LocalStreamingJob");
       //反射調用執(zhí)行
      Method method = clazz.getMethod("executeSql", String.class);
      method.invoke(null, sqlContent);
    } catch (Exception e) {
      if(e instanceof InvocationTargetException) {
        throw new ServiceException(e.getCause());
      } else {
        throw new ServiceException(e.getMessage());
      }
    } finally {
      Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
  }

最后滞项,數(shù)據(jù)庫里也有對應的ddl的信息了:


metastore

小插曲

在這期間出一了一個小問題:我的平臺使用了ebean作為我的ORM框架狭归,而我的mysqlcatalo的實現(xiàn),我最初也是用ebean文判。這會導致在設置ClassLoader時过椎,兩邊的Mapper沖突,出現(xiàn)catalog那端的mapper直接把實時平臺的mapper覆蓋掉了戏仓,導致一直報相應的Bean沒有注冊到EbeanServer疚宇。最后亡鼠,我不得不把Catalog的實現(xiàn)換成了原生JDBC。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末灰嫉,一起剝皮案震驚了整個濱河市拆宛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讼撒,老刑警劉巖浑厚,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異根盒,居然都是意外死亡钳幅,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門炎滞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來敢艰,“玉大人,你說我怎么就攤上這事册赛∧频迹” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵森瘪,是天一觀的道長牡属。 經(jīng)常有香客問我,道長扼睬,這世上最難降的妖魔是什么逮栅? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮窗宇,結果婚禮上措伐,老公的妹妹穿的比我還像新娘。我一直安慰自己军俊,他們只是感情好侥加,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蝇完,像睡著了一般官硝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上短蜕,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音傻咖,去河邊找鬼朋魔。 笑死,一個胖子當著我的面吹牛卿操,可吹牛的內容都是我干的警检。 我是一名探鬼主播孙援,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扇雕!你這毒婦竟也來了拓售?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤镶奉,失蹤者是張志新(化名)和其女友劉穎础淤,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哨苛,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡鸽凶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了建峭。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片啃炸。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡幔虏,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情雳窟,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布辕坝,位于F島的核電站坠敷,受9級特大地震影響,放射性物質發(fā)生泄漏砚蓬。R本人自食惡果不足惜矢门,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望灰蛙。 院中可真熱鬧祟剔,春花似錦、人聲如沸摩梧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽仅父。三九已至叛薯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間笙纤,已是汗流浹背耗溜。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留省容,地道東北人抖拴。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像腥椒,于是被迫代替她去往敵國和親阿宅。 傳聞我的和親對象是個殘疾皇子候衍,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359