【Spark Java API】Transformation(1)—mapPartitions、mapPartitionsWithIndex

mapPartitions


官方文檔描述:

 Return a new RDD by applying a function to each partition of this RDD.

**
mapPartitions函數(shù)會(huì)對(duì)每個(gè)分區(qū)依次調(diào)用分區(qū)函數(shù)處理,然后將處理的結(jié)果(若干個(gè)Iterator)生成新的RDDs艺玲。
mapPartitions與map類似肋僧,但是如果在映射的過(guò)程中需要頻繁創(chuàng)建額外的對(duì)象斑胜,使用mapPartitions要比map高效的過(guò)。比如嫌吠,將RDD中的所有數(shù)據(jù)通過(guò)JDBC連接寫入數(shù)據(jù)庫(kù)止潘,如果使用map函數(shù),可能要為每一個(gè)元素都創(chuàng)建一個(gè)connection辫诅,這樣開(kāi)銷很大凭戴,如果使用mapPartitions,那么只需要針對(duì)每一個(gè)分區(qū)建立一個(gè)connection泥栖。
**

函數(shù)原型:

def mapPartitions[U](f:FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U]
def mapPartitions[U](f:FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U]

**
第一個(gè)函數(shù)是基于第二個(gè)函數(shù)實(shí)現(xiàn)的簇宽,使用的是preservesPartitioning為false。而第二個(gè)函數(shù)我們可以指定preservesPartitioning吧享,preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息魏割;FlatMapFunction中的Iterator是這個(gè)rdd的一個(gè)分區(qū)的所有element組成的Iterator。
**

實(shí)例

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
//RDD有兩個(gè)分區(qū)
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2);
//計(jì)算每個(gè)分區(qū)的合計(jì)
JavaRDD<Integer> mapPartitionsRDD = javaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {   
 @Override
 public Iterable<Integer> call(Iterator<Integer> integerIterator) throws Exception {
        int isum = 0;
        while(integerIterator.hasNext())
            isum += integerIterator.next();
        LinkedList<Integer> linkedList = new LinkedList<Integer>();
        linkedList.add(isum);
        return linkedList;    }
});

System.out.println("mapPartitionsRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsRDD.collect());

mapPartitionsWithIndex


官方文檔說(shuō)明:

 Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

**
mapPartitionsWithIndex與mapPartitions基本相同钢颂,只是在處理函數(shù)的參數(shù)是一個(gè)二元元組钞它,元組的第一個(gè)元素是當(dāng)前處理的分區(qū)的index,元組的第二個(gè)元素是當(dāng)前處理的分區(qū)元素組成的Iterator
**

函數(shù)原型:

def mapPartitionsWithIndex[R]( f:JFunction2[jl.Integer, java.util.Iterator[T], 
java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R]

源碼分析:

def mapPartitions[U: ClassTag](f:Iterator[T] => Iterator[U],  
preservesPartitioning:Boolean = false): RDD[U] = withScope {  
val cleanedF = sc.clean(f)  
new MapPartitionsRDD(this,  (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), 
preservesPartitioning)
}
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {  
val cleanedF = sc.clean(f)  
new MapPartitionsRDD(this,  (context: TaskContext, index: Int, iter: Iterator[T]) => 
cleanedF(index, iter),    
preservesPartitioning)
}

**
從源碼中可以看到其實(shí)mapPartitions已經(jīng)獲得了當(dāng)前處理的分區(qū)的index殊鞭,只是沒(méi)有傳入分區(qū)處理函數(shù)遭垛,而mapPartitionsWithIndex將其傳入分區(qū)處理函數(shù)。
**

實(shí)例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
//RDD有兩個(gè)分區(qū)
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2);
//分區(qū)index操灿、元素值锯仪、元素編號(hào)輸出
JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
 @Override 
public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception {        
  LinkedList<String> linkedList = new LinkedList<String>();        
  int i = 0;        
  while (v2.hasNext())            
   linkedList.add(Integer.toString(v1) + "|" + v2.next().toString() + Integer.toString(i++));        
  return linkedList.iterator();    
  }
},false);

System.out.println("mapPartitionsWithIndexRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsWithIndexRDD.collect());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市趾盐,隨后出現(xiàn)的幾起案子庶喜,更是在濱河造成了極大的恐慌小腊,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件久窟,死亡現(xiàn)場(chǎng)離奇詭異秩冈,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)斥扛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門入问,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人稀颁,你說(shuō)我怎么就攤上這事芬失。” “怎么了峻村?”我有些...
    開(kāi)封第一講書人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵麸折,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我粘昨,道長(zhǎng)垢啼,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任张肾,我火速辦了婚禮芭析,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘吞瞪。我一直安慰自己馁启,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布芍秆。 她就那樣靜靜地躺著惯疙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪妖啥。 梳的紋絲不亂的頭發(fā)上霉颠,一...
    開(kāi)封第一講書人閱讀 52,156評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音荆虱,去河邊找鬼蒿偎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛怀读,可吹牛的內(nèi)容都是我干的诉位。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼菜枷,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼苍糠!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起啤誊,我...
    開(kāi)封第一講書人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤椿息,失蹤者是張志新(化名)和其女友劉穎歹袁,沒(méi)想到半個(gè)月后坷衍,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體寝优,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年枫耳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乏矾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡迁杨,死狀恐怖钻心,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情铅协,我是刑警寧澤捷沸,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站狐史,受9級(jí)特大地震影響痒给,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜骏全,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一苍柏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧姜贡,春花似錦试吁、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至母怜,卻和暖如春余耽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背糙申。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工宾添, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人柜裸。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓缕陕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親疙挺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扛邑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容