- paimon sink 源碼 之 DynamicTableSink
- 上篇得知 sink 的拓?fù)涫窃?DynamicTableSink#getSinkRuntimeProvider 里面定義的
- Paimon 對(duì)于 DynamicTableSink 的實(shí)現(xiàn)就是 org.apache.paimon.flink.sink.FlinkTableSink
- getSinkRuntimeProvider 在父類 org.apache.paimon.flink.sink.FlinkTableSinkBase#getSinkRuntimeProvider 中定義
- 這里只探討主鍵表在流模式下寫入場(chǎng)景
拓?fù)涫崂?/h1>
-
判斷是否有 logStoreTableFactory 在創(chuàng)建 DynamicTableSink 是根據(jù)建表語句是否有配置 log.system 來決定是否有 logStoreTableFactory
1.1 log.system 默認(rèn)值是 none策治,可以配置成 kafka宁仔,配置成 kafka 數(shù)據(jù)不僅僅會(huì)寫入 fileSystem 也會(huì)寫入 kafka 相當(dāng)于是雙寫玩敏,流讀消費(fèi) paimon 表數(shù)據(jù)的時(shí)候 streaming-read-mode 可以配置成 log 從 kafka 進(jìn)行消費(fèi)
1.2 log.system 設(shè)置為 kafka 舉例
CREATE TABLE T (i INT, j INT) WITH (
'log.system'='kafka',
'log.system.partitions'='2',
'kafka.bootstrap.servers'='%s',
'kafka.topic'='Tt',
'connector='paimon'
...
)
1.3 LogSinkFunction 的創(chuàng)建鸭叙,對(duì)于 log.system 是 kafka 實(shí)際上就是利用 kafka connector 的 FlinkKafkaProducer 進(jìn)行數(shù)據(jù)發(fā)送
LogSinkFunction
sink.parallelism 設(shè)置 sink 的并行度
local-merge-buffer-size 如作業(yè)存在主鍵數(shù)據(jù)偏斜可以設(shè)置“l(fā)ocal-merge-buffer-size”霹娄,在數(shù)據(jù)進(jìn)行 shuffle 之前進(jìn)行 buffer 和 merge, 當(dāng)同一主鍵在快照之間頻繁更新時(shí)能犯,這特別有用。建議從“64 mb”開始調(diào)整緩沖區(qū)大小项棠。不適用于 CDC ingestion
3.1 如果設(shè)置了 local-merge-buffer-size 就會(huì)加入一個(gè) LocalMergeOperator 算子
3.2 LocalMergeOperator 算子并行度和上游保持一致悲雳,shuffle 方式是 forward
對(duì) DataStream<RowData> 進(jìn)行一次 map 將 org.apache.flink.table.data.RowData 轉(zhuǎn)化成 org.apache.paimon.data.InternalRow
4.1 算子并行度和上游一致
4.2 paimon InternalRow 只是對(duì) flink RowData 的包裝實(shí)際還是操作的 flink RowData
判斷 BucketMode ,在這篇里有講述 Paimon Table BuketMode 的邏輯
- 對(duì)于主鍵表會(huì)有 FIXED香追、DYNAMIC合瓢、GLOBAL_DYNAMIC 三種 mode
- 對(duì)于 append only 表有 FIXED、UNAWARE 兩種 mode
- 不同的 BucketMode 會(huì)有不同的拓?fù)渫傅洌麄兊牟煌饕w現(xiàn)在 partition 的邏輯不一樣和數(shù)據(jù)寫入的邏輯不一樣晴楔,之后的一些算子就是一樣的顿苇,這次我們先梳理主線不去看算子細(xì)節(jié),主線梳理完成之后在寫每個(gè)算子的細(xì)節(jié)
- 這次面向主鍵表税弃,從 FIXED mode 開始
FIXED
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 sink.parallelism
- 分區(qū)邏輯為
a. 先抽取 bucket key 的值纪岁,然后將值的 hash 和 bucket 桶數(shù)取模 Math.abs(key_hashcode % numBuckets
得到一個(gè) bucket 序號(hào)
b. 如果有 logFuntion 則直接按照 bucket 序號(hào)和算子并行度取模 recordChannel = bucket % numChannels
c. 如果沒有 logFuntion 則先算出 分區(qū)屬于那一個(gè) channel, 然后再用 分區(qū)的 channel + bucket 序號(hào)再和并行度取模 這個(gè)算法和我在 hudi 提的分區(qū)算法是一樣的
int startChannel = Math.abs(partition.hashCode()) % numChannels;
return (startChannel + bucket) % numChannels;
- 分區(qū)后添加數(shù)據(jù)寫入 RowDataStoreWriteOperator 算子
- 并行度為上游并行度
- 這里有點(diǎn)花原本我以為 logFuntion 會(huì)直接加一個(gè) KafkaSinkFunction 算子,沒想到他是直接集成到了 RowDataStoreWriteOperator 里面则果♂:玻可能是我 out 了遗增。上代碼
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
private final LogSinkFunction logSinkFunction;
@Override
public void setup() {
super.setup(containingTask, config, output);
if (logSinkFunction != null) {
// 調(diào)用 flink core 強(qiáng)加 function
FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
}
}
public void open() throws Exception {
super.open();
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
// to stay compatible with Flink 1.18-
if (logSinkFunction instanceof RichFunction) {
RichFunction richFunction = (RichFunction) logSinkFunction;
//不僅僅當(dāng)前算子要 open logFuntion 也手動(dòng) open
richFunction.open(new Configuration());
}
}
}
public void processElement(StreamRecord<InternalRow> element) {
SinkRecord record;
try {
record = write.write(element.getValue()); //寫 fileSystem
} catch (Exception e) {
throw new IOException(e);
}
if (record != null && logSinkFunction != null) {
// write to log store, need to preserve original pk (which includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext); // 來吧接著干 log
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
if (logSinkFunction != null) {
// log 也 snapshotState
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), logSinkFunction);
}
}
}
DYNAMIC
- Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
- 獲取 dynamic-bucket.assigner-parallelism: 用來定義 assigner 算子的并行度如果沒有定義就是 sink.parallelism抡草。這個(gè)配置的設(shè)置和桶的初始化個(gè)數(shù)相關(guān)康震,太小會(huì)導(dǎo)致 assigner 算子處理速度不夠
- 獲取 dynamic-bucket.initial-buckets: 控制初始化bucket的數(shù)量。
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 assigner-parallelism
- 分區(qū)邏輯為
a. 他的邏輯和 FIXED 在沒有 logFunction 的邏輯一樣瘫镇,不同點(diǎn)是 FIXED 的 buketSize 是固定的答姥,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值 bucket=MathUtils.min(numAssigners, numChannels)
b. 然后就是 FIXED 邏輯一致了
int start = Math.abs(partitionHash % numChannels);
int id = Math.abs(keyHash % buketSize);
return (start + id) % numChannels;
- 添加 HashBucketAssignerOperator 算子
- 并行度為上游的并行度
- 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle
- 并行度為 sink.parallelism
- 分區(qū)邏輯為
a. 上游 HashBucketAssignerOperator 會(huì)給每條記錄打上一個(gè) 編號(hào)(編號(hào)產(chǎn)生的邏輯暫時(shí)不看)
b. 然后就是和 FIX 一樣
int startChannel = Math.abs(partition.hashCode()) % numChannels;
// 這個(gè) bucket 是上游 HashBucketAssignerOperator 打上的一個(gè)編號(hào)
return (startChannel + bucket) % numChannels;
- 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子
- 并行度為上游并行度
GLOBAL_DYNAMIC
- Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
- 離線(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用處于 TODO 轉(zhuǎn)態(tài)(此條可以忽略)
- 添加一個(gè) IndexBootstrapOperator 算子構(gòu)建索引
- 并行度為上游并行度
- 同上獲取 dynamic-bucket.assigner-parallelism
- 同上獲取 dynamic-bucket.initial-buckets
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 Max(assigner-parallelism,initial-buckets) 如果為空則是 sink.parallelism
- 分區(qū)邏輯為 主鍵 函數(shù)和并行度取模 Math.abs(主鍵.hashCode() % numChannels)
- 添加 GlobalIndexAssignerOperator 算子
- 并行度為上游并行度
- 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle 和 DYNAMIC 第 6 步一樣的
- 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一樣的
到此在不同 bucketMode 下對(duì)應(yīng)的算子梳理完畢了择卦, 在這些處理完之后還有一些通用的 doCommit 邏輯
- 判斷 sink.savepoint.auto-tag 是否為 true 默認(rèn)為 false , 參數(shù)表示是否自動(dòng)創(chuàng)建 tag 如果開了則添加 AutoTagForSavepointCommitterOperator 算子 并且這個(gè)算子里面是包含 CommitterOperator 的
- 算子并行度 為 1
- 如果沒有開啟則直接是 CommitterOperator 算子 并行度也是 1
- 最后添加一個(gè) 空的 sink 節(jié)點(diǎn) DiscardingSink 并行度為 1
FINAL
-
梳理了一個(gè) DataStream 在 Paimon sink 時(shí)的整個(gè) DataStream 轉(zhuǎn)化拓?fù)淦碓搿2⑶覍?duì)其中的分區(qū)器進(jìn)行了分析整理入下圖
Paimon stream sink 拓?fù)?/div>
- 接下來看具體算子的邏輯
最后編輯于 :?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者- 文/潘曉璐 我一進(jìn)店門缩搅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硼瓣,“玉大人,你說我怎么就攤上這事亿傅∥疗埽” “怎么了半哟?”我有些...
- 文/不壞的土叔 我叫張陵寓涨,是天一觀的道長戒良。 經(jīng)常有香客問我,道長译打,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮韵洋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘食拜。我一直安慰自己副编,他們只是感情好,可當(dāng)我...
- 文/花漫 我一把揭開白布呻待。 她就那樣靜靜地躺著蚕捉,像睡著了一般柴淘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敛熬,一...
- 文/蒼蘭香墨 我猛地睜開眼形导,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼朵耕!你這毒婦竟也來了淋叶?” 一聲冷哼從身側(cè)響起煞檩,我...
- 序言:老撾萬榮一對(duì)情侶失蹤形娇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后癣缅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哄酝,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡屡立,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年膨俐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了罩句。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片门烂。...
- 正文 年R本政府宣布惧盹,位于F島的核電站,受9級(jí)特大地震影響钧椰,放射性物質(zhì)發(fā)生泄漏嫡霞。R本人自食惡果不足惜诊沪,卻給世界環(huán)境...
- 文/蒙蒙 一端姚、第九天 我趴在偏房一處隱蔽的房頂上張望渐裸。 院中可真熱鬧昏鹃,春花似錦盆顾、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽宪巨。三九已至,卻和暖如春溜畅,著一層夾襖步出監(jiān)牢的瞬間捏卓,已是汗流浹背。 一陣腳步聲響...
- 正文 我出身青樓浴捆,卻偏偏與公主長得像蒜田,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子选泻,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 接上篇paimon sink 之 dataStream 的拓?fù)涫崂韀http://www.reibang.com...
- Flink 極簡教程: 架構(gòu)及原理 Apache Flink? — Stateful Computations o...
- 在學(xué)習(xí) paimon sink 的過程中本來只想快速梳理下 paimon 的 sink 時(shí)對(duì) DataStream...
- 一道批、基礎(chǔ)信息 1.1 簡介 分布式的大數(shù)據(jù)計(jì)算引擎。支持 無界數(shù)據(jù)流 和 無限數(shù)據(jù)流 進(jìn)行有狀態(tài)計(jì)算判哥。 對(duì)于fli...
判斷是否有 logStoreTableFactory 在創(chuàng)建 DynamicTableSink 是根據(jù)建表語句是否有配置 log.system 來決定是否有 logStoreTableFactory
1.1 log.system 默認(rèn)值是 none策治,可以配置成 kafka宁仔,配置成 kafka 數(shù)據(jù)不僅僅會(huì)寫入 fileSystem 也會(huì)寫入 kafka 相當(dāng)于是雙寫玩敏,流讀消費(fèi) paimon 表數(shù)據(jù)的時(shí)候 streaming-read-mode 可以配置成 log 從 kafka 進(jìn)行消費(fèi)
1.2 log.system 設(shè)置為 kafka 舉例
CREATE TABLE T (i INT, j INT) WITH (
'log.system'='kafka',
'log.system.partitions'='2',
'kafka.bootstrap.servers'='%s',
'kafka.topic'='Tt',
'connector='paimon'
...
)
1.3 LogSinkFunction 的創(chuàng)建鸭叙,對(duì)于 log.system 是 kafka 實(shí)際上就是利用 kafka connector 的 FlinkKafkaProducer 進(jìn)行數(shù)據(jù)發(fā)送
sink.parallelism 設(shè)置 sink 的并行度
local-merge-buffer-size 如作業(yè)存在主鍵數(shù)據(jù)偏斜可以設(shè)置“l(fā)ocal-merge-buffer-size”霹娄,在數(shù)據(jù)進(jìn)行 shuffle 之前進(jìn)行 buffer 和 merge, 當(dāng)同一主鍵在快照之間頻繁更新時(shí)能犯,這特別有用。建議從“64 mb”開始調(diào)整緩沖區(qū)大小项棠。不適用于 CDC ingestion
3.1 如果設(shè)置了 local-merge-buffer-size 就會(huì)加入一個(gè) LocalMergeOperator 算子
3.2 LocalMergeOperator 算子并行度和上游保持一致悲雳,shuffle 方式是 forward
對(duì) DataStream<RowData> 進(jìn)行一次 map 將 org.apache.flink.table.data.RowData 轉(zhuǎn)化成 org.apache.paimon.data.InternalRow
4.1 算子并行度和上游一致
4.2 paimon InternalRow 只是對(duì) flink RowData 的包裝實(shí)際還是操作的 flink RowData
判斷 BucketMode ,在這篇里有講述 Paimon Table BuketMode 的邏輯
FIXED
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 sink.parallelism
- 分區(qū)邏輯為
a. 先抽取 bucket key 的值纪岁,然后將值的 hash 和 bucket 桶數(shù)取模Math.abs(key_hashcode % numBuckets
得到一個(gè) bucket 序號(hào)
b. 如果有 logFuntion 則直接按照 bucket 序號(hào)和算子并行度取模recordChannel = bucket % numChannels
c. 如果沒有 logFuntion 則先算出 分區(qū)屬于那一個(gè) channel, 然后再用 分區(qū)的 channel + bucket 序號(hào)再和并行度取模 這個(gè)算法和我在 hudi 提的分區(qū)算法是一樣的int startChannel = Math.abs(partition.hashCode()) % numChannels; return (startChannel + bucket) % numChannels;
- 分區(qū)后添加數(shù)據(jù)寫入 RowDataStoreWriteOperator 算子
- 并行度為上游并行度
- 這里有點(diǎn)花原本我以為 logFuntion 會(huì)直接加一個(gè) KafkaSinkFunction 算子,沒想到他是直接集成到了 RowDataStoreWriteOperator 里面则果♂:玻可能是我 out 了遗增。上代碼
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> { private final LogSinkFunction logSinkFunction; @Override public void setup() { super.setup(containingTask, config, output); if (logSinkFunction != null) { // 調(diào)用 flink core 強(qiáng)加 function FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext()); } } public void open() throws Exception { super.open(); this.sinkContext = new SimpleContext(getProcessingTimeService()); if (logSinkFunction != null) { // to stay compatible with Flink 1.18- if (logSinkFunction instanceof RichFunction) { RichFunction richFunction = (RichFunction) logSinkFunction; //不僅僅當(dāng)前算子要 open logFuntion 也手動(dòng) open richFunction.open(new Configuration()); } } } public void processElement(StreamRecord<InternalRow> element) { SinkRecord record; try { record = write.write(element.getValue()); //寫 fileSystem } catch (Exception e) { throw new IOException(e); } if (record != null && logSinkFunction != null) { // write to log store, need to preserve original pk (which includes partition fields) SinkRecord logRecord = write.toLogRecord(record); logSinkFunction.invoke(logRecord, sinkContext); // 來吧接著干 log } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); if (logSinkFunction != null) { // log 也 snapshotState StreamingFunctionUtils.snapshotFunctionState( context, getOperatorStateBackend(), logSinkFunction); } } }
DYNAMIC
- Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
- 獲取 dynamic-bucket.assigner-parallelism: 用來定義 assigner 算子的并行度如果沒有定義就是 sink.parallelism抡草。這個(gè)配置的設(shè)置和桶的初始化個(gè)數(shù)相關(guān)康震,太小會(huì)導(dǎo)致 assigner 算子處理速度不夠
- 獲取 dynamic-bucket.initial-buckets: 控制初始化bucket的數(shù)量。
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 assigner-parallelism
- 分區(qū)邏輯為
a. 他的邏輯和 FIXED 在沒有 logFunction 的邏輯一樣瘫镇,不同點(diǎn)是 FIXED 的 buketSize 是固定的答姥,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值bucket=MathUtils.min(numAssigners, numChannels)
b. 然后就是 FIXED 邏輯一致了int start = Math.abs(partitionHash % numChannels); int id = Math.abs(keyHash % buketSize); return (start + id) % numChannels;
- 添加 HashBucketAssignerOperator 算子
- 并行度為上游的并行度 - 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle
- 并行度為 sink.parallelism
- 分區(qū)邏輯為
a. 上游 HashBucketAssignerOperator 會(huì)給每條記錄打上一個(gè) 編號(hào)(編號(hào)產(chǎn)生的邏輯暫時(shí)不看)
b. 然后就是和 FIX 一樣int startChannel = Math.abs(partition.hashCode()) % numChannels; // 這個(gè) bucket 是上游 HashBucketAssignerOperator 打上的一個(gè)編號(hào) return (startChannel + bucket) % numChannels;
- 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子
- 并行度為上游并行度
GLOBAL_DYNAMIC
- Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
- 離線(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用處于 TODO 轉(zhuǎn)態(tài)(此條可以忽略)
- 添加一個(gè) IndexBootstrapOperator 算子構(gòu)建索引
- 并行度為上游并行度
- 同上獲取 dynamic-bucket.assigner-parallelism
- 同上獲取 dynamic-bucket.initial-buckets
- 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
- 并行度為 Max(assigner-parallelism,initial-buckets) 如果為空則是 sink.parallelism
- 分區(qū)邏輯為 主鍵 函數(shù)和并行度取模 Math.abs(主鍵.hashCode() % numChannels)
- 添加 GlobalIndexAssignerOperator 算子
- 并行度為上游并行度
- 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle 和 DYNAMIC 第 6 步一樣的
- 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一樣的
到此在不同 bucketMode 下對(duì)應(yīng)的算子梳理完畢了择卦, 在這些處理完之后還有一些通用的 doCommit 邏輯
- 算子并行度 為 1
梳理了一個(gè) DataStream 在 Paimon sink 時(shí)的整個(gè) DataStream 轉(zhuǎn)化拓?fù)淦碓搿2⑶覍?duì)其中的分區(qū)器進(jìn)行了分析整理入下圖
- 文/潘曉璐 我一進(jìn)店門缩搅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硼瓣,“玉大人,你說我怎么就攤上這事亿傅∥疗埽” “怎么了半哟?”我有些...
- 文/不壞的土叔 我叫張陵寓涨,是天一觀的道長戒良。 經(jīng)常有香客問我,道長译打,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮韵洋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘食拜。我一直安慰自己副编,他們只是感情好,可當(dāng)我...
- 文/花漫 我一把揭開白布呻待。 她就那樣靜靜地躺著蚕捉,像睡著了一般柴淘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敛熬,一...
- 文/蒼蘭香墨 我猛地睜開眼形导,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼朵耕!你這毒婦竟也來了淋叶?” 一聲冷哼從身側(cè)響起煞檩,我...
- 序言:老撾萬榮一對(duì)情侶失蹤形娇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后癣缅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哄酝,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡屡立,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年膨俐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了罩句。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片门烂。...
- 正文 年R本政府宣布惧盹,位于F島的核電站,受9級(jí)特大地震影響钧椰,放射性物質(zhì)發(fā)生泄漏嫡霞。R本人自食惡果不足惜诊沪,卻給世界環(huán)境...
- 文/蒙蒙 一端姚、第九天 我趴在偏房一處隱蔽的房頂上張望渐裸。 院中可真熱鬧昏鹃,春花似錦盆顾、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽宪巨。三九已至,卻和暖如春溜畅,著一層夾襖步出監(jiān)牢的瞬間捏卓,已是汗流浹背。 一陣腳步聲響...
- 正文 我出身青樓浴捆,卻偏偏與公主長得像蒜田,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子选泻,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 接上篇paimon sink 之 dataStream 的拓?fù)涫崂韀http://www.reibang.com...
- Flink 極簡教程: 架構(gòu)及原理 Apache Flink? — Stateful Computations o...
- 在學(xué)習(xí) paimon sink 的過程中本來只想快速梳理下 paimon 的 sink 時(shí)對(duì) DataStream...
- 一道批、基礎(chǔ)信息 1.1 簡介 分布式的大數(shù)據(jù)計(jì)算引擎。支持 無界數(shù)據(jù)流 和 無限數(shù)據(jù)流 進(jìn)行有狀態(tài)計(jì)算判哥。 對(duì)于fli...