2-基于RDD的開發(fā)

RDD(resilient distributed dataset, 彈性分布式數(shù)據(jù)集)

RDD的特點(diǎn)

1、分布式
2福荸、只讀
3蕴坪、血緣關(guān)系
4、緩存(默認(rèn)緩存到內(nèi)存)
5、從外部文件創(chuàng)建
6背传、從父RDD生成子RDD
7呆瞻、支持本地磁盤文件
8、支持整個(gè)目錄径玖、多文件痴脾、通配符
9、支持壓縮文件
10挺狰、支持HDFS
11明郭、容錯(cuò)

RDD核心屬性

調(diào)度和計(jì)算都依賴于這五個(gè)屬性

  1. 分區(qū)列表
    分布在哪臺(tái)機(jī)器上
  2. 依賴列表
    即血緣關(guān)系,需要記錄下來丰泊,因?yàn)樯婕暗饺蒎e(cuò)
  • 寬窄依賴

  • 窄依賴:一(父)對(duì)一(子)映射


    image.png
  • 寬依賴:多(父)對(duì)一(子)映射(會(huì)影響到后面的執(zhí)行計(jì)劃)
    會(huì)發(fā)生一種Shuffle操作----一種數(shù)據(jù)的傳遞(由于下游的RDD需要把上游的好幾個(gè)task的結(jié)果拿過來會(huì)涉及到數(shù)據(jù)的傳輸)


    image.png
  • Shuffle


    image.png
  1. Compute函數(shù)薯定,用于計(jì)算RDD各分區(qū)的值
  2. 分區(qū)策略(可選)
    數(shù)據(jù)是如何分布到各個(gè)節(jié)點(diǎn)上的
  3. 優(yōu)先位置列表(可選,HDFS實(shí)現(xiàn)數(shù)據(jù)本地化瞳购,避免數(shù)據(jù)移動(dòng))

sc = spark.sparkContext
rdd = sc.textFile("d:/app.log")   #新建rdd
rdd.collect()  #查看rdd內(nèi)容

rdd = sc.textFile("d:/app.log,d:/Sid_SAS_20180430.txt")  #讀取多個(gè)文件的話话侄,用逗號(hào)隔開
rdd.collect()

還可以讀取壓縮文件

rdd = sc.textFile("d:/1-1G113144611.zip")
rdd.count() #查看有多少行

集合并行化

x = [1,2,3,4,5]
type(x)
rdd2 = sc.parallelize(x)  #將數(shù)據(jù)轉(zhuǎn)化為RDD
rdd2.collect()
type(rdd2)   #查看是否轉(zhuǎn)化為RDD

RDD支持哪些運(yùn)算

1 Transformations

  • 輸入RDD,輸出RDD
  • 延遲執(zhí)行
  • 常用的Transformation
Transformation 含義
map(func) 返回一個(gè)新的RDD学赛,該RDD由每個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個(gè)新的RDD年堆,該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成
flatMap(func) 類似于map,但每個(gè)輸入元素可被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列盏浇,而不是單一元素)
mapPartitions(func) 類似于map,但獨(dú)立地在RDD的每個(gè)分片上運(yùn)行变丧,因此在類型為T的RDD上運(yùn)行時(shí),func的函數(shù)類型必須是Iterator[T]=>Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions, 但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值绢掰,因此在類型為T的RDD上運(yùn)行時(shí)痒蓬,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換滴劲,seed用于指定隨機(jī)數(shù)生成器種子
union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD
groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用攻晒,返回一個(gè)(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD班挖,使用指定的reduce函數(shù)鲁捏,將相同key的值聚合到一起,與groupByKey類似萧芙,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對(duì)k/y的RDD進(jìn)行操作
sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用给梅,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似双揪,但是更靈活 第一個(gè)參數(shù)是根據(jù)什么排序 第二個(gè)是怎么排序 false倒序 第三個(gè)排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用破喻,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集)
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 兩個(gè)RDD的笛卡爾積 的成很多個(gè)K/V
pipe(command, [envVars]) 調(diào)用外部程序
coalesce(numPartitions) 重新分區(qū) 第一個(gè)參數(shù)是要分多少區(qū)盟榴,第二個(gè)參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false
repartition(numPartitions) 重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多
repartitionAndSortWithinPartitions(partitioner) 重新分區(qū)+排序 比先分區(qū)再排序效率高 對(duì)K/V的RDD進(jìn)行操作
foldByKey(zeroValue)(seqOp) 該函數(shù)用于K/V做折疊,合并處理 婴噩,與aggregate類似 第一個(gè)括號(hào)的參數(shù)應(yīng)用于每個(gè)V值 第二括號(hào)函數(shù)是聚合例如:+
combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
partitionBy(partitioner) 對(duì)RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2)
cache擎场,persist RDD緩存羽德,可以避免重復(fù)計(jì)算從而減少時(shí)間,區(qū)別:cache內(nèi)部調(diào)用了persist算子迅办,cache默認(rèn)就一個(gè)緩存級(jí)別MEMORY-ONLY 宅静,而persist則可以選擇緩存級(jí)別
Subtract(rdd) 返回前rdd元素不在后rdd的rdd
leftOuterJoin leftOuterJoin類似于SQL中的左外關(guān)聯(lián)left outer join,返回結(jié)果以前面的RDD為主站欺,關(guān)聯(lián)不上的記錄為空姨夹。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián)矾策,多關(guān)聯(lián)幾次即可磷账。
rightOuterJoin rightOuterJoin類似于SQL中的有外關(guān)聯(lián)right outer join,返回結(jié)果以參數(shù)中的RDD為主贾虽,關(guān)聯(lián)不上的記錄為空逃糟。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián)蓬豁,多關(guān)聯(lián)幾次即可
subtractByKey substractByKey和基本轉(zhuǎn)換操作中的subtract類似只不過這里是針對(duì)K的绰咽,返回在主RDD中出現(xiàn),并且不在otherRDD中出現(xiàn)的元素

2 Actions

  • 輸入RDD地粪,輸出非RDD
  • 立即執(zhí)行
  • 常用的Action
Action 含義
reduce(func) 通過func函數(shù)聚集RDD中的所有元素取募,這個(gè)功能必須是課交換且可并聯(lián)的
collect() 在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count() 返回RDD的元素個(gè)數(shù)
first() 返回RDD的第一個(gè)元素(類似于take(1))
take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組蟆技,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成玩敏,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)付魔,對(duì)于每個(gè)元素聊品,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下几苍,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)翻屈。
countByKey() 針對(duì)(K,V)類型的RDD,返回一個(gè)(K,Int)的map妻坝,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)伸眶。
foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新刽宪。
aggregate 先對(duì)分區(qū)進(jìn)行操作厘贼,在總體操作
from pyspark.sql import SparkSession
master = 'local'
spark = SparkSession.builder. \
        appName('test'). \
        master(master). \
        getOrCreate()
sc = spark.sparkContext
filename = 'file:///tmp/README.md'
logData = sc.textFile(filename)
wordsRDD = logData. \
    flatMap(lambda x:x.split(" ")). \
    map(lambda x:(x,1)). \
    reduceByKey(lambda x,y:x+y)

out_filenmae = 'result'
wordsRDD.saveAsTextFile(out_filename)
words = wordsRDD.collect()
spark.stop()

作業(yè)

1.從身份證號(hào)中提取年齡

2.從身份證號(hào)中提取性別

3.從身份證號(hào)中提取星座

啟動(dòng)Pyspark

from pyspark.sql import SparkSession
form datetime import date
spark = SparkSession.builder.appName('CardAnalysis').getOrCreate()
sd = spark.sparkContext

###讀取文件
logData = sc.textFile('F:/pyspark/data/cardnum.txt')
###讀取身份證號(hào)
cardRDD = logData.map(lambda x:x.split(',')).map(lambda x:x[1])
###1 取身份證7-14位并計(jì)算年齡
birthdayRDD = cardRDD.map(lambda x:date(int(x[6:10]), int(x[10:12]), int(x[12:14])))
ageRDD = birthdayRDD.map(lambda x:int((date.today()-x).days/365))
age_list = ageRDD.collect()
print(age_list)

###2 取身份證第17位并計(jì)算性別
genderRDD = cardRDD.map(lambda x: 'Male' if int(x[17])%2 else 'Female')
gender_list = genderRDD.collect()
print(gender_list)

###3 取身份證11-14位并計(jì)算星座
定義計(jì)算星座函數(shù)
def getConstellation(month,day):
    n = (u'摩羯座', u'水瓶座', u'雙魚座', u'白羊座', u'金牛座', u'雙子座', u'巨蟹座', 
         u'獅子座', u'處女座', u'天秤座', u'天蝎座', u'射手座')
    d = ((1,20), (2,19), (3,21), (4,21), (5,21), (6,22), (7,23),(8,23), (9,23),(10,23),(11,23),(12,23))
    return n[len(list(filter(lambda y: y<=(month,day),d)))%12]

constellationRDD = cardRDD.map(lambda x:getConstellation(int(x[10:12]),int(x[12:14])))
constellation_list = constellationRDD.collect()
print(constellation_list)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市圣拄,隨后出現(xiàn)的幾起案子嘴秸,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件岳掐,死亡現(xiàn)場(chǎng)離奇詭異凭疮,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)串述,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門执解,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人纲酗,你說我怎么就攤上這事衰腌。” “怎么了觅赊?”我有些...
    開封第一講書人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵右蕊,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我茉兰,道長(zhǎng)尤泽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任规脸,我火速辦了婚禮坯约,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘莫鸭。我一直安慰自己闹丐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開白布被因。 她就那樣靜靜地躺著卿拴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪梨与。 梳的紋絲不亂的頭發(fā)上堕花,一...
    開封第一講書人閱讀 49,749評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音粥鞋,去河邊找鬼缘挽。 笑死,一個(gè)胖子當(dāng)著我的面吹牛呻粹,可吹牛的內(nèi)容都是我干的壕曼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼等浊,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼腮郊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起筹燕,我...
    開封第一講書人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤轧飞,失蹤者是張志新(化名)和其女友劉穎衅鹿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體过咬,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡塘安,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了援奢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡忍捡,死狀恐怖集漾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情砸脊,我是刑警寧澤具篇,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布,位于F島的核電站凌埂,受9級(jí)特大地震影響驱显,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜瞳抓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一埃疫、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧孩哑,春花似錦栓霜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至丛晌,卻和暖如春仅炊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背澎蛛。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工抚垄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瓶竭。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓督勺,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親斤贰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子智哀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容