RDD操作—— 行動(Action)操作

行動操作是真正觸發(fā)計算的地方揩尸。Spark程序執(zhí)行到行動操作時所禀,才會執(zhí)行真正的計算,從文件中加載數(shù)據(jù)放钦,完成一次又一次轉(zhuǎn)換操作色徘,最終,完成行動操作得到結(jié)果操禀。

操作 說明
count() 返回數(shù)據(jù)集中的元素個數(shù)
collect() 以數(shù)組的形式返回數(shù)據(jù)集中的所有元素
first() 返回數(shù)據(jù)集中的第一個元素
take(n) 以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素
reduce(func) 通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素
foreach(func) 將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運行

惰性機制

在當前的spark目錄下面創(chuàng)建input目錄

cd $SPARK_HOME
mkdir input
vim word.txt
hello world
hello spark
hello hadoop
hello scala

由于textFile()方法只是一個轉(zhuǎn)換操作褂策,因此,這行代碼執(zhí)行后颓屑,不會立即把data.txt文件加載到內(nèi)存中斤寂,這時的lines只是一個指向這個文件的指針。

scala> val lines = sc.textFile("word.txt")
lines: org.apache.spark.rdd.RDD[String] = word.txt MapPartitionsRDD[13] at textFile at <console>:24

下面代碼用來計算每行的長度(即每行包含多少個單詞)揪惦,同樣遍搞,由于map()方法只是一個轉(zhuǎn)換操作,這行代碼執(zhí)行后器腋,不會立即計算每行的長度溪猿。

scala> val lineLengths = lines.map(s=>s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:25

reduce()方法是一個“動作”類型的操作,這時纫塌,就會觸發(fā)真正的計算诊县。這時,Spark會把計算分解成多個任務在不同的機器上執(zhí)行措左,每臺機器運行位于屬于它自己的map和reduce依痊,最后把結(jié)果返回給Driver Program。

scala> val totalLength = lineLengths.reduce((a,b)=> a+b)
totalLength: Int = 45

count

lines就是一個RDD怎披。lines.filter()會遍歷lines中的每行文本胸嘁,并對每行文本執(zhí)行括號中的匿名函數(shù),也就是執(zhí)行Lamda表達式:line => line.contains(“spark”)钳枕,在執(zhí)行Lamda表達式時,會把當前遍歷到的這行文本內(nèi)容賦值給參數(shù)line赏壹,然后鱼炒,執(zhí)行處理邏輯line.contains(“spark”),也就是只有當改行文本包含“spark”才滿足條件蝌借,才會被放入到結(jié)果集中昔瞧。最后,等到lines集合遍歷結(jié)束后菩佑,就會得到一個結(jié)果集自晰,這個結(jié)果集中包含了所有包含“Spark”的行。最后稍坯,對這個結(jié)果集調(diào)用count()酬荞,這是一個行動操作搓劫,會計算出結(jié)果集中的元素個數(shù)。

scala> val lines = sc.textFile("file:///root/app/spark/input/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///root/app/spark/input/word.txt MapPartitionsRDD[8] at textFile at <console>:24

scala> lines.filter(_.contains("spark")).count
res3: Long = 1

scala> lines.filter(_.contains("hello")).count
res4: Long = 4

持久化

在Spark中混巧,RDD采用惰性求值的機制枪向,每次遇到行動操作,都會從頭開始執(zhí)行計算咧党。如果整個Spark程序中只有一次行動操作秘蛔,這當然不會有什么問題。但是傍衡,在一些情形下深员,我們需要多次調(diào)用不同的行動操作,這就意味著蛙埂,每次調(diào)用行動操作倦畅,都會觸發(fā)一次從頭開始的計算。這對于迭代計算而言箱残,代價是很大的滔迈,迭代計算經(jīng)常需要多次重復使用同一組數(shù)據(jù)。

scala> val list  = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:26

scala> rdd.count()
res5: Long = 3

scala> rdd.collect().mkString(",")
res6: String = hadoop,spark,hive

前后共觸發(fā)了兩次從頭到尾的計算被辑。
實際上燎悍,可以通過持久化(緩存)機制避免這種重復計算的開銷∨卫恚可以使用persist()方法對一個RDD標記為持久化谈山,之所以說“標記為持久化”,是因為出現(xiàn)persist()語句的地方宏怔,并不會馬上計算生成RDD并把它持久化奏路,而是要等到遇到第一個行動操作觸發(fā)真正計算以后,才會把計算結(jié)果進行持久化臊诊,持久化后的RDD將會被保留在計算節(jié)點的內(nèi)存中被后面的行動操作重復使用鸽粉。
persist()的圓括號中包含的是持久化級別參數(shù),
persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲于JVM中抓艳,如果內(nèi)存不足触机,就要按照LRU原則替換緩存中的內(nèi)容。
persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中玷或,如果內(nèi)存不足儡首,超出的分區(qū)將會被存放在硬盤上。
一般而言偏友,使用cache()方法時蔬胯,會調(diào)用persist(MEMORY_ONLY)。

scala> val list  = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:26

scala> rdd.cache
res7: rdd.type = ParallelCollectionRDD[12] at parallelize at <console>:26
 //會調(diào)用persist(MEMORY_ONLY)位他,但是氛濒,語句執(zhí)行到這里产场,并不會緩存rdd,這是rdd還沒有被計算生成
scala> rdd.count //第一次行動操作泼橘,觸發(fā)一次真正從頭到尾的計算涝动,這時才會執(zhí)行上面的rdd.cache(),把這個rdd放到緩存中
3
scala> rdd.collect.mkString(",") //第二次行動操作炬灭,不需要觸發(fā)從頭到尾的計算醋粟,只需要重復使用上面緩存中的rdd
res9: String = hadoop,spark,hive

可以使用unpersist()方法手動地把持久化的RDD從緩存中移除。

分區(qū)

RDD是彈性分布式數(shù)據(jù)集重归,通常RDD很大米愿,會被分成很多個分區(qū),分別保存在不同的節(jié)點上鼻吮。RDD分區(qū)的一個分區(qū)原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目育苟。
對于不同的Spark部署模式而言(本地模式、Standalone模式椎木、YARN模式违柏、Mesos模式),都可以通過設置spark.default.parallelism這個參數(shù)的值香椎,來配置默認的分區(qū)數(shù)目漱竖,一般而言:
*本地模式:默認為本地機器的CPU數(shù)目,若設置了local[N],則默認為N畜伐;
*Apache Mesos:默認的分區(qū)數(shù)為8馍惹;
*Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認值;
因此玛界,對于parallelize而言万矾,如果沒有在方法中指定分區(qū)數(shù),則默認為spark.default.parallelism慎框,比如:

scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala>val rdd = sc.parallelize(array,2) #設置兩個分區(qū)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29

對于textFile而言良狈,如果沒有在方法中指定分區(qū)數(shù),則默認為min(defaultParallelism,2)笨枯,其中薪丁,defaultParallelism對應的就是spark.default.parallelism。
如果是從HDFS中讀取文件猎醇,則分區(qū)數(shù)為文件分片數(shù)(比如窥突,128MB/片)努溃。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末硫嘶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子梧税,更是在濱河造成了極大的恐慌沦疾,老刑警劉巖称近,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異哮塞,居然都是意外死亡刨秆,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門忆畅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來衡未,“玉大人,你說我怎么就攤上這事家凯』捍祝” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵绊诲,是天一觀的道長送粱。 經(jīng)常有香客問我,道長掂之,這世上最難降的妖魔是什么抗俄? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮世舰,結(jié)果婚禮上动雹,老公的妹妹穿的比我還像新娘。我一直安慰自己冯乘,他們只是感情好洽胶,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著裆馒,像睡著了一般姊氓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上喷好,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天翔横,我揣著相機與錄音,去河邊找鬼梗搅。 笑死禾唁,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的无切。 我是一名探鬼主播荡短,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼哆键!你這毒婦竟也來了掘托?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤籍嘹,失蹤者是張志新(化名)和其女友劉穎闪盔,沒想到半個月后弯院,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡泪掀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年听绳,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片异赫。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡椅挣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出塔拳,到底是詐尸還是另有隱情贴妻,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布蝙斜,位于F島的核電站名惩,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏孕荠。R本人自食惡果不足惜娩鹉,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望稚伍。 院中可真熱鬧弯予,春花似錦、人聲如沸个曙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽垦搬。三九已至呼寸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猴贰,已是汗流浹背对雪。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留米绕,地道東北人瑟捣。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像栅干,于是被迫代替她去往敵國和親迈套。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359