這兩天正在折騰ClickHouse块请,折騰完再寫文章記錄挎袜,今天就先弄一篇關(guān)于Flink的小知識吧只厘。
DataStream分區(qū)
Spark的RDD有分區(qū)的概念乘综,F(xiàn)link的DataStream同樣也有符隙,只不過沒有RDD那么顯式而已趴捅。Flink通過流分區(qū)器StreamPartitioner來控制DataStream中的元素往下游的流向,以StreamPartitioner抽象類為中心的類圖如下所示霹疫。
在Flink的Web UI界面中拱绑,各算子之間的分區(qū)器類型會在箭頭上標(biāo)注出來,如下所示丽蝎。
StreamPartitioner繼承自ChannelSelector接口猎拨。這里的Channel概念與Netty不同,只是Flink對于數(shù)據(jù)寫入目的地的簡單抽象屠阻,我們可以直接認(rèn)為它就是下游算子的并發(fā)實例(即物理分區(qū))红省。所有StreamPartitioner的子類都要實現(xiàn)selectChannel()方法,用來選擇分區(qū)號国觉。下面分別來看看Flink提供的8種StreamPartitioner的源碼吧恃,以加深理解。
GlobalPartitioner
// dataStream.global()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
GlobalPartitioner只會將數(shù)據(jù)輸出到下游算子的第一個實例麻诀,簡單暴力蚜枢。
ShufflePartitioner
private Random random = new Random();
// dataStream.shuffle()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
ShufflePartitioner會將數(shù)據(jù)隨機輸出到下游算子的并發(fā)實例缸逃。由于java.util.Random生成的隨機數(shù)符合均勻分布,故能夠近似保證平均厂抽。
RebalancePartitioner
private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
// dataStream.rebalance()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
RebalancePartitioner會先隨機選擇一個下游算子的實例,然后用輪詢(round-robin)的方式從該實例開始循環(huán)輸出丁眼。該方式能保證完全的下游負(fù)載均衡筷凤,所以常用來處理帶有自然傾斜的原始數(shù)據(jù)流,比如各Partition之間數(shù)據(jù)量差距比較大的Kafka Topic苞七。
KeyGroupStreamPartitioner
private final KeySelector<T, K> keySelector;
private int maxParallelism;
// dataStream.keyBy()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
這就是keyBy()算子底層所采用的StreamPartitioner藐守,可見是先在key值的基礎(chǔ)上經(jīng)過了兩重哈希得到key對應(yīng)的哈希值,第一重是Java自帶的hashCode()蹂风,第二重則是MurmurHash卢厂。然后將哈希值乘以算子并行度,并除以最大并行度惠啄,得到最終的分區(qū)ID慎恒。
看官可能會覺得上面的代碼有點眼熟,其實它們在之前講解Key Group機制時出現(xiàn)過撵渡,詳情參見這篇文章融柬。
BroadcastPartitioner
// dataStream.broadcast()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
return true;
}
BroadcastPartitioner是廣播流專用的分區(qū)器。由于廣播流發(fā)揮作用必須靠DataStream.connect()方法與正常的數(shù)據(jù)流連接起來趋距,所以實際上不需要BroadcastPartitioner來選擇分區(qū)(廣播數(shù)據(jù)總會投遞給下游算子的所有并發(fā))粒氧,selectChannel()方法也就不必實現(xiàn)了。細(xì)節(jié)請參見Flink中BroadcastStream相關(guān)的源碼节腐,這里就不再列舉了外盯。
RescalePartitioner
private int nextChannelToSendTo = -1;
// dataStream.rescale()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
這個看起來也太簡單了,并且與RebalancePartitioner的邏輯是相同的翼雀?實際上并不是饱苟。我們看看StreamingJobGraphGenerator類,它負(fù)責(zé)把Flink執(zhí)行計劃中的StreamGraph(邏輯執(zhí)行計劃)轉(zhuǎn)換為JobGraph(優(yōu)化的邏輯執(zhí)行計劃)锅纺。其connect()方法中有如下代碼掷空。
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
resultPartitionType);
粗略地講,如果分區(qū)邏輯是RescalePartitioner或ForwardPartitioner(下面會說)囤锉,那么采用POINTWISE模式來連接上下游的頂點坦弟,對于其他分區(qū)邏輯,都用ALL_TO_ALL模式來連接官地∧鸢看下面兩張圖會比較容易理解。
也就是說驱入,POINTWISE模式的RescalePartitioner在中間結(jié)果傳送給下游節(jié)點時赤炒,會根據(jù)并行度的比值來輪詢分配給下游算子實例的子集氯析,對TaskManager來說本地性會比較好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局輪詢分配莺褒,更加均衡掩缓,但是就會不可避免地在節(jié)點之間交換數(shù)據(jù),如果數(shù)據(jù)量大的話遵岩,造成的網(wǎng)絡(luò)流量會很可觀你辣。
ForwardPartitioner
// dataStream.forward()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
與GlobalPartitioner的實現(xiàn)相同。但通過上面對POINTWISE和ALL_TO_ALL連接模式的講解尘执,我們能夠知道舍哄,它會將數(shù)據(jù)輸出到本地運行的下游算子的第一個實例,而非全局誊锭。在上下游算子的并行度相同的情況下表悬,默認(rèn)就會采用ForwardPartitioner。反之丧靡,若上下游算子的并行度不同蟆沫,默認(rèn)會采用前述的RebalancePartitioner。
CustomPartitionerWrapper
Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
// dataStream.partitionCustom()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
return partitioner.partition(key, numberOfChannels);
}
這就是自定義的分區(qū)邏輯了窘行,我們可以通過繼承Partitioner接口自己實現(xiàn)饥追,并傳入partitionCustom()方法。舉個簡單的栗子罐盔,以key的長度做分區(qū):
sourceStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;
}
}, 0);
The End
明天早起搬磚但绕,民那晚安晚安。