【Spark Java API】Action(4)—sortBy让禀、takeOrdered、takeSample

sortBy


官方文檔描述:

Return this RDD sorted by the given key function.

函數(shù)原型:

def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]

**
sortBy根據(jù)給定的f函數(shù)將RDD中的元素進(jìn)行排序呵俏。
**

源碼分析:

def sortBy[K](   
   f: (T) => K,    
  ascending: Boolean = true,    
  numPartitions: Int = this.partitions.length)    
  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {  
    this.keyBy[K](f)      
    .sortByKey(ascending, numPartitions)      
    .values
}
/** 
* Creates tuples of the elements in this RDD by applying `f`. 
*/
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {  
  val cleanedF = sc.clean(f)  
  map(x => (cleanedF(x), x))
}

**
從源碼中可以看出堆缘,sortBy函數(shù)的實(shí)現(xiàn)依賴于sortByKey函數(shù)。該函數(shù)接受三個(gè)參數(shù)普碎,第一參數(shù)是一個(gè)函數(shù)吼肥,該函數(shù)帶有泛型參數(shù)T,返回類型與RDD中的元素類型一致麻车,主要是用keyBy函數(shù)的map轉(zhuǎn)化骚秦,將每個(gè)元素轉(zhuǎn)化為tuples類型的元素扳抽;第二個(gè)參數(shù)是ascending,該參數(shù)是可選參數(shù),主要用于RDD中的元素的排序方式金赦,默認(rèn)是true掌唾,是升序今野;第三個(gè)參數(shù)是numPartitions照藻,該參數(shù)也是可選參數(shù),主要使用對(duì)排序后的RDD進(jìn)行分區(qū)彼水,默認(rèn)的分區(qū)個(gè)數(shù)與排序前一致是partitions.length崔拥。
**

實(shí)例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
final Random random = new Random(100);
//對(duì)RDD進(jìn)行轉(zhuǎn)換,每個(gè)元素有兩部分組成
JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() {    
  @Override    
  public String call(Integer v1) throws Exception {        
    return v1.toString() + "_" + random.nextInt(100);    
  }
});
System.out.println(javaRDD1.collect());
//按RDD中每個(gè)元素的第二部分進(jìn)行排序
JavaRDD<String> resultRDD = javaRDD1.sortBy(new Function<String, Object>() {    
  @Override    
  public Object call(String v1) throws Exception {        
    return v1.split("_")[1];    
  }
},false,3);
System.out.println("result--------------" + resultRDD.collect());

takeOrdered


官方文檔描述:

Returns the first k (smallest) elements from this RDD using the 
natural ordering for T while maintain the order.

函數(shù)原型:

def takeOrdered(num: Int): JList[T]
def takeOrdered(num: Int, comp: Comparator[T]): JList[T]

**
takeOrdered函數(shù)用于從RDD中凤覆,按照默認(rèn)(升序)或指定排序規(guī)則链瓦,返回前num個(gè)元素。
**

源碼分析:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {  
  if (num == 0) {    
    Array.empty  
  } else {    
    val mapRDDs = mapPartitions { items =>      
    // Priority keeps the largest elements, so let's reverse the ordering.      
    val queue = new BoundedPriorityQueue[T](num)(ord.reverse)      
    queue ++= util.collection.Utils.takeOrdered(items, num)(ord)      
    Iterator.single(queue)    
  }    
  if (mapRDDs.partitions.length == 0) {      
    Array.empty    
  } else {      
    mapRDDs.reduce { (queue1, queue2) =>        
      queue1 ++= queue2        
      queue1      
  }.toArray.sorted(ord)    
  }  
 }
}

**
從源碼分析可以看出盯桦,利用mapPartitions在每個(gè)分區(qū)里面進(jìn)行分區(qū)排序慈俯,每個(gè)分區(qū)局部排序只返回num個(gè)元素,這里注意返回的mapRDDs的元素是BoundedPriorityQueue優(yōu)先隊(duì)列拥峦,再針對(duì)mapRDDs進(jìn)行reduce函數(shù)操作贴膘,轉(zhuǎn)化為數(shù)組進(jìn)行全局排序。
**

實(shí)例:

//注意comparator需要序列化
public static class TakeOrderedComparator implements Serializable,Comparator<Integer>{    
    @Override    
    public int compare(Integer o1, Integer o2) {        
      return -o1.compareTo(o2);    
    }
}
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeOrdered-----1-------------" + javaRDD.takeOrdered(2));
List<Integer> list = javaRDD.takeOrdered(2, new TakeOrderedComparator());
System.out.println("takeOrdered----2--------------" + list);

takeSample


官方文檔描述:

Return a fixed-size sampled subset of this RDD in an array

函數(shù)原型:

def takeSample(withReplacement: Boolean, num: Int): JList[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] 

**
takeSample函數(shù)返回一個(gè)數(shù)組事镣,在數(shù)據(jù)集中隨機(jī)采樣 num 個(gè)元素組成步鉴。
**

源碼分析:

def takeSample(    
  withReplacement: Boolean,    
  num: Int,    
  seed: Long = Utils.random.nextLong): Array[T] = 
{  
    val numStDev = 10.0  
    if (num < 0) {    
      throw new IllegalArgumentException("Negative number of elements requested")  
    } else if (num == 0) {    
      return new Array[T](0)  
    }  
    val initialCount = this.count()  
    if (initialCount == 0) {    
      return new Array[T](0)  
    }
    val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt  
    if (num > maxSampleSize) {    
      throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +      s"$numStDev * math.sqrt(Int.MaxValue)")  
    }  
    val rand = new Random(seed)    
    if (!withReplacement && num >= initialCount) {    
      return Utils.randomizeInPlace(this.collect(), rand)  
    }  
    val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,    withReplacement)  
    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()  
    // If the first sample didn't turn out large enough, keep trying to take samples;  
    // this shouldn't happen often because we use a big multiplier for the initial size  
    var numIters = 0  
    while (samples.length < num) {    
      logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")    
      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()    
      numIters += 1  
  }  
  Utils.randomizeInPlace(samples, rand).take(num)
}

**
從源碼中可以看出揪胃,takeSample函數(shù)類似于sample函數(shù)璃哟,該函數(shù)接受三個(gè)參數(shù)氛琢,第一個(gè)參數(shù)withReplacement ,表示采樣是否放回随闪,true表示有放回的采樣阳似,false表示無(wú)放回采樣;第二個(gè)參數(shù)num铐伴,表示返回的采樣數(shù)據(jù)的個(gè)數(shù)撮奏,這個(gè)也是takeSample函數(shù)和sample函數(shù)的區(qū)別;第三個(gè)參數(shù)seed当宴,表示用于指定的隨機(jī)數(shù)生成器種子畜吊。另外,takeSample函數(shù)先是計(jì)算fraction户矢,也就是采樣比例玲献,然后調(diào)用sample函數(shù)進(jìn)行采樣,并對(duì)采樣后的數(shù)據(jù)進(jìn)行collect()梯浪,最后調(diào)用take函數(shù)返回num個(gè)元素捌年。注意,如果采樣個(gè)數(shù)大于RDD的元素個(gè)數(shù)挂洛,且選擇的無(wú)放回采樣礼预,則返回RDD的元素的個(gè)數(shù)。
**

實(shí)例:

List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeSample-----1-------------" + javaRDD.takeSample(true,2));
System.out.println("takeSample-----2-------------" + javaRDD.takeSample(true,2,100));
//返回20個(gè)元素
System.out.println("takeSample-----3-------------" + javaRDD.takeSample(true,20,100));
//返回7個(gè)元素
System.out.println("takeSample-----4-------------" + javaRDD.takeSample(false,20,100));
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末虏劲,一起剝皮案震驚了整個(gè)濱河市托酸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柒巫,老刑警劉巖励堡,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異吻育,居然都是意外死亡念秧,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門布疼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)摊趾,“玉大人,你說(shuō)我怎么就攤上這事游两±悖” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵贱案,是天一觀的道長(zhǎng)肛炮。 經(jīng)常有香客問(wèn)我,道長(zhǎng),這世上最難降的妖魔是什么侨糟? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任碍扔,我火速辦了婚禮,結(jié)果婚禮上秕重,老公的妹妹穿的比我還像新娘不同。我一直安慰自己,他們只是感情好溶耘,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布二拐。 她就那樣靜靜地躺著,像睡著了一般凳兵。 火紅的嫁衣襯著肌膚如雪百新。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,156評(píng)論 1 308
  • 那天庐扫,我揣著相機(jī)與錄音饭望,去河邊找鬼。 笑死聚蝶,一個(gè)胖子當(dāng)著我的面吹牛杰妓,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播碘勉,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼巷挥,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了验靡?” 一聲冷哼從身側(cè)響起倍宾,我...
    開(kāi)封第一講書(shū)人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胜嗓,沒(méi)想到半個(gè)月后高职,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡辞州,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年怔锌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片变过。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡埃元,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出媚狰,到底是詐尸還是另有隱情岛杀,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布崭孤,位于F島的核電站类嗤,受9級(jí)特大地震影響糊肠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜遗锣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一货裹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧黄伊,春花似錦泪酱、人聲如沸派殷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)毡惜。三九已至拓轻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間经伙,已是汗流浹背扶叉。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留帕膜,地道東北人枣氧。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像垮刹,于是被迫代替她去往敵國(guó)和親达吞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容