RDD可以通過兩種方式創(chuàng)建:
1:讀取一個外部數(shù)據(jù)接戳吝,比如從本地文件加載數(shù)據(jù)集片橡,或者從HDFS文件系統(tǒng)合武,HBase等外部數(shù)據(jù)系統(tǒng)加載數(shù)據(jù)前硫。Spark也可以支持文本文件胞得,SequenceFile文件和其他符合Hadoop InputFormat 格式的文件
2:調(diào)用SparkContext 的parallelize方法,在Driver中一個已經(jīng)存在的集合(數(shù)組)上創(chuàng)建
從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD
Spark 采用textFile() 方法來從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD屹电,該方法把文件的URL作為參數(shù)阶剑,這個URL可以本地文件系統(tǒng)的地址跃巡,或者是分布式文件系統(tǒng)HDFS的地址
???? val =sc.textFile("file:///home/data/word.txt")
在使用Spark讀取文件時,需要說明以下幾點:
(1)如果使用了本地文件系統(tǒng)的路徑牧愁,那么素邪,必須要保證在所有的worker節(jié)點上,也都能夠采用相同的路徑訪問到該文件猪半,比如兔朦,可以把該文件拷貝到每個worker節(jié)點上,或者也可以使用網(wǎng)絡掛載共享文件系統(tǒng)办龄。
(2)textFile()方法的輸入?yún)?shù)烘绽,可以是文件名,也可以是目錄俐填,也可以是壓縮文件等安接。比如,textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
(3)textFile()方法也可以接受第2個輸入?yún)?shù)(可選)英融,用來指定分區(qū)的數(shù)目盏檐。默認情況下,Spark會為HDFS的每個block創(chuàng)建一個分區(qū)(HDFS中每個block默認是128MB)驶悟。你也可以提供一個比block數(shù)量更大的值作為分區(qū)數(shù)目胡野,但是,你不能提供一個小于block數(shù)量的值作為分區(qū)數(shù)目痕鳍。
通過并行集合(數(shù)組)創(chuàng)建RDD
調(diào)用 SparkContext的parallelize方法硫豆,在Driver中一個已經(jīng)存在的集合【數(shù)組】上創(chuàng)建
代碼:val array=Array(1,2,3,4,5)
????????? val rdd =sc.parallenlize(array)
RDD操作
RDD 被創(chuàng)建好之后,在后續(xù)使用過程中一般會發(fā)生兩種操作:
?????????? 1....... 轉(zhuǎn)換【Transformation】:基于現(xiàn)在的數(shù)據(jù)集創(chuàng)建一個新的數(shù)據(jù)集
?????????? 2 .......行動【Action】: 在數(shù)據(jù)集上運行計算笼呆,返回計算值
轉(zhuǎn)換操作
對于RDD而言熊响,每一次轉(zhuǎn)換操作都會產(chǎn)生不同的RDD,供給下一個“轉(zhuǎn)換”使用诗赌。轉(zhuǎn)換得到的RDD是惰性求值的汗茄,也就是說,整個轉(zhuǎn)換過程只是記錄了轉(zhuǎn)換的軌跡铭若,并不會發(fā)生真正的計算洪碳,只有遇到行動操作時,才會發(fā)生真正的計算叼屠,開始從血緣關系源頭開始瞳腌,進行物理的轉(zhuǎn)換操作。
下面列出一些常見的轉(zhuǎn)換操作(Transformation API):
* filter(func):篩選出滿足函數(shù)func的元素镜雨,并返回一個新的數(shù)據(jù)集
* map(func):將每個元素傳遞到函數(shù)func中纯趋,并將結(jié)果返回為一個新的數(shù)據(jù)集
* flatMap(func):與map()相似,但每個輸入元素都可以映射到0或多個輸出結(jié)果
* groupByKey():應用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K, Iterable)形式的數(shù)據(jù)集
* reduceByKey(func):應用于(K,V)鍵值對的數(shù)據(jù)集時吵冒,返回一個新的(K, V)形式的數(shù)據(jù)集,其中的每個值是將每個key傳遞到函數(shù)func中進行聚合
行動操作
行動操作是真正觸發(fā)計算的地方西剥。Spark程序執(zhí)行到行動操作時痹栖,才會執(zhí)行真正的計算,從文件中加載數(shù)據(jù)瞭空,完成一次又一次轉(zhuǎn)換操作,最終,完成行動操作得到結(jié)果扼雏。
下面列出一些常見的行動操作(Action API):
* count()? 返回數(shù)據(jù)集中的元素個數(shù)
* collect() 以數(shù)組的形式返回數(shù)據(jù)集中的所有元素
* first()? 返回數(shù)據(jù)集中的第一個元素
* take(n)? 以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素
* reduce(func)? 通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素
* foreach(func) 將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運行*
持久化
前面我們已經(jīng)說過攒砖,在Spark中,RDD采用惰性求值的機制旧找,每次遇到行動操作溺健,都會從頭開始執(zhí)行計算。如果整個Spark程序中只有一次行動操作钮蛛,這當然不會有什么問題鞭缭。但是,在一些情形下魏颓,我們需要多次調(diào)用不同的行動操作岭辣,這就意味著,每次調(diào)用行動操作甸饱,都會觸發(fā)一次從頭開始的計算沦童。這對于迭代計算而言,代價是很大的叹话,迭代計算經(jīng)常需要多次重復使用同一組數(shù)據(jù)偷遗。
下面就是多次計算同一個DD的例子
實際上,可以通過持久化(緩存)機制避免這種重復計算的開銷渣刷○兄祝可以使用persist()方法對一個RDD標記為持久化,之所以說“標記為持久化”辅柴,是因為出現(xiàn)persist()語句的地方箩溃,并不會馬上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發(fā)真正計算以后碌嘀,才會把計算結(jié)果進行持久化涣旨,持久化后的RDD將會被保留在計算節(jié)點的內(nèi)存中被后面的行動操作重復使用。
persist()的圓括號中包含的是持久化級別參數(shù)股冗,比如霹陡,persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容烹棉。persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中攒霹,如果內(nèi)存不足,超出的分區(qū)將會被存放在硬盤上浆洗。一般而言催束,使用cache()方法時,會調(diào)用persist(MEMORY_ONLY)伏社。
分區(qū)
RDD是彈性分布式數(shù)據(jù)集抠刺,通常RDD很大,會被分成很多個分區(qū)摘昌,分別保存在不同的節(jié)點上速妖。RDD分區(qū)的一個分區(qū)原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目。
對于不同的Spark部署模式而言(本地模式聪黎、Standalone模式罕容、YARN模式、Mesos模式)挺举,都可以通過設置spark.default.parallelism這個參數(shù)的值杀赢,來配置默認的分區(qū)數(shù)目,一般而言:
*本地模式:默認為本地機器的CPU數(shù)目湘纵,若設置了local[N],則默認為N脂崔;
*Apache Mesos:默認的分區(qū)數(shù)為8;
*Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認值梧喷;
因此砌左,對于parallelize而言,如果沒有在方法中指定分區(qū)數(shù)铺敌,則默認為spark.default.parallelism汇歹,比如:
scala>val array=Array(1,2,3,4,5)
array:Array[Int]=Array(1,2,3,4,5)
scala>val rdd=sc.parallelize(array,2)#設置兩個分區(qū)
rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]at parallelize at:29
對于textFile而言,如果沒有在方法中指定分區(qū)數(shù)偿凭,則默認為min(defaultParallelism,2)产弹,其中,defaultParallelism對應的就是spark.default.parallelism弯囊。
如果是從HDFS中讀取文件痰哨,則分區(qū)數(shù)為文件分片數(shù)(比如,128MB/片)匾嘱。
打印元素
在實際編程中斤斧,我們經(jīng)常需要把RDD中的元素打印輸出到屏幕上(標準輸出stdout),一般會采用語句rdd.foreach(println)或者rdd.map(println)霎烙。當采用本地模式(local)在單機上執(zhí)行時撬讽,這些語句會打印出一個RDD中的所有元素蕊连。但是,當采用集群模式執(zhí)行時游昼,在worker節(jié)點上執(zhí)行打印語句是輸出到worker節(jié)點的stdout中甘苍,而不是輸出到任務控制節(jié)點Driver
Program中,因此烘豌,任務控制節(jié)點Driver
Program中的stdout是不會顯示打印語句的這些輸出內(nèi)容的羊赵。為了能夠把所有worker節(jié)點上的打印輸出信息也顯示到Driver
Program中,可以使用collect()方法扇谣,比如,rdd.collect().foreach(println)闲昭,但是罐寨,由于collect()方法會把各個worker節(jié)點上的所有RDD元素都抓取到Driver
Program中,因此序矩,這可能會導致內(nèi)存溢出鸯绿。因此,當你只需要打印RDD的部分元素時簸淀,可以采用語句rdd.take(100).foreach(println)瓶蝴。