Datax任務(wù)分配規(guī)則

Datax根首先據(jù)配置文件,確定好channel的并發(fā)數(shù)目。然后將整個job分成一個個小的task,然后劃分成組瑰排。

channel數(shù)目的確定

如果指定字節(jié)數(shù)限速,則計算字節(jié)限速后的并發(fā)數(shù)目暖侨。如果指定記錄數(shù)限速椭住,則計算記錄數(shù)限速后的并發(fā)數(shù)目。再取兩者中最小的channel并發(fā)數(shù)目字逗。如果兩者限速都沒指定京郑,則看是否配置文件指定了channel并發(fā)數(shù)目。

比如以下面這個配置為例:

{
    "core": {
       "transport" : {
          "channel": {
             "speed": {
                "record": 100,
                "byte": 100
             }
          }
       }
    },
    "job": {
      "setting": {
        "speed": {
          "record": 500,
          "byte": 1000,
          "channel" : 1
        }
      }
    }
}

job里的是全局配置葫掉, core里的channel是單個channel的限制些举。首先計算按照字節(jié)數(shù)限速,channel的數(shù)目應(yīng)該為 500 / 100 = 5,然后按照記錄數(shù)限速俭厚, channel的數(shù)目應(yīng)該為 1000 / 100 = 10, 最后返回兩者的最小值 5户魏。雖然指定了channel為1, 但只有在沒有限速的條件下挪挤,才會使用叼丑。

adjustChannelNumber方法,實現(xiàn)了上述功能

private void adjustChannelNumber() {
    int needChannelNumberByByte = Integer.MAX_VALUE;
    int needChannelNumberByRecord = Integer.MAX_VALUE;

    // 是否指定字節(jié)數(shù)限速
    boolean isByteLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
    if (isByteLimit) {
        // 總的限速字節(jié)數(shù)
        long globalLimitedByteSpeed = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);

        // 單個Channel的字節(jié)數(shù)
        Long channelLimitedByteSpeed = this.configuration
                .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
        if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
            DataXException.asDataXException(
                    FrameworkErrorCode.CONFIG_ERROR,
                    "在有總bps限速條件下扛门,單個channel的bps值不能為空幢码,也不能為非正數(shù)");
        }
        // 計算所需要的channel數(shù)目
        needChannelNumberByByte =
                (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
        needChannelNumberByByte =
                needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
    }
    // 是否指定記錄數(shù)限速
    boolean isRecordLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
    if (isRecordLimit) {
        // 總的限速記錄數(shù)
        long globalLimitedRecordSpeed = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
        // 獲取單個channel的限定的記錄數(shù)
        Long channelLimitedRecordSpeed = this.configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
        if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
            DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
                    "在有總tps限速條件下,單個channel的tps值不能為空尖飞,也不能為非正數(shù)");
        }
        // 計算所需要的channel數(shù)目
        needChannelNumberByRecord =
                (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
        needChannelNumberByRecord =
                needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
        LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
    }

    // 取較小值
    this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
            needChannelNumberByByte : needChannelNumberByRecord;

    // 返回最小值
    if (this.needChannelNumber < Integer.MAX_VALUE) {
        return;
    }

    // 是否指定了channel數(shù)目
    boolean isChannelLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
    if (isChannelLimit) {
        this.needChannelNumber = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

        LOG.info("Job set Channel-Number to " + this.needChannelNumber
                + " channels.");

        return;
    }

    throw DataXException.asDataXException(
            FrameworkErrorCode.CONFIG_ERROR,
            "Job運行速度必須設(shè)置");
}

切分任務(wù)

JobContainer 的split負(fù)責(zé)將整個job切分成多個task症副,生成task配置的列表。

private int split() {
    // 計算所需的channel數(shù)目
    this.adjustChannelNumber();

    if (this.needChannelNumber <= 0) {
        this.needChannelNumber = 1;
    }
    // 生成任務(wù)的reader配置
    List<Configuration> readerTaskConfigs = this
            .doReaderSplit(this.needChannelNumber);
    int taskNumber = readerTaskConfigs.size();
    // 生成任務(wù)的writer配置
    List<Configuration> writerTaskConfigs = this
            .doWriterSplit(taskNumber);

    // 生成任務(wù)的transformer配置
    List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

    LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
    
    // 合并任務(wù)的reader政基,writer贞铣,transformer配置
    List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
            readerTaskConfigs, writerTaskConfigs, transformerList);
    LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
    
    // 將配置結(jié)果保存在job.content路徑下
    this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);

    return contentConfig.size();
}

執(zhí)行reader和writer最細(xì)粒度的切分,需要注意的是沮明,writer的切分結(jié)果要參照reader的切分結(jié)果辕坝,達到切分后數(shù)目相等,才能滿足1:1的通道模型.

reader的切分

doReaderSplit方法荐健, 調(diào)用Reader.Job的split方法酱畅,返回Reader.Task的Configuration列表

/**
* adviceNumber, 建議的數(shù)目
*/
private List<Configuration> doReaderSplit(int adviceNumber) {
    // 切換ClassLoader
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.READER, this.readerPluginName));
    // 調(diào)用Job.Reader的split切分
    List<Configuration> readerSlicesConfigs =
            this.jobReader.split(adviceNumber);
    if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                "reader切分的task數(shù)目不能小于等于0");
    }
    LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
            this.readerPluginName, readerSlicesConfigs.size());
    classLoaderSwapper.restoreCurrentThreadClassLoader();
    return readerSlicesConfigs;
}
 public List<Configuration> split(int adviceNumber) {

            LOG.info("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
            // warn:每個slice拖且僅拖一個文件,
            // int splitNumber = adviceNumber;
            int splitNumber = this.sourceFiles.size();
            if (0 == splitNumber) {
//                throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
//                        String.format("未能找到待讀取的文件,請確認(rèn)您的配置項path: %s", this.readerOriginConfig.getString(Key.PATH)));
                String message = String.format("未能找到待讀取的文件,請確認(rèn)您的配置項path: %s", this.readerOriginConfig.getString(Key.PATH));
                LOG.info(message);
                return new ArrayList<Configuration>();
            }

            List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
            for (List<String> files : splitedSourceFiles) {
                Configuration splitedConfig = this.readerOriginConfig.clone();
                splitedConfig.set(Constant.SOURCE_FILES, files);
                readerSplitConfigs.add(splitedConfig);
            }

            return readerSplitConfigs;
        }

private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) {
            List<List<T>> splitedList = new ArrayList<List<T>>();
            int averageLength = sourceList.size() / adviceNumber;
            averageLength = averageLength == 0 ? 1 : averageLength;

            for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {
                end = begin + averageLength;
                if (end > sourceList.size()) {
                    end = sourceList.size();
                }
                splitedList.add(sourceList.subList(begin, end));
            }
            return splitedList;
        }

這里的reader是hdfs reader 原文件數(shù),reader task數(shù)等于文件數(shù)江场。

Writer的切分?jǐn)?shù)

doWriterSplit方法纺酸, 調(diào)用Writer.JOb的split方法,返回Writer.Task的Configuration列表

private List<Configuration> doWriterSplit(int readerTaskNumber) {
    // 切換ClassLoader
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.WRITER, this.writerPluginName));
    // 調(diào)用Job.Reader的split切分
    List<Configuration> writerSlicesConfigs = this.jobWriter
            .split(readerTaskNumber);
    if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                "writer切分的task不能小于等于0");
    }
    LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
            this.writerPluginName, writerSlicesConfigs.size());
    classLoaderSwapper.restoreCurrentThreadClassLoader();

    return writerSlicesConfigs;
}

為了做到Reader址否、Writer任務(wù)數(shù)對等餐蔬,這里要求Writer插件必須按照源端的切分?jǐn)?shù)進行切分。否則框架報錯!
比如rediswriter

 public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) {
                configurations.add(getPluginJobConf());
            }
            return configurations;
        }
合并配置

合并reader樊诺,writer仗考,transformer配置列表。并將任務(wù)列表词爬,保存在配置job.content的值里秃嗜。

private List<Configuration> mergeReaderAndWriterTaskConfigs(
        List<Configuration> readerTasksConfigs,
        List<Configuration> writerTasksConfigs,
        List<Configuration> transformerConfigs) {
    // reader切分的任務(wù)數(shù)目必須等于writer切分的任務(wù)數(shù)目
    if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                String.format("reader切分的task數(shù)目[%d]不等于writer切分的task數(shù)目[%d].",
                        readerTasksConfigs.size(), writerTasksConfigs.size())
        );
    }

    List<Configuration> contentConfigs = new ArrayList<Configuration>();
    for (int i = 0; i < readerTasksConfigs.size(); i++) {
        Configuration taskConfig = Configuration.newDefault();
        // 保存reader相關(guān)配置
        taskConfig.set(CoreConstant.JOB_READER_NAME,
                this.readerPluginName);
        taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
                readerTasksConfigs.get(i));
        // 保存writer相關(guān)配置
        taskConfig.set(CoreConstant.JOB_WRITER_NAME,
                this.writerPluginName);
        taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
                writerTasksConfigs.get(i));
        // 保存transformer相關(guān)配置
        if(transformerConfigs!=null && transformerConfigs.size()>0){
            taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
        }

        taskConfig.set(CoreConstant.TASK_ID, i);
        contentConfigs.add(taskConfig);
    }

    return contentConfigs;
}
分配任務(wù)

分配算法

  • 首先根據(jù)指定的channel數(shù)目和每個Taskgroup的擁有channel數(shù)目,計算出Taskgroup的數(shù)目
  • 根據(jù)每個任務(wù)的reader.parameter.loadBalanceResourceMark將任務(wù)分組
  • 根據(jù)每個任務(wù)writer.parameter.loadBalanceResourceMark來講任務(wù)分組
  • 根據(jù)上面兩個任務(wù)分組的組數(shù)顿膨,挑選出大的那個組
  • 輪詢上面步驟的任務(wù)組痪寻,依次輪詢的向各個TaskGroup添加一個,直到所有任務(wù)都被分配完

這里舉個實例:
目前有7個task虽惭,channel有20個橡类,每個Taskgroup擁有5個channel。
首先計算出Taskgroup的數(shù)目芽唇, 20 / 5 = 4 顾画。(實際不會有這種情況,channel數(shù)不會超過task數(shù))

根據(jù)reader.parameter.loadBalanceResourceMark匆笤,將任務(wù)分組如下:

{
    "database_a" : [task_id_1, task_id_2],
    "database_b" : [task_id_3, task_id_4, task_id_5],
    "database_c" : [task_id_6, task_id_7]
}

根據(jù)writer.parameter.loadBalanceResourceMark研侣,將任務(wù)分組如下:

{
    "database_dst_d" : [task_id_1, task_id_2],
    "database_dst_e" : [task_id_3, task_id_4, task_id_5, task_id_6, task_id_7]
}

因為readerResourceMarkAndTaskIdMap有三個組,而writerResourceMarkAndTaskIdMap只有兩個組炮捧。從中選出組數(shù)最多的庶诡,所以這里按照readerResourceMarkAndTaskIdMap將任務(wù)分配。

執(zhí)行過程是咆课,輪詢database_a, database_b, database_c末誓,取出第一個。循環(huán)上一步

1. 取出task_id_1 放入 taskGroup_1
2. 取出task_id_3 放入 taskGroup_2
3. 取出task_id_6 放入 taskGroup_3
4. 取出task_id_2 放入 taskGroup_4
5. 取出task_id_4 放入 taskGroup_1
6. ………

最后返回的結(jié)果為

{
    "taskGroup_1": [task_id_1, task_id_4],
    "taskGroup_2": [task_id_3, task_id_7],
    "taskGroup_3": [task_id_6, task_id_5],
    "taskGroup_4": [task_id_2]
}

代碼解釋
任務(wù)的分配是由JobAssignUtil類負(fù)責(zé)书蚪。使用者調(diào)用assignFairly方法喇澡,傳入?yún)?shù),返回TaskGroup配置列表

public final class JobAssignUtil {
    /**
    * configuration 配置
    * channelNumber殊校, channel總數(shù)
    * channelsPerTaskGroup晴玖, 每個TaskGroup擁有的channel數(shù)目
    */
    public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
        
        List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
        // 計算TaskGroup的數(shù)目
        int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);

        ......
        // 任務(wù)分組
        LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
        // 調(diào)用doAssign方法,分配任務(wù)
        List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);

        // 調(diào)整 每個 taskGroup 對應(yīng)的 Channel 個數(shù)(屬于優(yōu)化范疇)
        adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
        return taskGroupConfig;
    }
}

任務(wù)分組
按照task配置的reader.parameter.loadBalanceResourceMark和writer.parameter.loadBalanceResourceMark为流,分別對任務(wù)進行分組呕屎,選擇分組數(shù)最高的那組,作為任務(wù)分組的源敬察。

/**
* contentConfig參數(shù)秀睛,task的配置列表
*/
private static LinkedHashMap<String, List<Integer>> parseAndGetResourceMarkAndTaskIdMap(List<Configuration> contentConfig) {
    // reader的任務(wù)分組,key為分組的名稱静汤,value是taskId的列表
    LinkedHashMap<String, List<Integer>> readerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
    // writer的任務(wù)分組琅催,key為分組的名稱,value是taskId的列表
    LinkedHashMap<String, List<Integer>> 
    writerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();

    for (Configuration aTaskConfig : contentConfig) {
        int taskId = aTaskConfig.getInt(CoreConstant.TASK_ID);
        
        // 取出reader.parameter.loadBalanceResourceMark的值虫给,作為分組名
        String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
        if (readerResourceMarkAndTaskIdMap.get(readerResourceMark) == null) {
            readerResourceMarkAndTaskIdMap.put(readerResourceMark, new LinkedList<Integer>());
        }
        // 把 readerResourceMark 加到 readerResourceMarkAndTaskIdMap 中
        readerResourceMarkAndTaskIdMap.get(readerResourceMark).add(taskId);

        // 取出writer.parameter.loadBalanceResourceMark的值藤抡,作為分組名
        String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
        if (writerResourceMarkAndTaskIdMap.get(writerResourceMark) == null) {
            writerResourceMarkAndTaskIdMap.put(writerResourceMark, new LinkedList<Integer>());
        }
        // 把 writerResourceMark 加到 writerResourceMarkAndTaskIdMap 中
        writerResourceMarkAndTaskIdMap.get(writerResourceMark).add(taskId);
    }

    // 選出reader和writer其中最大的
    if (readerResourceMarkAndTaskIdMap.size() >= writerResourceMarkAndTaskIdMap.size()) {
        // 采用 reader 對資源做的標(biāo)記進行 shuffle
        return readerResourceMarkAndTaskIdMap;
    } else {
        // 采用 writer 對資源做的標(biāo)記進行 shuffle
        return writerResourceMarkAndTaskIdMap;
    }
}

分配任務(wù)
將上一部任務(wù)的分組,劃分到每個taskGroup里

private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {
    List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);

    Configuration taskGroupTemplate = jobConfiguration.clone();
    taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);

    List<Configuration> result = new LinkedList<Configuration>();

    // 初始化taskGroupConfigList
    List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
    for (int i = 0; i < taskGroupNumber; i++) {
        taskGroupConfigList.add(new LinkedList<Configuration>());
    }

    // 取得resourceMarkAndTaskIdMap的值的最大個數(shù)
    int mapValueMaxLength = -1;

    List<String> resourceMarks = new ArrayList<String>();
    for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
        resourceMarks.add(entry.getKey());
        if (entry.getValue().size() > mapValueMaxLength) {
            mapValueMaxLength = entry.getValue().size();
        }
    }

    
    int taskGroupIndex = 0;
    // 執(zhí)行mapValueMaxLength次數(shù)抹估,每一次輪詢一遍resourceMarkAndTaskIdMap
    for (int i = 0; i < mapValueMaxLength; i++) {
        // 輪詢resourceMarkAndTaskIdMap
        for (String resourceMark : resourceMarks) {
            if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
                // 取出第一個
                int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
                // 輪詢的向taskGroupConfigList插入值
                taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
                // taskGroupIndex自增
                taskGroupIndex++;
                // 刪除第一個
                resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
            }
        }
    }

    Configuration tempTaskGroupConfig;
    for (int i = 0; i < taskGroupNumber; i++) {
        tempTaskGroupConfig = taskGroupTemplate.clone();
        // 設(shè)置TaskGroup的配置
        tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
        tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);

        result.add(tempTaskGroupConfig);
    }
    // 返回結(jié)果
    return result;
}

為組分配channel
上面已經(jīng)把任務(wù)劃分成多個組缠黍,為了每個組能夠均勻的分配channel,還需要調(diào)整药蜻。算法原理是瓷式,當(dāng)channel總的數(shù)目,不能整除TaskGroup的數(shù)目時语泽。多的余數(shù)個channel贸典,從中挑選出余數(shù)個TaskGroup,每個多分配一個踱卵。

比如現(xiàn)在有13個channel廊驼,然后taskgroup確有5個。那么首先每個組先分 13 / 5 = 2 個惋砂。那么還剩下多的3個chanel妒挎,分配給前面?zhèn)€taskgroup。

private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
    int taskGroupNumber = taskGroupConfig.size();
    int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
    int remainderChannelCount = channelNumber % taskGroupNumber;
    // 表示有 remainderChannelCount 個 taskGroup,其對應(yīng) Channel 個數(shù)應(yīng)該為:avgChannelsPerTaskGroup + 1西饵;
    // (taskGroupNumber - remainderChannelCount)個 taskGroup,其對應(yīng) Channel 個數(shù)應(yīng)該為:avgChannelsPerTaskGroup

    int i = 0;
    for (; i < remainderChannelCount; i++) {
        taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
    }

    for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
        taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末酝掩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子眷柔,更是在濱河造成了極大的恐慌期虾,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驯嘱,死亡現(xiàn)場離奇詭異彻消,居然都是意外死亡,警方通過查閱死者的電腦和手機宙拉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門宾尚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谢澈,你說我怎么就攤上這事煌贴。” “怎么了锥忿?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵牛郑,是天一觀的道長。 經(jīng)常有香客問我敬鬓,道長淹朋,這世上最難降的妖魔是什么笙各? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮础芍,結(jié)果婚禮上杈抢,老公的妹妹穿的比我還像新娘。我一直安慰自己仑性,他們只是感情好惶楼,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著诊杆,像睡著了一般歼捐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上晨汹,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天豹储,我揣著相機與錄音,去河邊找鬼淘这。 笑死颂翼,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的慨灭。 我是一名探鬼主播朦乏,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼氧骤!你這毒婦竟也來了呻疹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤筹陵,失蹤者是張志新(化名)和其女友劉穎刽锤,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體朦佩,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡并思,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了语稠。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宋彼。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖仙畦,靈堂內(nèi)的尸體忽然破棺而出输涕,到底是詐尸還是另有隱情,我是刑警寧澤慨畸,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布莱坎,位于F島的核電站,受9級特大地震影響寸士,放射性物質(zhì)發(fā)生泄漏檐什。R本人自食惡果不足惜碴卧,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望乃正。 院中可真熱鬧住册,春花似錦、人聲如沸烫葬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽搭综。三九已至,卻和暖如春划栓,著一層夾襖步出監(jiān)牢的瞬間兑巾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工忠荞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蒋歌,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓委煤,卻偏偏與公主長得像堂油,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子碧绞,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 開篇 ?最早接觸DataX是在前阿里同事在現(xiàn)在的公司引入的時候提到的府框,一直想抽空好好看看這部分代碼,因為DataX...
    晴天哥_王志閱讀 42,087評論 16 45
  • 概覽 DataX 是一個異構(gòu)數(shù)據(jù)源離線同步工具讥邻,致力于實現(xiàn)包括關(guān)系型數(shù)據(jù)庫(MySQL迫靖、Oracle等)、HDFS...
    tracy_668閱讀 699評論 0 1
  • 一兴使、DataX3.0概述 DataX 是一個異構(gòu)數(shù)據(jù)源離線同步工具系宜,致力于實現(xiàn)包括關(guān)系型數(shù)據(jù)庫(MySQL、Ora...
    勤奮的超跑閱讀 11,520評論 1 12
  • 目的這篇教程從用戶的角度出發(fā)发魄,全面地介紹了Hadoop Map/Reduce框架的各個方面盹牧。先決條件請先確認(rèn)Had...
    SeanC52111閱讀 1,725評論 0 1
  • 首夏猶清和,芳草亦未歇励幼。斗指東南欢策,夏風(fēng)初至,薔薇染墻赏淌,萬物繁榮踩寇。今日立夏,明凈馨雅六水,告別春天俺孙,不說再見辣卒。 在麥子開...
    窗花閱讀 383評論 0 2