前言
今天 Review 了一下同事的代碼仿粹,
發(fā)現(xiàn)其代碼中有非常多的 mapPartitions
,
問其原因眷篇,他說性能比 map
更好例诀。
我說為什么性能好呢朽肥?
于是就有了這篇文章
網(wǎng)上推崇 mapPartitions 的原因
按照某些文章的原話來說
一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù)菇民,而不是一次函數(shù)調(diào)用處理一條尽楔,性能相對(duì)來說會(huì)高一些。
又比如說:
如果是普通的map第练,比如一個(gè)partition中有1萬條數(shù)據(jù)阔馋;
那么你的function要執(zhí)行和計(jì)算1萬次。
但是娇掏,使用MapPartitions操作之后呕寝,
一個(gè)task僅僅會(huì)執(zhí)行一次function,
function一次接收所有的partition數(shù)據(jù)婴梧。
只要執(zhí)行一次就可以了下梢,性能比較高
這種說法如果按照上面的方式來理解其實(shí)也是那么一回事,
但是也很容易讓一些新人理解為:
map要執(zhí)行1萬次塞蹭,而 MapPartitions 只需要一次孽江,這速度杠杠的提升了啊
實(shí)際上,你使用MapPartitions迭代的時(shí)候番电,
還是是一條條數(shù)據(jù)處理的竟坛,這個(gè)次數(shù)其實(shí)完全沒變。
其實(shí)這個(gè)問題我們可以來看看源碼
map算子源碼
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
接受用戶傳入的一個(gè)函數(shù),
new 一個(gè) MapPartitionsRDD 對(duì)象担汤,
我們的函數(shù)是作用在 MapPartitionsRDD 的迭代器 iter 上涎跨。
mapPartition算子源碼
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
接受一個(gè)迭代器,
new 一個(gè) MapPartitionsRDD 對(duì)象崭歧,
傳入的迭代器是作為 MapPartitionsRDD 的迭代器隅很。
說白了,這個(gè)兩算子真沒什么差率碾,
map 算子可以理解為 mapPartitions 的一個(gè)高級(jí)封裝而已叔营。
mapPartitions 帶來的問題
其實(shí)就我個(gè)人經(jīng)驗(yàn)來看,
mapPartitions 的正確使用其實(shí)并不會(huì)造成什么大的問題所宰,
當(dāng)然我也沒看出普通場(chǎng)景 mapPartitions 比 map 有什么優(yōu)勢(shì)绒尊,
所以 完全沒必要刻意使用 mapPartitions
反而,mapPartitions 會(huì)帶來一些問題仔粥。
- 使用起來并不是很方便婴谱,這個(gè)寫過代碼的人應(yīng)該都知道。
當(dāng)然這個(gè)問題并不是不能解決躯泰,我們可以寫類似下面的代碼谭羔,
確實(shí)也變的和 map 簡(jiǎn)潔性也差不太多,
恩麦向,我不會(huì)告訴你可以嘗試在生產(chǎn)環(huán)境中用用噢瘟裸。//抽象出一個(gè)函數(shù),以后所有的 mapPartitions 都可以用 def mapFunc[T, U](iterator: Iterator[T], f: T => U) = { iterator.map(x => { f(x) }) } //使用 rdd.mapPartitions(x => { mapFunc(x, line => { s"${line}轉(zhuǎn)換數(shù)據(jù)" }) })
- 容易造成 OOM诵竭,這個(gè)也是很多博客提到的問題话告,
他們大致會(huì)寫出如下的代碼來做測(cè)試,
如果你的代碼是上面那樣卵慰,那OOM也就不足為奇了沙郭,rdd.mapPartitions(x => { xxxx操作 while (x.hasNext){ val next = x.next() } xxx操作 })
不知道你注意到了沒有,mapPartitions 是接受一個(gè)迭代器呵燕,
再返回一個(gè)迭代器的棠绘,
如果你這么寫代碼,就完全沒有使用到迭代器的懶執(zhí)行特性再扭。
將數(shù)據(jù)都堆積到了內(nèi)存氧苍,
真就變成了一次處理一個(gè)partition的數(shù)據(jù)了,
在某種程度上已經(jīng)破壞了 Spark Pipeline 的計(jì)算模式了泛范。
mapPartitions 到底該怎么用
一對(duì)一的普通使用
存在即是道理让虐,
雖然上面一直在吐槽,
但是其確實(shí)有存在的理由罢荡。
其一個(gè)分區(qū)只會(huì)被調(diào)用一次的特性赡突,
在一些寫數(shù)據(jù)庫的時(shí)候確實(shí)很有幫助对扶,
因?yàn)槲覀兊?Spark 是分布式執(zhí)行的,
所以連接數(shù)據(jù)庫的操作必須放到算子內(nèi)部才能正確的被Executor執(zhí)行惭缰,
那么 mapPartitions 就顯示比 map 要有優(yōu)勢(shì)的多了浪南。
比如下面這段偽代碼
rdd.mapPartitions(x => {
println("連接數(shù)據(jù)庫")
val res = x.map(line=>{
print("寫入數(shù)據(jù):" + line)
line
})
res
})
這樣我就一個(gè)分區(qū)只要連接一次數(shù)據(jù)庫,
而如果是 map 算子漱受,那可能就要連接 n 多次了络凿。
不過上面這種就沒法關(guān)閉數(shù)據(jù)庫連接了,
所以可以換另外一種方式:
rdd1.mapPartitions(x => {
println("連接數(shù)據(jù)庫")
new Iterator[Any] {
override def hasNext: Boolean = {
if (x.hasNext) {
true
} else {
println("關(guān)閉數(shù)據(jù)庫")
false
}
}
override def next(): Any = "寫入數(shù)據(jù):" + x.next()
}
})
自定義一個(gè)迭代器昂羡,
這樣雖然麻煩了一點(diǎn)絮记,
但是無疑才是正確的。
當(dāng)然還有一些復(fù)雜的處理虐先,
比如類似 flatMap那種要輸出多條怎么辦怨愤?
這個(gè)時(shí)候可以去參考下 Iterator 的源碼是怎么實(shí)現(xiàn)的,
同樣不難蛹批,這里就不贅述了撰洗。
一對(duì)多的的高級(jí)使用
本來是想偷點(diǎn)懶的,
不過既然有人問起這個(gè)般眉,
這里就補(bǔ)充說下輸出多條的方式了赵。
思路其實(shí)很簡(jiǎn)單潜支,
我們可以查看迭代器的源碼甸赃,
他是有一個(gè) flatMap
算子的,
我們仿照一下就ok啦。
下面我們來解讀下 Iterator.flatMap
算子這段的源碼吧冗酿。
// f 函數(shù)是 傳入每一條數(shù)據(jù)都需要返回一個(gè)迭代器
// 也就是說一條記錄可以返回多個(gè)值
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
//定義當(dāng)前的迭代器是空的
private var cur: Iterator[B] = empty
//這是源碼埠对,為了方便理解,我稍微改寫了下
// def hasNext: Boolean =
// cur.hasNext || self.hasNext && {
// cur = f(self.next).toIterator;
// hasNext
// }
def hasNext: Boolean ={
if(cur.hasNext){
//如果當(dāng)前迭代器還有值裁替,
//則返回true
return true
}
if(self.hasNext){
//如果cur已經(jīng)沒有值了
//但是本身的迭代器還有值
//則我們把本身迭代器的一個(gè)值拿出來
//通過 f函數(shù) 構(gòu)造一個(gè)迭代器放到當(dāng)前的迭代器
cur = f(self.next).toIterator;
//再遞歸一次本函數(shù)來看是否還有值
return hasNext
}
}
//這個(gè)就沒什么好說的了
def next(): B = (if (hasNext) cur else empty).next()
}
上面的代碼為了方便理解项玛,
我修改了下,并加了注釋弱判,
應(yīng)該是很好理解了襟沮。
這里如果你如果要做伸手黨的話,
我也給出一個(gè)實(shí)例代碼
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("test")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
sc.parallelize(Seq("a,a,a,a,a"))
.mapPartitions(iter => {
new AbstractIterator[String] {
def myF(data: String): Iterable[String] = {
println(data)
data.split(",").toIterable
}
var cur: Iterator[String] = Iterator.empty
override def hasNext: Boolean = {
cur.hasNext || iter.hasNext && {
cur = myF(iter.next).toIterator
hasNext
}
}
override def next(): String = (if (hasNext) cur else Iterator.empty).next()
}
})
.foreach(println)
這里捎帶提一下就是昌腰,
其實(shí)迭代器本身就有 Map flatMap 等算子开伏,
之所以還要去自定義,
就是因?yàn)樽远x提供了更加自由的一些操作遭商,
比如開啟和關(guān)閉數(shù)據(jù)庫等固灵,
但是大部分情況下,
還是能不自定義劫流,誰想去折騰呢巫玻?
其他
另外一點(diǎn)就是 mapPartitions 提供給了我們更加強(qiáng)大的數(shù)據(jù)控制力丛忆,
怎么理解呢?我們可以一次拿到一個(gè)分區(qū)的數(shù)據(jù)仍秤,
那么我們就可以對(duì)一個(gè)分區(qū)的數(shù)據(jù)進(jìn)行統(tǒng)一處理熄诡,
雖然會(huì)加大內(nèi)存的開銷,但是在某些場(chǎng)景下還是很有用的诗力,
比如一些矩陣的乘法粮彤。
后記
不管你要使用哪個(gè)算子,其實(shí)都是可以的姜骡,
但是大多數(shù)時(shí)候导坟,我還是推薦你使用 map 算子,
當(dāng)然遇到一些map算子不合適的場(chǎng)景圈澈,
那就沒辦法了...
不過就算你是真的要使用 mapPartitions惫周,
那么請(qǐng)記得充分發(fā)揮一下 迭代器的 懶執(zhí)行特性。