解析命令行參數(shù)和在Flink應(yīng)用程序中傳遞參數(shù)
幾乎所有的Flink應(yīng)用程序,包括批處理和流處理疲迂,都依賴于外部配置參數(shù),這些參數(shù)被用來指定輸入和輸出源(如路徑或者地址),系統(tǒng)參數(shù)(并發(fā)數(shù)糕珊,運(yùn)行時配置)和應(yīng)用程序的可配參數(shù)(通常用在自定義函數(shù)中)。
Flink提供了一個簡單的叫做ParameterTool
的utility
,ParameterTool
提供了一些基礎(chǔ)的工具來解決這些問題芽偏,當(dāng)然你也可以不用這里所有描述的ParameterTool
密幔,其他框架如:Commons CLI
和argparse4j
在Flink中也是支持的。
獲取你的配置值并傳入ParameterTool中
ParameterTool
提供了一系列預(yù)定義的靜態(tài)方法來讀取配置信息喇肋,ParameterTool
內(nèi)部是一個Map<String, String>
,所以很容易與你自己的配置形式相集成坟乾。
從.properties文件中獲取
下面方法將去讀取一個Properties文件,并返回若干key/value對:
String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
從命令行參數(shù)中獲取
下面會從命令行中獲取像--input hdfs:///mydata --elements 42
這種形式的參數(shù):
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
從系統(tǒng)屬性中獲取
當(dāng)啟動一個JVM時蝶防,你可以給它傳遞一些系統(tǒng)屬性如:-Dinput=hdfs:///mydata
甚侣,你也可以用這些系統(tǒng)屬性來初始化ParameterTool:
ParameterTool parameter = ParameterTool.fromSystemProperties();
在程序中使用ParameterTool的參數(shù)
既然我們已經(jīng)從其他地方(方法如上)拿到了配置參數(shù),我們就可以以各種形式來使用它們了间学。
直接從ParameterTool中獲取
ParameterTool本身有方法來獲取這些值:
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
你可以在提交應(yīng)用程序的客戶端main()方法中直接使用這些方法返回的值殷费,例如:你可以按如下方法來設(shè)置一個算子的并發(fā)度:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
因?yàn)镻arameterTool是可序列化的,所以你可以將它傳遞給函數(shù)本身低葫;
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
之后在函數(shù)內(nèi)部使用ParameterTool來獲取命令行參數(shù)详羡。
將參數(shù)以Configuration對象的形式傳遞給函數(shù)
下面的例子展示了如何將參數(shù)以Configuration對象的形式傳遞給用戶自定義函數(shù)。
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
在Tokenizer內(nèi)部嘿悬,Configuration對象可以通過open(Configuration conf)方法來獲仁的;
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
注冊全局參數(shù)
在ExecutionConfig中注冊為全作業(yè)參數(shù)的參數(shù)鹊漠,可以被JobManager的web端以及用戶自定義函數(shù)中以配置值的形式訪問.
注冊全局參數(shù):
ParameterTool parameters = ParameterTool.fromArgs(args);
// 創(chuàng)建一個執(zhí)行環(huán)境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在用戶自定義的富函數(shù)中獲取它們:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
命名大型的TupleX類型
對于有很多字段的數(shù)據(jù)類型我們建議采用POJOs
(普通Java
對象)主到,而不是TupleX
;同時躯概,POJOs
還可以用來為大型的Tuple
命名登钥。、
例如:
不使用:Tuple11<String, String, ..., String> var = new ...;
因?yàn)槔^承大型Tuple類來創(chuàng)建一個自定義的類型會比直接使用大型Tuple簡單得多:
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
使用Logback而不是Log4j
注意:本文檔適用于Flink 0.10之后的版本
Apache Flink使用slf4j來作為logging 抽象娶靡,也建議用戶在他們自定義的方法中也使用slf4j牧牢。Slf4j是一個編譯時的logging接口,在運(yùn)行時可以使用不同的logging實(shí)現(xiàn)姿锭,例如:log4j或者Logback塔鳍。
默認(rèn)情況下Flink依賴Log4j,本也描述了如何在Flink中使用Logback呻此。有用戶反饋它們通過本指南轮纫,可以使用Graylog來建立集中式日志收集。
使用下面的代碼來獲取一個logger實(shí)例:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
在IDE之外運(yùn)行Flink時使用Logback或者在一個Java應(yīng)用程序中使用Logback
在所有情況下類會在一個由依賴管理器如Maven創(chuàng)建的classpath中執(zhí)行焚鲜,F(xiàn)link會將log4j推到classpath中掌唾。
因此,你需要將log4j從Flink的依賴中剔除忿磅,下面的描述假定有一個跟Maven工程:
按如下方式來修改你的工程:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
在<dependencies>部分進(jìn)行如下修改糯彬;
1、將所有的log4j依賴從所有的Flink dependencies
中剔除:這會導(dǎo)致Maven忽略Flink對log4j的傳遞依賴葱她。
2撩扒、將slf4j-log4j12 artifact從Flink依賴中剔除:因?yàn)槲覀儗⒂胹lf4j到logback的綁定,所以我們需要刪除slf4j到log4j的綁定吨些。
3搓谆、添加Logback依賴:logback-core和logback-classic
4炒辉、添加log4j-over-slf4j依賴:log4j-over-slf4j是一種允許舊應(yīng)用程序直接使用Log4j API來調(diào)用Slf4j接口的工具。Flink依賴Hadoop挽拔,而Hadoop是使用Log4j來記錄日志的
請注意:你需要手動添加exclusion到所有你添加到pom文件中的Flink依賴辆脸。
你可能還需要檢查一下其他非Flink依賴是否也是log4j的綁定但校,你可以使用mvn dependency:true
來分析你的工程依賴螃诅。
當(dāng)Flink運(yùn)行在集群中是使用Logback
本指南適用于當(dāng)Flink運(yùn)行在YARN或者standalong集群時。
為了在Flink中使用Logback而不是Log4j状囱,你需要將 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar從 lib/目錄中刪除
接下來术裸,你需要將下面的jar文件添加到 lib/目錄下:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
注意:當(dāng)使用單任務(wù)的YARN集群時,你需要明確的設(shè)置 lib/目錄!
將自定義logger的Flink提交到Y(jié)ARN中的命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>