Datax根首先據(jù)配置文件,確定好channel的并發(fā)數(shù)目。然后將整個job分成一個個小的task,然后劃分成組瑰排。
channel數(shù)目的確定
如果指定字節(jié)數(shù)限速,則計算字節(jié)限速后的并發(fā)數(shù)目暖侨。如果指定記錄數(shù)限速椭住,則計算記錄數(shù)限速后的并發(fā)數(shù)目。再取兩者中最小的channel并發(fā)數(shù)目字逗。如果兩者限速都沒指定京郑,則看是否配置文件指定了channel并發(fā)數(shù)目。
比如以下面這個配置為例:
{
"core": {
"transport" : {
"channel": {
"speed": {
"record": 100,
"byte": 100
}
}
}
},
"job": {
"setting": {
"speed": {
"record": 500,
"byte": 1000,
"channel" : 1
}
}
}
}
job里的是全局配置葫掉, core里的channel是單個channel的限制些举。首先計算按照字節(jié)數(shù)限速,channel的數(shù)目應(yīng)該為 500 / 100 = 5,然后按照記錄數(shù)限速俭厚, channel的數(shù)目應(yīng)該為 1000 / 100 = 10, 最后返回兩者的最小值 5户魏。雖然指定了channel為1, 但只有在沒有限速的條件下挪挤,才會使用叼丑。
adjustChannelNumber方法,實現(xiàn)了上述功能
private void adjustChannelNumber() {
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
// 是否指定字節(jié)數(shù)限速
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit) {
// 總的限速字節(jié)數(shù)
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 單個Channel的字節(jié)數(shù)
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有總bps限速條件下扛门,單個channel的bps值不能為空幢码,也不能為非正數(shù)");
}
// 計算所需要的channel數(shù)目
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
}
// 是否指定記錄數(shù)限速
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit) {
// 總的限速記錄數(shù)
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
// 獲取單個channel的限定的記錄數(shù)
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有總tps限速條件下,單個channel的tps值不能為空尖飞,也不能為非正數(shù)");
}
// 計算所需要的channel數(shù)目
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}
// 取較小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
// 返回最小值
if (this.needChannelNumber < Integer.MAX_VALUE) {
return;
}
// 是否指定了channel數(shù)目
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job運行速度必須設(shè)置");
}
切分任務(wù)
JobContainer 的split負(fù)責(zé)將整個job切分成多個task症副,生成task配置的列表。
private int split() {
// 計算所需的channel數(shù)目
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}
// 生成任務(wù)的reader配置
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
// 生成任務(wù)的writer配置
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
// 生成任務(wù)的transformer配置
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
// 合并任務(wù)的reader政基,writer贞铣,transformer配置
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
// 將配置結(jié)果保存在job.content路徑下
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
return contentConfig.size();
}
執(zhí)行reader和writer最細(xì)粒度的切分,需要注意的是沮明,writer的切分結(jié)果要參照reader的切分結(jié)果辕坝,達到切分后數(shù)目相等,才能滿足1:1的通道模型.
reader的切分
doReaderSplit方法荐健, 調(diào)用Reader.Job的split方法酱畅,返回Reader.Task的Configuration列表
/**
* adviceNumber, 建議的數(shù)目
*/
private List<Configuration> doReaderSplit(int adviceNumber) {
// 切換ClassLoader
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
// 調(diào)用Job.Reader的split切分
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"reader切分的task數(shù)目不能小于等于0");
}
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
}
public List<Configuration> split(int adviceNumber) {
LOG.info("split() begin...");
List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
// warn:每個slice拖且僅拖一個文件,
// int splitNumber = adviceNumber;
int splitNumber = this.sourceFiles.size();
if (0 == splitNumber) {
// throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
// String.format("未能找到待讀取的文件,請確認(rèn)您的配置項path: %s", this.readerOriginConfig.getString(Key.PATH)));
String message = String.format("未能找到待讀取的文件,請確認(rèn)您的配置項path: %s", this.readerOriginConfig.getString(Key.PATH));
LOG.info(message);
return new ArrayList<Configuration>();
}
List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
for (List<String> files : splitedSourceFiles) {
Configuration splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(Constant.SOURCE_FILES, files);
readerSplitConfigs.add(splitedConfig);
}
return readerSplitConfigs;
}
private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) {
List<List<T>> splitedList = new ArrayList<List<T>>();
int averageLength = sourceList.size() / adviceNumber;
averageLength = averageLength == 0 ? 1 : averageLength;
for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {
end = begin + averageLength;
if (end > sourceList.size()) {
end = sourceList.size();
}
splitedList.add(sourceList.subList(begin, end));
}
return splitedList;
}
這里的reader是hdfs reader 原文件數(shù),reader task數(shù)等于文件數(shù)江场。
Writer的切分?jǐn)?shù)
doWriterSplit方法纺酸, 調(diào)用Writer.JOb的split方法,返回Writer.Task的Configuration列表
private List<Configuration> doWriterSplit(int readerTaskNumber) {
// 切換ClassLoader
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
// 調(diào)用Job.Reader的split切分
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"writer切分的task不能小于等于0");
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
}
為了做到Reader址否、Writer任務(wù)數(shù)對等餐蔬,這里要求Writer插件必須按照源端的切分?jǐn)?shù)進行切分。否則框架報錯!
比如rediswriter
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(getPluginJobConf());
}
return configurations;
}
合并配置
合并reader樊诺,writer仗考,transformer配置列表。并將任務(wù)列表词爬,保存在配置job.content的值里秃嗜。
private List<Configuration> mergeReaderAndWriterTaskConfigs(
List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs) {
// reader切分的任務(wù)數(shù)目必須等于writer切分的任務(wù)數(shù)目
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
String.format("reader切分的task數(shù)目[%d]不等于writer切分的task數(shù)目[%d].",
readerTasksConfigs.size(), writerTasksConfigs.size())
);
}
List<Configuration> contentConfigs = new ArrayList<Configuration>();
for (int i = 0; i < readerTasksConfigs.size(); i++) {
Configuration taskConfig = Configuration.newDefault();
// 保存reader相關(guān)配置
taskConfig.set(CoreConstant.JOB_READER_NAME,
this.readerPluginName);
taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
readerTasksConfigs.get(i));
// 保存writer相關(guān)配置
taskConfig.set(CoreConstant.JOB_WRITER_NAME,
this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
// 保存transformer相關(guān)配置
if(transformerConfigs!=null && transformerConfigs.size()>0){
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
}
return contentConfigs;
}
分配任務(wù)
分配算法
- 首先根據(jù)指定的channel數(shù)目和每個Taskgroup的擁有channel數(shù)目,計算出Taskgroup的數(shù)目
- 根據(jù)每個任務(wù)的reader.parameter.loadBalanceResourceMark將任務(wù)分組
- 根據(jù)每個任務(wù)writer.parameter.loadBalanceResourceMark來講任務(wù)分組
- 根據(jù)上面兩個任務(wù)分組的組數(shù)顿膨,挑選出大的那個組
- 輪詢上面步驟的任務(wù)組痪寻,依次輪詢的向各個TaskGroup添加一個,直到所有任務(wù)都被分配完
這里舉個實例:
目前有7個task虽惭,channel有20個橡类,每個Taskgroup擁有5個channel。
首先計算出Taskgroup的數(shù)目芽唇, 20 / 5 = 4 顾画。(實際不會有這種情況,channel數(shù)不會超過task數(shù))
根據(jù)reader.parameter.loadBalanceResourceMark匆笤,將任務(wù)分組如下:
{
"database_a" : [task_id_1, task_id_2],
"database_b" : [task_id_3, task_id_4, task_id_5],
"database_c" : [task_id_6, task_id_7]
}
根據(jù)writer.parameter.loadBalanceResourceMark研侣,將任務(wù)分組如下:
{
"database_dst_d" : [task_id_1, task_id_2],
"database_dst_e" : [task_id_3, task_id_4, task_id_5, task_id_6, task_id_7]
}
因為readerResourceMarkAndTaskIdMap有三個組,而writerResourceMarkAndTaskIdMap只有兩個組炮捧。從中選出組數(shù)最多的庶诡,所以這里按照readerResourceMarkAndTaskIdMap將任務(wù)分配。
執(zhí)行過程是咆课,輪詢database_a, database_b, database_c末誓,取出第一個。循環(huán)上一步
1. 取出task_id_1 放入 taskGroup_1
2. 取出task_id_3 放入 taskGroup_2
3. 取出task_id_6 放入 taskGroup_3
4. 取出task_id_2 放入 taskGroup_4
5. 取出task_id_4 放入 taskGroup_1
6. ………
最后返回的結(jié)果為
{
"taskGroup_1": [task_id_1, task_id_4],
"taskGroup_2": [task_id_3, task_id_7],
"taskGroup_3": [task_id_6, task_id_5],
"taskGroup_4": [task_id_2]
}
代碼解釋
任務(wù)的分配是由JobAssignUtil類負(fù)責(zé)书蚪。使用者調(diào)用assignFairly方法喇澡,傳入?yún)?shù),返回TaskGroup配置列表
public final class JobAssignUtil {
/**
* configuration 配置
* channelNumber殊校, channel總數(shù)
* channelsPerTaskGroup晴玖, 每個TaskGroup擁有的channel數(shù)目
*/
public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
// 計算TaskGroup的數(shù)目
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
......
// 任務(wù)分組
LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
// 調(diào)用doAssign方法,分配任務(wù)
List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
// 調(diào)整 每個 taskGroup 對應(yīng)的 Channel 個數(shù)(屬于優(yōu)化范疇)
adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
return taskGroupConfig;
}
}
任務(wù)分組
按照task配置的reader.parameter.loadBalanceResourceMark和writer.parameter.loadBalanceResourceMark为流,分別對任務(wù)進行分組呕屎,選擇分組數(shù)最高的那組,作為任務(wù)分組的源敬察。
/**
* contentConfig參數(shù)秀睛,task的配置列表
*/
private static LinkedHashMap<String, List<Integer>> parseAndGetResourceMarkAndTaskIdMap(List<Configuration> contentConfig) {
// reader的任務(wù)分組,key為分組的名稱静汤,value是taskId的列表
LinkedHashMap<String, List<Integer>> readerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
// writer的任務(wù)分組琅催,key為分組的名稱,value是taskId的列表
LinkedHashMap<String, List<Integer>>
writerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
for (Configuration aTaskConfig : contentConfig) {
int taskId = aTaskConfig.getInt(CoreConstant.TASK_ID);
// 取出reader.parameter.loadBalanceResourceMark的值虫给,作為分組名
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
if (readerResourceMarkAndTaskIdMap.get(readerResourceMark) == null) {
readerResourceMarkAndTaskIdMap.put(readerResourceMark, new LinkedList<Integer>());
}
// 把 readerResourceMark 加到 readerResourceMarkAndTaskIdMap 中
readerResourceMarkAndTaskIdMap.get(readerResourceMark).add(taskId);
// 取出writer.parameter.loadBalanceResourceMark的值藤抡,作為分組名
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
if (writerResourceMarkAndTaskIdMap.get(writerResourceMark) == null) {
writerResourceMarkAndTaskIdMap.put(writerResourceMark, new LinkedList<Integer>());
}
// 把 writerResourceMark 加到 writerResourceMarkAndTaskIdMap 中
writerResourceMarkAndTaskIdMap.get(writerResourceMark).add(taskId);
}
// 選出reader和writer其中最大的
if (readerResourceMarkAndTaskIdMap.size() >= writerResourceMarkAndTaskIdMap.size()) {
// 采用 reader 對資源做的標(biāo)記進行 shuffle
return readerResourceMarkAndTaskIdMap;
} else {
// 采用 writer 對資源做的標(biāo)記進行 shuffle
return writerResourceMarkAndTaskIdMap;
}
}
分配任務(wù)
將上一部任務(wù)的分組,劃分到每個taskGroup里
private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {
List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
Configuration taskGroupTemplate = jobConfiguration.clone();
taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);
List<Configuration> result = new LinkedList<Configuration>();
// 初始化taskGroupConfigList
List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
for (int i = 0; i < taskGroupNumber; i++) {
taskGroupConfigList.add(new LinkedList<Configuration>());
}
// 取得resourceMarkAndTaskIdMap的值的最大個數(shù)
int mapValueMaxLength = -1;
List<String> resourceMarks = new ArrayList<String>();
for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
resourceMarks.add(entry.getKey());
if (entry.getValue().size() > mapValueMaxLength) {
mapValueMaxLength = entry.getValue().size();
}
}
int taskGroupIndex = 0;
// 執(zhí)行mapValueMaxLength次數(shù)抹估,每一次輪詢一遍resourceMarkAndTaskIdMap
for (int i = 0; i < mapValueMaxLength; i++) {
// 輪詢resourceMarkAndTaskIdMap
for (String resourceMark : resourceMarks) {
if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
// 取出第一個
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
// 輪詢的向taskGroupConfigList插入值
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
// taskGroupIndex自增
taskGroupIndex++;
// 刪除第一個
resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}
Configuration tempTaskGroupConfig;
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
// 設(shè)置TaskGroup的配置
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
result.add(tempTaskGroupConfig);
}
// 返回結(jié)果
return result;
}
為組分配channel
上面已經(jīng)把任務(wù)劃分成多個組缠黍,為了每個組能夠均勻的分配channel,還需要調(diào)整药蜻。算法原理是瓷式,當(dāng)channel總的數(shù)目,不能整除TaskGroup的數(shù)目時语泽。多的余數(shù)個channel贸典,從中挑選出余數(shù)個TaskGroup,每個多分配一個踱卵。
比如現(xiàn)在有13個channel廊驼,然后taskgroup確有5個。那么首先每個組先分 13 / 5 = 2 個惋砂。那么還剩下多的3個chanel妒挎,分配給前面?zhèn)€taskgroup。
private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
int taskGroupNumber = taskGroupConfig.size();
int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
int remainderChannelCount = channelNumber % taskGroupNumber;
// 表示有 remainderChannelCount 個 taskGroup,其對應(yīng) Channel 個數(shù)應(yīng)該為:avgChannelsPerTaskGroup + 1西饵;
// (taskGroupNumber - remainderChannelCount)個 taskGroup,其對應(yīng) Channel 個數(shù)應(yīng)該為:avgChannelsPerTaskGroup
int i = 0;
for (; i < remainderChannelCount; i++) {
taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
}
for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
}
}