Spark系列——關(guān)于 mapPartitions的誤區(qū)

前言

今天 Review 了一下同事的代碼仿粹,
發(fā)現(xiàn)其代碼中有非常多的 mapPartitions,
問其原因眷篇,他說性能比 map 更好例诀。
我說為什么性能好呢朽肥?
于是就有了這篇文章

網(wǎng)上推崇 mapPartitions 的原因

按照某些文章的原話來說
一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù)菇民,而不是一次函數(shù)調(diào)用處理一條尽楔,性能相對(duì)來說會(huì)高一些。
又比如說:
如果是普通的map第练,比如一個(gè)partition中有1萬條數(shù)據(jù)阔馋;
那么你的function要執(zhí)行和計(jì)算1萬次。
但是娇掏,使用MapPartitions操作之后呕寝,
一個(gè)task僅僅會(huì)執(zhí)行一次function,
function一次接收所有的partition數(shù)據(jù)婴梧。
只要執(zhí)行一次就可以了下梢,性能比較高
這種說法如果按照上面的方式來理解其實(shí)也是那么一回事,
但是也很容易讓一些新人理解為:
map要執(zhí)行1萬次塞蹭,而 MapPartitions 只需要一次孽江,這速度杠杠的提升了啊
實(shí)際上,你使用MapPartitions迭代的時(shí)候番电,
還是是一條條數(shù)據(jù)處理的竟坛,這個(gè)次數(shù)其實(shí)完全沒變。

其實(shí)這個(gè)問題我們可以來看看源碼
map算子源碼

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

接受用戶傳入的一個(gè)函數(shù),
new 一個(gè) MapPartitionsRDD 對(duì)象担汤,
我們的函數(shù)是作用在 MapPartitionsRDD 的迭代器 iter 上涎跨。

mapPartition算子源碼

  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

接受一個(gè)迭代器,
new 一個(gè) MapPartitionsRDD 對(duì)象崭歧,
傳入的迭代器是作為 MapPartitionsRDD 的迭代器隅很。

說白了,這個(gè)兩算子真沒什么差率碾,
map 算子可以理解為 mapPartitions 的一個(gè)高級(jí)封裝而已叔营。

mapPartitions 帶來的問題

其實(shí)就我個(gè)人經(jīng)驗(yàn)來看,
mapPartitions 的正確使用其實(shí)并不會(huì)造成什么大的問題所宰,
當(dāng)然我也沒看出普通場(chǎng)景 mapPartitions 比 map 有什么優(yōu)勢(shì)绒尊,
所以 完全沒必要刻意使用 mapPartitions
反而,mapPartitions 會(huì)帶來一些問題仔粥。

  1. 使用起來并不是很方便婴谱,這個(gè)寫過代碼的人應(yīng)該都知道。
    當(dāng)然這個(gè)問題并不是不能解決躯泰,我們可以寫類似下面的代碼谭羔,
    確實(shí)也變的和 map 簡(jiǎn)潔性也差不太多,
    恩麦向,我不會(huì)告訴你可以嘗試在生產(chǎn)環(huán)境中用用噢瘟裸。
    //抽象出一個(gè)函數(shù),以后所有的 mapPartitions 都可以用
    def mapFunc[T, U](iterator: Iterator[T], f: T => U) = {
      iterator.map(x => {
        f(x)
      })
    }
    //使用    
    rdd.mapPartitions(x => {
        mapFunc(x, line => {
            s"${line}轉(zhuǎn)換數(shù)據(jù)"
        })
      })
    
    
  2. 容易造成 OOM诵竭,這個(gè)也是很多博客提到的問題话告,
    他們大致會(huì)寫出如下的代碼來做測(cè)試,
    rdd.mapPartitions(x => {
        xxxx操作
       while (x.hasNext){
         val next = x.next()
       }
        xxx操作
      })
    
    如果你的代碼是上面那樣卵慰,那OOM也就不足為奇了沙郭,
    不知道你注意到了沒有,mapPartitions 是接受一個(gè)迭代器呵燕,
    再返回一個(gè)迭代器的棠绘,
    如果你這么寫代碼,就完全沒有使用到迭代器的懶執(zhí)行特性再扭。
    將數(shù)據(jù)都堆積到了內(nèi)存氧苍,
    真就變成了一次處理一個(gè)partition的數(shù)據(jù)了,
    在某種程度上已經(jīng)破壞了 Spark Pipeline 的計(jì)算模式了泛范。

mapPartitions 到底該怎么用

一對(duì)一的普通使用

存在即是道理让虐,
雖然上面一直在吐槽,
但是其確實(shí)有存在的理由罢荡。
其一個(gè)分區(qū)只會(huì)被調(diào)用一次的特性赡突,
在一些寫數(shù)據(jù)庫的時(shí)候確實(shí)很有幫助对扶,
因?yàn)槲覀兊?Spark 是分布式執(zhí)行的,
所以連接數(shù)據(jù)庫的操作必須放到算子內(nèi)部才能正確的被Executor執(zhí)行惭缰,
那么 mapPartitions 就顯示比 map 要有優(yōu)勢(shì)的多了浪南。
比如下面這段偽代碼

rdd.mapPartitions(x => {
        println("連接數(shù)據(jù)庫")
        val res = x.map(line=>{
          print("寫入數(shù)據(jù):" + line)
          line
        })
        res
      })

這樣我就一個(gè)分區(qū)只要連接一次數(shù)據(jù)庫,
而如果是 map 算子漱受,那可能就要連接 n 多次了络凿。
不過上面這種就沒法關(guān)閉數(shù)據(jù)庫連接了,
所以可以換另外一種方式:

rdd1.mapPartitions(x => {
      println("連接數(shù)據(jù)庫")
      new Iterator[Any] {
        override def hasNext: Boolean = {
          if (x.hasNext) {
            true
          } else {
            println("關(guān)閉數(shù)據(jù)庫")
            false
          }
        }
        override def next(): Any = "寫入數(shù)據(jù):" + x.next()
      }
    })

自定義一個(gè)迭代器昂羡,
這樣雖然麻煩了一點(diǎn)絮记,
但是無疑才是正確的。
當(dāng)然還有一些復(fù)雜的處理虐先,
比如類似 flatMap那種要輸出多條怎么辦怨愤?
這個(gè)時(shí)候可以去參考下 Iterator 的源碼是怎么實(shí)現(xiàn)的,
同樣不難蛹批,這里就不贅述了撰洗。

一對(duì)多的的高級(jí)使用

本來是想偷點(diǎn)懶的,
不過既然有人問起這個(gè)般眉,
這里就補(bǔ)充說下輸出多條的方式了赵。

思路其實(shí)很簡(jiǎn)單潜支,
我們可以查看迭代器的源碼甸赃,
他是有一個(gè) flatMap 算子的,
我們仿照一下就ok啦。
下面我們來解讀下 Iterator.flatMap算子這段的源碼吧冗酿。

        // f 函數(shù)是 傳入每一條數(shù)據(jù)都需要返回一個(gè)迭代器
        // 也就是說一條記錄可以返回多個(gè)值
        def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
          //定義當(dāng)前的迭代器是空的
          private var cur: Iterator[B] = empty
        //這是源碼埠对,為了方便理解,我稍微改寫了下
//          def hasNext: Boolean =
//            cur.hasNext || self.hasNext && {
//              cur = f(self.next).toIterator;
//              hasNext
//           }
        def hasNext: Boolean ={
          if(cur.hasNext){
            //如果當(dāng)前迭代器還有值裁替,
            //則返回true
            return true
          }
          if(self.hasNext){
            //如果cur已經(jīng)沒有值了
            //但是本身的迭代器還有值
            //則我們把本身迭代器的一個(gè)值拿出來
            //通過 f函數(shù) 構(gòu)造一個(gè)迭代器放到當(dāng)前的迭代器
            cur = f(self.next).toIterator;
            //再遞歸一次本函數(shù)來看是否還有值
            return hasNext
          }
        }
          //這個(gè)就沒什么好說的了
          def next(): B = (if (hasNext) cur else empty).next()
        }

上面的代碼為了方便理解项玛,
我修改了下,并加了注釋弱判,
應(yīng)該是很好理解了襟沮。

這里如果你如果要做伸手黨的話,
我也給出一個(gè)實(shí)例代碼

 val conf = new SparkConf()
      .setMaster("local[1]")
      .setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    sc.parallelize(Seq("a,a,a,a,a"))
      .mapPartitions(iter => {

        new AbstractIterator[String] {
          def myF(data: String): Iterable[String] = {
            println(data)
            data.split(",").toIterable
          }

          var cur: Iterator[String] = Iterator.empty

          override def hasNext: Boolean = {
            cur.hasNext || iter.hasNext && {
              cur = myF(iter.next).toIterator
              hasNext
            }
          }

          override def next(): String = (if (hasNext) cur else Iterator.empty).next()
        }
      })
      .foreach(println)

這里捎帶提一下就是昌腰,
其實(shí)迭代器本身就有 Map flatMap 等算子开伏,
之所以還要去自定義,
就是因?yàn)樽远x提供了更加自由的一些操作遭商,
比如開啟和關(guān)閉數(shù)據(jù)庫等固灵,
但是大部分情況下,
還是能不自定義劫流,誰想去折騰呢巫玻?

其他

另外一點(diǎn)就是 mapPartitions 提供給了我們更加強(qiáng)大的數(shù)據(jù)控制力丛忆,
怎么理解呢?我們可以一次拿到一個(gè)分區(qū)的數(shù)據(jù)仍秤,
那么我們就可以對(duì)一個(gè)分區(qū)的數(shù)據(jù)進(jìn)行統(tǒng)一處理熄诡,
雖然會(huì)加大內(nèi)存的開銷,但是在某些場(chǎng)景下還是很有用的诗力,
比如一些矩陣的乘法粮彤。

后記

不管你要使用哪個(gè)算子,其實(shí)都是可以的姜骡,
但是大多數(shù)時(shí)候导坟,我還是推薦你使用 map 算子,
當(dāng)然遇到一些map算子不合適的場(chǎng)景圈澈,
那就沒辦法了...
不過就算你是真的要使用 mapPartitions惫周,
那么請(qǐng)記得充分發(fā)揮一下 迭代器的 懶執(zhí)行特性。

最后康栈,如果本文對(duì)你有幫助递递,幫忙點(diǎn)個(gè)贊唄

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市啥么,隨后出現(xiàn)的幾起案子登舞,更是在濱河造成了極大的恐慌,老刑警劉巖悬荣,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件菠秒,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡氯迂,警方通過查閱死者的電腦和手機(jī)践叠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嚼蚀,“玉大人禁灼,你說我怎么就攤上這事〗问铮” “怎么了弄捕?”我有些...
    開封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)导帝。 經(jīng)常有香客問我守谓,道長(zhǎng),這世上最難降的妖魔是什么舟扎? 我笑而不...
    開封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任分飞,我火速辦了婚禮,結(jié)果婚禮上睹限,老公的妹妹穿的比我還像新娘譬猫。我一直安慰自己讯檐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開白布染服。 她就那樣靜靜地躺著别洪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪柳刮。 梳的紋絲不亂的頭發(fā)上挖垛,一...
    開封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音秉颗,去河邊找鬼痢毒。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蚕甥,可吹牛的內(nèi)容都是我干的哪替。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼菇怀,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼凭舶!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起爱沟,我...
    開封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤帅霜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后呼伸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體身冀,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年蜂大,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闽铐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蝶怔。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡奶浦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出踢星,到底是詐尸還是另有隱情澳叉,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布沐悦,位于F島的核電站成洗,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏藏否。R本人自食惡果不足惜瓶殃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望副签。 院中可真熱鬧遥椿,春花似錦基矮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至碴裙,卻和暖如春钢悲,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背舔株。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來泰國打工莺琳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人载慈。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓芦昔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親娃肿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子咕缎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容