Spark之RDD算子-創(chuàng)建算子

RDD算子是Spark計(jì)算框架中定義的對RDD進(jìn)行操作的各種函數(shù),從RDD算子的功能可將RDD算子分為四類镜会,創(chuàng)建算子虑稼、轉(zhuǎn)換算子琳钉、緩存算子和行動算子。

RDD算子

創(chuàng)建算子

創(chuàng)建RDD有兩種方式:一種是將基于Scala的集合類型數(shù)據(jù)(如List或Set類型)分布到集群中生成RDD蛛倦,另一種則是加載外部數(shù)據(jù)源(如本地文本文件或HDFS文件)生成RDD歌懒。上面所提到的兩種方式都是通過SparkContext的接口函數(shù)提供的,前者有兩種方法:makeRDDparallelize溯壶,后者則因?yàn)槠渲С植煌问胶筒煌袷降奈募霸恚休^多的函數(shù)。

基于集合類型數(shù)據(jù)創(chuàng)建RDD

SparkContext.makeRDD:創(chuàng)建RDD

# 輸入?yún)?shù)seq為一個(gè)集合數(shù)據(jù)集且改,參數(shù)String序列指定了希望將該數(shù)據(jù)集產(chǎn)生的RDD分區(qū)希望放置的節(jié)點(diǎn)验烧,
# 可以用Spark節(jié)點(diǎn)的主機(jī)名(hostname)描述
def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): [RDD]

# 輸入?yún)?shù)seq為一個(gè)數(shù)據(jù)集,numSlices是分區(qū)數(shù)量又跛,若不指定數(shù)量碍拆,
# 將使用Spark配置中的spark.default.parallelism參數(shù)所生成的defaultParallelism數(shù)值,為默認(rèn)的分區(qū)數(shù)量慨蓝。
def makeRDD[T](seq: Seq[T], numSlices: Int = [defaultParallelism])(implicit arg0: ClassTag[T]): [RDD]
makeRDD

示例代碼:

your-spark-path/bin# ./spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/01/23 09:20:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/23 09:20:58 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.1.150:4040
Spark context available as 'sc' (master = local[*], app id = local-1485134457902).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val rdd = sc.makeRDD(1 to 6, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)                                      

scala> rdd.partitions
res1: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@691, org.apache.spark.rdd.ParallelCollectionPartition@692)

scala> val data = Seq((1 to 6, Seq("spark-master", "hadoop-node1")), (7 to 10, Seq("hadoop-node2")))
data: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6),List(spark-master, hadoop-node1)), (Range(7, 8, 9, 10),List(hadoop-node2)))

scala> val rdd1 = sc.makeRDD(data)
rdd1: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[1] at makeRDD at <console>:26

scala> rdd1.collect
res2: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5, 6), Range(7, 8, 9, 10))

scala> rdd1.preferredLocations(rdd1.partitions(1))
res6: Seq[String] = List(hadoop-node2)

scala> rdd1.preferredLocations(rdd1.partitions(0))
res7: Seq[String] = List(spark-master, hadoop-node1)

SparkContext.parallelize:數(shù)據(jù)并行化生成RDD

# 將集合數(shù)據(jù)seq分布到節(jié)點(diǎn)上形成RDD感混,并返回生成的RDD。numSlices是分區(qū)數(shù)量礼烈,
# 若不指定數(shù)量弧满,將使用Spark配置中的spark.default.parallelism參數(shù)所生成的defaultParallelism數(shù)值,為默認(rèn)的分區(qū)數(shù)量此熬。
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
parallelize
scala> val rdd = sc.parallelize(1 to 6, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd.collect
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd.partitions
res9: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6e3, org.apache.spark.rdd.ParallelCollectionPartition@6e4)

基于外部數(shù)據(jù)創(chuàng)建RDD

SparkContext.textFile——基于文本文件創(chuàng)建RDD

# 從HDFS庭呜、本地文件系統(tǒng)或者其他Hadoop支持的文件系統(tǒng),按行讀入指定路徑下的文本文件摹迷,并返回生成的RDD疟赊。
# path是待讀入的文本文件的路徑,minPartitions是分區(qū)數(shù)量峡碉,不給定由spark配置中參數(shù)生成默認(rèn)值
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
textFile
# 若讀取的數(shù)據(jù)是來自HDFS時(shí),路徑地址:"hdfs://...."
scala> val textFile = sc.textFile("file:/usr/local/spark/spark202/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:/usr/local/spark/spark202/README.md MapPartitionsRDD[6] at textFile at <console>:24

scala> textFile.count()
res11: Long = 99

scala> textFile.first()
res12: String = # Apache Spark

SparkContext.wholeTextFiles——基于一個(gè)目錄下的全部文本文件創(chuàng)建RDD

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

基于Hadoop API從Hadoop文件數(shù)據(jù)創(chuàng)建RDD

詳見spark官方文檔

Hadoop-RDD

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末驮审,一起剝皮案震驚了整個(gè)濱河市鲫寄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌疯淫,老刑警劉巖地来,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異熙掺,居然都是意外死亡未斑,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門币绩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜡秽,“玉大人府阀,你說我怎么就攤上這事⊙客唬” “怎么了试浙?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長寞蚌。 經(jīng)常有香客問我田巴,道長,這世上最難降的妖魔是什么挟秤? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任壹哺,我火速辦了婚禮,結(jié)果婚禮上艘刚,老公的妹妹穿的比我還像新娘管宵。我一直安慰自己,他們只是感情好昔脯,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布啄糙。 她就那樣靜靜地躺著,像睡著了一般云稚。 火紅的嫁衣襯著肌膚如雪隧饼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天静陈,我揣著相機(jī)與錄音燕雁,去河邊找鬼。 笑死鲸拥,一個(gè)胖子當(dāng)著我的面吹牛拐格,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播刑赶,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼捏浊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了撞叨?” 一聲冷哼從身側(cè)響起金踪,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎牵敷,沒想到半個(gè)月后胡岔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡枷餐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年靶瘸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡怨咪,死狀恐怖屋剑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情惊暴,我是刑警寧澤饼丘,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站辽话,受9級特大地震影響肄鸽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜油啤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一典徘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧益咬,春花似錦逮诲、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冗锁,卻和暖如春齐唆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背冻河。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工箍邮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人叨叙。 一個(gè)月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓锭弊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親擂错。 傳聞我的和親對象是個(gè)殘疾皇子味滞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容