在進行Spark開發(fā)算法時,最有用的一個函數(shù)就是reduceByKey临扮。
reduceByKey的作用對像是(key, value)形式的rdd者蠕,而reduce有減少、壓縮之意桩了,reduceByKey的作用就是對相同key的數(shù)據(jù)進行處理,最終每個key只保留一條記錄埠戳。
保留一條記錄通常有兩種結(jié)果井誉。一種是只保留我們希望的信息,比如每個key出現(xiàn)的次數(shù)整胃。第二種是把value聚合在一起形成列表颗圣,這樣后續(xù)可以對value做進一步的操作,比如排序屁使。
常用方式舉例
比如現(xiàn)在我們有數(shù)據(jù)goodsSale:RDD[(String, String)]在岂,兩個字段分別是goodsid、單個訂單中的銷售額蛮寂,現(xiàn)在我們需要統(tǒng)計每個goodsid的銷售額蔽午。
我們只需要保留每個goodsid的累記銷售額,可以使用如下語句來實現(xiàn):
val goodsSaleSum = goodsSale.reduceByKey((x,y) => x+y)
熟悉之后你可以使用更簡潔的方式:
val goodsSaleSum = goodsSale.reduceByKey(_+_)
reduceByKey會尋找相同key的數(shù)據(jù)酬蹋,當(dāng)找到這樣的兩條記錄時會對其value(分別記為x,y)做(x,y) => x+y
的處理及老,即只保留求和之后的數(shù)據(jù)作為value。反復(fù)執(zhí)行這個操作直至每個key只留下一條記錄范抓。
現(xiàn)在假設(shè)goodsSaleSum還有一個字段類目id骄恶,即 RDD[(String, String, String)] 形式,三個字段分別是類目id匕垫、goodsid叠蝇、總銷量,現(xiàn)在我們要獲得第個類目id下銷量最高的一個商品年缎。
上一步聚是保留value求和之后的數(shù)據(jù)悔捶,而這里其實我們只需要保留銷量更高的那條記錄。不過我們不能直接對RDD[(String, String, String)]類型的數(shù)據(jù)使用reduceByKey方法单芜,因為這并不是一個(key, value)形式的數(shù)據(jù)蜕该,所以需要使用map方法轉(zhuǎn)化一下類型。
val catGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3)))
.reduceByKey((x, y) => if (x._2.toDouble > y._2.toDouble) x else y)
.map(x => (x._1, x._2._1, x._2._2)
再進一步洲鸠,假設(shè)現(xiàn)在我們有一個任務(wù):推薦5個銷售額最高的類目堂淡,并為每個類目推薦一個銷售額最高的商品,而我們的數(shù)據(jù)就是上述RDD[(String, String, String)類型的goodsSaleSum扒腕。
這需要兩步绢淀,一是計算每個類目的銷售額,這和舉的第一個例子一樣瘾腰。二是找出每個類目下銷量最高的商品皆的,這和第二個例子一樣。實際上蹋盆,我們可以只實用一個reduceByKey就達到上面的目的费薄。
val catIdGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3, x._3)))
.reduceByKey((x, y) => if (x._2 > y._2) (x._1, x._2, x._3+y._3) else (y._1, y._2, x._3+y._3))
.map( x => (x._1, x._2._1, x._2._2, x._2._3)
.sortBy(_._3, false)
.take(5)
由于我們需要計算每個類目的總銷售額硝全,同時需要保留商品的銷售額,所以先使用map增加一個字段用來記錄類目的總銷售額楞抡。這樣一來伟众,我們就可以使用reduceByKey同時完成前兩個例子的操作。
剩下的就是進行排序并獲取前5條記錄召廷。
聚合方式舉例
上述的三個例子都是只保留需要的信息凳厢,但有時我們需要將value聚合在一起進行排序操作,比如對每個類目下的商品按銷售額進行排序竞慢。
假設(shè)我們的數(shù)據(jù)是 RDD[(String, String, String)]数初,三個字段分別是類目id、goodsid梗顺、銷售額泡孩。
若是使用sql,那我們直接用row_number函數(shù)就可以很簡單的使用分類目排序這個任務(wù)寺谤。
但由于spark-sql占用的資源會比RDD多不少仑鸥,在開發(fā)任務(wù)時并不建議使用spark-sql。
我們的方法是通過reduceByKey把商品聚合成一個List变屁,然后對這個List進行排序眼俊,再使用flatMapValues攤平數(shù)據(jù)。
我們在使用reduceyByKey時會注意到粟关,兩個value聚合后的數(shù)據(jù)類型必須和之前一致疮胖。
所以在聚合商品時我們也需要保證這點,通常有兩種方法闷板,一是使用ListBuffer澎灸,即可變長度的List。二是使用String遮晚,以分隔符來區(qū)分商品和銷售額性昭。下面我們使用第一種方式完成這個任務(wù)。
val catIdGoodsIdSorted = goodsGmvSum.map(x => (x._1, ListBuffer(x._2, x._3.toDouble)))
.reduceByKey((x, y) => x++y)
.flatMapValues( x => x.toList.sortBy(_._2).reverse.zipWithIndex)
上述zipWithIndex給列表增加一個字段县遣,用來記錄元素的位置信息糜颠。而flatMapValues可以把List的每個元素單獨拆成一條記錄,詳細的說明可以參考我寫的另一篇文章Spark入門-常用函數(shù)匯總
小結(jié)
我在本文中介紹了reduceByKey的三種作用:
求和匯總
獲得每個key下value最大的記錄
聚合value形成一個List之后進行排序