paimon sink 源碼之 dataStream 的拓?fù)涫崂?/h1>
  • paimon sink 源碼 之 DynamicTableSink
  • 上篇得知 sink 的拓?fù)涫窃?DynamicTableSink#getSinkRuntimeProvider 里面定義的
  • Paimon 對(duì)于 DynamicTableSink 的實(shí)現(xiàn)就是 org.apache.paimon.flink.sink.FlinkTableSink
  • getSinkRuntimeProvider 在父類 org.apache.paimon.flink.sink.FlinkTableSinkBase#getSinkRuntimeProvider 中定義
  • 這里只探討主鍵表在流模式下寫入場(chǎng)景

拓?fù)涫崂?/h1>
  1. 判斷是否有 logStoreTableFactory 在創(chuàng)建 DynamicTableSink 是根據(jù)建表語句是否有配置 log.system 來決定是否有 logStoreTableFactory
    1.1 log.system 默認(rèn)值是 none策治,可以配置成 kafka宁仔,配置成 kafka 數(shù)據(jù)不僅僅會(huì)寫入 fileSystem 也會(huì)寫入 kafka 相當(dāng)于是雙寫玩敏,流讀消費(fèi) paimon 表數(shù)據(jù)的時(shí)候 streaming-read-mode 可以配置成 log 從 kafka 進(jìn)行消費(fèi)

    1.2 log.system 設(shè)置為 kafka 舉例

    CREATE TABLE T (i INT, j INT) WITH (
       'log.system'='kafka', 
       'log.system.partitions'='2', 
       'kafka.bootstrap.servers'='%s', 
       'kafka.topic'='Tt',
      'connector='paimon'
      ...
    )
    

    1.3 LogSinkFunction 的創(chuàng)建鸭叙,對(duì)于 log.system 是 kafka 實(shí)際上就是利用 kafka connector 的 FlinkKafkaProducer 進(jìn)行數(shù)據(jù)發(fā)送


    LogSinkFunction
  2. sink.parallelism 設(shè)置 sink 的并行度

  3. local-merge-buffer-size 如作業(yè)存在主鍵數(shù)據(jù)偏斜可以設(shè)置“l(fā)ocal-merge-buffer-size”霹娄,在數(shù)據(jù)進(jìn)行 shuffle 之前進(jìn)行 buffer 和 merge, 當(dāng)同一主鍵在快照之間頻繁更新時(shí)能犯,這特別有用。建議從“64 mb”開始調(diào)整緩沖區(qū)大小项棠。不適用于 CDC ingestion
    3.1 如果設(shè)置了 local-merge-buffer-size 就會(huì)加入一個(gè) LocalMergeOperator 算子
    3.2 LocalMergeOperator 算子并行度和上游保持一致悲雳,shuffle 方式是 forward

  4. 對(duì) DataStream<RowData> 進(jìn)行一次 map 將 org.apache.flink.table.data.RowData 轉(zhuǎn)化成 org.apache.paimon.data.InternalRow
    4.1 算子并行度和上游一致
    4.2 paimon InternalRow 只是對(duì) flink RowData 的包裝實(shí)際還是操作的 flink RowData

  5. 判斷 BucketMode ,在這篇里有講述 Paimon Table BuketMode 的邏輯

  • 對(duì)于主鍵表會(huì)有 FIXED香追、DYNAMIC合瓢、GLOBAL_DYNAMIC 三種 mode
  • 對(duì)于 append only 表有 FIXED、UNAWARE 兩種 mode
  • 不同的 BucketMode 會(huì)有不同的拓?fù)渫傅洌麄兊牟煌饕w現(xiàn)在 partition 的邏輯不一樣和數(shù)據(jù)寫入的邏輯不一樣晴楔,之后的一些算子就是一樣的顿苇,這次我們先梳理主線不去看算子細(xì)節(jié),主線梳理完成之后在寫每個(gè)算子的細(xì)節(jié)
  • 這次面向主鍵表税弃,從 FIXED mode 開始

    FIXED

    1. 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
      • 并行度為 sink.parallelism
      • 分區(qū)邏輯為
        a. 先抽取 bucket key 的值纪岁,然后將值的 hash 和 bucket 桶數(shù)取模 Math.abs(key_hashcode % numBuckets 得到一個(gè) bucket 序號(hào)
        b. 如果有 logFuntion 則直接按照 bucket 序號(hào)和算子并行度取模 recordChannel = bucket % numChannels
        c. 如果沒有 logFuntion 則先算出 分區(qū)屬于那一個(gè) channel, 然后再用 分區(qū)的 channel + bucket 序號(hào)再和并行度取模 這個(gè)算法和我在 hudi 提的分區(qū)算法是一樣的
        int startChannel = Math.abs(partition.hashCode()) % numChannels;
        return (startChannel + bucket) % numChannels;
        
    2. 分區(qū)后添加數(shù)據(jù)寫入 RowDataStoreWriteOperator 算子
      • 并行度為上游并行度
      • 這里有點(diǎn)花原本我以為 logFuntion 會(huì)直接加一個(gè) KafkaSinkFunction 算子,沒想到他是直接集成到了 RowDataStoreWriteOperator 里面则果♂:玻可能是我 out 了遗增。上代碼
           public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
              private final LogSinkFunction logSinkFunction;
              @Override
              public void setup() {
                   super.setup(containingTask, config, output);
                   if (logSinkFunction != null) {
                      // 調(diào)用 flink core 強(qiáng)加 function
                       FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
                   }
                }
               public void open() throws Exception {
                   super.open();
                   this.sinkContext = new SimpleContext(getProcessingTimeService());
                   if (logSinkFunction != null) {
                       // to stay compatible with Flink 1.18-
                       if (logSinkFunction instanceof RichFunction) {
                           RichFunction richFunction = (RichFunction) logSinkFunction;
                           //不僅僅當(dāng)前算子要 open  logFuntion 也手動(dòng) open 
                           richFunction.open(new Configuration());
                       }
                   }
               }
        
               public void processElement(StreamRecord<InternalRow> element) {
                   SinkRecord record;
                   try {
                       record = write.write(element.getValue()); //寫 fileSystem
                   } catch (Exception e) {
                       throw new IOException(e);
                   }
        
                  if (record != null && logSinkFunction != null) {
                       // write to log store, need to preserve original pk (which includes partition fields)
                       SinkRecord logRecord = write.toLogRecord(record);
                       logSinkFunction.invoke(logRecord, sinkContext); // 來吧接著干 log
                   }
               }
        
               @Override
               public void snapshotState(StateSnapshotContext context) throws Exception {
                   super.snapshotState(context);
                    if (logSinkFunction != null) {
                         // log 也 snapshotState
                       StreamingFunctionUtils.snapshotFunctionState(
                               context, getOperatorStateBackend(), logSinkFunction);
                   }
               }
           }
        

    DYNAMIC

    1. Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
    2. 獲取 dynamic-bucket.assigner-parallelism: 用來定義 assigner 算子的并行度如果沒有定義就是 sink.parallelism抡草。這個(gè)配置的設(shè)置和桶的初始化個(gè)數(shù)相關(guān)康震,太小會(huì)導(dǎo)致 assigner 算子處理速度不夠
    3. 獲取 dynamic-bucket.initial-buckets: 控制初始化bucket的數(shù)量。
    4. 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
      • 并行度為 assigner-parallelism
      • 分區(qū)邏輯為
        a. 他的邏輯和 FIXED 在沒有 logFunction 的邏輯一樣瘫镇,不同點(diǎn)是 FIXED 的 buketSize 是固定的答姥,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值 bucket=MathUtils.min(numAssigners, numChannels)
        b. 然后就是 FIXED 邏輯一致了
           int start = Math.abs(partitionHash % numChannels);
           int id = Math.abs(keyHash % buketSize);
           return (start + id)  % numChannels;
        
    5. 添加 HashBucketAssignerOperator 算子
      - 并行度為上游的并行度
    6. 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle
      - 并行度為 sink.parallelism
      - 分區(qū)邏輯為
      a. 上游 HashBucketAssignerOperator 會(huì)給每條記錄打上一個(gè) 編號(hào)(編號(hào)產(chǎn)生的邏輯暫時(shí)不看)
      b. 然后就是和 FIX 一樣
       int startChannel = Math.abs(partition.hashCode()) % numChannels;
       // 這個(gè) bucket 是上游 HashBucketAssignerOperator 打上的一個(gè)編號(hào)
       return (startChannel + bucket) % numChannels;
      
    7. 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子
      • 并行度為上游并行度

    GLOBAL_DYNAMIC

    1. Dynamic bucket 不適用于 log.system 場(chǎng)景校驗(yàn)會(huì)直接報(bào)錯(cuò)不支持, logSinkFunction 必須為 null
    2. 離線(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用處于 TODO 轉(zhuǎn)態(tài)(此條可以忽略)
    3. 添加一個(gè) IndexBootstrapOperator 算子構(gòu)建索引
      • 并行度為上游并行度
    4. 同上獲取 dynamic-bucket.assigner-parallelism
    5. 同上獲取 dynamic-bucket.initial-buckets
    6. 添加 FlinkStreamPartitioner 分區(qū)器對(duì)數(shù)據(jù)進(jìn)行 shuffle
      • 并行度為 Max(assigner-parallelism,initial-buckets) 如果為空則是 sink.parallelism
      • 分區(qū)邏輯為 主鍵 函數(shù)和并行度取模 Math.abs(主鍵.hashCode() % numChannels)
    7. 添加 GlobalIndexAssignerOperator 算子
    - 并行度為上游并行度
    
    1. 再添加一個(gè) FlinkStreamPartitioner 對(duì)數(shù)據(jù)再進(jìn)行一次 shuffle 和 DYNAMIC 第 6 步一樣的
    2. 分區(qū)后添加數(shù)據(jù)寫入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一樣的

到此在不同 bucketMode 下對(duì)應(yīng)的算子梳理完畢了择卦, 在這些處理完之后還有一些通用的 doCommit 邏輯

  1. 判斷 sink.savepoint.auto-tag 是否為 true 默認(rèn)為 false , 參數(shù)表示是否自動(dòng)創(chuàng)建 tag 如果開了則添加 AutoTagForSavepointCommitterOperator 算子 并且這個(gè)算子里面是包含 CommitterOperator 的
    • 算子并行度 為 1
  2. 如果沒有開啟則直接是 CommitterOperator 算子 并行度也是 1
  3. 最后添加一個(gè) 空的 sink 節(jié)點(diǎn) DiscardingSink 并行度為 1

FINAL

  • 梳理了一個(gè) DataStream 在 Paimon sink 時(shí)的整個(gè) DataStream 轉(zhuǎn)化拓?fù)淦碓搿2⑶覍?duì)其中的分區(qū)器進(jìn)行了分析整理入下圖


    Paimon stream sink 拓?fù)?/div>
  • 接下來看具體算子的邏輯
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者

  • 序言:七十年代末辑鲤,一起剝皮案震驚了整個(gè)濱河市月褥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌宁赤,老刑警劉巖决左,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異链烈,居然都是意外死亡强衡,警方通過查閱死者的電腦和手機(jī)漩勤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門缩搅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硼瓣,“玉大人,你說我怎么就攤上這事亿傅∥疗埽” “怎么了半哟?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵寓涨,是天一觀的道長戒良。 經(jīng)常有香客問我,道長译打,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮韵洋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘食拜。我一直安慰自己副编,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布呻待。 她就那樣靜靜地躺著蚕捉,像睡著了一般柴淘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敛熬,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天荸型,我揣著相機(jī)與錄音炸茧,去河邊找鬼梭冠。 笑死控漠,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的偶翅。 我是一名探鬼主播聚谁,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼形导,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼朵耕!你這毒婦竟也來了淋叶?” 一聲冷哼從身側(cè)響起煞檩,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤形娇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后癣缅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哄酝,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡屡立,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年膨俐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了罩句。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片门烂。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖捕虽,靈堂內(nèi)的尸體忽然破棺而出坡脐,到底是詐尸還是另有隱情挨措,我是刑警寧澤浅役,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布惧盹,位于F島的核電站,受9級(jí)特大地震影響钧椰,放射性物質(zhì)發(fā)生泄漏嫡霞。R本人自食惡果不足惜诊沪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一端姚、第九天 我趴在偏房一處隱蔽的房頂上張望渐裸。 院中可真熱鬧昏鹃,春花似錦盆顾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宪巨。三九已至,卻和暖如春溜畅,著一層夾襖步出監(jiān)牢的瞬間捏卓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工慈格, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留怠晴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓浴捆,卻偏偏與公主長得像蒜田,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子选泻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容