1. 什么樣的數(shù)據(jù)可以流化
對于Java和Scala來說累魔,凡是可以被序列化的對象都可以流化士嚎。Flink自己的序列化器可以用于:
- 基本數(shù)據(jù)類型:String, Long, Integer, Boolean, Array等
- 組合數(shù)據(jù)類型:Tuples, POJOs and Scala case classes
在Java里组底,F(xiàn)link提供了Tuple0到Tuple25共26種Tuple類型戈稿。
Flink將滿足以下三點(diǎn)要求的對象都看作是POFO: - public且獨(dú)立(非靜態(tài)內(nèi)部類)的類
- 有public的無參構(gòu)造方法
- 類里所有非靜態(tài)非transient屬性都要么是public非final的喻犁,要么有public的get/set方法
2. 一個完整的例子
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
-
Stream Execution Environment
每一個Flink應(yīng)用程序都需要一個執(zhí)行環(huán)境蔓搞,在本例中是一個StreamExecutionEnvironment瑞筐。DataStream API的調(diào)用會生成一個job graph, 并添加到StreamExecutionEnvironment上凄鼻。env.execute()調(diào)用之后,graph被打包發(fā)送到j(luò)obManager, 有jobManager并行化job并分配給不同的Task Managers來執(zhí)行聚假。每一個job的parallel slice都會在一個task slot執(zhí)行块蚌。如果不調(diào)用execute, 程序永遠(yuǎn)不會執(zhí)行。
Basic Stream Sources
stream的source可以如上例的elements, 也可以是集合膘格,socket或文件
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
DataStream<String> lines = env.socketTextStream("localhost", 9999);
DataStream<String> lines = env.readTextFile("file:///path");
但在實(shí)際應(yīng)用中峭范,通常要選取支持低延遲高吞吐并發(fā)讀取,并能夠倒退重播的source, 比如kafka瘪贱,Kinesis及不同的文件系統(tǒng)纱控。REST API和數(shù)據(jù)庫也可以使用。
-
Basic Stream Sinks
例子里調(diào)用print方法輸出結(jié)果到task manager的log, 會調(diào)用每個element的toString()方法政敢,輸出結(jié)果如下:
1> Fred: age 35
2> Wilma: age 35
其中1>, 2>表示是哪一個sub task輸出的結(jié)果其徙。
在生產(chǎn)中,常用StreamingFileSink, 各種數(shù)據(jù)庫和發(fā)布訂閱系統(tǒng)喷户。
-
Debugging
生產(chǎn)環(huán)境中唾那,應(yīng)用通常跑在集群上或者容器里,如果fail了褪尝,就是遠(yuǎn)程的闹获,這時我們可以查看JobManager和TaskManager的log.同時也可以在本地IDE上調(diào)試代碼。