MapReduce源碼分析——ReduceTask流程分析

前言

Reduce會(huì)從Mapper任務(wù)中拉取很多小文件,小文件內(nèi)部有序哲银,但是整體是沒序的,Reduce會(huì)合并小文件呻惕,然后套個(gè)歸并算法荆责,變成一個(gè)整體有序的文件。

Reducer 主要有3個(gè)基本的過程:

1.Shuffle階段
Reducer會(huì)通過網(wǎng)絡(luò)IO將Mapper端的排序輸出給復(fù)制過來亚脆。

2.Sort階段

  • 按key對(duì)reducer輸入進(jìn)行排序(因?yàn)椴煌膍apper可能輸出相同的key)
  • shuffle和sort階段同時(shí)發(fā)生做院,即在拉去mapper輸出時(shí),它們被合并濒持。

3.Reduce階段
在此階段中键耕,對(duì)排序輸入中的每個(gè)group調(diào)用reduce(object,iterable柑营,reducer.context)方法郁竟。reduce任務(wù)的輸出通常通過reducer.context.write(object棚亩,object)寫入記錄編寫器。reduce的輸出沒有重新排序勒虾。

源碼解析

1.Shuffle階段源碼分析

@Override
  @SuppressWarnings("unchecked")
  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    //發(fā)送task任務(wù)報(bào)告修然,與父進(jìn)程做交流
    TaskReporter reporter = startReporter(umbilical);
    //判斷用的是新的MapReduceAPI還是舊的API
    boolean useNewApi = job.getUseNewReducer();
    //核心代碼愕宋,初始化任務(wù)
    initialize(job, getJobID(), reporter, useNewApi);

    //Reduce任務(wù)有4種结榄,Job-setup Task, Job-cleanup Task, Task-cleanup Task和ReduceTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    // Initialize the codec
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    //使用的shuffle插件
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
    
    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
                    
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    //初始化shuffle插件角塑,核心代碼
    shuffleConsumerPlugin.init(shuffleContext);
    //跑shuflle核心代碼,此步驟蝙昙,會(huì)通過網(wǎng)絡(luò)IO將Map端的輸出給拉過來,并且進(jìn)行合并操作~~~
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();

    // sort is complete
    sortPhase.complete();                         
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();

    //分組比較
    RawComparator comparator = job.getOutputValueGroupingComparator();
     //如果前面3個(gè)任務(wù)都不是蒜撮,執(zhí)行的就是最主要的ReduceTask,根據(jù)新老API調(diào)用不同的方法
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }

首先是shuffle階段淀弹,Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程,通過HTTP方式請(qǐng)求MapTask所在的NodeManager以獲取輸出文件菌赖。

Reducer是如何知道要去哪些機(jī)器取數(shù)據(jù)呢琉用?

一旦map任務(wù)完成之后,就會(huì)通過常規(guī)心跳通知應(yīng)用程序的Application Master奴紧。reduce的一個(gè)線程會(huì)周期性地向master詢問黍氮,直到提取完所有數(shù)據(jù)沫浆。數(shù)據(jù)被reduce提走之后专执,map機(jī)器不會(huì)立刻刪除數(shù)據(jù)本股,這是為了預(yù)防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個(gè)作業(yè)完成之后才被刪除掉的苟径。


NodeManager需要為分區(qū)文件運(yùn)行reduce任務(wù)涩笤。并且reduce任務(wù)需要集群上若干個(gè)map任務(wù)的map輸出作為其特殊的分區(qū)文件蹬碧。而每個(gè)map任務(wù)的完成時(shí)間可能不同恩沽,因此只要有一個(gè)任務(wù)完成罗心,reduce任務(wù)就開始復(fù)制其輸出城瞎。

reduce任務(wù)有少量復(fù)制線程脖镀,因此能夠并行取得map輸出蜒灰。默認(rèn)線程數(shù)為5,但這個(gè)默認(rèn)值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進(jìn)行設(shè)置凸椿。

由于job的每一個(gè)map都會(huì)根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個(gè)partition脑漫,所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的启昧。所以密末,為了優(yōu)化reduce的執(zhí)行時(shí)間严里,hadoop中是等job的第一個(gè)map結(jié)束后追城,所有的reduce就開始嘗試從完成的mapper中下載該reduce對(duì)應(yīng)的partition的部分?jǐn)?shù)據(jù)(shuffle)座柱,因此map和reduce是交叉進(jìn)行的色洞。

 @Override
  public RawKeyValueIterator run() throws IOException, InterruptedException {
    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
    // on the ApplicationMaster when a thundering herd of reducers fetch events
    // TODO: This should not be necessary after HADOOP-8942
    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
        MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

    // Start the map-completion events fetcher thread
    final EventFetcher<K,V> eventFetcher = 
      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
          maxEventsToFetch);
    eventFetcher.start();
    
    // Start the map-output fetcher threads
    boolean isLocal = localMapFiles != null;
    final int numFetchers = isLocal ? 1 :
      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    if (isLocal) {
      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
          localMapFiles);
      fetchers[0].start();
    } else {
      for (int i=0; i < numFetchers; ++i) {
        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                       reporter, metrics, this, 
                                       reduceTask.getShuffleSecret());
        fetchers[i].start();
      }
    }
    
    // Wait for shuffle to complete successfully
    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
      reporter.progress();
      
      synchronized (this) {
        if (throwable != null) {
          throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                 throwable);
        }
      }
    }

    // Stop the event-fetcher thread
    eventFetcher.shutDown();
    
    // Stop the map-output fetcher threads
    for (Fetcher<K,V> fetcher : fetchers) {
      fetcher.shutDown();
    }
    
    // stop the scheduler
    scheduler.close();

    copyPhase.complete(); // copy is already complete
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);

    // Finish the on-going merges...
    RawKeyValueIterator kvIter = null;
    try {
      kvIter = merger.close();
    } catch (Throwable e) {
      throw new ShuffleError("Error while doing final merge " , e);
    }

    // Sanity check
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
    
    return kvIter;
  }

將Map端復(fù)制過來的數(shù)據(jù)先放入內(nèi)存緩沖區(qū)中
Merge有3種形式,分別是內(nèi)存到內(nèi)存奈搜,內(nèi)存到磁盤馋吗,磁盤到磁盤秋秤。默認(rèn)情況下第一種形式不啟用,第二種Merge方式一直在運(yùn)行(spill階段)直到結(jié)束堰怨,然后啟用第三種磁盤到磁盤的Merge方式生成最終的文件。(注意:為了合并灿巧,壓縮的map輸出都必須在內(nèi)存中被解壓縮)

一旦內(nèi)存緩沖區(qū)達(dá)到緩存溢出到磁盤的閾值時(shí)(默認(rèn)0.66)抠藕,或達(dá)到Map任務(wù)在緩存溢出前能夠保留在內(nèi)存中的輸出個(gè)數(shù)的閾值(默認(rèn)1000)盾似,則合并后溢出寫到磁盤中。如果指定combiner溉跃,則在合并期間運(yùn)行它以降低寫入硬盤的數(shù)據(jù)量撰茎。

隨著磁盤上副本的增多龄糊,后臺(tái)線程會(huì)將它們合并為更大的炫惩、排好序的文件诡必。這會(huì)為后面的合并節(jié)省一些時(shí)間爸舒。
復(fù)制完所有map輸出后扭勉,reduce任務(wù)進(jìn)入sort排序階段(更恰當(dāng)?shù)恼f法是merge合并階段涂炎,因?yàn)檫@個(gè)階段的主要工作是執(zhí)行了歸并排序)唱捣,循環(huán)進(jìn)行歸并排序(維持其順序排序)震缭。同時(shí)合并的文件流的數(shù)量由mapreduce.task.io.sort.factor屬性決定(默認(rèn)10)拣宰。

這里的merge和map端的merge動(dòng)作類似巡社,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中肥荔,然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才spill磁盤次企。這里的緩沖區(qū)大小要比map端的更為靈活缸棵,它基于JVM的heap size設(shè)置堵第。

這個(gè)內(nèi)存大小的控制就不像map一樣可以通過io.sort.mb來設(shè)定了隧出,而是通過另外一個(gè)參數(shù) mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源碼里面寫死了) 來設(shè)置针余,這個(gè)參數(shù)其實(shí)是一個(gè)百分比凄诞,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task伪朽。JVM的heapsize的70%汛蝙。內(nèi)存到磁盤merge的啟動(dòng)門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置烈涮。

也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.admin.reduce.child.java.opts來設(shè)置窖剑,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)坚洽。默認(rèn)情況下,reduce會(huì)使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)西土。假設(shè) mapreduce.reduce.shuffle.input.buffer.percent 為0.7讶舰,reducetask的max heapsize為1G,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右。這700M的內(nèi)存绘雁,跟map端一樣,也不是要等到全部寫滿才會(huì)往磁盤刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個(gè)百分比),就會(huì)開始往磁盤刷(刷磁盤前會(huì)先做sortMerge)杠娱。

這個(gè)限度閾值也是可以通過參數(shù) mapreduce.reduce.shuffle.merge.percent(default0.66)來設(shè)定刘离。與map 端類似恼除,這也是溢寫的過程,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的堵幽,然后在磁盤中生成了眾多的溢寫文件殴胧。這種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)磁盤到磁盤的merge方式生成最終的那個(gè)文件。

runNewReducer源碼解讀

void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    //真正的迭代器
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        return ret;
      }
    };
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               //構(gòu)建上下文的時(shí)候把迭代器傳進(jìn)來
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,//比較器
                                               valueClass);
    try {
      //構(gòu)建完上下文之后運(yùn)行Redude的Run方法
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }

首先這里會(huì)將茎匠,一些屬性封裝到reducerContext這個(gè)對(duì)象中汽馋,其中包括了最重要的rIter這個(gè)對(duì)象,這個(gè)迭代器對(duì)象復(fù)制真正的讀取數(shù)據(jù)的工作。之后調(diào)用的就是run方法,開始執(zhí)行setup方法以及我們自定義的reduce方法,下面先看看ReduceContextImpl中是如何完成迭代的习蓬!

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           //把迭代器傳給輸入對(duì)象Input
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(buffer);
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer);
    hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }
/** Start processing next unique key. */
  //實(shí)際上Reduce中run方法中的contect.netKey調(diào)用的邏輯
  public boolean nextKey() throws IOException,InterruptedException {
   ///第一次假 放空
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }

  /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    DataInputBuffer nextKey = input.getKey();
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }

    hasMore = input.next();
    if (hasMore) {
      nextKey = input.getKey();
     //判斷當(dāng)前key和下一個(gè)Key是否相等罗丰。
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }

  public KEYIN getCurrentKey() {
    return key;
  }

  @Override
  public VALUEIN getCurrentValue() {
    return value;
  }

接下來是Reduce中的run的方法

 public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
     //實(shí)際上在這一步里實(shí)際上調(diào)用了NextKeyValue的值更新了 hasmore栖疑,nextKeyisSame,Key锻霎,Value的值
      while (context.nextKey()) {       
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

nextKey()會(huì)調(diào)用真的迭代器的方法奄容,會(huì)對(duì)nextKeyIsSame進(jìn)行判斷,還對(duì)hasMore進(jìn)行判斷。
getValues()會(huì)返回一個(gè)痴荐,可迭代的對(duì)象,ValueIterable類型的员淫。ValueIterable中iterator又會(huì)返回一個(gè)ValueIterator迭代器對(duì)象拴事,下面看了看ValueIterator的源碼徘公。該類是ReduceContextImpl的內(nèi)部類关面。

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {

    private boolean inReset = false;
    private boolean clearMarkFlag = false;

    @Override
    public boolean hasNext() {
      try {
        if (inReset && backupStore.hasNext()) {
          return true;
        } 
      } catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException("hasNext failed", e);
      }
      return firstValue || nextKeyIsSame;
    }

    @Override
    public VALUEIN next() {
      if (inReset) {
        try {
          if (backupStore.hasNext()) {
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
        } catch (IOException e) {
          e.printStackTrace();
          throw new RuntimeException("next value iterator failed", e);
        }
      } 

      // if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // if this isn't the first record and the next key is different, they
      // can't advance it here.
      if (!nextKeyIsSame) {
        throw new NoSuchElementException("iterate past last value");
      }
      // otherwise, go to the next key/value pair
      try {
       //這個(gè)迭代器自身是沒有數(shù)據(jù)的内边,在Next中調(diào)用的還是 nextKeyValue,在這個(gè)NextKeyValue中調(diào)用的是Input的輸入數(shù)據(jù)
        nextKeyValue();
        return value;
      } catch (IOException ie) {
        throw new RuntimeException("next value iterator failed", ie);
      } catch (InterruptedException ie) {
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie);        
      }
    }

可以看到這個(gè)迭代器自身對(duì)數(shù)據(jù)的迭代是通過之前的真實(shí)迭代器對(duì)象rIter來完成的竿音,在Next中調(diào)用的還是rIter的 nextKeyValue方法萄涯,在這個(gè)NextKeyValue中調(diào)用的是Input的輸入數(shù)據(jù)燃乍。

真正迭代器中有一個(gè)重要的標(biāo)識(shí)NextKeyisSame,這個(gè)標(biāo)識(shí)會(huì)被hasNext方法用到然后判斷下一個(gè)key是否 相同蛮浑,直到一組數(shù)據(jù)唠叛。

nextKeyValue該方法會(huì)對(duì)key和value進(jìn)行賦值,同時(shí)調(diào)用hasmore對(duì)nextKeyIsSame進(jìn)行判定是否是true沮稚,之后調(diào)用分組比較器艺沼,返回0則為true。

這里需要注意蕴掏,自定義的reduce方法障般,如果迭代了value调鲸,每個(gè)value對(duì)應(yīng)的key也是會(huì)隨之迭代的。因?yàn)閗ey這塊是按引用傳遞剩拢。會(huì)改變同一塊內(nèi)存中的數(shù)據(jù)线得。也就是說通過值的迭代,也迭代key徐伐。

上面還需要注意的地方是贯钩。

如果用戶設(shè)置了比較器會(huì)用用戶自定義的分組比較器,如果用戶沒設(shè)置就用排序比較器當(dāng)做分組比較器办素,否則用默認(rèn)的key自帶的比較器角雷。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市性穿,隨后出現(xiàn)的幾起案子勺三,更是在濱河造成了極大的恐慌,老刑警劉巖需曾,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吗坚,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡呆万,警方通過查閱死者的電腦和手機(jī)商源,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谋减,“玉大人牡彻,你說我怎么就攤上這事〕龅” “怎么了庄吼?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長严就。 經(jīng)常有香客問我总寻,道長,這世上最難降的妖魔是什么梢为? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任废菱,我火速辦了婚禮,結(jié)果婚禮上抖誉,老公的妹妹穿的比我還像新娘殊轴。我一直安慰自己,他們只是感情好袒炉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布旁理。 她就那樣靜靜地躺著,像睡著了一般我磁。 火紅的嫁衣襯著肌膚如雪孽文。 梳的紋絲不亂的頭發(fā)上驻襟,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音芋哭,去河邊找鬼沉衣。 笑死,一個(gè)胖子當(dāng)著我的面吹牛减牺,可吹牛的內(nèi)容都是我干的豌习。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼拔疚,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼肥隆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起稚失,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤栋艳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后句各,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吸占,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年凿宾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了矾屯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡菌湃,死狀恐怖问拘,靈堂內(nèi)的尸體忽然破棺而出遍略,到底是詐尸還是另有隱情惧所,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布绪杏,位于F島的核電站下愈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蕾久。R本人自食惡果不足惜势似,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望僧著。 院中可真熱鬧履因,春花似錦、人聲如沸盹愚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽皆怕。三九已至毅舆,卻和暖如春西篓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背憋活。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工岂津, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人悦即。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓吮成,卻偏偏與公主長得像,于是被迫代替她去往敵國和親盐欺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子赁豆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 往往一個(gè)人的時(shí)候,靜靜的腦海中會(huì)浮現(xiàn)出很多回憶冗美,模模糊糊的回想起魔种,不禁嘴角浮現(xiàn)出一絲微笑。 記得小時(shí)候粉洼,家里還沒有...
    豆砸_e499閱讀 308評(píng)論 1 2
  • “信息孤島”是指相互之間在功能上不關(guān)聯(lián)互助节预、信息不共享互換以及信息與業(yè)務(wù)流程和應(yīng)用相互脫節(jié)的計(jì)算機(jī)應(yīng)用系統(tǒng)∈羧停“孤島...
    互加計(jì)劃_陳嘉誼閱讀 2,405評(píng)論 11 15
  • S君的侄兒小時(shí)候安拟,在姥姥家里,有一天宵喂,陽光明媚糠赦,姥姥心情顯得好了些,于是一個(gè)早上姥爺都沒有被責(zé)怪過锅棕。姥爺開心地告訴...
    佳佳的寶瓶子閱讀 318評(píng)論 0 0