mapPartitions
官方文檔描述:
Return a new RDD by applying a function to each partition of this RDD.
**
mapPartitions函數(shù)會(huì)對(duì)每個(gè)分區(qū)依次調(diào)用分區(qū)函數(shù)處理,然后將處理的結(jié)果(若干個(gè)Iterator)生成新的RDDs艺玲。
mapPartitions與map類似肋僧,但是如果在映射的過(guò)程中需要頻繁創(chuàng)建額外的對(duì)象斑胜,使用mapPartitions要比map高效的過(guò)。比如嫌吠,將RDD中的所有數(shù)據(jù)通過(guò)JDBC連接寫入數(shù)據(jù)庫(kù)止潘,如果使用map函數(shù),可能要為每一個(gè)元素都創(chuàng)建一個(gè)connection辫诅,這樣開(kāi)銷很大凭戴,如果使用mapPartitions,那么只需要針對(duì)每一個(gè)分區(qū)建立一個(gè)connection泥栖。
**
函數(shù)原型:
def mapPartitions[U](f:FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U]
def mapPartitions[U](f:FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U]
**
第一個(gè)函數(shù)是基于第二個(gè)函數(shù)實(shí)現(xiàn)的簇宽,使用的是preservesPartitioning為false。而第二個(gè)函數(shù)我們可以指定preservesPartitioning吧享,preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息魏割;FlatMapFunction中的Iterator是這個(gè)rdd的一個(gè)分區(qū)的所有element組成的Iterator。
**
實(shí)例
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
//RDD有兩個(gè)分區(qū)
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2);
//計(jì)算每個(gè)分區(qū)的合計(jì)
JavaRDD<Integer> mapPartitionsRDD = javaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterable<Integer> call(Iterator<Integer> integerIterator) throws Exception {
int isum = 0;
while(integerIterator.hasNext())
isum += integerIterator.next();
LinkedList<Integer> linkedList = new LinkedList<Integer>();
linkedList.add(isum);
return linkedList; }
});
System.out.println("mapPartitionsRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsRDD.collect());
mapPartitionsWithIndex
官方文檔說(shuō)明:
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
**
mapPartitionsWithIndex與mapPartitions基本相同钢颂,只是在處理函數(shù)的參數(shù)是一個(gè)二元元組钞它,元組的第一個(gè)元素是當(dāng)前處理的分區(qū)的index,元組的第二個(gè)元素是當(dāng)前處理的分區(qū)元素組成的Iterator
**
函數(shù)原型:
def mapPartitionsWithIndex[R]( f:JFunction2[jl.Integer, java.util.Iterator[T],
java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R]
源碼分析:
def mapPartitions[U: ClassTag](f:Iterator[T] => Iterator[U],
preservesPartitioning:Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(this, (context: TaskContext, index: Int, iter: Iterator[T]) =>
cleanedF(index, iter),
preservesPartitioning)
}
**
從源碼中可以看到其實(shí)mapPartitions已經(jīng)獲得了當(dāng)前處理的分區(qū)的index殊鞭,只是沒(méi)有傳入分區(qū)處理函數(shù)遭垛,而mapPartitionsWithIndex將其傳入分區(qū)處理函數(shù)。
**
實(shí)例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
//RDD有兩個(gè)分區(qū)
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2);
//分區(qū)index操灿、元素值锯仪、元素編號(hào)輸出
JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception {
LinkedList<String> linkedList = new LinkedList<String>();
int i = 0;
while (v2.hasNext())
linkedList.add(Integer.toString(v1) + "|" + v2.next().toString() + Integer.toString(i++));
return linkedList.iterator();
}
},false);
System.out.println("mapPartitionsWithIndexRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsWithIndexRDD.collect());