Dataset和RDD中的coalesce和repartition

Dataset

 /**
   * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
   * are requested. If a larger number of partitions is requested, it will stay at the current
   * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
   * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
   * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can call repartition. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * @group typedrel
   * @since 1.6.0
   */
  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = false, logicalPlan)
  }
  /**
   * Returns a new Dataset that has exactly `numPartitions` partitions.
   *
   * @group typedrel
   * @since 1.6.0
   */
  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
  }

coalesce不進(jìn)行shuffle朝捆,分區(qū)從大變小谢谦,可以用來解決小文件問題
repartition進(jìn)行shuffle孩饼,增大分區(qū)數(shù)亲铡,提高程序并行度

RDD

/**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions. If a larger number
   * of partitions is requested, it will stay at the current number of partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * @note With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner. The optional partition coalescer
   * passed in must be serializable.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末才写,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奴愉,更是在濱河造成了極大的恐慌琅摩,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锭硼,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蜕劝,警方通過查閱死者的電腦和手機(jī)檀头,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來岖沛,“玉大人暑始,你說我怎么就攤上這事∮は鳎” “怎么了廊镜?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長唉俗。 經(jīng)常有香客問我嗤朴,道長配椭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任雹姊,我火速辦了婚禮股缸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘吱雏。我一直安慰自己敦姻,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布歧杏。 她就那樣靜靜地躺著镰惦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪犬绒。 梳的紋絲不亂的頭發(fā)上陨献,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音懂更,去河邊找鬼眨业。 笑死,一個(gè)胖子當(dāng)著我的面吹牛沮协,可吹牛的內(nèi)容都是我干的龄捡。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼慷暂,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼聘殖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起行瑞,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤奸腺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后血久,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體突照,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年氧吐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了讹蘑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡筑舅,死狀恐怖座慰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情翠拣,我是刑警寧澤版仔,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響蛮粮,放射性物質(zhì)發(fā)生泄漏益缎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一蝉揍、第九天 我趴在偏房一處隱蔽的房頂上張望链峭。 院中可真熱鬧,春花似錦又沾、人聲如沸弊仪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽励饵。三九已至,卻和暖如春滑燃,著一層夾襖步出監(jiān)牢的瞬間役听,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工表窘, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留典予,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓乐严,卻偏偏與公主長得像瘤袖,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子昂验,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容