定義SparkContext
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
Map算子
@Test
def mapTest(): Unit ={
// 創(chuàng)建本地集合 RDD ,最小分區(qū)數(shù)為4
val list=List(1,2,3,4,5,6,7,8)
val rdd1=sc.makeRDD(list,4)
// 使用 map 算子
val mapRdd: RDD[Unit] = rdd1.map(i => {
println(i)
})
// 調(diào)用
mapRdd.collect()
}
運(yùn)行結(jié)果疲眷;以為是分區(qū)計(jì)算,所以結(jié)果是無序的柔滔。
5
3
7
1
8
4
6
2
查看每個(gè)分區(qū)處理哪些數(shù)據(jù)
rdd1.mapPartitionsWithIndex((index,it)=>{
println(s"index=$index data=${it.toList}")
it
}).collect
運(yùn)行結(jié)果
index=3 data=List(7, 8)
index=0 data=List(1, 2)
index=2 data=List(5, 6)
index=1 data=List(3, 4)
獲取map算子返回
@Test
def mapTest(): Unit ={
// 創(chuàng)建本地集合 RDD ,最小分區(qū)數(shù)為4
val list=List(1,2,3,4,5,6,7,8)
val rdd1=sc.makeRDD(list,4)
// 使用 map 算子
val mapRdd: RDD[Int] = rdd1.map(i =>i*i)
// 調(diào)用
println(mapRdd.collect().toList)
}
List(1, 4, 9, 16, 25, 36, 49, 64)
其基本操作和scala的map一致泡徙。
map 算子 源碼
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
// clean 用于清理函數(shù)閉包,以為閉包無法進(jìn)行序列化葛假。
val cleanF = sc.clean(f)
// 創(chuàng)建 MapPartitionsRDD 對象;
// this:創(chuàng)建新的RDD時(shí)綁定當(dāng)前RDD的依賴關(guān)系
// 需要傳入一個(gè)函數(shù)滋恬,有三個(gè)參數(shù)
// 第一個(gè)_:指定TaskContext
// 第二個(gè)_:指定TaskContext
// iter:當(dāng)前分區(qū)的迭代器(內(nèi)容如下)聊训;然后調(diào)用scala中的map而不是spark的map算子。
//index=3 data=List(7, 8)
//index=0 data=List(1, 2)
//index=2 data=List(5, 6)
//index=1 data=List(3, 4)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
思考一個(gè)問題恢氯?map算子并沒有指定分區(qū)带斑,但是卻是還是4個(gè)分區(qū)?
首先 map的數(shù)據(jù)來源于rdd1
;rdd1指定了分區(qū)勋拟。
val rdd1=sc.makeRDD(list,4)
然后map綁定當(dāng)前rdd的關(guān)聯(lián)關(guān)系
// 由rdd1 調(diào)用
val mapRdd: RDD[Int] = rdd1.map(i =>i*i)
// 返回一個(gè)新的 rdd時(shí)綁定了當(dāng)前的rdd 也就是this
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
最重要的還是 MapPartitionsRDD 繼承的RDD就是rdd1
,可以通過 prev看到勋磕。
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
其他參數(shù)
) extends RDD[U](prev) {
所以map算子的分區(qū)大小是其父類指定的分區(qū)大小。
mapPartitions 算子
案例:使用mapPartitions
指黎,通過id查詢用戶信息
@Test
def mysqlQueryByMapPartitions(): Unit ={
// 讀取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)
val arr=lines.mapPartitions(it => {
// 加載驅(qū)動(dòng)類
Class.forName("com.mysql.jdbc.Driver")
//連接數(shù)據(jù)庫
val connection= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")
val statement = connection.prepareStatement("select * from user_info where id=?")
try{
while (it.hasNext) {
//主鍵id
val id: String = it.next()
// 設(shè)置參數(shù)
statement.setInt(1,id.toInt)
// 執(zhí)行查詢
val result: ResultSet = statement.executeQuery()
while (result.next()) {
val id = result.getInt("id")
val nickName = result.getString("nick_name")
val name = result.getString("name")
println(s"id=$id,nickName=$nickName,name=$name")
}
}
}catch {
case e:Exception => {
println("數(shù)據(jù)庫查詢失敗")
println(e.printStackTrace())
}
}finally {
//關(guān)閉資源
if(statement!=null) statement.close()
if(connection!=null) connection.close()
}
it
}).collect()
println(arr.toList)
}
我是將id 一行行存儲(chǔ)到該文件中的朋凉。數(shù)據(jù)量不大,一百個(gè)id醋安。
val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)
數(shù)據(jù)就不復(fù)制出來了杂彭,就看看數(shù)據(jù)庫資源關(guān)閉了幾次∠啪荆總共有四個(gè)分區(qū)亲怠,所以最終數(shù)據(jù)庫關(guān)閉了4次。
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
將mapPartitions
換成 map
@Test
def mysqlQueryByMap(): Unit ={
// 讀取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)
val arr=lines.map(id => {
// 加載驅(qū)動(dòng)類
Class.forName("com.mysql.jdbc.Driver")
//連接數(shù)據(jù)庫
val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")
val statement = connection.prepareStatement("select * from user_info where id=?")
try {
// 設(shè)置參數(shù)
statement.setInt(1, id.toInt)
// 執(zhí)行查詢
val result: ResultSet = statement.executeQuery()
while (result.next()) {
val id = result.getInt("id")
val nickName = result.getString("nick_name")
val name = result.getString("name")
println(s"id=$id,nickName=$nickName,name=$name")
}
}catch {
case e:Exception => {
println("數(shù)據(jù)庫查詢失敗")
println(e.printStackTrace())
}
}finally {
println("關(guān)閉數(shù)據(jù)庫資源....")
//關(guān)閉資源
if(statement!=null) statement.close()
if(connection!=null) connection.close()
}
}).collect()
}
數(shù)據(jù)庫的建立與銷毀來來回回一百次(可以自己試試柠辞。
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
...
數(shù)據(jù)量少所以团秽,沒有問題,但是若數(shù)據(jù)不是一百而是上百萬叭首,千萬呢习勤?肯定是不行的。
可能將資源鏈接丟入map中才會(huì)造成這樣的原因焙格。
如果把connection
提出去會(huì)怎么樣图毕?
@Test
def mysqlQueryByMap(): Unit ={
// 讀取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)
//連接數(shù)據(jù)庫
val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")
val statement = connection.prepareStatement("select * from user_info where id=?")
try {
val arr=lines.map(id => {
// 加載驅(qū)動(dòng)類
Class.forName("com.mysql.jdbc.Driver")
// 設(shè)置參數(shù)
statement.setInt(1, id.toInt)
// 執(zhí)行查詢
val result: ResultSet = statement.executeQuery()
while (result.next()) {
val id = result.getInt("id")
val nickName = result.getString("nick_name")
val name = result.getString("name")
println(s"id=$id,nickName=$nickName,name=$name")
}
}).collect()
}catch {
case e:Exception => {
println("數(shù)據(jù)庫查詢失敗")
println(e.printStackTrace())
}
}finally {
println("關(guān)閉數(shù)據(jù)庫資源....")
//關(guān)閉資源
if(statement!=null) statement.close()
if(connection!=null) connection.close()
}
}
報(bào)了一個(gè)錯(cuò),該錯(cuò)誤的原因是jdbc
眷唉,并沒有實(shí)現(xiàn)序列化予颤,無法進(jìn)行傳輸。
org.apache.spark.SparkException: Task not serializable
該案例除了說明 map
與 mapPartitions
的區(qū)別外冬阳,更想表達(dá)的意思是蛤虐。
rdd 會(huì)將數(shù)據(jù)進(jìn)行分區(qū),每個(gè)分區(qū)的計(jì)算邏輯或數(shù)據(jù)可能不在同一個(gè)節(jié)點(diǎn)上肝陪。即使是local
模式驳庭,分區(qū)之間也是并行處理。
mapPartitions 與 map 的區(qū)別:
- map里面的函數(shù)是針對分區(qū)里面的
每個(gè)元素
進(jìn)行計(jì)算氯窍,mapPartitions里面的函數(shù)是針對每個(gè)分區(qū)的所有數(shù)據(jù)的迭代器
進(jìn)行計(jì)算 - map里面的函數(shù)是計(jì)算一個(gè)元素返回一個(gè)結(jié)果,所以map生成的新的RDD里面的元素個(gè)數(shù) = 原來RDD元素個(gè)數(shù)
mapPartitions里面的函數(shù)是計(jì)算一個(gè)分區(qū)的所有數(shù)據(jù)的迭代器然后返回一個(gè)新的迭代器,所以mapPartitions生成的新的RDD里面的元素的個(gè)數(shù)與原來RDD元素個(gè)數(shù)可能不相同 - map是針對
單個(gè)元素
操作,元素操作完成之后可以進(jìn)行回收內(nèi)存
了
mapPartitions是針對一個(gè)迭代器操作嚷掠,操作完成迭代器一個(gè)元素之后,該元素不能回收必須等到整個(gè)迭代器都處理完成之后才能回收捏检。如果一個(gè)分區(qū)中數(shù)據(jù)量很大,可能導(dǎo)致內(nèi)存溢出。如果出現(xiàn)內(nèi)存溢出可以用map代替不皆」岢牵【完成比完美更重要】
mapPartitions源碼賞析
// f:傳入一個(gè)函數(shù),參為迭代器
// preservesPartitioning:是否保留分區(qū)霹娄,默認(rèn)為false
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
// 清除閉包能犯,保證數(shù)據(jù)可以進(jìn)行序列化傳輸
val cleanedF = sc.clean(f)
// 創(chuàng)建 MapPartitionsRDD
// this 綁定當(dāng)前RDD,
// iter 迭代器
new MapPartitionsRDD(this,(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)
}
點(diǎn)擊進(jìn)入MapPartitionsRDD
它會(huì)執(zhí)行一個(gè)compute
函數(shù)
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
mapPartitionsWithIndex
和 mapPartitions類似,但是它可以指定分區(qū)號
@Test
def mapPartitionsWithIndexTest(): Unit ={
val list=List(1,2,3,4,5,6,7,8)
val rdd1=sc.makeRDD(list,4)
rdd1.mapPartitionsWithIndex((index,it)=>{
while (it.hasNext) {
println(s"index=$index, ${it.next()}")
}
it
}).collect()
}
結(jié)果
index=2, 5
index=1, 3
index=0, 1
index=1, 4
index=2, 6
index=0, 2
index=3, 7
index=3, 8
flatMap()
與map操作類似犬耻,將RDD中的每一個(gè)元素通過應(yīng)用f函數(shù)依次轉(zhuǎn)換為新的元素踩晶,并封裝到RDD中。
區(qū)別:在flatMap操作中枕磁,f函數(shù)的返回值是一個(gè)集合渡蜻,并且會(huì)將每一個(gè)該集合中的元素拆分出來放到新的RDD中。
@Test
def flatMapTest(): Unit ={
val list=List(
"hello,java,scala",
"python,html,xml",
"xpath,js,vue",
"linux,windows"
)
// 創(chuàng)建本地集合RDD
val rdd1= sc.parallelize(list, 4)
// flatMap
val rdd2= rdd1.flatMap(_.split(","))
// 計(jì)算计济,匯總
println(rdd2.collect.toList)
}
結(jié)果
List(hello, java, scala, python, html, xml, xpath, js, vue, linux, windows)
glom()
該操作將RDD中每一個(gè)分區(qū)變成一個(gè)數(shù)組茸苇,并放置在新的RDD中,數(shù)組中元素的類型與原分區(qū)中元素類型一致
@Test
def glomTest(): Unit ={
val list: Seq[Int] =List(1,2,3,4,5,6,7,8)
val rdd1: RDD[Int] =sc.parallelize(list,4)
val rdd2: RDD[Array[Int]] =rdd1.glom()
// 獲取
val arrList: List[Array[Int]] = rdd2.collect.toList
for (arr<- arrList){
println(arr.toList)
}
}
List(1, 2)
List(3, 4)
List(5, 6)
List(7, 8)
groupBy
對數(shù)據(jù)進(jìn)行分組
@Test
def groupBy(): Unit ={
// 生成一百個(gè)數(shù)
val range = Range(0, 100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// 將一百以內(nèi)的數(shù)據(jù)按照 2的倍數(shù)和3的倍數(shù) 進(jìn)行分類沦寂。
val f=(i:Int)=>{
if(i%2==0 && i%3==0){0}
else if(i%2==0){1}
else if(i%3==0){2}
else -1
}
val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(f)
val result: List[(Int, Iterable[Int])] = rdd2.collect.toList
for(r <- result){
r match {
case (k,v) => println(k,v)
}
}
}
結(jié)果
(0,CompactBuffer(0, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96))
(1,CompactBuffer(2, 4, 8, 10, 14, 16, 20, 22, 26, 28, 32, 34, 38, 40, 44, 46, 50, 52, 56, 58, 62, 64, 68, 70, 74, 76, 80, 82, 86, 88, 92, 94, 98))
(2,CompactBuffer(3, 9, 15, 21, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81, 87, 93, 99))
(-1,CompactBuffer(1, 5, 7, 11, 13, 17, 19, 23, 25, 29, 31, 35, 37, 41, 43, 47, 49, 53, 55, 59, 61, 65, 67, 71, 73, 77, 79, 83, 85, 89, 91, 95, 97))
使用groupby
完成worldCount
作業(yè)
@Test
def worldCount():Unit={
//讀取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
// 內(nèi)容扁平化
val worldList: RDD[String] = lines.flatMap(_.split(" "))
// 內(nèi)容分組
val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
// 統(tǒng)計(jì)單詞數(shù)量
val result=groupList.map(x=>(x._1,x._2.size))
println(result.collect().toList)
}
數(shù)據(jù)結(jié)果
List((shell,4), (wahaha,1), (hello,2), (python,1), (java,5))
filter 過濾
接收一個(gè)返回值為布爾類型的函數(shù)作為參數(shù)学密。當(dāng)某個(gè)RDD調(diào)用filter方法時(shí),會(huì)對該RDD中每一個(gè)元素應(yīng)用f函數(shù)传藏,如果返回值類型為true腻暮,則該元素會(huì)被添加到新的RDD中。
案例:找到一百中所有的偶數(shù)
@Test
def filterTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
//過濾奇數(shù)
val value: RDD[Int] = rdd1.filter(_%2==0)
println(value.collect.toList)
}
List(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98)
sample 采樣
def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {...}
- withReplacement:
是否放回[true-代表放回,意味著同一個(gè)數(shù)據(jù)可能被多次采樣 false-不返回,意味著同一條數(shù)據(jù)最多被采樣一次] 【工作中設(shè)置為false】 - fraction: 【工作中一般設(shè)置為 0.1-0.2 】:
如果withReplacement=false, fraction代表每個(gè)元素被采樣的概率[0,1]
如果withReplacement=true, fraction代表每個(gè)元素期望被采樣的次數(shù) - seed: 隨機(jī)數(shù)種子
對應(yīng) 0-100的數(shù)字進(jìn)行采樣毯侦;100%
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=false ;fraction=1
val value: RDD[Int] = rdd1.sample(false, 1)
val list=value.collect.toList
println(list)
}
List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
對應(yīng) 0-100的數(shù)字進(jìn)行采樣哭靖;50%
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=false ;fraction=0.5
val value: RDD[Int] = rdd1.sample(false, 0.5)
val list=value.collect.toList
println(list)
}
隨機(jī)抽查50個(gè)不重樣數(shù)據(jù)
List(0, 1, 3, 4, 6, 7, 9, 11, 15, 16, 17, 20, 22, 23, 25, 27, 30, 32, 33, 34, 35, 36, 37, 38, 40, 42, 44, 45, 46, 48, 50, 54, 56, 60, 63, 65, 66, 67, 70, 71, 73, 75, 77, 79, 80, 81, 82, 83, 85, 89, 91, 92, 97)
對應(yīng) 0-100的數(shù)字進(jìn)行采樣;10%
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=false ;fraction=0.1
val value: RDD[Int] = rdd1.sample(false, 0.1)
val list=value.collect.toList
println(list)
}
這樣看起方便點(diǎn)
List(15, 23, 44, 49, 50, 83, 85, 86, 96)
重復(fù)采樣侈离,單個(gè)元素可能被采樣多次
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=true ;fraction=0.2
val value: RDD[Int] = rdd1.sample(true, 0.2)
val list=value.collect.toList
println(list)
}
withReplacement 設(shè)置為true试幽,有些數(shù)據(jù)可能會(huì)被多次采樣(如88,95)霍狰。
List(3, 19, 25, 26, 27, 30, 30, 33, 38, 55, 60, 62, 63, 66, 72, 77, 88, 88, 93, 94, 95, 95)
將withReplacement 依舊為true,fraction改為整數(shù)
這次采用0-10的數(shù)據(jù)
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,10)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=true ;fraction=2
val value: RDD[Int] = rdd1.sample(true,2)
val list=value.collect.toList
println(list)
}
如果withReplacement=true, fraction代表每個(gè)元素期望被采樣的次數(shù);
這個(gè)期望值是不確定的饰及,如上我期望值是2但是有的卻只有1個(gè)如(7)蔗坯;但也比2高的。
List(0, 0, 0, 0, 0, 1, 1, 1, 3, 3, 4, 4, 5, 5, 5, 5, 5, 5, 7, 8, 8, 8, 9, 9, 9)
最后再說說 seed
燎含;默認(rèn)是一個(gè)偽隨機(jī)數(shù)宾濒,用來決定采樣的隨機(jī)率。
一般不會(huì)更改
Long = Utils.random.nextLong
若seed
固定多次采用的結(jié)果也是一樣
@Test
def sampleTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,10)
val rdd1: RDD[Int] = sc.parallelize(range, 4)
// withReplacement=true ;fraction=2;seed=10
val value: RDD[Int] = rdd1.sample(true,2,10)
val list=value.collect.toList
println(list)
}
第一次運(yùn)行
List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)
第二次運(yùn)行
List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)
distinct 去重
@Test
def distinctTest(): Unit ={
// 設(shè)置一些重復(fù)的元素
val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)
// 創(chuàng)建本地集合RDD
val rdd1: RDD[Int] = sc.parallelize(list, 4)
// 去重
val value: RDD[Int] = rdd1.distinct()
// 調(diào)用屏箍,打印
println(value.collect.toList)
}
結(jié)果
List(4, 8, 1, 9, 5, 6, 2, 3, 7)
除了使用distinct
也可使用groupBy
實(shí)現(xiàn)去重功能
@Test
def distinctTest(): Unit ={
// 設(shè)置一些重復(fù)的元素
val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)
// 創(chuàng)建本地集合RDD
val rdd1: RDD[Int] = sc.parallelize(list, 4)
// 去重
val value: RDD[Int] = rdd1.groupBy(x=>x).map(_._1)
// 調(diào)用绘梦,打印
println(value.collect.toList)
}
結(jié)果
List(4, 8, 1, 9, 5, 6, 2, 3, 7)
coalesce 合并分區(qū)
@Test
def coalesceTest(): Unit ={
// 生成0-100的數(shù)
val range=Range(0,100)
// 創(chuàng)建本地集合RDD
val rdd1: RDD[Int] = sc.parallelize(range, 4)
}
查看分區(qū)情況
rdd1.mapPartitionsWithIndex((index,it)=>{
println(s"index=$index;it=${it.toList}")
it
}).collect
各個(gè)分區(qū)情況
index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24)
index=1;it=List(25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=3;it=List(75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
index=2;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74)
合并分區(qū)
numPartitions:合并分區(qū)的個(gè)數(shù)
shuffle:默認(rèn)為false橘忱,并不會(huì)觸發(fā)shuffle
,設(shè)置為true卸奉,可以重新擴(kuò)大分區(qū)钝诚,但是會(huì)進(jìn)行shuffle
操作。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null): RDD[T] = withScope {...}
合并成兩個(gè)分區(qū)榄棵,并查看合并分區(qū)后的數(shù)據(jù)情況
//合并成兩個(gè)分區(qū)
val value: RDD[Int] = rdd1.coalesce(2)
println(s"分區(qū)數(shù)${value.getNumPartitions}")
//查看合并之后的分區(qū)情況
value.mapPartitionsWithIndex((index,it)=>{
println(s"index=$index;it=${it.toList}")
it
}).collect
結(jié)果
分區(qū)數(shù)2
index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=1;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
默認(rèn)情況下凝颇,無法增加分區(qū)
//合并成兩個(gè)分區(qū)
val value: RDD[Int] = rdd1.coalesce(5)
println(s"分區(qū)數(shù)${value.getNumPartitions}")
//查看合并之后的分區(qū)情況
value.mapPartitionsWithIndex((index,it)=>{
println(s"index=$index;it=${it.toList}")
it
}).collect
就打印分區(qū)數(shù)就行了,無法擴(kuò)展分區(qū)疹鳄。
分區(qū)數(shù)2
coalesce
默認(rèn)情況下只能合并分區(qū),如果想要增大分區(qū)數(shù),需要設(shè)置shuffle
=true
//合并成兩個(gè)分區(qū)
val value: RDD[Int] = rdd1.coalesce(5,true)
println(s"分區(qū)數(shù)${value.getNumPartitions}")
//查看合并之后的分區(qū)情況
value.mapPartitionsWithIndex((index,it)=>{
println(s"index=$index;it=${it.toList}")
it
}).collect
數(shù)據(jù)結(jié)果
分區(qū)數(shù)5
index=0;it=List(4, 9, 14, 19, 24, 27, 32, 37, 42, 47, 53, 58, 63, 68, 73, 78, 83, 88, 93, 98)
index=1;it=List(0, 5, 10, 15, 20, 28, 33, 38, 43, 48, 54, 59, 64, 69, 74, 79, 84, 89, 94, 99)
index=3;it=List(2, 7, 12, 17, 22, 25, 30, 35, 40, 45, 51, 56, 61, 66, 71, 76, 81, 86, 91, 96)
index=2;it=List(1, 6, 11, 16, 21, 29, 34, 39, 44, 49, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95)
index=4;it=List(3, 8, 13, 18, 23, 26, 31, 36, 41, 46, 52, 57, 62, 67, 72, 77, 82, 87, 92, 97)
源碼解析
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null): RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
// 判斷 shuffle 是否為true
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
先分析shuffle
=false情況
// this 當(dāng)前調(diào)用coalesce的rdd
// numPartitions 分區(qū)數(shù) 拧略;上面設(shè)置的是5
new CoalescedRDD(this, numPartitions, partitionCoalescer)
partitionCoalescer 我們并沒有指定,所以使用的是默認(rèn)的DefaultPartitionCoalescer
override def getPartitions: Array[Partition] = {
val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.partitions.map(_.index).toArray
CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}