1.Flink的TypeInformation類
TypeInformation是flink中所有類型的基類癣诱,其作為生產(chǎn)序列化器和比較的一個工具。它包括了類型的一些基本屬性剪个,并可以通過它來生產(chǎn)序列化器(serializer)食店,特殊情況下還可以生成類型比較器材义。(Flink中的比較器不僅僅是定義大小順序青责,更是處理keys的基本輔助工具)
- 基本類型:所有Java基本數(shù)據(jù)類型和對應(yīng)的裝箱類型挺据,加上void取具,String,Date扁耐,BigDecimal和BigInteger
- 基本數(shù)組和對象數(shù)組
- 復(fù)合類型:
- Flink Java Tuples (Flink Java API的一部分): 最多25個成員暇检,不支持null成員
- Scala case 類 (包括 Scala tuples): 最多25個成員, 不支持null成員
- Row: 包含任意多個字段的元組并且支持null成員
- POJOs: 遵循類bean模式的類
- 輔助類型 (Option, Either, Lists, Maps, …)
- 泛型: Flink自身不會序列化泛型,而是借助Kryo進(jìn)行序列化.
POJO類非常有意思婉称,因為POJO類可以支持復(fù)雜類型的創(chuàng)建块仆,并且在定義keys時可以使用成員的名字:dataSet.join(another).where("name").equalTo("personName")。同時王暗,POJO類對于運行時(runtime)是透明的悔据,這使得Flink可以非常高效地處理它們。
1.1 POJO類型的規(guī)則
在滿足如下條件時瘫筐,F(xiàn)link會將這種數(shù)據(jù)類型識別成POJO類型(并允許以成員名引用字段):
- 該類是public的并且是獨立的(即沒有非靜態(tài)的內(nèi)部類)
- 該類有一個public的無參構(gòu)造方法
- 該類(及該類的父類)的所有成員要么是public的蜜暑,要么是擁有按照標(biāo)準(zhǔn)java bean命名規(guī)則命名的public getter和 public setter方法铐姚。
1.2 創(chuàng)建一個TypeInformation對象或序列化器###
創(chuàng)建一個TypeInformation對象時如下:
在Scala中策肝,F(xiàn)link使用在編譯時運行的宏,在宏可供調(diào)用時去捕獲所有泛型信息隐绵。
// 重要: 為了能夠訪問'createTypeInformation' 的宏方法之众,這個import是必須的
import org.apache.flink.streaming.api.scala._
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
你也可以在Java使用相同的方法作為備選。
為了創(chuàng)建一個序列化器(TypeSerializer)依许,只需要在TypeInformation 對象上調(diào)用typeInfo.createSerializer(config)方法棺禾。
config參數(shù)的類型是ExecutionConfig,它保留了程序的注冊的自定義序列化器的相關(guān)信息峭跳。在可能用到TypeSerializer的地方膘婶,盡量傳入程序的ExecutionConfig,你可以調(diào)用DataStream 或 DataSet的 getExecutionConfig()方法獲取ExecutionConfig蛀醉。一些內(nèi)部方法(如:MapFunction)中悬襟,你可以通過將該方法變成一個Rich Function,然后調(diào)用getRuntimeContext().getExecutionConfig()獲取ExecutionConfig.
2 基本類型實現(xiàn)示例
以String為例:
//BasicTypeInfo.java
public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
StringSerializer如下
//StringSerializer.java
public final class StringSerializer extends TypeSerializerSingleton<String> {
private static final long serialVersionUID = 1L;
public static final StringSerializer INSTANCE = new StringSerializer();
private static final String EMPTY = "";
@Override
public boolean isImmutableType() {
return true;
}
@Override
public String createInstance() {
return EMPTY;
}
@Override
public String copy(String from) {
return from;
}
@Override
public String copy(String from, String reuse) {
return from;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(String record, DataOutputView target) throws IOException {
StringValue.writeString(record, target);
}
@Override
public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
@Override
public String deserialize(String record, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
StringValue.copyString(source, target);
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof StringSerializer;
}
@Override
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
return super.isCompatibleSerializationFormatIdentifier(identifier)
|| identifier.equals(StringValue.class.getCanonicalName());
}
}
上面代碼中出現(xiàn)的StringValue是真正進(jìn)行input以及output序列化過程操作拯刁,基本類型都有相應(yīng)的方法脊岳,后面會單獨說明下多字段Record序列化形式。
StringComparator如下
public final class StringComparator extends BasicTypeComparator<String> {
private static final long serialVersionUID = 1L;
private static final int HIGH_BIT = 0x1 << 7;
private static final int HIGH_BIT2 = 0x1 << 13;
private static final int HIGH_BIT2_MASK = 0x3 << 6;
public StringComparator(boolean ascending) {
super(ascending);
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
String s1 = StringValue.readString(firstSource);
String s2 = StringValue.readString(secondSource);
int comp = s1.compareTo(s2);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return true;
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public int getNormalizeKeyLen() {
return Integer.MAX_VALUE;
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return true;
}
@Override
public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {;
final int limit = offset + len;
final int end = record.length();
int pos = 0;
while (pos < end && offset < limit) {
char c = record.charAt(pos++);
if (c < HIGH_BIT) {
target.put(offset++, (byte) c);
}
else if (c < HIGH_BIT2) {
target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT));
if (offset < limit) {
target.put(offset++, (byte) c);
}
}
else {
target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK));
if (offset < limit) {
target.put(offset++, (byte) (c >>> 2));
}
if (offset < limit) {
target.put(offset++, (byte) c);
}
}
}
while (offset < limit) {
target.put(offset++, (byte) 0);
}
}
@Override
public StringComparator duplicate() {
return new StringComparator(ascendingComparison);
}
}
3 多字段Record示例
在開始這部分原理分析之前可以先看個示例代碼
//RecordTest.java
public void testAddField() {
try {
// Add a value to an empty record
Record record = new Record();
assertTrue(record.getNumFields() == 0);
record.addField(this.origVal1);
assertTrue(record.getNumFields() == 1);
assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue()));
// Add 100 random integers to the record
record = new Record();
for (int i = 0; i < 100; i++) {
IntValue orig = new IntValue(this.rand.nextInt());
record.addField(orig);
IntValue rec = record.getField(i, IntValue.class);
assertTrue(record.getNumFields() == i + 1);
assertTrue(orig.getValue() == rec.getValue());
}
// Add 3 values of different type to the record
record = new Record(this.origVal1, this.origVal2);
record.addField(this.origVal3);
assertTrue(record.getNumFields() == 3);
StringValue recVal1 = record.getField(0, StringValue.class);
DoubleValue recVal2 = record.getField(1, DoubleValue.class);
IntValue recVal3 = record.getField(2, IntValue.class);
assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1));
assertTrue("The value of the second field changed", recVal2.equals(this.origVal2));
assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3));
} catch (Throwable t) {
Assert.fail("Test failed due to an exception: " + t.getMessage());
}
}
Record代表多個數(shù)值的記錄垛玻,其可以包含多個字段(可空并不體現(xiàn)在該記錄中)割捅,內(nèi)部有一個bitmap標(biāo)記字段是否被賦值。為了數(shù)據(jù)交換方便帚桩,Record中的數(shù)據(jù)都以bytes方式存儲亿驾,字段在訪問時才被進(jìn)行反序列化。當(dāng)字段被修改時首先是放在cache中账嚎,并在下次序列化時合入或者顯式調(diào)用updateBinaryRepresenation()方法莫瞬。
Notes:
- 該record必須是一個可變的對象参淹,這樣才可以被多個自定義方法使用來提升性能(后面單獨分析)。該record是一個比較中的對象乏悄,為了減少對每個字段的序列化浙值、反序列化操作,其保存了比較大的狀態(tài)檩小,需要有多個指針以及數(shù)組开呐,從而要占用相對比較大的內(nèi)存空間,在64位的JVM中要占用超過200bytes规求。
- 該類是非線程安全的
4 存放Record的數(shù)據(jù)結(jié)構(gòu)
針對上面提出的存放數(shù)據(jù)結(jié)構(gòu)的疑問筐付,這里繼續(xù)深入分析下。
- 將record放在一個迭代器中阻肿,當(dāng)前存在一個叫BlockResettableMutableObjectIterator瓦戚,其包含如下一些方法,讀寫都是在這個迭代器中進(jìn)行丛塌。
Record迭代器.png
其中以無參數(shù)next()方法為示例走讀存儲或者讀取流程较解,代碼如下:
public T next() throws IOException {
// check for the left over element
if (this.readPhase) {
return getNextRecord();
} else {
// writing phase. check for leftover first
T result = null;
if (this.leftOverReturned) {
// get next record
if ((result = this.input.next()) != null) {
if (writeNextRecord(result)) {
return result;
} else {
// did not fit into memory, keep as leftover
this.leftOverRecord = this.serializer.copy(result);
this.leftOverReturned = false;
this.fullWriteBuffer = true;
return null;
}
} else {
this.noMoreBlocks = true;
return null;
}
} else if (this.fullWriteBuffer) {
return null;
} else {
this.leftOverReturned = true;
return this.leftOverRecord;
}
}
}
通過源碼可以看出,在方法執(zhí)行時根據(jù)標(biāo)記判斷是讀取還是寫入流程赴邻,同時方法對應(yīng)getNextRecord和writeNextRecord兩個方法印衔,都在抽象類AbstractBlockResettableIterator中,兩個方法源碼如下:
protected T getNextRecord() throws IOException {
if (this.numRecordsReturned < this.numRecordsInBuffer) {
this.numRecordsReturned++;
return this.serializer.deserialize(this.readView);
} else {
return null;
}
}
protected boolean writeNextRecord(T record) throws IOException {
try {
this.serializer.serialize(record, this.collectingView);
this.numRecordsInBuffer++;
return true;
} catch (EOFException eofex) {
return false;
}
}
其中存放數(shù)據(jù)是基于Flink內(nèi)存管理部分進(jìn)行申請以及維護(hù)大小等姥敛,相關(guān)初始化源碼如下:
memoryManager.allocatePages(ownerTask, emptySegments, numPages);
this.collectingView = new SimpleCollectingOutputView(this.fullSegments,
new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());
5 Flink 如何直接操作二進(jìn)制數(shù)據(jù)
Flink 提供了如 group奸焙、sort、join 等操作彤敛,這些操作都需要訪問海量數(shù)據(jù)与帆。這里,我們以sort為例墨榄,這是一個在 Flink 中使用非常頻繁的操作玄糟。
首先,F(xiàn)link 會從 MemoryManager 中申請一批 MemorySegment渠概,我們把這批 MemorySegment 稱作 sort buffer茶凳,用來存放排序的數(shù)據(jù)。
我們會把 sort buffer 分成兩塊區(qū)域播揪。一個區(qū)域是用來存放所有對象完整的二進(jìn)制數(shù)據(jù)贮喧。另一個區(qū)域用來存放指向完整二進(jìn)制數(shù)據(jù)的指針以及定長的序列化后的key(key+pointer)。如果需要序列化的key是個變長類型猪狈,如String箱沦,則會取其前綴序列化。如上圖所示雇庙,當(dāng)一個對象要加到 sort buffer 中時谓形,它的二進(jìn)制數(shù)據(jù)會被加到第一個區(qū)域灶伊,指針(可能還有key)會被加到第二個區(qū)域。
將實際的數(shù)據(jù)和指針加定長key分開存放有兩個目的寒跳。第一聘萨,交換定長塊(key+pointer)更高效,不用交換真實的數(shù)據(jù)也不用移動其他key和pointer童太。第二米辐,這樣做是緩存友好的,因為key都是連續(xù)存儲在內(nèi)存中的书释,可以大大減少 cache miss(后面會詳細(xì)解釋)翘贮。
排序的關(guān)鍵是比大小和交換。Flink 中爆惧,會先用 key 比大小狸页,這樣就可以直接用二進(jìn)制的key比較而不需要反序列化出整個對象。因為key是定長的扯再,所以如果key相同(或者沒有提供二進(jìn)制key)芍耘,那就必須將真實的二進(jìn)制數(shù)據(jù)反序列化出來,然后再做比較叔收。之后齿穗,只需要交換key+pointer就可以達(dá)到排序的效果傲隶,真實的數(shù)據(jù)不用移動饺律。
最后,訪問排序后的數(shù)據(jù)跺株,可以沿著排好序的key+pointer區(qū)域順序訪問复濒,通過pointer找到對應(yīng)的真實數(shù)據(jù),并寫到內(nèi)存或外部(更多細(xì)節(jié)可以看這篇文章 Joins in Flink)乒省。
5.1 緩存友好的數(shù)據(jù)結(jié)構(gòu)和算法
隨著磁盤IO和網(wǎng)絡(luò)IO越來越快巧颈,CPU逐漸成為了大數(shù)據(jù)領(lǐng)域的瓶頸。從 L1/L2/L3 緩存讀取數(shù)據(jù)的速度比從主內(nèi)存讀取數(shù)據(jù)的速度快好幾個量級袖扛。通過性能分析可以發(fā)現(xiàn)砸泛,CPU時間中的很大一部分都是浪費在等待數(shù)據(jù)從主內(nèi)存過來上。如果這些數(shù)據(jù)可以從 L1/L2/L3 緩存過來蛆封,那么這些等待時間可以極大地降低唇礁,并且所有的算法會因此而受益。
在上面討論中我們談到的惨篱,F(xiàn)link 通過定制的序列化框架將算法中需要操作的數(shù)據(jù)(如sort中的key)連續(xù)存儲盏筐,而完整數(shù)據(jù)存儲在其他地方。因為對于完整的數(shù)據(jù)來說砸讳,key+pointer更容易裝進(jìn)緩存琢融,這大大提高了緩存命中率界牡,從而提高了基礎(chǔ)算法的效率。這對于上層應(yīng)用是完全透明的漾抬,可以充分享受緩存友好帶來的性能提升宿亡。