Transformations(轉(zhuǎn)換)
下表列出了一些 Spark 常用的 transformations(轉(zhuǎn)換)。詳情請(qǐng)參考 RDD API 文檔(Scala,Java鹰服,Python罢浇,R)和 pair RDD 函數(shù)文檔(Scala片橡,Java)搪缨。
Transformation(轉(zhuǎn)換) | Meaning(含義) |
---|---|
map(func) | 返回一個(gè)新的 distributed dataset(分布式數(shù)據(jù)集)逆趋,它由每個(gè) source(數(shù)據(jù)源)中的元素應(yīng)用一個(gè)函數(shù) func 來(lái)生成哮伟。 |
filter(func) | 返回一個(gè)新的 distributed dataset(分布式數(shù)據(jù)集)枉昏,它由每個(gè) source(數(shù)據(jù)源)中應(yīng)用一個(gè)函數(shù) func 且返回值為 true 的元素來(lái)生成陈肛。 |
flatMap(func) | 與 map 類似,但是每一個(gè)輸入的 item 可以被映射成 0 個(gè)或多個(gè)輸出的 items(所以 func 應(yīng)該返回一個(gè) Seq 而不是一個(gè)單獨(dú)的 item)兄裂。 |
mapPartitions(func) | 與 map 類似句旱,但是單獨(dú)的運(yùn)行在在每個(gè) RDD 的 partition(分區(qū),block)上晰奖,所以在一個(gè)類型為 T 的 RDD 上運(yùn)行時(shí) func 必須是 Iterator<T> => Iterator<U> 類型谈撒。 |
mapPartitionsWithIndex(func) | 與 mapPartitions 類似,但是也需要提供一個(gè)代表 partition 的 index(索引)的 interger value(整型值)作為參數(shù)的 func匾南,所以在一個(gè)類型為 T 的 RDD 上運(yùn)行時(shí) func 必須是 (Int, Iterator<T>) => Iterator<U> 類型啃匿。 |
sample(withReplacement, fraction, seed) | 樣本數(shù)據(jù),設(shè)置是否放回(withReplacement)蛆楞,采樣的百分比(fraction)溯乒、使用指定的隨機(jī)數(shù)生成器的種子(seed)。 |
union(otherDataset) | 反回一個(gè)新的 dataset豹爹,它包含了 source dataset(源數(shù)據(jù)集)和 otherDataset(其它數(shù)據(jù)集)的并集裆悄。 |
intersection(otherDataset) | 返回一個(gè)新的 RDD,它包含了 source dataset(源數(shù)據(jù)集)和 otherDataset(其它數(shù)據(jù)集)的交集帅戒。 |
distinct([numTasks])) | 返回一個(gè)新的 dataset灯帮,它包含了 source dataset(源數(shù)據(jù)集)中去重的元素。 |
groupByKey([numTasks]) | 在一個(gè) (K, V) pair 的 dataset 上調(diào)用時(shí)逻住,返回一個(gè) (K, Iterable<V>) . |
Note: 如果分組是為了在每一個(gè) key 上執(zhí)行聚合操作(例如钟哥,sum 或 average),此時(shí)使用 reduceByKey 或 aggregateByKey 來(lái)計(jì)算性能會(huì)更好. |
|
Note: 默認(rèn)情況下瞎访,并行度取決于父 RDD 的分區(qū)數(shù)腻贰。可以傳遞一個(gè)可選的 numTasks 參數(shù)來(lái)設(shè)置不同的任務(wù)數(shù)扒秸。 |
|
reduceByKey(func, [numTasks]) | 在 (K, V) pairs 的 dataset 上調(diào)用時(shí)播演,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是針對(duì)每個(gè) key 使用給定的函數(shù) func來(lái)進(jìn)行聚合的伴奥,它必須是 type (V,V) => V 的類型写烤。像 groupByKey 一樣,reduce tasks 的數(shù)量是可以通過(guò)第二個(gè)可選的參數(shù)來(lái)配置的拾徙。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 在 (K, V) pairs 的 dataset 上調(diào)用時(shí)洲炊,返回 (K, U) pairs 的 dataset,其中的 values 是針對(duì)每個(gè) key 使用給定的 combine 函數(shù)以及一個(gè) neutral "0" 值來(lái)進(jìn)行聚合的。允許聚合值的類型與輸入值的類型不一樣暂衡,同時(shí)避免不必要的配置询微。像 groupByKey 一樣,reduce tasks 的數(shù)量是可以通過(guò)第二個(gè)可選的參數(shù)來(lái)配置的狂巢。 |
sortByKey([ascending], [numTasks]) | 在一個(gè) (K, V) pair 的 dataset 上調(diào)用時(shí)撑毛,其中的 K 實(shí)現(xiàn)了 Ordered,返回一個(gè)按 keys 升序或降序的 (K, V) pairs 的 dataset唧领,由 boolean 類型的 ascending 參數(shù)來(lái)指定藻雌。 |
join(otherDataset, [numTasks]) | 在一個(gè) (K, V) 和 (K, W) 類型的 dataset 上調(diào)用時(shí),返回一個(gè) (K, (V, W)) pairs 的 dataset疹吃,它擁有每個(gè) key 中所有的元素對(duì)蹦疑。Outer joins 可以通過(guò) leftOuterJoin , rightOuterJoin 和 fullOuterJoin 來(lái)實(shí)現(xiàn)。 |
cogroup(otherDataset, [numTasks]) | 在一個(gè) (K, V) 和的 dataset 上調(diào)用時(shí)萨驶,返回一個(gè) (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset歉摧。這個(gè)操作也調(diào)用了 groupWith 。 |
cartesian(otherDataset) | 在一個(gè) T 和 U 類型的 dataset 上調(diào)用時(shí)腔呜,返回一個(gè) (T, U) pairs 類型的 dataset(所有元素的 pairs叁温,即笛卡爾積)。 |
pipe(command, [envVars]) | 通過(guò)使用 shell 命令來(lái)將每個(gè) RDD 的分區(qū)給 Pipe核畴。例如膝但,一個(gè) Perl 或 bash 腳本。RDD 的元素會(huì)被寫(xiě)入進(jìn)程的標(biāo)準(zhǔn)輸入(stdin)谤草,并且 lines(行)輸出到它的標(biāo)準(zhǔn)輸出(stdout)被作為一個(gè)字符串型 RDD 的 string 返回跟束。 |
coalesce(numPartitions) | Decrease(降低)RDD 中 partitions(分區(qū))的數(shù)量為 numPartitions。對(duì)于執(zhí)行過(guò)濾后一個(gè)大的 dataset 操作是更有效的丑孩。 |
repartition(numPartitions) | Reshuffle(重新洗牌)RDD 中的數(shù)據(jù)以創(chuàng)建或者更多的 partitions(分區(qū))并將每個(gè)分區(qū)中的數(shù)據(jù)盡量保持均勻冀宴。該操作總是通過(guò)網(wǎng)絡(luò)來(lái) shuffles 所有的數(shù)據(jù)。 |
repartitionAndSortWithinPartitions(partitioner) | 根據(jù)給定的 partitioner(分區(qū)器)對(duì) RDD 進(jìn)行重新分區(qū)温学,并在每個(gè)結(jié)果分區(qū)中略贮,按照 key 值對(duì)記錄排序。這比每一個(gè)分區(qū)中先調(diào)用 repartition 然后再 sorting(排序)效率更高仗岖,因?yàn)樗梢詫⑴判蜻^(guò)程推送到 shuffle 操作的機(jī)器上進(jìn)行逃延。 |
Actions(動(dòng)作)
下表列出了一些 Spark 常用的 actions 操作。詳細(xì)請(qǐng)參考 RDD API 文檔(Scala轧拄,Java揽祥,Python,R)
和 pair RDD 函數(shù)文檔(Scala檩电,Java)拄丰。
Action(動(dòng)作) | Meaning(含義) |
---|---|
reduce(func) | 使用函數(shù) func 聚合 dataset 中的元素桅打,這個(gè)函數(shù) func 輸入為兩個(gè)元素,返回為一個(gè)元素愈案。這個(gè)函數(shù)應(yīng)該是可交換(commutative)和關(guān)聯(lián)(associative)的,這樣才能保證它可以被并行地正確計(jì)算鹅搪。 |
collect() | 在 driver 程序中站绪,以一個(gè) array 數(shù)組的形式返回 dataset 的所有元素。這在過(guò)濾器(filter)或其他操作(other operation)之后返回足夠欣鍪痢(sufficiently small)的數(shù)據(jù)子集通常是有用的恢准。 |
count() | 返回 dataset 中元素的個(gè)數(shù)。 |
first() | 返回 dataset 中的第一個(gè)元素(類似于 take(1)甫题。 |
take(n) | 將數(shù)據(jù)集中的前 n 個(gè)元素作為一個(gè) array 數(shù)組返回馁筐。 |
takeSample(withReplacement, num, [seed]) | 對(duì)一個(gè) dataset 進(jìn)行隨機(jī)抽樣,返回一個(gè)包含 num 個(gè)隨機(jī)抽樣(random sample)元素的數(shù)組坠非,參數(shù) withReplacement 指定是否有放回抽樣敏沉,參數(shù) seed 指定生成隨機(jī)數(shù)的種子。 |
takeOrdered(n, [ordering]) | 返回 RDD 按自然順序(natural order)或自定義比較器(custom comparator)排序后的前 n 個(gè)元素炎码。 |
saveAsTextFile(path) | 將 dataset 中的元素以文本文件(或文本文件集合)的形式寫(xiě)入本地文件系統(tǒng)盟迟、HDFS 或其它 Hadoop 支持的文件系統(tǒng)中的給定目錄中。Spark 將對(duì)每個(gè)元素調(diào)用 toString 方法潦闲,將數(shù)據(jù)元素轉(zhuǎn)換為文本文件中的一行記錄攒菠。 |
saveAsSequenceFile(path) | |
(Java and Scala) | 將 dataset 中的元素以 Hadoop SequenceFile 的形式寫(xiě)入到本地文件系統(tǒng)、HDFS 或其它 Hadoop 支持的文件系統(tǒng)指定的路徑中歉闰。該操作可以在實(shí)現(xiàn)了 Hadoop 的 Writable 接口的鍵值對(duì)(key-value pairs)的 RDD 上使用辖众。在 Scala 中,它還可以隱式轉(zhuǎn)換為 Writable 的類型(Spark 包括了基本類型的轉(zhuǎn)換和敬,例如 Int凹炸,Double,String 等等)概龄。 |
saveAsObjectFile(path) | |
(Java and Scala) | 使用 Java 序列化(serialization)以簡(jiǎn)單的格式(simple format)編寫(xiě)數(shù)據(jù)集的元素还惠,然后使用 SparkContext.objectFile() 進(jìn)行加載。 |
countByKey() | 僅適用于(K,V)類型的 RDD私杜。返回具有每個(gè) key 的計(jì)數(shù)的(K , Int)pairs 的 hashmap蚕键。 |
foreach(func) | 對(duì) dataset 中每個(gè)元素運(yùn)行函數(shù) func。這通常用于副作用(side effects)衰粹,例如更新一個(gè) Accumulator(累加器)或與外部存儲(chǔ)系統(tǒng)(external storage systems)進(jìn)行交互锣光。Note:修改除 foreach() 之外的累加器以外的變量(variables)可能會(huì)導(dǎo)致未定義的行為(undefined behavior)。詳細(xì)介紹請(qǐng)閱讀 Understanding closures(理解閉包) 部分铝耻。 |
該 Spark RDD API 還暴露了一些 actions(操作)的異步版本誊爹,例如針對(duì) foreach
的 foreachAsync
蹬刷,它們會(huì)立即返回一個(gè)FutureAction
到調(diào)用者,而不是在完成 action 時(shí)阻塞频丘。這可以用于管理或等待 action 的異步執(zhí)行办成。.