Flink流應用程序處理的是以數(shù)據(jù)對象表示的事件流唧喉。所以在Flink內(nèi)部呆躲,我們需要能夠處理這些對象异逐。它們需要被序列化和反序列化,以便通過網(wǎng)絡(luò)傳送它們插掂;或者從狀態(tài)后端灰瞻、檢查點和保存點讀取它們。為了有效地做到這一點辅甥,F(xiàn)link需要明確知道應用程序所處理的數(shù)據(jù)類型箩祥。Flink使用類型信息的概念來表示數(shù)據(jù)類型,并為每個數(shù)據(jù)類型生成特定的序列化器肆氓、反序列化器和比較器。
Flink還具有一個類型提取系統(tǒng)底瓣,該系統(tǒng)分析函數(shù)的輸入和返回類型谢揪,以自動獲取類型信息蕉陋,從而獲得序列化器和反序列化器。但是拨扶,在某些情況下凳鬓,例如lambda函數(shù)或泛型類型,需要顯式地提供類型信息患民,才能使應用程序正常工作或提高其性能缩举。
Flink支持Java和Scala中所有常見數(shù)據(jù)類型。使用最廣泛的類型有以下幾種匹颤。
1.基礎(chǔ)數(shù)據(jù)類型
Flink支持所有的Java和Scala基礎(chǔ)數(shù)據(jù)類型仅孩,Int, Double, Long, String, …?
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
2.Java和Scala元組(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
3.Scala樣例類(case classes)
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
4.Java簡單對象(POJOs)
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
5.其它(Arrays, Lists, Maps, Enums, 等等)
Flink對Java和Scala中的一些特殊目的的類型也都是支持的,比如Java的ArrayList印蓖,HashMap辽慕,Enum等等。