Hudi 源碼之 Clustering

什么是Clustering

開門見山蒿叠,Clustering主要有兩個作用:數據小文件合并和重排序明垢。
當數據寫入Hudi表時,為了提高寫入效率和存儲利用率市咽,可能會產生大量小文件痊银。Hudi的Clustering機制允許在后臺周期性地將這些小文件合并成大文件,從而減少存儲碎片和元數據管理開銷施绎,提高查詢性能溯革。
Clustering過程可以重新組織和排序數據,依據用戶指定的列進行排序谷醉,這樣能提升相關查詢的性能致稀,比如范圍掃描或者JOIN操作,通過預排序的數據俱尼,查詢引擎能夠更高效地處理查詢請求抖单。
本篇首先介紹clustering的配置和操作,然后分析clustering的源代碼,包含clustering執(zhí)行計劃的創(chuàng)建和根據計劃執(zhí)行clustering過程兩個部分矛绘。

Clustering分區(qū)過濾策略

Clustering分區(qū)過濾策略按照hoodie.clustering.plan.partition.filter.mode配置項過濾出所需的partition耍休。有如下選項:

  • NONE: 不過濾,返回所有partition path蔑歌。
  • RECENT_DAYS: 按照partition path倒序排序羹应。跳過hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions個partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions個partition次屠。如果partition path是日期园匹,可以實現(xiàn)過濾出最近N天的數據。
  • SELECTED_PARTITIONS: 獲取hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition之間的分區(qū)劫灶。
  • DAY_ROLLING: 每次clustering一部分分區(qū)裸违。如果分區(qū)的index對24取余等于排期時候當前時間的小時數,則該分區(qū)需要clustering本昏。

配置項

Flink 配置項

  • clustering.schedule.enabled:是否排期clustering供汛。默認值為false。
  • clustering.async.enabled:是否異步執(zhí)行clustering涌穆。默認值為false怔昨。
  • clustering.delta_commits:每隔多少次commit之后觸發(fā)clustering。默認為4宿稀。
  • clustering.tasks:clustering并行度趁舀。默認和寫入并行度相同。
  • clustering.plan.strategy.daybased.lookback.partitions:對應RECENT_DAYS策略祝沸,保留多少個分區(qū)參與clustering矮烹。默認值為2。
  • clustering.plan.strategy.daybased.skipfromlatest.partitions:對應RECENT_DAYS策略罩锐,跳過最近多少個分區(qū)奉狈,之后的分區(qū)參與clustering。默認值為0涩惑。
  • clustering.plan.strategy.cluster.begin.partition:對應SELECTED_PARTITIONS策略仁期,指定參與clustering的開始分區(qū)。無默認值境氢。
  • clustering.plan.strategy.cluster.end.partition:對應SELECTED_PARTITIONS策略蟀拷,指定參與clustering的結束分區(qū)。無默認值萍聊。
  • clustering.plan.strategy.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering问芬。無默認值。
  • clustering.plan.strategy.partition.selected:指定要參與clustering的分區(qū)寿桨。無默認值此衅。
  • clustering.plan.strategy.class:clustering策略類强戴。默認值為FlinkSizeBasedClusteringPlanStrategy。選擇最近N天的分區(qū)挡鞍,選取較小的file slice參與clustering骑歹。
  • clustering.plan.partition.filter.mode:分區(qū)過濾策略酌住。默認值為NONE残炮。
  • clustering.plan.strategy.target.file.max.bytes:每個clustering group(可理解為并行度)clustering完畢之后生成的文件大小上限。默認為1GB回窘。
  • clustering.plan.strategy.small.file.limit:小于該大小的文件會認為是clustering的參與對象翘县。默認值為600MB最域。
  • clustering.plan.strategy.sort.columns:clustering排序字段。多個字段使用逗號分隔锈麸。無默認值镀脂。
  • clustering.plan.strategy.max.num.groups:clustering plan階段創(chuàng)建出的clustering group數量,對應并行度忘伞。默認為30薄翅。

Spark 配置項

  • hoodie.clustering.plan.strategy.daybased.lookback.partitions:對應RECENT_DAYS策略,保留多少個分區(qū)參與clustering氓奈。默認值為2翘魄。
  • hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions:對應RECENT_DAYS策略,跳過最近多少個分區(qū)舀奶,之后的分區(qū)參與clustering熟丸。默認值為0。
  • hoodie.clustering.plan.strategy.cluster.begin.partition:對應SELECTED_PARTITIONS策略伪节,指定參與clustering的開始分區(qū)。無默認值绩鸣。
  • hoodie.clustering.plan.strategy.cluster.end.partition:對應SELECTED_PARTITIONS策略怀大,指定參與clustering的結束分區(qū)。無默認值呀闻。
  • hoodie.clustering.plan.strategy.small.file.limit:小于該大小的文件會認為是clustering的參與對象化借。默認值為300MB。
  • hoodie.clustering.plan.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering捡多。無默認值蓖康。
  • hoodie.clustering.plan.partition.selected:指定要參與clustering的分區(qū)。無默認值垒手。
  • hoodie.clustering.plan.strategy.class:clustering plan策略蒜焊。默認為org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy。查找小文件(hoodie.clustering.plan.strategy.small.file.limit)科贬,這些小文件參與clustering泳梆。
  • hoodie.clustering.execution.strategy.class:clustering執(zhí)行策略鳖悠。默認為org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy。按照指定的列排序优妙,并滿足配置的目標文件大小乘综。
  • hoodie.clustering.inline:是否啟用inline clustering。默認為false套硼。
  • hoodie.clustering.inline.max.commits:最多多少次commit觸發(fā)inline clustering卡辰,控制clustering的頻率。默認為4邪意。
  • hoodie.clustering.async.max.commits:控制async clustering的頻率九妈。默認為4。
  • hoodie.clustering.max.parallelism:clustering的最大并行度抄罕。默認為15允蚣。
  • hoodie.clustering.group.read.parallelism:Spark從clustering group讀取數據的并行度。默認值為20呆贿。
  • hoodie.clustering.plan.partition.filter.mode:分區(qū)過濾策略嚷兔。默認為NONE。
  • hoodie.clustering.plan.strategy.max.bytes.per.group:每個clustering group最多產生的數據量做入。默認為2GB冒晰。
  • hoodie.clustering.plan.strategy.max.num.groups:最大clustering group數量。每次clustering的最大操作數據量= hoodie.clustering.plan.strategy.max.bytes.per.group * hoodie.clustering.plan.strategy.max.num.groups竟块。
  • hoodie.clustering.plan.strategy.target.file.max.bytes:每個clustering group生成hoodie.clustering.plan.strategy.max.bytes.per.group / hoodie.clustering.plan.strategy.target.file.max.bytes個file group壶运。
  • hoodie.clustering.plan.strategy.single.group.clustering.enabled:是否能夠生成只有一個file group參與的clustering執(zhí)行計劃。默認為true浪秘。
  • hoodie.clustering.plan.strategy.sort.columns:clustering排序字段蒋情。多個字段使用逗號分隔。無默認值耸携。
  • hoodie.clustering.updates.strategy:update策略棵癣。默認為org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy。clustering的時候拒絕更新夺衍。配置org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy可允許更新狈谊。
  • hoodie.clustering.schedule.inline:是否啟用inline clustering排期。默認為false沟沙。
  • hoodie.clustering.async.enabled:是否啟用async clustering河劝。默認為false。
  • hoodie.layout.optimize.strategy:布局策略矛紫。使用LINEAR(線性)赎瞎,ZORDER還是HILBERT(希爾伯特曲線)。默認值是LINEAR含衔。
  • hoodie.layout.optimize.curve.build.method:可配置DIRECT或者SAMPLE煎娇。SpatialCurveCompositionStrategyType中SAMPLE的數據排序分布效果較DIRECT更好二庵,但是執(zhí)行速度更慢。默認配置的是DIRECT類型缓呛。
  • hoodie.layout.optimizebuild.curve.sample.size:對應SAMPLE類型催享,默認值為200000。
  • hoodie.layout.optimize.data.skipping.enable:是否在布局優(yōu)化完成后收集統(tǒng)計信息來啟用數據跳過功能哟绊。默認為true因妙。
  • hoodie.clustering.rollback.pending.replacecommit.on.conflict:默認值為false。如果允許對等待clustering的file group進行更新票髓,則應將此配置設置為回滾失敗或pending的clustering instants攀涵。 僅當插入或更新與pending clustering的file group存在沖突時,pending clustering才會被回滾洽沟。 在設置此配置時請務必謹慎以故,特別是在非常頻繁地執(zhí)行clustering操作時。這在極少數情況下可能導致競態(tài)條件裆操, 例如怒详,在獲取到實例后但回滾完成前clustering操作已完成。

離線觸發(fā)

使用Spark

任務提交命令如下:

spark-submit \  
--class org.apache.hudi.utilities.HoodieClusteringJob \  
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.15.0.jar \  
--props /path/to/config/clusteringjob.properties \  
--mode scheduleAndExecute \  
--base-path /path/to/hudi_table/basePath \  
--table-name hudi_table_schedule_clustering \  
--spark-memory 1g

由于clustering的配置項較多踪区,可以把這些配置項寫在/path/to/config/clusteringjob.properties文件中昆烁。例如:

hoodie.clustering.async.enabled=true  
hoodie.clustering.async.max.commits=4  
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824  
hoodie.clustering.plan.strategy.small.file.limit=629145600  
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy  
hoodie.clustering.plan.strategy.sort.columns=column1,column2

HoodieClusteringJob的參數如下:

參數名 是否必須 默認值 備注
--base-path - Hudi表根目錄
--table-name - 表名
--instant-time - 只在execute模式有效。指定執(zhí)行哪個instant time的clustering缎岗。如果沒有指定静尼。執(zhí)行最早排期的clustering。使用scheduleAndExecute默認該配置項會被忽略传泊。
--parallelism 1 clustering并行度
--spark-master - Spark master
--spark-memory - Spark內存
--retry 0 重試次數
--skip-clean true clustering完畢之后是否跳過clean
--retry-last-failed-clustering-job false 使用scheduleAndExecute有效鼠渺。是否重試最近失敗的clustering job
--mode - schedule表示排期。execute表示執(zhí)行眷细。scheduleAndExecute表示排期并執(zhí)行
--help false 打印幫助信息
--job-max-processing-time-ms 0 只有--retry-last-failed-clustering-job和scheduleAndExecute是否有效系冗。如果超過配置時間clustering job仍未完成。Hudi認為該job失敗并重新啟動
--props - clustering配置參數所在文件薪鹦。使用properties文件格式
--hoodie-conf - 額外的Hudi配置

使用Flink

Flink的HoodieFlinkClusteringJob不僅有clustering,還包含了archive和clean操作惯豆。

任務提交命令如下:

./bin/flink run \
    -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
    lib/hudi-flink1.17-bundle-0.15.0.jar \
    --path hdfs://xxx:8020/table

參數解析:

參數名 是否必須 默認值 備注
--path - Hudi表的根目錄
--clustering-delta-commits 1 最多多少次commit觸發(fā)clustering池磁,控制clustering的頻率
--clustering-tasks false -1 Clustering task 的并發(fā)數
--clean-policy false KEEP_LATEST_COMMITS clean策略】蓿可以使用KEEP_LATEST_COMMITS地熄, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS
--clean-retain-commits 10 保留最近n個commit不被清理
--clean-retain-hours 24 保留最近n小時的commit不被清理
--clean-retain-file-versions 5 保留最近n個文件版本不被清理
--archive-min-commits 20 歸檔commit前保留的最少commit數量
--archive-max-commits 30 歸檔commit前保留的最多commit數量
--schedule false 是否排期clustering plan
--instant-time - clustering instant time
--clean-async-enabled false 是否啟用異步clean
--plan-strategy-class FlinkSizeBasedClusteringPlanStrategy clustering策略類
--plan-partition-filter-mode NONE 分區(qū)過濾模式
--seq FIFO Clustering plan的執(zhí)行順序芯杀。
LIFO: 從最近的plan開始執(zhí)行端考,
FIFO: 從最早的plan開始執(zhí)行
--target-file-max-bytes 1GB 最大目標文件
--small-file-limit 600 小于該大小的文件會參與clustering
--skip-from-latest-partitions 0 clustering跳過最近n個分區(qū)
--sort-columns - clustering排序字段雅潭。多個字段使用逗號分隔
--sort-memory 128 排序內存大小
--max-num-groups 30 Clustering group個數
--target-partitions 2 參與clustering的分區(qū)數
--cluster-begin-partition - Clustering開始分區(qū)
--cluster-end-partition - Clustering結束分區(qū)
--partition-regex-pattern - 匹配該正則的partition參與clustering
--partition-selected - 指定參與clustering的分區(qū)
--service false 是否開啟 service 模式,service模式為常駐作業(yè)
--min-clustering-interval-seconds 600s 異步clustering服務的最小時間間隔
--hoodie-conf - 額外的Hudi配置
--props - clustering等參數配置所在文件路徑

創(chuàng)建clustering執(zhí)行計劃

創(chuàng)建執(zhí)行計劃位于ClusteringPlanActionExecutor類的execute方法却特,代碼如下所示:

  @Override  
  public Option<HoodieClusteringPlan> execute() {  
    // 創(chuàng)建執(zhí)行計劃
    Option<HoodieClusteringPlan> planOption = createClusteringPlan();  
    // 如果計劃創(chuàng)建成功(可能存在沒有file slice需要cluster的情況)
    if (planOption.isPresent()) {  
      // 創(chuàng)建clustering instant
      // clustering instant的類型是replace commit扶供,意味這clustering之后的數據文件替換掉先前的
      HoodieInstant clusteringInstant =  
          new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);  
      try {  
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()  
            .setOperationType(WriteOperationType.CLUSTER.name())  
            .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))  
            .setClusteringPlan(planOption.get())  
            .build();  
        // 添加到pending commit中
        table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,  
            TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));  
      } catch (IOException ioe) {  
        throw new HoodieIOException("Exception scheduling clustering", ioe);  
      }  
    }  
  
    return planOption;  
  }  
}

該方法創(chuàng)建clustering執(zhí)行計劃,然后再創(chuàng)建一個pending replace commit裂明。因為clutering完成之后椿浓,新生成的數據文件會替換掉原有的數據文件,因此對應的commit類型為replace闽晦。
繼續(xù)分析createClusteringPlan方法扳碍。其中首先判斷是否滿足可執(zhí)行clustering的條件,然后獲取配置的clustering策略類仙蛉,創(chuàng)建clustering計劃笋敞。
Clustering并不是說每次schedule都必須要執(zhí)行。為了效率clustering要求至少要經過N次commit之后荠瘪,才會schedule夯巷。此限制通過配置項hoodie.clustering.inline.max.commitshoodie.clustering.async.max.commits(分別對應inline和異步)來控制。如果滿足clustering條件巧还,通過hoodie.clustering.plan.strategy.class配置的策略類生成執(zhí)行計劃鞭莽。
代碼如下所示:

protected Option<HoodieClusteringPlan> createClusteringPlan() {  
  LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); 
  // 獲取上一次clustering對應的instant 
  Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();  
  // 獲取上次clustering之后提交的次數
  int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()  
      .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)  
      .countInstants();  
  // 讀取hoodie.clustering.inline.max.commits配置,默認為4
  // 該配置項表示在上次clustering之后至少需要經歷幾次commit才能schedule下一次clustering
  // 這里處理inline clustering的配置
  if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {  
    LOG.warn("Not scheduling inline clustering as only " + commitsSinceLastClustering  
        + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "  
        + config.getInlineClusterMaxCommits());  
    return Option.empty();  
  }  
  // 同上麸祷,但這里處理異步clustering的配置
  // 配置項為hoodie.clustering.async.max.commits澎怒,默認值4
  if ((config.isAsyncClusteringEnabled() || config.scheduleInlineClustering()) && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {  
    LOG.warn("Not scheduling async clustering as only " + commitsSinceLastClustering  
        + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "  
        + config.getAsyncClusterMaxCommits());  
    return Option.empty();  
  }  
  
  LOG.info("Generating clustering plan for table " + config.getBasePath());  
  // 加載clustering策略類,對應配置項hoodie.clustering.plan.strategy.class
  // 默認為SparkSizeBasedClusteringPlanStrategy
  ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(  
      ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),  
          new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);  
  // 生成clustering計劃
  return strategy.generateClusteringPlan();  
}

接著我們聚焦默認的策略SparkSizeBasedClusteringPlanStrategy阶牍。該策略根據文件大小來決定文件數據是否參與clustering喷面。分析clustering計劃生成步驟。
generateClusteringPlan方法位于SparkSizeBasedClusteringPlanStrategy的父類PartitionAwareClusteringPlanStrategy中走孽。該方法根據hoodie.clustering.plan.strategy.partition.selected惧辈,hoodie.clustering.plan.strategy.partition.regex.patternhoodie.clustering.plan.partition.filter.mode條件過濾出符合要求的partition path。獲取它們包含的file slice磕瓷。從這些file slice中篩選出小文件(小于hoodie.clustering.plan.strategy.small.file.limit的文件)盒齿。將這些按照clutering要求的group大小(hoodie.clustering.plan.strategy.max.bytes.per.group),分成若干個group困食。Group數量上限為hoodie.clustering.plan.strategy.max.num.groups边翁。此步驟對應小文件合并功能。
代碼如下所示:

@Override  
public Option<HoodieClusteringPlan> generateClusteringPlan() {  
  if (!checkPrecondition()) {  
    return Option.empty();  
  }  
  // 獲取metaclient硕盹,用來操作metadata
  HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();  
  LOG.info("Scheduling clustering for " + metaClient.getBasePath());  
  // 獲取寫配置
  HoodieWriteConfig config = getWriteConfig();  
  // 讀取配置項hoodie.clustering.plan.strategy.partition.selected
  // 確定在哪些分區(qū)運行clustering
  String partitionSelected = config.getClusteringPartitionSelected();  
  LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);  
  List<String> partitionPaths;  

  // 如果沒有配置
  if (StringUtils.isNullOrEmpty(partitionSelected)) {  
    // get matched partitions if set  
    // 讀取hoodie.clustering.plan.strategy.partition.regex.pattern配置
    // 獲取正則表達式匹配的partition path
    partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));  
    // filter the partition paths if needed to reduce list status  
  } else {  
    // 如果配置了partitionSelected符匾,優(yōu)先這個配置
    partitionPaths = Arrays.asList(partitionSelected.split(","));  
  }  
  // 過濾需要clustering的分區(qū)
  // 過濾策略對應配置項hoodie.clustering.plan.partition.filter.mode
  // 可用策略為NONE,RECENT_DAYS瘩例,SELECTED_PARTITIONS和DAY_ROLLING
  partitionPaths = filterPartitionPaths(partitionPaths);  
  LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);  

  // 如果所有的分區(qū)都被排除了啊胶,返回空
  if (partitionPaths.isEmpty()) {  
    // In case no partitions could be picked, return no clustering plan  
    return Option.empty();  
  }  

  // 排除掉分區(qū)中已經要做clustering的file group(pending狀態(tài))
  // 篩選出小文件
  // 決定小文件判斷閾值的配置項為hoodie.clustering.plan.strategy.small.file.limit
  // 將其映射為HoodieClusteringGroup
  // 映射邏輯后面分析
  List<HoodieClusteringGroup> clusteringGroups = getEngineContext()  
      .flatMap(  
          partitionPaths,  
          partitionPath -> {  
            List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());  
            return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());  
          },  
          partitionPaths.size())  
      .stream()  
      .limit(getWriteConfig().getClusteringMaxNumGroups())  
      .collect(Collectors.toList());  
  
  if (clusteringGroups.isEmpty()) {  
    LOG.warn("No data available to cluster");  
    return Option.empty();  
  }  
  // 構造cluster策略
  HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()  
      .setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())  
      .setStrategyParams(getStrategyParams())  
      .build();  

  // 構造clustering計劃
  return Option.of(HoodieClusteringPlan.newBuilder()  
      .setStrategy(strategy)  
      .setInputGroups(clusteringGroups)  
      .setExtraMetadata(getExtraMetadata())  
      .setVersion(getPlanVersion())  
      .setPreserveHoodieMetadata(true)  
      .build());  
}

上面的filterPartitionPaths通過配置的hoodie.clustering.plan.partition.filter.mode過濾出所需的partition甸各。具有有如下選項:

  • NONE: 不過濾,返回所有partition path焰坪。
  • RECENT_DAYS: 按照partition path倒序排序趣倾。跳過hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions個partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions個partition琳彩。如果partition path是日期誊酌,可以實現(xiàn)過濾出最近N天的數據。
  • SELECTED_PARTITIONS: 獲取hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition之間的分區(qū)露乏。
  • DAY_ROLLING: 每次clustering一部分分區(qū)碧浊。如果分區(qū)的index對24取余等于排期時候當前時間的小時數,則該分區(qū)需要clustering瘟仿。

buildClusteringGroupsForPartition方法將篩選出的file slice按照從小到大排序箱锐。然后按照clustering配置的group size和group數量條件,合并為clustering group劳较。

protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {  
  // 獲取寫入配置
  HoodieWriteConfig writeConfig = getWriteConfig();  
  
  List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();  
  List<FileSlice> currentGroup = new ArrayList<>();  
  
  // Sort fileSlices before dividing, which makes dividing more compact  
  // file slice按照base file大小排序驹止,如果文件不存在,按照最大大小排序
  List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);  
  sortedFileSlices.sort((o1, o2) -> (int)  
      ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())  
          - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));  
  
  long totalSizeSoFar = 0;  
  
  for (FileSlice currentSlice : sortedFileSlices) {  
    // 遍歷所有file slice
    // 獲取當前file slice的大小观蜗,如果文件不存在臊恋,獲取大小上限
    long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();  
    // check if max size is reached and create new group, if needed.  
    // 如果本次累積的文件大小大于hoodie.clustering.plan.strategy.max.bytes.per.group
    // 并且當前group不為空
    if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {  
      // totalSizeSoFar除以hoodie.clustering.plan.strategy.target.file.max.bytes向上取整
      // 計算出輸出組編號
      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());  
      LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "  
          + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);  
      // 加入到fileSliceGroups集合中,保存結果
      // 保存了輸出組組和輸出組編號
      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));  
      // 結果保存之后墓捻,清零currentGroup和totalSizeSoFar
      currentGroup = new ArrayList<>();  
      totalSizeSoFar = 0;  
  
      // if fileSliceGroups's size reach the max group, stop loop  
      // 檢查file group個數是否超過了hoodie.clustering.plan.strategy.max.num.groups
      // 超過的話退出循環(huán)抖仅,本次不再處理后面的file slice
      if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {  
        LOG.info("Having generated the maximum number of groups : " + writeConfig.getClusteringMaxNumGroups());  
        break;  
      }  
    }  
  
    // Add to the current file-group  
    // 加入到當前文件組
    currentGroup.add(currentSlice);  
    // assume each file group size is ~= parquet.max.file.size  
    // 累積大小到totalSizeSoFar變量
    totalSizeSoFar += currentSize;  
  }  
  
  if (!currentGroup.isEmpty()) {  
    // 處理最后一個output group
    // shouldClusteringSingleGroup在下面兩個配置項任意一個啟用的時候為true
    // 表示只有一個輸出文件組的話,也clustering
    // hoodie.clustering.plan.strategy.sort.columns
    // hoodie.clustering.plan.strategy.single.group.clustering.enabled
    if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) {  
      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());  
      LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "  
          + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);  
      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));  
    }  
  }
  // 構造并返回fileSliceGroups
  return fileSliceGroups.stream().map(fileSliceGroup ->  
    HoodieClusteringGroup.newBuilder()  
        .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))  
        .setNumOutputFileGroups(fileSliceGroup.getRight())  
        .setMetrics(buildMetrics(fileSliceGroup.getLeft()))  
        .build());
}

到此為止clustering計劃生成部分分析完畢砖第。

根據執(zhí)行計劃執(zhí)行clustering

Clustering的執(zhí)行開始于BaseHoodieWriteClient::cluster撤卢。
在clustering之前,首先執(zhí)行preWrite操作梧兼。

public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {  
  // 創(chuàng)建hudi table放吩,根據引擎(Spark/Flink)和表類型(MOR/COW)的不同,有多種實現(xiàn)類
  HoodieTable table = createTable(config, context.getHadoopConf().get());
  // 執(zhí)行寫入前操作羽杰,包含:
  // inflight和requested instant去掉本次instant
  // 啟動clean和archive服務(如果開啟的話)
  preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
  // 執(zhí)行clutering
  return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}

接著是BaseHoodieTableServiceClient::cluster方法渡紫。該方法檢測當前clustering是否已經pending,配置監(jiān)控考赛,執(zhí)行clustering并返回clustering執(zhí)行結果元數據腻惠。

public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {  
  // 同上個方法,獲取table
  HoodieTable<?, I, ?, T> table = createTable(config, context.getHadoopConf().get());  
  HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();  
  HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);  
  // 檢查本次cluster是否已經pending狀態(tài)欲虚。如果是,需要回滾
  if (pendingClusteringTimeline.containsInstant(inflightInstant)) {  
    table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));  
    table.getMetaClient().reloadActiveTimeline();  
  }  
  // cluster時長計時器監(jiān)控
  clusteringTimer = metrics.getClusteringCtx();  
  LOG.info("Starting clustering at " + clusteringInstant);  
  // 調用table的cluster服務
  HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant);
  // 轉換metadata到對應計算引擎格式
  HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(writeMetadata);  
  // Validation has to be done after cloning. if not, it could result in referencing the write status twice which means clustering could get executed twice.  
  // 檢查cluster寫入狀態(tài)不能為空
  validateClusteringCommit(clusteringMetadata, clusteringInstant, table);  
  
  // Publish file creation metrics for clustering.  
  // 讀取并返回監(jiān)控信息
  if (config.isMetricsOn()) {  
    clusteringMetadata.getWriteStats()  
        .ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()  
            .filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null)  
            .map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime())  
            .forEach(metrics::updateClusteringFileCreationMetrics));  
  }  
  
  // TODO : Where is shouldComplete used ?  
  if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {  
    completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));  
  }  
  return clusteringMetadata;  
}

以Spark為例悔雹,我們查看COW表的HoodieSparkCopyOnWriteTable::cluster邏輯复哆。

public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,  
                                                         String clusteringInstantTime) {  
  return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();  
}

此邏輯交由SparkExecuteClusteringCommitActionExecutor執(zhí)行欣喧。繼續(xù)分析SparkExecuteClusteringCommitActionExecutor::execute方法,它調用了BaseCommitActionExecutor::executeClustering方法梯找。

@Override  
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {  
  return executeClustering(clusteringPlan);  
}

BaseCommitActionExecutor::executeClustering該方法反射加載hoodie.clustering.execution.strategy.class配置項對應的clustering策略(默認為SparkSortAndSizeExecutionStrategy)唆阿,然后執(zhí)行clustering。

protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {  
  // 創(chuàng)建instant
  HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);  
  // Mark instant as clustering inflight  
  // 標記instant為inflight狀態(tài)
  table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());  
  table.getMetaClient().reloadActiveTimeline();  
  
  // Disable auto commit. Strategy is only expected to write data in new files.  
  // 禁用自動commit
  config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());  

  // 添加_hoodie_commit_time等5個元數據字段到schema中
  final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));  
  // 加載hoodie.clustering.execution.strategy.class配置項對應的clustering策略類
  // 執(zhí)行它的performClustering方法
  // 對于默認的配置锈锤,clustering策略類為SparkSortAndSizeExecutionStrategy
  HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (  
      (ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)  
          ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),  
              new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))  
      .performClustering(clusteringPlan, schema, instantTime);  
  // 獲取寫入狀態(tài)
  HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();  
  // 更新表索引驯鳖,更新數據所在位置
  HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);  
  // 持久化保存
  statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime));  
  // triggers clustering.  
  // 更新writeMetadata中的writestats
  writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());  
// 獲取clustering操作的數據文件file id和partition path
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); 
// 提交修改的writeMetadata,clustering對后續(xù)操作生效
  commitOnAutoCommit(writeMetadata);  
  if (!writeMetadata.getCommitMetadata().isPresent()) {  
    HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),  
        extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());  
    writeMetadata.setCommitMetadata(Option.of(commitMetadata));  
  }  
  return writeMetadata;  
}

Clustering的執(zhí)行細節(jié)位于策略類中久免。我們這里分析默認的策略類SparkSortAndSizeExecutionStrategy::performClustering方法浅辙。該方法位于父類MultipleSparkJobExecutionStrategy::performClustering中。該方法使用線程池阎姥,一個線程處理一個input group(對應執(zhí)行計劃中提到的clustering group)记舆,但線程數不能超過配置的最大值。

@Override  
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {  
  JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());  
  // 是否保留元數據呼巴,默認為true
  boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);  
  // 使用專門的線程執(zhí)行clustering泽腮。創(chuàng)建clustering線程池
  // 取InputGroups數量(plan中clustering生成file group的數量)
  // 最大值為hoodie.clustering.max.parallelism,最大值默認15
  ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(  
      Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),  
      new CustomizedThreadFactory("clustering-job-group", true));  
  try {  
    // execute clustering for each group async and collect WriteStatus  
    // 在線程池中執(zhí)行clustering衣赶,獲取執(zhí)行結果
    Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(  
            clusteringPlan.getInputGroups().stream()  
                .map(inputGroup -> {  
 // hoodie.datasource.write.row.writer.enable如果為true诊赊,使用Spark原生的Row類型,避免類型轉換引發(fā)的額外代價
                  if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {  
                    return runClusteringForGroupAsyncAsRow(inputGroup,  
                        clusteringPlan.getStrategy().getStrategyParams(),  
                        shouldPreserveMetadata,  
                        instantTime,  
                        clusteringExecutorService);  
                  }  
                  return runClusteringForGroupAsync(inputGroup,  
                      clusteringPlan.getStrategy().getStrategyParams(),  
                      shouldPreserveMetadata,  
                      instantTime,  
                      clusteringExecutorService);  
                })  
                .collect(Collectors.toList()))  
        .join()  
        .stream();  
    JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));  
    JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);  
  
    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();  
    writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));  
    return writeMetadata;  
  } finally {  
    clusteringExecutorService.shutdown();  
  }  
}

我們繼續(xù)分析默認配置的執(zhí)行路線ClusteringPlanActionExecutor::runClusteringForGroupAsyncAsRow府瞄。該方法獲取到所有需要clustering的數據到Spark的dataset碧磅,讀取表schema和各個file id從屬的partition path的對應關系。然后執(zhí)行clustering摘能。

private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,  
                                                                                   Map<String, String> strategyParams,  
                                                                                   boolean shouldPreserveHoodieMetadata,  
                                                                                   String instantTime,  
                                                                                   ExecutorService clusteringExecutorService) {  
  return CompletableFuture.supplyAsync(() -> {  
    // 獲取spark context
    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());  
    // 轉換所有clustering涉及到的數據為Spark DataSet
    Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);  
    // 獲取帶有元數據字段的schema
    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));  
    // 轉換clustering的file slice為HoodieFileGroupId
    // 保存的是partition path和file id的對應關系
    List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()  
        .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))  
        .collect(Collectors.toList());  
    // 執(zhí)行clustering
    return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,  
        clusteringGroup.getExtraMetadata());  
  }, clusteringExecutorService);  
}

SparkSortAndSizeExecutionStrategy::performClusteringWithRecordsAsRow方法獲取分區(qū)器续崖,將數據重新排序,最后使用批量插入的方式团搞,寫回parquet文件严望。

@Override  
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,  
                                                                 int numOutputGroups,  
                                                                 String instantTime, Map<String, String> strategyParams,  
                                                                 Schema schema,  
                                                                 List<HoodieFileGroupId> fileGroupIdList,  
                                                                 boolean shouldPreserveHoodieMetadata,  
                                                                 Map<String, String> extraMetadata) {  
  LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);  
  // 生成寫入配置,clustering輸出多少個file group就配置多少個bulk insert并行度
  HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()  
      .withBulkInsertParallelism(numOutputGroups)  
      .withProps(getWriteConfig().getProps()).build();  
  // 配置最大parquet文件大小為clustering目標文件最大上限
  // 對應配置項為hoodie.clustering.plan.strategy.target.file.max.bytes
  newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));  

  // 獲取分區(qū)器
  BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);  
  // 使用分區(qū)器分區(qū)數據(數據重新排序)
  Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);  
  // 將重排序之后的數據批量插入
  return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,  
      partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);  
}

接下來分析的重點是clustering的另一個功能:將數據重排序逻恐。因此重點是分區(qū)器和分區(qū)器重排序的邏輯像吻。獲取分區(qū)器的邏輯位于它的父類MultipleSparkJobExecutionStrategy::getRowPartitioner中。代碼如下:

private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,  
                                                    Schema schema,  
                                                    boolean isRowPartitioner) { 
  // 獲取排序字段配置項
  // 對應的配置項為hoodie.clustering.plan.strategy.sort.columns 
  // 使用逗號分隔
  Option<String[]> orderByColumnsOpt =  
      Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))  
          .map(listStr -> listStr.split(","));  
  
  return orderByColumnsOpt.map(orderByColumns -> {  
    // 獲取hoodie.layout.optimize.strategy配置复隆,字段可使用zorder或者hilbert曲線排序或者linear線性排序
    HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();  
    switch (layoutOptStrategy) {  
      case ZORDER:  
      case HILBERT:  
        return isRowPartitioner  
            ? new RowSpatialCurveSortPartitioner(getWriteConfig())  
            : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,  
            getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);  
      case LINEAR:  
        return isRowPartitioner  
            ? new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())  
            : new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());  
      default:  
        throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));  
    }  
  }).orElseGet(() -> isRowPartitioner  
      ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)  
      : BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));  
}

對于使用Spark原生Row類型的情況拨匆,isRowPartitionertrue。如果使用ZORDER或者HILBERT排序策略挽拂,使用RowSpatialCurveSortPartitioner惭每,LINEAR排序策略對應的是RowCustomColumnsSortPartitioner
接下來我們分別分析這兩個partitioner是如何對數據重排序的。
首先是RowSpatialCurveSortPartitioner::repartitionRecords台腥,代碼如下:

@Override  
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {  
  return reorder(records, outputPartitions);  
}

repartitionRecords調用了reorder方法宏赘。

protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) { 
  // 檢查排序字段配置
  if (orderByColumns.length == 0) {  
    // No-op  
    return dataset;  
  }  
  
  List<String> orderedCols = Arrays.asList(orderByColumns);  
  // curveCompositionStrategyType默認為DIRECT
  switch (curveCompositionStrategyType) {  
    case DIRECT:  
      return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);  
    case SAMPLE:  
      return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);  
    default:  
      throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));  
  }  
}

SpatialCurveCompositionStrategyType中SAMPLE的數據排序分布效果較DIRECT更好,但是執(zhí)行速度更慢黎侈。默認配置的是DIRECT類型察署。
接下來分析DIRECT類型處理方式,對應的是SpaceCurveSortingHelper::orderDataFrameByMappingValues峻汉。該方法首先判斷排序字段配置的合法性贴汪。然后將數據按照排序字段,使用Z曲線或者是Hilbert曲線重排序休吠。

public static Dataset<Row> orderDataFrameByMappingValues(  
    Dataset<Row> df,  
    HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,  
    List<String> orderByCols,  
    int targetPartitionCount  
) {  
  // 獲取字段名稱和StructField的對應關系
  Map<String, StructField> columnsMap =  
      Arrays.stream(df.schema().fields())  
          .collect(Collectors.toMap(StructField::name, Function.identity()));  
  // 檢查排序字段是否出現(xiàn)在schema中
  List<String> checkCols =  
      orderByCols.stream()  
          .filter(columnsMap::containsKey)  
          .collect(Collectors.toList());  
  // 如果沒有扳埂,說明排序字段配置有誤,跳過不再繼續(xù)執(zhí)行
  if (orderByCols.size() != checkCols.size()) {  
    LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));  
    return df;  
  }  
  
  // In case when there's just one column to be ordered by, we can skip space-curve  
  // ordering altogether (since it will match linear ordering anyway) 
  // 如果排序字段只有一個蛛碌,沒必要使用空間曲線方式排序聂喇,直接使用Spark排序
  if (orderByCols.size() == 1) {  
    String orderByColName = orderByCols.get(0);  
    LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));  
  
    // TODO validate if we need Spark to re-partition  
    return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));  
  }  
  // 字段個數
  int fieldNum = df.schema().fields().length;  

  // 返回排序字段對應的index和字段信息對應關系
  Map<Integer, StructField> fieldMap =  
      orderByCols.stream()  
          .collect(  
              Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));  
  
  JavaRDD<Row> sortedRDD;  
  // 根據布局優(yōu)化策略,排序RDD
  switch (layoutOptStrategy) {  
    case ZORDER:  
      sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);  
      break;  
    case HILBERT:  
      sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);  
      break;  
    default:  
      throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));  
  }  
  
  // Compose new {@code StructType} for ordered RDDs  
  // 為排序后的RDD創(chuàng)建StructType(schema)
  StructType newStructType = composeOrderedRDDStructType(df.schema());  

  // 返回dataset
  return df.sparkSession()  
      .createDataFrame(sortedRDD, newStructType)  
      .drop("Index");  
}

我們先看第一種情況蔚携,Z曲線排序希太。方法位于SpaceCurveSortingHelper::createZCurveSortedRDD
該方法將多個排序字段的值映射為8字節(jié)內容(多的截取少的補充)酝蜒,然后每個字段的字節(jié)內容各取一位拼接到一起誊辉,然后再各取第二位拼接……一直循環(huán),這個步驟稱之為二進制數據交織(interleaving)亡脑。將交織之后的值作為一個字段堕澄,拼接在數據中。然后按照該字段的內容排序霉咨。

private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {  
  return originRDD.map(row -> {  
    // 將數據中每個排序字段的值填充為8字節(jié)內容
    // 多的截取少的補充
    byte[][] zBytes = fieldMap.entrySet().stream()  
      .map(entry -> {  
        int index = entry.getKey();  
        StructField field = entry.getValue();  
        return mapColumnValueTo8Bytes(row, index, field.dataType());  
      })  
      .toArray(byte[][]::new);  

    // Interleave received bytes to produce Z-curve ordinal  
    // 將這些排序字段的值交織起來
    // 比如有A蛙紫,B兩個排序字段。A字段值取1位途戒,然后取B字段值1位坑傅,然后A再取下一位,B取下一位喷斋,以此類推
    byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);  
    // 追加zOrdinalBytes到Row
    return appendToRow(row, zOrdinalBytes);  
  })  
    // 按照該字段的值(zOrdinalBytes唁毒,位于row的末尾,index正好是fieldNum)排序
    .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);  
}

第二種情況為Hilbert曲線星爪,對應方法為SpaceCurveSortingHelper::createHilbertSortedRDD浆西。和ZOrder曲線排序處理邏輯基本相同,只是將Z曲線替換成了Hilbert曲線顽腾。

private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {  
  // NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized  
  //       only once per partition  
  return originRDD.mapPartitions(rows -> {  
    // 創(chuàng)建hilbert fieldMap個數維度曲線
    HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());  
    return new Iterator<Row>() {  
  
      @Override  
      public boolean hasNext() {  
        return rows.hasNext();  
      }  
  
      @Override  
      public Row next() {  
        Row row = rows.next();  
        // 將row中的排序字段值映射為long類型
        long[] longs = fieldMap.entrySet().stream()  
            .mapToLong(entry -> {  
              int index = entry.getKey();  
              StructField field = entry.getValue();  
              return mapColumnValueToLong(row, index, field.dataType());  
            })  
            .toArray();  
  
        // Map N-dimensional coordinates into position on the Hilbert curve  
        // 使用hilbert曲線索引上面的long值近零,結果作為后面的排序依據
        byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);  
        return appendToRow(row, hilbertCurvePosBytes);  
      }  
    };  
  })  
      .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);  
}

和空間曲線的方式相比,LINEAR線性排序顯得較為簡單。代碼位于RowCustomColumnsSortPartitioner::repartitionRecords
久信。通過spark的sort算子按照配置的column排序猪瞬。

@Override  
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {  
  return records  
      .sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))  
      .coalesce(outputSparkPartitions);  
}

到這里為止分區(qū)器的邏輯分析完畢。

參考文獻

Clustering | Apache Hudi

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末入篮,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子幌甘,更是在濱河造成了極大的恐慌潮售,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锅风,死亡現(xiàn)場離奇詭異酥诽,居然都是意外死亡,警方通過查閱死者的電腦和手機皱埠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門肮帐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人边器,你說我怎么就攤上這事训枢。” “怎么了忘巧?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵恒界,是天一觀的道長。 經常有香客問我砚嘴,道長十酣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任际长,我火速辦了婚禮耸采,結果婚禮上,老公的妹妹穿的比我還像新娘工育。我一直安慰自己,他們只是感情好翅娶,可當我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布文留。 她就那樣靜靜地躺著,像睡著了一般竭沫。 火紅的嫁衣襯著肌膚如雪燥翅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天蜕提,我揣著相機與錄音森书,去河邊找鬼。 笑死,一個胖子當著我的面吹牛凛膏,可吹牛的內容都是我干的杨名。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼猖毫,長吁一口氣:“原來是場噩夢啊……” “哼台谍!你這毒婦竟也來了?” 一聲冷哼從身側響起吁断,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤趁蕊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后仔役,有當地人在樹林里發(fā)現(xiàn)了一具尸體掷伙,經...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年又兵,在試婚紗的時候發(fā)現(xiàn)自己被綠了任柜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡沛厨,死狀恐怖宙地,靈堂內的尸體忽然破棺而出彤守,到底是詐尸還是另有隱情迁沫,我是刑警寧澤慨丐,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布叮趴,位于F島的核電站答朋,受9級特大地震影響希停,放射性物質發(fā)生泄漏下隧。R本人自食惡果不足惜咨演,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一辰企、第九天 我趴在偏房一處隱蔽的房頂上張望风纠。 院中可真熱鬧,春花似錦牢贸、人聲如沸竹观。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽臭增。三九已至,卻和暖如春竹习,著一層夾襖步出監(jiān)牢的瞬間誊抛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工整陌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拗窃,地道東北人瞎领。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像随夸,于是被迫代替她去往敵國和親九默。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內容