sortBy
官方文檔描述:
Return this RDD sorted by the given key function.
函數(shù)原型:
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]
**
sortBy根據(jù)給定的f函數(shù)將RDD中的元素進(jìn)行排序呵俏。
**
源碼分析:
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
val cleanedF = sc.clean(f)
map(x => (cleanedF(x), x))
}
**
從源碼中可以看出堆缘,sortBy函數(shù)的實(shí)現(xiàn)依賴于sortByKey函數(shù)。該函數(shù)接受三個(gè)參數(shù)普碎,第一參數(shù)是一個(gè)函數(shù)吼肥,該函數(shù)帶有泛型參數(shù)T,返回類型與RDD中的元素類型一致麻车,主要是用keyBy函數(shù)的map轉(zhuǎn)化骚秦,將每個(gè)元素轉(zhuǎn)化為tuples類型的元素扳抽;第二個(gè)參數(shù)是ascending,該參數(shù)是可選參數(shù),主要用于RDD中的元素的排序方式金赦,默認(rèn)是true掌唾,是升序今野;第三個(gè)參數(shù)是numPartitions照藻,該參數(shù)也是可選參數(shù),主要使用對(duì)排序后的RDD進(jìn)行分區(qū)彼水,默認(rèn)的分區(qū)個(gè)數(shù)與排序前一致是partitions.length崔拥。
**
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
final Random random = new Random(100);
//對(duì)RDD進(jìn)行轉(zhuǎn)換,每個(gè)元素有兩部分組成
JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() {
@Override
public String call(Integer v1) throws Exception {
return v1.toString() + "_" + random.nextInt(100);
}
});
System.out.println(javaRDD1.collect());
//按RDD中每個(gè)元素的第二部分進(jìn)行排序
JavaRDD<String> resultRDD = javaRDD1.sortBy(new Function<String, Object>() {
@Override
public Object call(String v1) throws Exception {
return v1.split("_")[1];
}
},false,3);
System.out.println("result--------------" + resultRDD.collect());
takeOrdered
官方文檔描述:
Returns the first k (smallest) elements from this RDD using the
natural ordering for T while maintain the order.
函數(shù)原型:
def takeOrdered(num: Int): JList[T]
def takeOrdered(num: Int, comp: Comparator[T]): JList[T]
**
takeOrdered函數(shù)用于從RDD中凤覆,按照默認(rèn)(升序)或指定排序規(guī)則链瓦,返回前num個(gè)元素。
**
源碼分析:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
**
從源碼分析可以看出盯桦,利用mapPartitions在每個(gè)分區(qū)里面進(jìn)行分區(qū)排序慈俯,每個(gè)分區(qū)局部排序只返回num個(gè)元素,這里注意返回的mapRDDs的元素是BoundedPriorityQueue優(yōu)先隊(duì)列拥峦,再針對(duì)mapRDDs進(jìn)行reduce函數(shù)操作贴膘,轉(zhuǎn)化為數(shù)組進(jìn)行全局排序。
**
實(shí)例:
//注意comparator需要序列化
public static class TakeOrderedComparator implements Serializable,Comparator<Integer>{
@Override
public int compare(Integer o1, Integer o2) {
return -o1.compareTo(o2);
}
}
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeOrdered-----1-------------" + javaRDD.takeOrdered(2));
List<Integer> list = javaRDD.takeOrdered(2, new TakeOrderedComparator());
System.out.println("takeOrdered----2--------------" + list);
takeSample
官方文檔描述:
Return a fixed-size sampled subset of this RDD in an array
函數(shù)原型:
def takeSample(withReplacement: Boolean, num: Int): JList[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T]
**
takeSample函數(shù)返回一個(gè)數(shù)組事镣,在數(shù)據(jù)集中隨機(jī)采樣 num 個(gè)元素組成步鉴。
**
源碼分析:
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] =
{
val numStDev = 10.0
if (num < 0) {
throw new IllegalArgumentException("Negative number of elements requested")
} else if (num == 0) {
return new Array[T](0)
}
val initialCount = this.count()
if (initialCount == 0) {
return new Array[T](0)
}
val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
if (num > maxSampleSize) {
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)")
}
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
return Utils.randomizeInPlace(this.collect(), rand)
}
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for the initial size
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
**
從源碼中可以看出揪胃,takeSample函數(shù)類似于sample函數(shù)璃哟,該函數(shù)接受三個(gè)參數(shù)氛琢,第一個(gè)參數(shù)withReplacement ,表示采樣是否放回随闪,true表示有放回的采樣阳似,false表示無(wú)放回采樣;第二個(gè)參數(shù)num铐伴,表示返回的采樣數(shù)據(jù)的個(gè)數(shù)撮奏,這個(gè)也是takeSample函數(shù)和sample函數(shù)的區(qū)別;第三個(gè)參數(shù)seed当宴,表示用于指定的隨機(jī)數(shù)生成器種子畜吊。另外,takeSample函數(shù)先是計(jì)算fraction户矢,也就是采樣比例玲献,然后調(diào)用sample函數(shù)進(jìn)行采樣,并對(duì)采樣后的數(shù)據(jù)進(jìn)行collect()梯浪,最后調(diào)用take函數(shù)返回num個(gè)元素捌年。注意,如果采樣個(gè)數(shù)大于RDD的元素個(gè)數(shù)挂洛,且選擇的無(wú)放回采樣礼预,則返回RDD的元素的個(gè)數(shù)。
**
實(shí)例:
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeSample-----1-------------" + javaRDD.takeSample(true,2));
System.out.println("takeSample-----2-------------" + javaRDD.takeSample(true,2,100));
//返回20個(gè)元素
System.out.println("takeSample-----3-------------" + javaRDD.takeSample(true,20,100));
//返回7個(gè)元素
System.out.println("takeSample-----4-------------" + javaRDD.takeSample(false,20,100));