https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html
創(chuàng)建Project
Use one of the following commands to create a project:
1.使用maven
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2
2.使用qucikstart 腳本
$ curl https://flink.apache.org/q/quickstart.sh | bash
檢查Project
There will be a new directory in your working directory. If you’ve used the curl approach, the directory is called quickstart. Otherwise, it has the name of your artifactId:
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties
這個(gè)sample項(xiàng)目是使用maven project,它包含了4個(gè)class。StreamingJob 和 BatchJob是基本的骨架項(xiàng)目瘾蛋,SocketTextStreamWordCount 是一個(gè)工作的流式例子,WordCountJob 是一個(gè)批量例子秒梳。可以直接在在本地環(huán)境運(yùn)行flink的example箕速。
We recommend you import this project into your IDE to develop and test it. If you use Eclipse, the m2e plugin allows to import Maven projects. Some Eclipse bundles include that plugin by default, others require you to install it manually. The IntelliJ IDE supports Maven projects out of the box.
A note to Mac OS X users: The default JVM heapsize for Java is too small for Flink. You have to manually increase it. In Eclipse, chooseRun Configurations -> Arguments and write into the VM Arguments box: -Xmx800m.
編譯 Project
可以輸入命令 mvn clean install -Pbuild-jar 酪碘,就可以編譯一個(gè)好的jar包在 target/original-your-artifact-id-your-version.jar,這個(gè)是沒(méi)有依賴(lài)的thin jar包盐茎,如果需要fat jar包arget/your-artifact-id-your-version.jar 兴垦。(fat jar包是指所有的依賴(lài)也包含在里面)
下一步
編寫(xiě)你的應(yīng)用
The quickstart project contains a WordCount implementation, the “Hello World” of Big Data processing systems. The goal of WordCount is to determine the frequencies of words in a text, e.g., how often do the terms “the” or “house” occur in all Wikipedia texts.
開(kāi)始項(xiàng)目包含一個(gè) wordcount的實(shí)現(xiàn),這相當(dāng)于大數(shù)據(jù)處理領(lǐng)域的“hello world”字柠。wordcount的目的是計(jì)算一個(gè)文本中單次的頻率探越。比如計(jì)算 “the” 或者 “house” 出現(xiàn)在Wikipedia texts的頻率
比如:
Sample Input:
big data is big
Sample Output:
big 2 data 1 is 1
下面的code展示了wordcount的實(shí)現(xiàn)處理每行數(shù)據(jù)包含兩個(gè)操作((a FlatMap and a Reduce operation 通過(guò)聚合求 sum),然后把 結(jié)果單詞 和 次數(shù) 輸出
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
}
The operations are defined by specialized classes, here the LineSplitter class.
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}