什么是Clustering
開門見山蒿叠,Clustering主要有兩個作用:數據小文件合并和重排序明垢。
當數據寫入Hudi表時,為了提高寫入效率和存儲利用率市咽,可能會產生大量小文件痊银。Hudi的Clustering機制允許在后臺周期性地將這些小文件合并成大文件,從而減少存儲碎片和元數據管理開銷施绎,提高查詢性能溯革。
Clustering過程可以重新組織和排序數據,依據用戶指定的列進行排序谷醉,這樣能提升相關查詢的性能致稀,比如范圍掃描或者JOIN操作,通過預排序的數據俱尼,查詢引擎能夠更高效地處理查詢請求抖单。
本篇首先介紹clustering的配置和操作,然后分析clustering的源代碼,包含clustering執(zhí)行計劃的創(chuàng)建和根據計劃執(zhí)行clustering過程兩個部分矛绘。
Clustering分區(qū)過濾策略
Clustering分區(qū)過濾策略按照hoodie.clustering.plan.partition.filter.mode
配置項過濾出所需的partition耍休。有如下選項:
- NONE: 不過濾,返回所有partition path蔑歌。
- RECENT_DAYS: 按照partition path倒序排序羹应。跳過
hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions
個partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions
個partition次屠。如果partition path是日期园匹,可以實現(xiàn)過濾出最近N天的數據。 - SELECTED_PARTITIONS: 獲取
hoodie.clustering.plan.strategy.cluster.begin.partition
和hoodie.clustering.plan.strategy.cluster.end.partition
之間的分區(qū)劫灶。 - DAY_ROLLING: 每次clustering一部分分區(qū)裸违。如果分區(qū)的index對24取余等于排期時候當前時間的小時數,則該分區(qū)需要clustering本昏。
配置項
Flink 配置項
- clustering.schedule.enabled:是否排期clustering供汛。默認值為false。
- clustering.async.enabled:是否異步執(zhí)行clustering涌穆。默認值為false怔昨。
- clustering.delta_commits:每隔多少次commit之后觸發(fā)clustering。默認為4宿稀。
- clustering.tasks:clustering并行度趁舀。默認和寫入并行度相同。
- clustering.plan.strategy.daybased.lookback.partitions:對應RECENT_DAYS策略祝沸,保留多少個分區(qū)參與clustering矮烹。默認值為2。
- clustering.plan.strategy.daybased.skipfromlatest.partitions:對應RECENT_DAYS策略罩锐,跳過最近多少個分區(qū)奉狈,之后的分區(qū)參與clustering。默認值為0涩惑。
- clustering.plan.strategy.cluster.begin.partition:對應SELECTED_PARTITIONS策略仁期,指定參與clustering的開始分區(qū)。無默認值境氢。
- clustering.plan.strategy.cluster.end.partition:對應SELECTED_PARTITIONS策略蟀拷,指定參與clustering的結束分區(qū)。無默認值萍聊。
- clustering.plan.strategy.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering问芬。無默認值。
- clustering.plan.strategy.partition.selected:指定要參與clustering的分區(qū)寿桨。無默認值此衅。
- clustering.plan.strategy.class:clustering策略類强戴。默認值為FlinkSizeBasedClusteringPlanStrategy。選擇最近N天的分區(qū)挡鞍,選取較小的file slice參與clustering骑歹。
- clustering.plan.partition.filter.mode:分區(qū)過濾策略酌住。默認值為NONE残炮。
- clustering.plan.strategy.target.file.max.bytes:每個clustering group(可理解為并行度)clustering完畢之后生成的文件大小上限。默認為1GB回窘。
- clustering.plan.strategy.small.file.limit:小于該大小的文件會認為是clustering的參與對象翘县。默認值為600MB最域。
- clustering.plan.strategy.sort.columns:clustering排序字段。多個字段使用逗號分隔锈麸。無默認值镀脂。
- clustering.plan.strategy.max.num.groups:clustering plan階段創(chuàng)建出的clustering group數量,對應并行度忘伞。默認為30薄翅。
Spark 配置項
- hoodie.clustering.plan.strategy.daybased.lookback.partitions:對應RECENT_DAYS策略,保留多少個分區(qū)參與clustering氓奈。默認值為2翘魄。
- hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions:對應RECENT_DAYS策略,跳過最近多少個分區(qū)舀奶,之后的分區(qū)參與clustering熟丸。默認值為0。
- hoodie.clustering.plan.strategy.cluster.begin.partition:對應SELECTED_PARTITIONS策略伪节,指定參與clustering的開始分區(qū)。無默認值绩鸣。
- hoodie.clustering.plan.strategy.cluster.end.partition:對應SELECTED_PARTITIONS策略怀大,指定參與clustering的結束分區(qū)。無默認值呀闻。
- hoodie.clustering.plan.strategy.small.file.limit:小于該大小的文件會認為是clustering的參與對象化借。默認值為300MB。
- hoodie.clustering.plan.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering捡多。無默認值蓖康。
- hoodie.clustering.plan.partition.selected:指定要參與clustering的分區(qū)。無默認值垒手。
- hoodie.clustering.plan.strategy.class:clustering plan策略蒜焊。默認為org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy。查找小文件(hoodie.clustering.plan.strategy.small.file.limit)科贬,這些小文件參與clustering泳梆。
- hoodie.clustering.execution.strategy.class:clustering執(zhí)行策略鳖悠。默認為org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy。按照指定的列排序优妙,并滿足配置的目標文件大小乘综。
- hoodie.clustering.inline:是否啟用inline clustering。默認為false套硼。
- hoodie.clustering.inline.max.commits:最多多少次commit觸發(fā)inline clustering卡辰,控制clustering的頻率。默認為4邪意。
- hoodie.clustering.async.max.commits:控制async clustering的頻率九妈。默認為4。
- hoodie.clustering.max.parallelism:clustering的最大并行度抄罕。默認為15允蚣。
- hoodie.clustering.group.read.parallelism:Spark從clustering group讀取數據的并行度。默認值為20呆贿。
- hoodie.clustering.plan.partition.filter.mode:分區(qū)過濾策略嚷兔。默認為NONE。
- hoodie.clustering.plan.strategy.max.bytes.per.group:每個clustering group最多產生的數據量做入。默認為2GB冒晰。
- hoodie.clustering.plan.strategy.max.num.groups:最大clustering group數量。每次clustering的最大操作數據量= hoodie.clustering.plan.strategy.max.bytes.per.group * hoodie.clustering.plan.strategy.max.num.groups竟块。
- hoodie.clustering.plan.strategy.target.file.max.bytes:每個clustering group生成hoodie.clustering.plan.strategy.max.bytes.per.group / hoodie.clustering.plan.strategy.target.file.max.bytes個file group壶运。
- hoodie.clustering.plan.strategy.single.group.clustering.enabled:是否能夠生成只有一個file group參與的clustering執(zhí)行計劃。默認為true浪秘。
- hoodie.clustering.plan.strategy.sort.columns:clustering排序字段蒋情。多個字段使用逗號分隔。無默認值耸携。
- hoodie.clustering.updates.strategy:update策略棵癣。默認為org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy。clustering的時候拒絕更新夺衍。配置org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy可允許更新狈谊。
- hoodie.clustering.schedule.inline:是否啟用inline clustering排期。默認為false沟沙。
- hoodie.clustering.async.enabled:是否啟用async clustering河劝。默認為false。
- hoodie.layout.optimize.strategy:布局策略矛紫。使用LINEAR(線性)赎瞎,ZORDER還是HILBERT(希爾伯特曲線)。默認值是LINEAR含衔。
- hoodie.layout.optimize.curve.build.method:可配置DIRECT或者SAMPLE煎娇。
SpatialCurveCompositionStrategyType
中SAMPLE的數據排序分布效果較DIRECT更好二庵,但是執(zhí)行速度更慢。默認配置的是DIRECT類型缓呛。 - hoodie.layout.optimizebuild.curve.sample.size:對應SAMPLE類型催享,默認值為200000。
- hoodie.layout.optimize.data.skipping.enable:是否在布局優(yōu)化完成后收集統(tǒng)計信息來啟用數據跳過功能哟绊。默認為true因妙。
- hoodie.clustering.rollback.pending.replacecommit.on.conflict:默認值為false。如果允許對等待clustering的file group進行更新票髓,則應將此配置設置為回滾失敗或pending的clustering instants攀涵。 僅當插入或更新與pending clustering的file group存在沖突時,pending clustering才會被回滾洽沟。 在設置此配置時請務必謹慎以故,特別是在非常頻繁地執(zhí)行clustering操作時。這在極少數情況下可能導致競態(tài)條件裆操, 例如怒详,在獲取到實例后但回滾完成前clustering操作已完成。
離線觸發(fā)
使用Spark
任務提交命令如下:
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.15.0.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g
由于clustering的配置項較多踪区,可以把這些配置項寫在/path/to/config/clusteringjob.properties
文件中昆烁。例如:
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2
HoodieClusteringJob的參數如下:
參數名 | 是否必須 | 默認值 | 備注 |
---|---|---|---|
--base-path | 是 | - | Hudi表根目錄 |
--table-name | 是 | - | 表名 |
--instant-time | 否 | - | 只在execute模式有效。指定執(zhí)行哪個instant time的clustering缎岗。如果沒有指定静尼。執(zhí)行最早排期的clustering。使用scheduleAndExecute默認該配置項會被忽略传泊。 |
--parallelism | 否 | 1 | clustering并行度 |
--spark-master | 否 | - | Spark master |
--spark-memory | 否 | - | Spark內存 |
--retry | 否 | 0 | 重試次數 |
--skip-clean | 否 | true | clustering完畢之后是否跳過clean |
--retry-last-failed-clustering-job | 否 | false | 使用scheduleAndExecute有效鼠渺。是否重試最近失敗的clustering job |
--mode | 否 | - | schedule表示排期。execute表示執(zhí)行眷细。scheduleAndExecute表示排期并執(zhí)行 |
--help | 否 | false | 打印幫助信息 |
--job-max-processing-time-ms | 否 | 0 | 只有--retry-last-failed-clustering-job和scheduleAndExecute是否有效系冗。如果超過配置時間clustering job仍未完成。Hudi認為該job失敗并重新啟動 |
--props | 否 | - | clustering配置參數所在文件薪鹦。使用properties文件格式 |
--hoodie-conf | 否 | - | 額外的Hudi配置 |
使用Flink
Flink的HoodieFlinkClusteringJob
不僅有clustering,還包含了archive和clean操作惯豆。
任務提交命令如下:
./bin/flink run \
-c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
lib/hudi-flink1.17-bundle-0.15.0.jar \
--path hdfs://xxx:8020/table
參數解析:
參數名 | 是否必須 | 默認值 | 備注 |
---|---|---|---|
--path | 是 | - | Hudi表的根目錄 |
--clustering-delta-commits | 否 | 1 | 最多多少次commit觸發(fā)clustering池磁,控制clustering的頻率 |
--clustering-tasks | false | -1 | Clustering task 的并發(fā)數 |
--clean-policy | false | KEEP_LATEST_COMMITS | clean策略】蓿可以使用KEEP_LATEST_COMMITS地熄, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS |
--clean-retain-commits | 否 | 10 | 保留最近n個commit不被清理 |
--clean-retain-hours | 否 | 24 | 保留最近n小時的commit不被清理 |
--clean-retain-file-versions | 否 | 5 | 保留最近n個文件版本不被清理 |
--archive-min-commits | 否 | 20 | 歸檔commit前保留的最少commit數量 |
--archive-max-commits | 否 | 30 | 歸檔commit前保留的最多commit數量 |
--schedule | 否 | false | 是否排期clustering plan |
--instant-time | 否 | - | clustering instant time |
--clean-async-enabled | 否 | false | 是否啟用異步clean |
--plan-strategy-class | 否 | FlinkSizeBasedClusteringPlanStrategy | clustering策略類 |
--plan-partition-filter-mode | 否 | NONE | 分區(qū)過濾模式 |
--seq | 否 | FIFO | Clustering plan的執(zhí)行順序芯杀。 LIFO: 從最近的plan開始執(zhí)行端考, FIFO: 從最早的plan開始執(zhí)行 |
--target-file-max-bytes | 否 | 1GB | 最大目標文件 |
--small-file-limit | 否 | 600 | 小于該大小的文件會參與clustering |
--skip-from-latest-partitions | 否 | 0 | clustering跳過最近n個分區(qū) |
--sort-columns | 否 | - | clustering排序字段雅潭。多個字段使用逗號分隔 |
--sort-memory | 否 | 128 | 排序內存大小 |
--max-num-groups | 否 | 30 | Clustering group個數 |
--target-partitions | 否 | 2 | 參與clustering的分區(qū)數 |
--cluster-begin-partition | 否 | - | Clustering開始分區(qū) |
--cluster-end-partition | 否 | - | Clustering結束分區(qū) |
--partition-regex-pattern | 否 | - | 匹配該正則的partition參與clustering |
--partition-selected | 否 | - | 指定參與clustering的分區(qū) |
--service | 否 | false | 是否開啟 service 模式,service模式為常駐作業(yè) |
--min-clustering-interval-seconds | 否 | 600s | 異步clustering服務的最小時間間隔 |
--hoodie-conf | 否 | - | 額外的Hudi配置 |
--props | 否 | - | clustering等參數配置所在文件路徑 |
創(chuàng)建clustering執(zhí)行計劃
創(chuàng)建執(zhí)行計劃位于ClusteringPlanActionExecutor
類的execute
方法却特,代碼如下所示:
@Override
public Option<HoodieClusteringPlan> execute() {
// 創(chuàng)建執(zhí)行計劃
Option<HoodieClusteringPlan> planOption = createClusteringPlan();
// 如果計劃創(chuàng)建成功(可能存在沒有file slice需要cluster的情況)
if (planOption.isPresent()) {
// 創(chuàng)建clustering instant
// clustering instant的類型是replace commit扶供,意味這clustering之后的數據文件替換掉先前的
HoodieInstant clusteringInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
try {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.CLUSTER.name())
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.setClusteringPlan(planOption.get())
.build();
// 添加到pending commit中
table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
return planOption;
}
}
該方法創(chuàng)建clustering執(zhí)行計劃,然后再創(chuàng)建一個pending replace commit裂明。因為clutering完成之后椿浓,新生成的數據文件會替換掉原有的數據文件,因此對應的commit類型為replace闽晦。
繼續(xù)分析createClusteringPlan
方法扳碍。其中首先判斷是否滿足可執(zhí)行clustering的條件,然后獲取配置的clustering策略類仙蛉,創(chuàng)建clustering計劃笋敞。
Clustering并不是說每次schedule都必須要執(zhí)行。為了效率clustering要求至少要經過N次commit之后荠瘪,才會schedule夯巷。此限制通過配置項hoodie.clustering.inline.max.commits
或hoodie.clustering.async.max.commits
(分別對應inline和異步)來控制。如果滿足clustering條件巧还,通過hoodie.clustering.plan.strategy.class
配置的策略類生成執(zhí)行計劃鞭莽。
代碼如下所示:
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
// 獲取上一次clustering對應的instant
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();
// 獲取上次clustering之后提交的次數
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
// 讀取hoodie.clustering.inline.max.commits配置,默認為4
// 該配置項表示在上次clustering之后至少需要經歷幾次commit才能schedule下一次clustering
// 這里處理inline clustering的配置
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
// 同上麸祷,但這里處理異步clustering的配置
// 配置項為hoodie.clustering.async.max.commits澎怒,默認值4
if ((config.isAsyncClusteringEnabled() || config.scheduleInlineClustering()) && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
// 加載clustering策略類,對應配置項hoodie.clustering.plan.strategy.class
// 默認為SparkSizeBasedClusteringPlanStrategy
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);
// 生成clustering計劃
return strategy.generateClusteringPlan();
}
接著我們聚焦默認的策略SparkSizeBasedClusteringPlanStrategy
阶牍。該策略根據文件大小來決定文件數據是否參與clustering喷面。分析clustering計劃生成步驟。
generateClusteringPlan
方法位于SparkSizeBasedClusteringPlanStrategy
的父類PartitionAwareClusteringPlanStrategy
中走孽。該方法根據hoodie.clustering.plan.strategy.partition.selected
惧辈,hoodie.clustering.plan.strategy.partition.regex.pattern
和hoodie.clustering.plan.partition.filter.mode
條件過濾出符合要求的partition path。獲取它們包含的file slice磕瓷。從這些file slice中篩選出小文件(小于hoodie.clustering.plan.strategy.small.file.limit
的文件)盒齿。將這些按照clutering要求的group大小(hoodie.clustering.plan.strategy.max.bytes.per.group
),分成若干個group困食。Group數量上限為hoodie.clustering.plan.strategy.max.num.groups
边翁。此步驟對應小文件合并功能。
代碼如下所示:
@Override
public Option<HoodieClusteringPlan> generateClusteringPlan() {
if (!checkPrecondition()) {
return Option.empty();
}
// 獲取metaclient硕盹,用來操作metadata
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
// 獲取寫配置
HoodieWriteConfig config = getWriteConfig();
// 讀取配置項hoodie.clustering.plan.strategy.partition.selected
// 確定在哪些分區(qū)運行clustering
String partitionSelected = config.getClusteringPartitionSelected();
LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
List<String> partitionPaths;
// 如果沒有配置
if (StringUtils.isNullOrEmpty(partitionSelected)) {
// get matched partitions if set
// 讀取hoodie.clustering.plan.strategy.partition.regex.pattern配置
// 獲取正則表達式匹配的partition path
partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));
// filter the partition paths if needed to reduce list status
} else {
// 如果配置了partitionSelected符匾,優(yōu)先這個配置
partitionPaths = Arrays.asList(partitionSelected.split(","));
}
// 過濾需要clustering的分區(qū)
// 過濾策略對應配置項hoodie.clustering.plan.partition.filter.mode
// 可用策略為NONE,RECENT_DAYS瘩例,SELECTED_PARTITIONS和DAY_ROLLING
partitionPaths = filterPartitionPaths(partitionPaths);
LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
// 如果所有的分區(qū)都被排除了啊胶,返回空
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
return Option.empty();
}
// 排除掉分區(qū)中已經要做clustering的file group(pending狀態(tài))
// 篩選出小文件
// 決定小文件判斷閾值的配置項為hoodie.clustering.plan.strategy.small.file.limit
// 將其映射為HoodieClusteringGroup
// 映射邏輯后面分析
List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
.flatMap(
partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream()
.limit(getWriteConfig().getClusteringMaxNumGroups())
.collect(Collectors.toList());
if (clusteringGroups.isEmpty()) {
LOG.warn("No data available to cluster");
return Option.empty();
}
// 構造cluster策略
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
.setStrategyParams(getStrategyParams())
.build();
// 構造clustering計劃
return Option.of(HoodieClusteringPlan.newBuilder()
.setStrategy(strategy)
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(true)
.build());
}
上面的filterPartitionPaths
通過配置的hoodie.clustering.plan.partition.filter.mode
過濾出所需的partition甸各。具有有如下選項:
- NONE: 不過濾,返回所有partition path焰坪。
- RECENT_DAYS: 按照partition path倒序排序趣倾。跳過
hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions
個partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions
個partition琳彩。如果partition path是日期誊酌,可以實現(xiàn)過濾出最近N天的數據。 - SELECTED_PARTITIONS: 獲取
hoodie.clustering.plan.strategy.cluster.begin.partition
和hoodie.clustering.plan.strategy.cluster.end.partition
之間的分區(qū)露乏。 - DAY_ROLLING: 每次clustering一部分分區(qū)碧浊。如果分區(qū)的index對24取余等于排期時候當前時間的小時數,則該分區(qū)需要clustering瘟仿。
buildClusteringGroupsForPartition
方法將篩選出的file slice按照從小到大排序箱锐。然后按照clustering配置的group size和group數量條件,合并為clustering group劳较。
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
// 獲取寫入配置
HoodieWriteConfig writeConfig = getWriteConfig();
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
// Sort fileSlices before dividing, which makes dividing more compact
// file slice按照base file大小排序驹止,如果文件不存在,按照最大大小排序
List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
sortedFileSlices.sort((o1, o2) -> (int)
((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
long totalSizeSoFar = 0;
for (FileSlice currentSlice : sortedFileSlices) {
// 遍歷所有file slice
// 獲取當前file slice的大小观蜗,如果文件不存在臊恋,獲取大小上限
long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
// 如果本次累積的文件大小大于hoodie.clustering.plan.strategy.max.bytes.per.group
// 并且當前group不為空
if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
// totalSizeSoFar除以hoodie.clustering.plan.strategy.target.file.max.bytes向上取整
// 計算出輸出組編號
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
// 加入到fileSliceGroups集合中,保存結果
// 保存了輸出組組和輸出組編號
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
// 結果保存之后墓捻,清零currentGroup和totalSizeSoFar
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
// if fileSliceGroups's size reach the max group, stop loop
// 檢查file group個數是否超過了hoodie.clustering.plan.strategy.max.num.groups
// 超過的話退出循環(huán)抖仅,本次不再處理后面的file slice
if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {
LOG.info("Having generated the maximum number of groups : " + writeConfig.getClusteringMaxNumGroups());
break;
}
}
// Add to the current file-group
// 加入到當前文件組
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
// 累積大小到totalSizeSoFar變量
totalSizeSoFar += currentSize;
}
if (!currentGroup.isEmpty()) {
// 處理最后一個output group
// shouldClusteringSingleGroup在下面兩個配置項任意一個啟用的時候為true
// 表示只有一個輸出文件組的話,也clustering
// hoodie.clustering.plan.strategy.sort.columns
// hoodie.clustering.plan.strategy.single.group.clustering.enabled
if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
}
// 構造并返回fileSliceGroups
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
到此為止clustering計劃生成部分分析完畢砖第。
根據執(zhí)行計劃執(zhí)行clustering
Clustering的執(zhí)行開始于BaseHoodieWriteClient::cluster
撤卢。
在clustering之前,首先執(zhí)行preWrite
操作梧兼。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 創(chuàng)建hudi table放吩,根據引擎(Spark/Flink)和表類型(MOR/COW)的不同,有多種實現(xiàn)類
HoodieTable table = createTable(config, context.getHadoopConf().get());
// 執(zhí)行寫入前操作羽杰,包含:
// inflight和requested instant去掉本次instant
// 啟動clean和archive服務(如果開啟的話)
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
// 執(zhí)行clutering
return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}
接著是BaseHoodieTableServiceClient::cluster
方法渡紫。該方法檢測當前clustering是否已經pending,配置監(jiān)控考赛,執(zhí)行clustering并返回clustering執(zhí)行結果元數據腻惠。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 同上個方法,獲取table
HoodieTable<?, I, ?, T> table = createTable(config, context.getHadoopConf().get());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
// 檢查本次cluster是否已經pending狀態(tài)欲虚。如果是,需要回滾
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
// cluster時長計時器監(jiān)控
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at " + clusteringInstant);
// 調用table的cluster服務
HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant);
// 轉換metadata到對應計算引擎格式
HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(writeMetadata);
// Validation has to be done after cloning. if not, it could result in referencing the write status twice which means clustering could get executed twice.
// 檢查cluster寫入狀態(tài)不能為空
validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
// Publish file creation metrics for clustering.
// 讀取并返回監(jiān)控信息
if (config.isMetricsOn()) {
clusteringMetadata.getWriteStats()
.ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()
.filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null)
.map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime())
.forEach(metrics::updateClusteringFileCreationMetrics));
}
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
}
return clusteringMetadata;
}
以Spark為例悔雹,我們查看COW表的HoodieSparkCopyOnWriteTable::cluster
邏輯复哆。
public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,
String clusteringInstantTime) {
return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
此邏輯交由SparkExecuteClusteringCommitActionExecutor
執(zhí)行欣喧。繼續(xù)分析SparkExecuteClusteringCommitActionExecutor::execute
方法,它調用了BaseCommitActionExecutor::executeClustering
方法梯找。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return executeClustering(clusteringPlan);
}
BaseCommitActionExecutor::executeClustering
該方法反射加載hoodie.clustering.execution.strategy.class
配置項對應的clustering策略(默認為SparkSortAndSizeExecutionStrategy
)唆阿,然后執(zhí)行clustering。
protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
// 創(chuàng)建instant
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
// 標記instant為inflight狀態(tài)
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
// Disable auto commit. Strategy is only expected to write data in new files.
// 禁用自動commit
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
// 添加_hoodie_commit_time等5個元數據字段到schema中
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
// 加載hoodie.clustering.execution.strategy.class配置項對應的clustering策略類
// 執(zhí)行它的performClustering方法
// 對于默認的配置锈锤,clustering策略類為SparkSortAndSizeExecutionStrategy
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
// 獲取寫入狀態(tài)
HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
// 更新表索引驯鳖,更新數據所在位置
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
// 持久化保存
statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime));
// triggers clustering.
// 更新writeMetadata中的writestats
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
// 獲取clustering操作的數據文件file id和partition path
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
// 提交修改的writeMetadata,clustering對后續(xù)操作生效
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
Clustering的執(zhí)行細節(jié)位于策略類中久免。我們這里分析默認的策略類SparkSortAndSizeExecutionStrategy::performClustering
方法浅辙。該方法位于父類MultipleSparkJobExecutionStrategy::performClustering
中。該方法使用線程池阎姥,一個線程處理一個input group(對應執(zhí)行計劃中提到的clustering group)记舆,但線程數不能超過配置的最大值。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 是否保留元數據呼巴,默認為true
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
// 使用專門的線程執(zhí)行clustering泽腮。創(chuàng)建clustering線程池
// 取InputGroups數量(plan中clustering生成file group的數量)
// 最大值為hoodie.clustering.max.parallelism,最大值默認15
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
// execute clustering for each group async and collect WriteStatus
// 在線程池中執(zhí)行clustering衣赶,獲取執(zhí)行結果
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
// hoodie.datasource.write.row.writer.enable如果為true诊赊,使用Spark原生的Row類型,避免類型轉換引發(fā)的額外代價
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
return writeMetadata;
} finally {
clusteringExecutorService.shutdown();
}
}
我們繼續(xù)分析默認配置的執(zhí)行路線ClusteringPlanActionExecutor::runClusteringForGroupAsyncAsRow
府瞄。該方法獲取到所有需要clustering的數據到Spark的dataset碧磅,讀取表schema和各個file id從屬的partition path的對應關系。然后執(zhí)行clustering摘能。
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
Map<String, String> strategyParams,
boolean shouldPreserveHoodieMetadata,
String instantTime,
ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
// 獲取spark context
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 轉換所有clustering涉及到的數據為Spark DataSet
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
// 獲取帶有元數據字段的schema
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
// 轉換clustering的file slice為HoodieFileGroupId
// 保存的是partition path和file id的對應關系
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
// 執(zhí)行clustering
return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
}, clusteringExecutorService);
}
SparkSortAndSizeExecutionStrategy::performClusteringWithRecordsAsRow
方法獲取分區(qū)器续崖,將數據重新排序,最后使用批量插入的方式团搞,寫回parquet文件严望。
@Override
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime, Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
// 生成寫入配置,clustering輸出多少個file group就配置多少個bulk insert并行度
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
// 配置最大parquet文件大小為clustering目標文件最大上限
// 對應配置項為hoodie.clustering.plan.strategy.target.file.max.bytes
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
// 獲取分區(qū)器
BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);
// 使用分區(qū)器分區(qū)數據(數據重新排序)
Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);
// 將重排序之后的數據批量插入
return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,
partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
}
接下來分析的重點是clustering的另一個功能:將數據重排序逻恐。因此重點是分區(qū)器和分區(qū)器重排序的邏輯像吻。獲取分區(qū)器的邏輯位于它的父類MultipleSparkJobExecutionStrategy::getRowPartitioner
中。代碼如下:
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,
Schema schema,
boolean isRowPartitioner) {
// 獲取排序字段配置項
// 對應的配置項為hoodie.clustering.plan.strategy.sort.columns
// 使用逗號分隔
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
return orderByColumnsOpt.map(orderByColumns -> {
// 獲取hoodie.layout.optimize.strategy配置复隆,字段可使用zorder或者hilbert曲線排序或者linear線性排序
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
case ZORDER:
case HILBERT:
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElseGet(() -> isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));
}
對于使用Spark原生Row類型的情況拨匆,isRowPartitioner
為true
。如果使用ZORDER
或者HILBERT
排序策略挽拂,使用RowSpatialCurveSortPartitioner
惭每,LINEAR排序策略對應的是RowCustomColumnsSortPartitioner
。
接下來我們分別分析這兩個partitioner是如何對數據重排序的。
首先是RowSpatialCurveSortPartitioner::repartitionRecords
台腥,代碼如下:
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {
return reorder(records, outputPartitions);
}
repartitionRecords
調用了reorder
方法宏赘。
protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
// 檢查排序字段配置
if (orderByColumns.length == 0) {
// No-op
return dataset;
}
List<String> orderedCols = Arrays.asList(orderByColumns);
// curveCompositionStrategyType默認為DIRECT
switch (curveCompositionStrategyType) {
case DIRECT:
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
}
}
SpatialCurveCompositionStrategyType
中SAMPLE的數據排序分布效果較DIRECT更好,但是執(zhí)行速度更慢黎侈。默認配置的是DIRECT類型察署。
接下來分析DIRECT類型處理方式,對應的是SpaceCurveSortingHelper::orderDataFrameByMappingValues
峻汉。該方法首先判斷排序字段配置的合法性贴汪。然后將數據按照排序字段,使用Z曲線或者是Hilbert曲線重排序休吠。
public static Dataset<Row> orderDataFrameByMappingValues(
Dataset<Row> df,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
List<String> orderByCols,
int targetPartitionCount
) {
// 獲取字段名稱和StructField的對應關系
Map<String, StructField> columnsMap =
Arrays.stream(df.schema().fields())
.collect(Collectors.toMap(StructField::name, Function.identity()));
// 檢查排序字段是否出現(xiàn)在schema中
List<String> checkCols =
orderByCols.stream()
.filter(columnsMap::containsKey)
.collect(Collectors.toList());
// 如果沒有扳埂,說明排序字段配置有誤,跳過不再繼續(xù)執(zhí)行
if (orderByCols.size() != checkCols.size()) {
LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));
return df;
}
// In case when there's just one column to be ordered by, we can skip space-curve
// ordering altogether (since it will match linear ordering anyway)
// 如果排序字段只有一個蛛碌,沒必要使用空間曲線方式排序聂喇,直接使用Spark排序
if (orderByCols.size() == 1) {
String orderByColName = orderByCols.get(0);
LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));
// TODO validate if we need Spark to re-partition
return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));
}
// 字段個數
int fieldNum = df.schema().fields().length;
// 返回排序字段對應的index和字段信息對應關系
Map<Integer, StructField> fieldMap =
orderByCols.stream()
.collect(
Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));
JavaRDD<Row> sortedRDD;
// 根據布局優(yōu)化策略,排序RDD
switch (layoutOptStrategy) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));
}
// Compose new {@code StructType} for ordered RDDs
// 為排序后的RDD創(chuàng)建StructType(schema)
StructType newStructType = composeOrderedRDDStructType(df.schema());
// 返回dataset
return df.sparkSession()
.createDataFrame(sortedRDD, newStructType)
.drop("Index");
}
我們先看第一種情況蔚携,Z曲線排序希太。方法位于SpaceCurveSortingHelper::createZCurveSortedRDD
。
該方法將多個排序字段的值映射為8字節(jié)內容(多的截取少的補充)酝蜒,然后每個字段的字節(jié)內容各取一位拼接到一起誊辉,然后再各取第二位拼接……一直循環(huán),這個步驟稱之為二進制數據交織(interleaving)亡脑。將交織之后的值作為一個字段堕澄,拼接在數據中。然后按照該字段的內容排序霉咨。
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
// 將數據中每個排序字段的值填充為8字節(jié)內容
// 多的截取少的補充
byte[][] zBytes = fieldMap.entrySet().stream()
.map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueTo8Bytes(row, index, field.dataType());
})
.toArray(byte[][]::new);
// Interleave received bytes to produce Z-curve ordinal
// 將這些排序字段的值交織起來
// 比如有A蛙紫,B兩個排序字段。A字段值取1位途戒,然后取B字段值1位坑傅,然后A再取下一位,B取下一位喷斋,以此類推
byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);
// 追加zOrdinalBytes到Row
return appendToRow(row, zOrdinalBytes);
})
// 按照該字段的值(zOrdinalBytes唁毒,位于row的末尾,index正好是fieldNum)排序
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
第二種情況為Hilbert曲線星爪,對應方法為SpaceCurveSortingHelper::createHilbertSortedRDD
浆西。和ZOrder曲線排序處理邏輯基本相同,只是將Z曲線替換成了Hilbert曲線顽腾。
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
// NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized
// only once per partition
return originRDD.mapPartitions(rows -> {
// 創(chuàng)建hilbert fieldMap個數維度曲線
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
// 將row中的排序字段值映射為long類型
long[] longs = fieldMap.entrySet().stream()
.mapToLong(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueToLong(row, index, field.dataType());
})
.toArray();
// Map N-dimensional coordinates into position on the Hilbert curve
// 使用hilbert曲線索引上面的long值近零,結果作為后面的排序依據
byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
return appendToRow(row, hilbertCurvePosBytes);
}
};
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
和空間曲線的方式相比,LINEAR線性排序顯得較為簡單。代碼位于RowCustomColumnsSortPartitioner::repartitionRecords
久信。通過spark的sort算子按照配置的column排序猪瞬。
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
return records
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
.coalesce(outputSparkPartitions);
}
到這里為止分區(qū)器的邏輯分析完畢。