在對(duì)RDDPair(一種特殊的 RDD宾符,即RDD[(key, Row)])進(jìn)行操作時(shí)經(jīng)常會(huì)用到 reduceByKey() 和 groupByKey() 兩個(gè)算子唉擂。下面看看兩者的區(qū)別和使用方法:
一谭胚、reduceByKey(func) 和 groupByKey() 的區(qū)別
reduceByKey(func):顧名思義仙蚜,是針對(duì) RDDPair 中具有
相同 key 的所有 row 做 reduce 操作
匙姜,操作內(nèi)容由函數(shù) func 確定脂倦,可以自定義番宁,比如:形如 (0, BACA) 這樣的 row,現(xiàn)在需要對(duì) key 相同的所有row(即 BACA)使用"-"拼接成一個(gè)長(zhǎng)字符串赖阻,比如(1,TMWTYV-PYSAJV)
蝶押;groupByKey(): 顧名思義,是針對(duì) RDDPair 中具有
相同 key 的所有 row 分組
火欧,相同 key 對(duì)應(yīng)的 row 匯總生成一個(gè)sequence棋电;本身不能自定義函數(shù),只能通過(guò)額外通過(guò)map(func)來(lái)實(shí)現(xiàn)苇侵。比如:(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
赶盔。
使用reduceByKey()的時(shí)候,本地的數(shù)據(jù)先進(jìn)行merge
然后再傳輸?shù)讲煌?jié)點(diǎn)再進(jìn)行merge榆浓,最終得到最終結(jié)果于未。
而使用groupByKey()的時(shí)候,并不進(jìn)行本地的merge
陡鹃,全部數(shù)據(jù)傳出烘浦,得到全部數(shù)據(jù)后才會(huì)進(jìn)行聚合成一個(gè)sequence,groupByKey()傳輸速度明顯慢于reduceByKey()萍鲸。
雖然groupByKey().map(func)也能實(shí)現(xiàn)reduceByKey(func)功能闷叉,但是優(yōu)先使用reduceByKey(func)
。
區(qū)別:
區(qū)別項(xiàng) | reduceByKey | groupByKey | 備注 |
---|---|---|---|
功能 | 針對(duì) RDDPair 中具有相同 key 的所有 row 做 reduce 操作 | 針對(duì) RDDPair 中具有相同 key 的所有 row 分組 | 無(wú) |
能自定義函數(shù) | 可以自定義reduce函數(shù) | 否 | 無(wú) |
輸出 | 一個(gè) key 對(duì)應(yīng)一個(gè)row | 一個(gè)key 對(duì)應(yīng)多個(gè)row的sequence | 無(wú) |
性能 | 更高 | 更低 | groupByKey.map(func) 可以實(shí)現(xiàn) reduceByKey脊阴,但是盡量用 reduceByKey握侧,因?yàn)楦咝?/td> |
二蚯瞧、Scala 代碼--使用方法
- rddMap.groupByKey(自定義partitioner);
- rddMap.reduceByKey(自定義reduce函數(shù)) 或者類似 rddMap.reduceByKey(_ + "-" + ) 藕咏,其中 _ + "-" + _ 中的""表示 key 相同的兩個(gè) row
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object TestSparkShuffle {
class MyPartitioner(partitionNum: Int) extends Partitioner() {
override def numPartitions: Int = partitionNum
override def getPartition(key: Any): Int = {
if (key.asInstanceOf[Int] % 2 == 0) {
0
} else {
1
}
}
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local").appName("test").getOrCreate()
val sc = spark.sparkContext
val arr = new ArrayBuffer[String]
genStrArr(36, arr)
val rdd = sc.parallelize(arr)
val rddMap: RDD[(Int, String)] = rdd.mapPartitions(
partition => {
partition.map(str => (getKey(str), str))
}
)
rddMap.foreach(x => println(x))
// 按照 key 進(jìn)行分組状知,且key為奇、偶數(shù)的row各分在0孽查、1分區(qū)內(nèi)
val rddMap2 = rddMap.groupByKey(new MyPartitioner(2))
rddMap2.foreach(x => println(x))
// 對(duì) rddMap 中的row按照row的key饥悴,同樣的key的value相繼使用"-"拼接起來(lái)
val rddMap3 = rddMap.reduceByKey(reduceFunc)
// val rddMap3 = rddMap.reduceByKey(_ + "-" + _) // _ + "-" + _ 中的"_"表示 key 相同的兩個(gè)value
rddMap3.foreach(x => println(x))
println(rddMap.count())
}
// reduce 函數(shù),將兩個(gè)字符串使用"-"拼接
def reduceFunc(x: String, y : String): String = {
x + "-" + y
}
def getKey(str: String): Int = {
Math.abs(str.hashCode % 6)
}
// 生成size為num的字符串?dāng)?shù)組盲再,每個(gè)字符串長(zhǎng)度為6西设,由A~Z隨機(jī)構(gòu)成
def genStrArr(num : Int, arr: ArrayBuffer[String]): Unit = {
val baseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val charLen = 6
val rand = new Random()
for (x <- Range(0, num)) {
var subStr = ""
for (i <- Range(0, charLen)) {
val order = rand.nextInt(baseChars.length)
subStr += baseChars.charAt(order)
}
arr.append(subStr)
}
}
}
測(cè)試結(jié)果:
# groupByKey 結(jié)果
(4,CompactBuffer(HCAESV, OZNIQU, WIIWNX, MEFMUZ, TVFPRH, EMSZJC))
(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
(2,CompactBuffer(XTAKJH, HOUFFR, KIJCNU, BDILZU, SJFGRN, IZPCHR, RIPRRA, UUGZER))
(1,CompactBuffer(TMWTYV, PYSAJV))
(3,CompactBuffer(UHQTWN, YSLXXE, PNIMWJ, NAYYWU, EYPRPM, SXGUQO, DDSNIY, EXPSPM))
(5,CompactBuffer(ZOGCRZ, VORGBM, CUZZFS, SLFBWC, PFRFRA))
# reduceByKey 結(jié)果
(4,HCAESV-OZNIQU-WIIWNX-MEFMUZ-TVFPRH-EMSZJC)
(0,ZCEXLX-BKSGQD-ICRWVA-PXFBAC-SUBCYR-OMEQVV-TMBPHW)
(1,TMWTYV-PYSAJV)
(3,UHQTWN-YSLXXE-PNIMWJ-NAYYWU-EYPRPM-SXGUQO-DDSNIY-EXPSPM)
(5,ZOGCRZ-VORGBM-CUZZFS-SLFBWC-PFRFRA)
(2,XTAKJH-HOUFFR-KIJCNU-BDILZU-SJFGRN-IZPCHR-RIPRRA-UUGZER)