什么是數(shù)據(jù)傾斜江掩?
Spark 的計(jì)算抽象如下
數(shù)據(jù)傾斜指的是:并行處理的數(shù)據(jù)集中芬萍,某一部分(如 Spark 或 Kafka 的一個 Partition)的數(shù)據(jù)顯著多于其它部分释簿,從而使得該部分的處理速度成為整個數(shù)據(jù)集處理的瓶頸。
如果數(shù)據(jù)傾斜不能解決应媚,其他的優(yōu)化手段再逆天都白搭扣讼,如同短板效應(yīng),任務(wù)完成的效率不是看最快的task,而是最慢的那一個垃帅。
數(shù)據(jù)傾導(dǎo)致的后果:
- 數(shù)據(jù)傾斜直接可能會導(dǎo)致一種情況:Out Of Memory 或者GC 超時延届。
- 任務(wù)不一定失敗,但是極端慢贸诚。(但是目前我遇到的數(shù)據(jù)傾斜幾乎都失敗了)
數(shù)據(jù)傾斜示意圖
如上圖所示
個別 ShuffleMapTask2 (98 億條數(shù)據(jù))處理過度大量數(shù)據(jù)方庭。導(dǎo)致拖慢了整個 Job 的執(zhí)行時間。
這可能導(dǎo)致該 Task 所在的機(jī)器 OOM酱固,或者運(yùn)行速度非常慢械念。
傾斜原理:
在進(jìn)行 shuffle 的時候,必須將各個節(jié)點(diǎn)上相同的 key 拉取到某個節(jié)點(diǎn)上的一個 task 來進(jìn)行處理运悲,比如按照 key 進(jìn)行聚合或 join 等操作龄减。此時如果某個 key 對應(yīng)的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜班眯。
比如大部分 key 對應(yīng) 幾十上百 條數(shù)據(jù)希停,但是個別 key 卻對應(yīng)了 成千上萬 條數(shù)據(jù)烁巫,那么大部分 task 可能就只會分配到 少量 數(shù)據(jù),然后 1 秒鐘就運(yùn)行完了宠能;
但是個別 task 可能分配到了 海量數(shù)據(jù)亚隙,要運(yùn)行一兩個小時。
因此违崇,整個 Spark 作業(yè)的運(yùn)行進(jìn)度是由運(yùn)行時間最長的那個 task 決定的阿弃。
注:由于同一個 Stage 內(nèi)的所有 Task 執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下羞延,不同 Task 之間耗時的差異主要由該 Task 所處理的數(shù)據(jù)量決定恤浪。
定位傾斜位置
可能觸發(fā)的算子
可能觸發(fā)的算子(不完全) |
---|
distinct |
groupByKey |
reduceByKey |
aggregateByKey |
join |
cogroup |
repartition |
count |
task 內(nèi)存溢出
這種情況下去定位出問題的代碼(觸發(fā)JOb的Action位)就比較容易了。
可以直接看 yarn-client 模式下本地 log 的異常棧肴楷,或者是通過 YARN 查看 yarn-cluster 模式下的 log 中的異常棧水由。
一般來說,通過異常棧信息就可以定位到你的代碼中哪一行(觸發(fā)JOb的Action位置)發(fā)生了內(nèi)存溢出和溢出的Stage是哪一個赛蔫。然后在那行代碼附近找找砂客,一般也會有 shuffle類算子,此時很可能就是這個算子導(dǎo)致了數(shù)據(jù)傾斜呵恢,但是是經(jīng)工作中發(fā)現(xiàn)鞠值,這個定位具體行數(shù)還是比較困難,因?yàn)槿罩局粫霈F(xiàn)觸發(fā)JOb的Action算子的代碼行數(shù)渗钉,而一個Job可能有多可shuffle階段彤恶,你要很了解任務(wù)的劃分才有可能找對位置。
要注意的是鳄橘,出現(xiàn)內(nèi)存溢出不一定就是傾斜声离。這只是一種可能而已。
task 執(zhí)行特別慢的情況
與上面類似瘫怜,雖然不報(bào)錯术徊,但是程序就在這里停住了,某部分task一直沒有完成鲸湃。
為了進(jìn)一步確定是否傾斜赠涮,最好的辦法是去看web ui,查看當(dāng)前task 所在Stage的所有task,看看執(zhí)行特別慢的task 運(yùn)行時間、所處理的數(shù)據(jù)量暗挑、GC等信息笋除。
如果與其他task差異較大,說明出現(xiàn)了傾斜問題炸裆,那我們接下來就該去解決問題了垃它。
key 的數(shù)據(jù)分布情況
我工作中因?yàn)闄?quán)限、環(huán)境等各種問題,無法查看Web UI 所以對于定位GC嗤瞎、OOM的問題特別難受~~~墙歪。
所以有時候采用很笨的方法來確定一下是否數(shù)據(jù)傾斜
上述表格中列舉了可能出現(xiàn)傾斜的算子,那么這些我們可以抽樣統(tǒng)計(jì)一下該算子操作的key對應(yīng)的數(shù)據(jù)量贝奇。如果key 的分布及不均勻虹菲,某種程度上也可以判定是出現(xiàn)了傾斜
df(dataFrame) 部分?jǐn)?shù)據(jù)如下
+--------+-----------+------------+------+--------+----+
| userid| zubo_nums| total_nums| nums| day|hour|
+--------+-----------+------------+------+--------+----+
| userid1| zubo_nums1| total_nums1| nums1|20190101| 00|
| userid2| zubo_nums2| total_nums2| nums2|20190101| 00|
| userid3| zubo_nums3| total_nums3| nums3|20190101| 00|
| userid4| zubo_nums4| total_nums4| nums4|20190101| 00|
| userid5| zubo_nums5| total_nums5| nums5|20190101| 00|
| userid6| zubo_nums6| total_nums6| nums6|20190101| 00|
| userid7| zubo_nums7| total_nums7| nums7|20190101| 00|
| userid8| zubo_nums8| total_nums8| nums8|20190101| 00|
| userid9| zubo_nums9| total_nums9| nums9|20190101| 00|
|userid10|zubo_nums10|total_nums10|nums10|20190101| 00|
+--------+-----------+------------+------+--------+----+
logger.info("\n df count=" +df.count())
df.sample(false,0.1).rdd.keyBy(row=>row.getAs("userid").toString).countByKey().foreach(println _)
df count=2058
多次抽樣對比
(userid88,3)
(userid99,1)
(userid61,2)
(userid50,2)
(userid34,2)
(userid1,33)
(userid39,4)
(userid83,3)
--------------------
(userid61,1)
(userid50,1)
(userid34,1)
(userid1,35)
(userid83,2)
(userid17,1)
(userid69,2)
---------
(userid99,2)
(userid61,1)
(userid50,2)
(userid34,2)
(userid1,25)
(userid39,1)
(userid83,1)
(userid94,2)
(userid17,1)
從上述抽樣結(jié)果接可以看出,userid1這個key數(shù)量明顯多余其他key掉瞳。
多次抽樣也可以看出毕源,這樣統(tǒng)計(jì)一定程度上可以反應(yīng)傾斜的問題并且可以確定傾斜的key,這樣對于我們后續(xù)解決傾斜問題有一定的幫助。
解決數(shù)據(jù)傾斜
從源端數(shù)據(jù)解決
下面距兩個例子說明:
kafka數(shù)據(jù)源
我們一般通過 DirectStream 方式讀取 Kafka數(shù)據(jù)陕习。
由于 Kafka 的每一個 Partition 對應(yīng) Spark 的一個Task(Partition)霎褐,所以 Kafka 內(nèi)相關(guān) Topic 的各Partition 之間數(shù)據(jù)是否平衡,直接決定 Spark處理該數(shù)據(jù)時是否會產(chǎn)生數(shù)據(jù)傾斜该镣。
Kafka 某一 Topic 內(nèi)消息在不同 Partition之間的分布冻璃,主要由 Producer 端所使用的Partition 實(shí)現(xiàn)類決定。
如果使用隨機(jī) Partitioner损合,則每條消息會隨機(jī)發(fā)送到一個 Partition 中省艳,從而從概率上來講,各Partition間的數(shù)據(jù)會達(dá)到平衡嫁审。此時源 Stage(直接讀取 Kafka 數(shù)據(jù)的 Stage)不會產(chǎn)生數(shù)據(jù)傾斜跋炕。
所以如果業(yè)務(wù)沒有特別需求,我們可以在Producer端的 Partitioner 采用隨機(jī)的方式律适,并且可以每個批次數(shù)據(jù)量適當(dāng)增加 Partition 的數(shù)量辐烂,達(dá)到增加task目的。
但是很多業(yè)務(wù)要求將具備同一特征的數(shù)據(jù)順序消費(fèi)捂贿,此時就需要將具有相同特征的數(shù)據(jù)放于同一個 Partition 中纠修。比如某個地市、區(qū)域的數(shù)據(jù)需要放在一個Partition 中眷蜓,如果此時出現(xiàn)了數(shù)據(jù)傾斜分瘾,就只能采用其他的辦法解決了。
hive數(shù)據(jù)源
如果數(shù)據(jù)源是來自hive,那么我們可以考慮在hive端就針對該key一次etl處理吁系。
如果評估可行,那我們在Spark就可以在Spark端使用etl后的數(shù)據(jù)了白魂,也就不用Spark中執(zhí)行之前傾斜的部分的邏輯了汽纤。
優(yōu)點(diǎn):實(shí)現(xiàn)起來簡單便捷,效果不錯福荸,完全規(guī)避掉了數(shù)據(jù)傾斜蕴坪,Spark 作業(yè)的性能會大幅度提升。
缺點(diǎn):治標(biāo)不治本,我們只是把數(shù)據(jù)傾斜提前到了hive端背传,Hive ETL 中還是會發(fā)生數(shù)據(jù)傾斜呆瞻,所以我們還是避免不了要在hive端處理傾斜問題。
適用情況:
因?yàn)楸举|(zhì)上沒有解決數(shù)據(jù)傾斜的問題径玖,我們只有解決了Hive端數(shù)據(jù)傾斜的問題才算真正的解決這個問題痴脾。
所以當(dāng)hive端的數(shù)據(jù)僅僅被調(diào)用一兩次的時候,這樣做性能提升不大梳星;
但是當(dāng)頻繁的調(diào)用相關(guān)數(shù)據(jù)的時候赞赖,如果在Spark調(diào)用Hive etl后的數(shù)據(jù)是就不會出現(xiàn)數(shù)據(jù)傾斜的問題,這樣性能的提升就非吃┰郑可觀了
調(diào)整并行度
原理:調(diào)整并行度前域,分散同一個 Task 的不同 Key 到更多的Task
注意:調(diào)整并行度不一定是增加,也可能是減少韵吨,目的是為了匿垄,分散同一個 Task 中傾斜 Key 到更多的Task,所以如果減少并行度可以實(shí)現(xiàn)归粉,也是可以的
對于Spark Sql配置下列參數(shù)spark.sql.shuffle.partitions
對于RDD,可以對shuflle算子設(shè)置并行度年堆,如
rdd.map(p=>(p._1,1)).reduceByKey( (c1, c2)=>(c1+c2),1000)
默認(rèn)使用HashPartitioner,并行度默認(rèn)為 spark.default.parallelism
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
優(yōu)點(diǎn):實(shí)現(xiàn)起來比較簡單盏浇,理論上可以有效緩解和減輕數(shù)據(jù)傾斜的影響变丧。
方案缺點(diǎn):只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題绢掰,對于某個key傾斜的情況毫無辦法痒蓬,因?yàn)闊o論你設(shè)置并行度為多少,相同的key總會在同一個partition中
一般如果出現(xiàn)數(shù)據(jù)傾斜滴劲,都可以通過這種方法先試驗(yàn)幾次攻晒,如果問題未解決,再嘗試其它方法班挖。
適用場景少鲁捏,只能將分配到同一 Task 的不同 Key 分散開,但對于同一 Key 傾斜嚴(yán)重的情況該方法并不適用萧芙。
并且該方法一般只能緩解數(shù)據(jù)傾斜给梅,沒有徹底消除問題。
根據(jù)我工作遇到傾斜問題的來看双揪,這方法有一定效果但是作用不大动羽,還沒試過只調(diào)整并行度就直接解決的案例。
自定義分區(qū)函數(shù)
原理:使用自定義的 Partitioner(默認(rèn)為 HashPartitioner)渔期,將原本被分配到同一個 Task 的不同 Key 分配到不同 Task运吓。
class CustomerPartitioner(numParts: Int) extends Partitioner{
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
//自定義分區(qū)
val id: Int = key.toString.toInt
//這里自定義分區(qū)的方式比較靈活渴邦,可以根據(jù)key的分布設(shè)計(jì)不同的計(jì)算方式
if (id <= 10000) //10000 以內(nèi)的id容易出現(xiàn)傾斜
return new java.util.Random().nextInt(10000) % numPartitions
else
return id % numPartitions
}
}
rdd.map(p=>(p._1,1)).groupByKey(new CustomerPartitioner(10))
適用場景:大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過大。
優(yōu)點(diǎn):不影響原有的并行度設(shè)計(jì)拘哨。如果改變并行度谋梭,后續(xù) Stage 的并行度也會默認(rèn)改變,可能會影響后續(xù) Stage倦青。
缺點(diǎn):適用場景有限瓮床,只能將不同 Key 分散開,對于同一 Key 對應(yīng)數(shù)據(jù)集非常大的場景不適用姨夹。
效果與調(diào)整并行度類似纤垂,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜。
而且不夠靈活磷账,需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的 Partitioner(即需要非常了解key的分分布)峭沦。
ReduceJoin轉(zhuǎn)MapJoin(Broadcast )
原理:如果一個 RDD 是比較小的,則可以采用廣播小 RDD 全量數(shù)據(jù) +map 算子來實(shí)現(xiàn)與 join 同樣的效果逃糟,也就是 map join吼鱼,此時就不會發(fā)生 shuffle 操作,也就不會發(fā)生數(shù)據(jù)傾斜绰咽。
示意圖
優(yōu)點(diǎn):對 join 操作導(dǎo)致的數(shù)據(jù)傾斜菇肃,效果非常好,因?yàn)楦揪筒粫l(fā)生 shuffle取募,也就根本不會發(fā)生數(shù)據(jù)傾斜琐谤。
缺點(diǎn):要求參與 Join的一側(cè)數(shù)據(jù)集足夠小,并且主要適用于 Join 的場景玩敏,不適合聚合的場景斗忌,適用條件有限。
通過 Spark 的 Broadcast 機(jī)制旺聚,將 Reduce 側(cè) Join 轉(zhuǎn)化為 Map 側(cè) Join织阳,避免 Shuffle 從而完全消除 Shuffle 帶來的數(shù)據(jù)傾斜。
Web UI的DAG圖如下
MapJoin
相關(guān)參數(shù):
將 Broadcast 的閾值設(shè)置得足夠大
SET spark.sql.autoBroadcastJoinThreshold=10485760
局部聚合+全局聚合
原理:將原本相同的 key 通過附加隨機(jī)前綴的方式砰粹,變成多個不同的 key唧躲,就可以讓原本被一個 task 處理的數(shù)據(jù)分散到多個 task 上去做局部聚合,進(jìn)而解決單個 task 處理數(shù)據(jù)量過多的問題碱璃。接著去除掉隨機(jī)前綴弄痹,再次進(jìn)行全局聚合,就可以得到最終的結(jié)果厘贼。
rdd1
.map(s=>(new Random().nextInt(100)+"_"+s._1,s._2))//添加前綴
.reduceByKey(_+_)//局部聚合
.map(s=>(s._1.split("_")(1),s._2))//去除前綴
.reduceByKey(_+_)//全局聚合
適用場景:對 RDD 執(zhí)行 reduceByKey 等聚合類 shuffle 算子或者在 Spark SQL 中使用 group by 語句進(jìn)行分組聚合時界酒,比較適用這種方案。
優(yōu)點(diǎn):對于聚合類的 shuffle 操作導(dǎo)致的數(shù)據(jù)傾斜嘴秸,效果是非常不錯的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜岳掐,將 Spark 作業(yè)的性能提升數(shù)倍以上凭疮。
缺點(diǎn):僅僅適用于聚合類的 shuffle 操作,適用范圍相對較窄串述。如果是 join 類的 shuffle 操作执解,還得用其他的解決方案。
傾斜 key 增加隨機(jī)前/后綴
實(shí)現(xiàn)原理:將傾斜的key 與非傾斜的key 分別與右表join纲酗,得到skewedJoinRDD和unskewedJoinRDD最后unoin得到最終結(jié)果
skewedJoinRDD部分實(shí)現(xiàn)步驟:
- 將 rddLeft 中傾斜的 key(即 userid1 與 userid2)對應(yīng)的數(shù)據(jù)單獨(dú)過濾出來衰腌,且加上 1 到 n 的隨機(jī)前綴)形成單獨(dú)的 left: RDD[(String, Int)]。
- 將 rddRight 中傾斜 key 對應(yīng)的數(shù)據(jù)抽取出來觅赊,并通過 flatMap 操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為 n 條數(shù)據(jù)(每條分別加上 1 到 n 的隨機(jī)前綴)右蕊,形成單獨(dú)的 right: RDD[(String, String)]。
- 將 left 與 right 進(jìn)行 Join吮螺,并將并行度設(shè)置為 n饶囚,且在 Join 過程中將隨機(jī)前綴去掉,得到傾斜數(shù)據(jù)集的 Join 結(jié)果 skewedJoinRDD鸠补。
unskewedJoinRDD部分實(shí)現(xiàn)步驟:
- 將 rddLeft: RDD[(String, Int)] 中不包含傾斜 Key 的數(shù)據(jù)抽取出來作為單獨(dú)的 leftUnSkewRDD萝风。
- 對 leftUnSkewRDD 與原始的 rddRight: RDD[(String, String)] 進(jìn)行Join,并行度也設(shè)置為 n紫岩,得到 Join 結(jié)果 unskewedJoinRDD规惰。
- 通過 union 算子將 skewedJoinRDD 與 unskewedJoinRDD 進(jìn)行合并,從而得到完整的 Join 結(jié)果集泉蝌。
實(shí)現(xiàn)代碼
def prix(): Unit = {
val sc = spark.sparkContext
val rddLeft: RDD[(String, Int)] = srdd.rdd.keyBy(row => row.getAs("userid").toString).map(p => (p._1, 1))
val rddRight: RDD[(String, String)] = srdd.rdd.keyBy(row => row.getAs("userid").toString).map(p => (p._1, p._2.getAs("nums").toString))
val skewedKeySet = Set("userid1", "userid2") //傾斜的key
val addList: Seq[Int] = 1 to 24 //右表前綴
val skewedKey: Broadcast[Set[String]] = sc.broadcast(skewedKeySet) //廣播傾斜key
val addLisPrix: Broadcast[Seq[Int]] = sc.broadcast(addList) //廣播右表前綴
val left: RDD[(String, Int)] = rddLeft
.filter(kv => skewedKey.value.contains(kv._1)) //左表篩選傾斜key
.map(kv => (new Random().nextInt(24) + "," + kv._1, kv._2)) //傾斜key增加前綴
val leftUnSkewRDD: RDD[(String, Int)] = rddLeft
.filter(kv => !skewedKey.value.contains(kv._1)) //左表篩選非傾斜key
val right: RDD[(String, String)] = rddRight
.filter(kv => skewedKey.value.contains(kv._1)) //右表篩選傾斜key
.map(kv => (addLisPrix.value.map(str => (str + "," + kv._1, kv._2)))) //右表傾斜key每個增加1 to 24 的前綴
.flatMap(kv => kv.iterator)
val skewedJoinRDD: RDD[(String, String)] = left
.join(right, 100) //關(guān)聯(lián)操作
.mapPartitions(kv => kv.map(str => (str._1.split(",")(1), str._2._2))) //去除前綴
val unskewedJoinRDD: RDD[(String, String)] = leftUnSkewRDD
.join(rddRight, 100) //非傾斜關(guān)聯(lián)操作
.mapPartitions(kv => kv.map(str => (str._1, str._2._2)))
//合并傾斜與非傾斜key
skewedJoinRDD.union(unskewedJoinRDD).collect().foreach(println _)
}
用場景:兩張表都比較大歇万,無法使用 Map 側(cè) Join。其中一個 RDD 有少數(shù)幾個 Key 的數(shù)據(jù)量過大梨与,另外一個 RDD 的 Key 分布較為均勻堕花。
優(yōu)點(diǎn):相對于 Map 側(cè) Join,更能適應(yīng)大數(shù)據(jù)集的 Join粥鞋。
如果資源充足缘挽,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行,效率提升明顯呻粹。
且只針對傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展壕曼,增加的資源消耗有限。
缺點(diǎn):如果傾斜 Key 非常多等浊,則另一側(cè)數(shù)據(jù)膨脹非常大腮郊,此方案不適用。
而且此時對傾斜 Key 與非傾斜 Key 分開處理筹燕,需要掃描數(shù)據(jù)集兩遍轧飞,增加了開銷衅鹿。
傾斜表隨機(jī)添加n種隨機(jī)前綴,小表擴(kuò)大n倍
原理:將包含傾斜 key 的rdd通過附加隨機(jī)前綴 1 to n 變成不一樣的 key过咬,然后就可以將這些處理后的 “不同key” 分散到多個 task 中去處理大渤。通過每條記錄增加前綴 1 to n 擴(kuò)容非傾斜 rdd ,然后再join
(此方法還有一個變體掸绞,就是將傾斜的key拉出來添加n種隨機(jī)前綴泵三,小表擴(kuò)大n倍,傾斜與非傾斜分開來衔掸,類似上一個例子)
實(shí)現(xiàn)原理
def prixAndMul(): Unit = {
val sc = spark.sparkContext
val rddLeft: RDD[(String, Int)] = srdd.rdd.keyBy(row => row.getAs("userid").toString).map(p => (p._1, 1))
val rddRight: RDD[(String, String)] = srdd.rdd.keyBy(row => row.getAs("userid").toString).map(p => (p._1, p._2.getAs("nums").toString))
val skewedKeySet = Set("userid1", "userid2") //傾斜的key
val addList: Seq[Int] = 1 to 24 //右表前綴
val addLisPrix: Broadcast[Seq[Int]] = sc.broadcast(addList) //廣播右表前綴
val left: RDD[(String, Int)] = rddLeft
.map(kv => (new Random().nextInt(24) + "," + kv._1, kv._2)) //傾斜key增加前綴
val right: RDD[(String, String)] = rddRight
.map(kv => (addLisPrix.value.map(str => (str + "," + kv._1, kv._2)))) //右表傾斜key每個增加1 to 24 的前綴
.flatMap(kv => kv.iterator)
val resultRDD: RDD[(String, String)] = left
.join(right, 100) //關(guān)聯(lián)操作
.mapPartitions(kv => kv.map(str => (str._1.split(",")(1), str._2._2))) //去除前綴
resultRDD.collect().foreach(println _)
}
優(yōu)點(diǎn):對 join 類型的數(shù)據(jù)傾斜基本都可以處理烫幕,而且效果也相對比較顯著,性能提升效果非常不錯敞映。
缺點(diǎn):該方案更多的是緩解數(shù)據(jù)傾斜较曼,而不是徹底避免數(shù)據(jù)傾斜。而且需要對整個 RDD 進(jìn)行擴(kuò)容驱显,對內(nèi)存資源要求很高诗芜。
該方案至少能保證程序能夠運(yùn)行完成,速度的話看實(shí)際情況了埃疫,畢竟先跑通再優(yōu)化伏恐。
過濾少數(shù)導(dǎo)致傾斜的 key
對于數(shù)據(jù)要求不是很嚴(yán)謹(jǐn)?shù)那闆r,可以通過抽樣獲取傾斜key 栓霜,然后直接過濾掉
關(guān)于數(shù)據(jù)傾斜翠桦,沒有一個固定的解決辦法,要根據(jù)數(shù)據(jù)的實(shí)際情況胳蛮,靈活采用各種方案解決