Flink最佳實(shí)踐

解析命令行參數(shù)和在Flink應(yīng)用程序中傳遞參數(shù)

幾乎所有的Flink應(yīng)用程序,包括批處理和流處理疲迂,都依賴于外部配置參數(shù),這些參數(shù)被用來指定輸入和輸出源(如路徑或者地址),系統(tǒng)參數(shù)(并發(fā)數(shù)糕珊,運(yùn)行時配置)和應(yīng)用程序的可配參數(shù)(通常用在自定義函數(shù)中)。

Flink提供了一個簡單的叫做ParameterToolutilityParameterTool提供了一些基礎(chǔ)的工具來解決這些問題芽偏,當(dāng)然你也可以不用這里所有描述的ParameterTool密幔,其他框架如:Commons CLIargparse4j在Flink中也是支持的。

獲取你的配置值并傳入ParameterTool中

ParameterTool提供了一系列預(yù)定義的靜態(tài)方法來讀取配置信息喇肋,ParameterTool內(nèi)部是一個Map<String, String>,所以很容易與你自己的配置形式相集成坟乾。

從.properties文件中獲取

下面方法將去讀取一個Properties文件,并返回若干key/value對:

String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
從命令行參數(shù)中獲取

下面會從命令行中獲取像--input hdfs:///mydata --elements 42這種形式的參數(shù):

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..
從系統(tǒng)屬性中獲取

當(dāng)啟動一個JVM時蝶防,你可以給它傳遞一些系統(tǒng)屬性如:-Dinput=hdfs:///mydata甚侣,你也可以用這些系統(tǒng)屬性來初始化ParameterTool:

ParameterTool parameter = ParameterTool.fromSystemProperties();

在程序中使用ParameterTool的參數(shù)

既然我們已經(jīng)從其他地方(方法如上)拿到了配置參數(shù),我們就可以以各種形式來使用它們了间学。

直接從ParameterTool中獲取

ParameterTool本身有方法來獲取這些值:

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

你可以在提交應(yīng)用程序的客戶端main()方法中直接使用這些方法返回的值殷费,例如:你可以按如下方法來設(shè)置一個算子的并發(fā)度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

因?yàn)镻arameterTool是可序列化的,所以你可以將它傳遞給函數(shù)本身低葫;

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

之后在函數(shù)內(nèi)部使用ParameterTool來獲取命令行參數(shù)详羡。

將參數(shù)以Configuration對象的形式傳遞給函數(shù)

下面的例子展示了如何將參數(shù)以Configuration對象的形式傳遞給用戶自定義函數(shù)。

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())

在Tokenizer內(nèi)部嘿悬,Configuration對象可以通過open(Configuration conf)方法來獲仁的;

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void open(Configuration parameters) throws Exception {
  parameters.getInteger("myInt", -1);
  // .. do
注冊全局參數(shù)

在ExecutionConfig中注冊為全作業(yè)參數(shù)的參數(shù)鹊漠,可以被JobManager的web端以及用戶自定義函數(shù)中以配置值的形式訪問.
注冊全局參數(shù):

ParameterTool parameters = ParameterTool.fromArgs(args);

// 創(chuàng)建一個執(zhí)行環(huán)境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在用戶自定義的富函數(shù)中獲取它們:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  ParameterTool parameters = (ParameterTool)
      getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  parameters.getRequired("input");
  // .. do more ..

命名大型的TupleX類型

對于有很多字段的數(shù)據(jù)類型我們建議采用POJOs(普通Java對象)主到,而不是TupleX;同時躯概,POJOs還可以用來為大型的Tuple命名登钥。、
例如:
不使用:Tuple11<String, String, ..., String> var = new ...;
因?yàn)槔^承大型Tuple類來創(chuàng)建一個自定義的類型會比直接使用大型Tuple簡單得多:

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
    // constructor matching super
}

使用Logback而不是Log4j

注意:本文檔適用于Flink 0.10之后的版本
Apache Flink使用slf4j來作為logging 抽象娶靡,也建議用戶在他們自定義的方法中也使用slf4j牧牢。Slf4j是一個編譯時的logging接口,在運(yùn)行時可以使用不同的logging實(shí)現(xiàn)姿锭,例如:log4j或者Logback塔鳍。
默認(rèn)情況下Flink依賴Log4j,本也描述了如何在Flink中使用Logback呻此。有用戶反饋它們通過本指南轮纫,可以使用Graylog來建立集中式日志收集。
使用下面的代碼來獲取一個logger實(shí)例:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
    // ...
在IDE之外運(yùn)行Flink時使用Logback或者在一個Java應(yīng)用程序中使用Logback

在所有情況下類會在一個由依賴管理器如Maven創(chuàng)建的classpath中執(zhí)行焚鲜,F(xiàn)link會將log4j推到classpath中掌唾。
因此,你需要將log4j從Flink的依賴中剔除忿磅,下面的描述假定有一個跟Maven工程:
按如下方式來修改你的工程:

<dependencies>
  <!-- Add the two required logback dependencies -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.3</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.3</version>
  </dependency>

  <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
   Hadoop is logging to log4j! -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.7</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
</dependencies>

在<dependencies>部分進(jìn)行如下修改糯彬;
  1、將所有的log4j依賴從所有的Flink dependencies中剔除:這會導(dǎo)致Maven忽略Flink對log4j的傳遞依賴葱她。
  2撩扒、將slf4j-log4j12 artifact從Flink依賴中剔除:因?yàn)槲覀儗⒂胹lf4j到logback的綁定,所以我們需要刪除slf4j到log4j的綁定吨些。
  3搓谆、添加Logback依賴:logback-core和logback-classic
  4炒辉、添加log4j-over-slf4j依賴:log4j-over-slf4j是一種允許舊應(yīng)用程序直接使用Log4j API來調(diào)用Slf4j接口的工具。Flink依賴Hadoop挽拔,而Hadoop是使用Log4j來記錄日志的

請注意:你需要手動添加exclusion到所有你添加到pom文件中的Flink依賴辆脸。
你可能還需要檢查一下其他非Flink依賴是否也是log4j的綁定但校,你可以使用mvn dependency:true來分析你的工程依賴螃诅。

當(dāng)Flink運(yùn)行在集群中是使用Logback

本指南適用于當(dāng)Flink運(yùn)行在YARN或者standalong集群時。

為了在Flink中使用Logback而不是Log4j状囱,你需要將 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar從 lib/目錄中刪除
接下來术裸,你需要將下面的jar文件添加到 lib/目錄下:
  logback-classic.jar
  logback-core.jar
  log4j-over-slf4j.jar

注意:當(dāng)使用單任務(wù)的YARN集群時,你需要明確的設(shè)置 lib/目錄!

將自定義logger的Flink提交到Y(jié)ARN中的命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末亭枷,一起剝皮案震驚了整個濱河市袭艺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌叨粘,老刑警劉巖猾编,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異升敲,居然都是意外死亡答倡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進(jìn)店門驴党,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瘪撇,“玉大人,你說我怎么就攤上這事港庄【蠹龋” “怎么了?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵鹏氧,是天一觀的道長渤涌。 經(jīng)常有香客問我,道長把还,這世上最難降的妖魔是什么实蓬? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮笨篷,結(jié)果婚禮上瞳秽,老公的妹妹穿的比我還像新娘。我一直安慰自己率翅,他們只是感情好练俐,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著冕臭,像睡著了一般腺晾。 火紅的嫁衣襯著肌膚如雪燕锥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天悯蝉,我揣著相機(jī)與錄音归形,去河邊找鬼。 笑死鼻由,一個胖子當(dāng)著我的面吹牛暇榴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蕉世,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蔼紧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了狠轻?” 一聲冷哼從身側(cè)響起奸例,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎向楼,沒想到半個月后查吊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡湖蜕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年逻卖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片重荠。...
    茶點(diǎn)故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡箭阶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出戈鲁,到底是詐尸還是另有隱情仇参,我是刑警寧澤,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布婆殿,位于F島的核電站诈乒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏婆芦。R本人自食惡果不足惜怕磨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望消约。 院中可真熱鬧肠鲫,春花似錦、人聲如沸或粮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至渣锦,卻和暖如春硝岗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背袋毙。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工型檀, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人听盖。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓胀溺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親媳溺。 傳聞我的和親對象是個殘疾皇子月幌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理碍讯,服務(wù)發(fā)現(xiàn)悬蔽,斷路器,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 歷史 log4j可以當(dāng)之無愧地說是Java日志框架的元老捉兴,1999年發(fā)布首個版本蝎困,2012年發(fā)布最后一個版本,20...
    kelgon閱讀 10,155評論 3 53
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,806評論 6 342
  • 概述 在項目開發(fā)中倍啥,為了跟蹤代碼的運(yùn)行情況禾乘,常常要使用日志來記錄信息。在Java世界虽缕,有很多的日志工具庫來實(shí)現(xiàn)日志...
    靜默虛空閱讀 1,854評論 1 9
  • spring官方文檔:http://docs.spring.io/spring/docs/current/spri...
    牛馬風(fēng)情閱讀 1,670評論 0 3