RDD(resilient distributed dataset, 彈性分布式數(shù)據(jù)集)
RDD的特點(diǎn)
1、分布式
2福荸、只讀
3蕴坪、血緣關(guān)系
4、緩存(默認(rèn)緩存到內(nèi)存)
5、從外部文件創(chuàng)建
6背传、從父RDD生成子RDD
7呆瞻、支持本地磁盤文件
8、支持整個(gè)目錄径玖、多文件痴脾、通配符
9、支持壓縮文件
10挺狰、支持HDFS
11明郭、容錯(cuò)
RDD核心屬性
調(diào)度和計(jì)算都依賴于這五個(gè)屬性
- 分區(qū)列表
分布在哪臺(tái)機(jī)器上 - 依賴列表
即血緣關(guān)系,需要記錄下來丰泊,因?yàn)樯婕暗饺蒎e(cuò)
寬窄依賴
-
窄依賴:一(父)對(duì)一(子)映射
-
寬依賴:多(父)對(duì)一(子)映射(會(huì)影響到后面的執(zhí)行計(jì)劃)
會(huì)發(fā)生一種Shuffle操作----一種數(shù)據(jù)的傳遞(由于下游的RDD需要把上游的好幾個(gè)task的結(jié)果拿過來會(huì)涉及到數(shù)據(jù)的傳輸)
-
Shuffle
- Compute函數(shù)薯定,用于計(jì)算RDD各分區(qū)的值
- 分區(qū)策略(可選)
數(shù)據(jù)是如何分布到各個(gè)節(jié)點(diǎn)上的 - 優(yōu)先位置列表(可選,HDFS實(shí)現(xiàn)數(shù)據(jù)本地化瞳购,避免數(shù)據(jù)移動(dòng))
例
sc = spark.sparkContext
rdd = sc.textFile("d:/app.log") #新建rdd
rdd.collect() #查看rdd內(nèi)容
rdd = sc.textFile("d:/app.log,d:/Sid_SAS_20180430.txt") #讀取多個(gè)文件的話话侄,用逗號(hào)隔開
rdd.collect()
還可以讀取壓縮文件
rdd = sc.textFile("d:/1-1G113144611.zip")
rdd.count() #查看有多少行
集合并行化
x = [1,2,3,4,5]
type(x)
rdd2 = sc.parallelize(x) #將數(shù)據(jù)轉(zhuǎn)化為RDD
rdd2.collect()
type(rdd2) #查看是否轉(zhuǎn)化為RDD
RDD支持哪些運(yùn)算
1 Transformations
- 輸入RDD,輸出RDD
- 延遲執(zhí)行
- 常用的Transformation
Transformation | 含義 |
---|---|
map(func) | 返回一個(gè)新的RDD学赛,該RDD由每個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成 |
filter(func) | 返回一個(gè)新的RDD年堆,該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成 |
flatMap(func) | 類似于map,但每個(gè)輸入元素可被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列盏浇,而不是單一元素) |
mapPartitions(func) | 類似于map,但獨(dú)立地在RDD的每個(gè)分片上運(yùn)行变丧,因此在類型為T的RDD上運(yùn)行時(shí),func的函數(shù)類型必須是Iterator[T]=>Iterator[U] |
mapPartitionsWithIndex(func) | 類似于mapPartitions, 但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值绢掰,因此在類型為T的RDD上運(yùn)行時(shí)痒蓬,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換滴劲,seed用于指定隨機(jī)數(shù)生成器種子 |
union(otherDataset) | 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD |
intersection(otherDataset) | 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD |
distinct([numTasks])) | 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD |
groupByKey([numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用攻晒,返回一個(gè)(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD班挖,使用指定的reduce函數(shù)鲁捏,將相同key的值聚合到一起,與groupByKey類似萧芙,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對(duì)k/y的RDD進(jìn)行操作 |
sortByKey([ascending], [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用给梅,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似双揪,但是更靈活 第一個(gè)參數(shù)是根據(jù)什么排序 第二個(gè)是怎么排序 false倒序 第三個(gè)排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用破喻,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集) |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) | 兩個(gè)RDD的笛卡爾積 的成很多個(gè)K/V |
pipe(command, [envVars]) | 調(diào)用外部程序 |
coalesce(numPartitions) | 重新分區(qū) 第一個(gè)參數(shù)是要分多少區(qū)盟榴,第二個(gè)參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false |
repartition(numPartitions) | 重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多 |
repartitionAndSortWithinPartitions(partitioner) | 重新分區(qū)+排序 比先分區(qū)再排序效率高 對(duì)K/V的RDD進(jìn)行操作 |
foldByKey(zeroValue)(seqOp) | 該函數(shù)用于K/V做折疊,合并處理 婴噩,與aggregate類似 第一個(gè)括號(hào)的參數(shù)應(yīng)用于每個(gè)V值 第二括號(hào)函數(shù)是聚合例如:+ |
combineByKey | 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
partitionBy(partitioner) | 對(duì)RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2) |
cache擎场,persist | RDD緩存羽德,可以避免重復(fù)計(jì)算從而減少時(shí)間,區(qū)別:cache內(nèi)部調(diào)用了persist算子迅办,cache默認(rèn)就一個(gè)緩存級(jí)別MEMORY-ONLY 宅静,而persist則可以選擇緩存級(jí)別 |
Subtract(rdd) | 返回前rdd元素不在后rdd的rdd |
leftOuterJoin | leftOuterJoin類似于SQL中的左外關(guān)聯(lián)left outer join,返回結(jié)果以前面的RDD為主站欺,關(guān)聯(lián)不上的記錄為空姨夹。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián)矾策,多關(guān)聯(lián)幾次即可磷账。 |
rightOuterJoin | rightOuterJoin類似于SQL中的有外關(guān)聯(lián)right outer join,返回結(jié)果以參數(shù)中的RDD為主贾虽,關(guān)聯(lián)不上的記錄為空逃糟。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián)蓬豁,多關(guān)聯(lián)幾次即可 |
subtractByKey | substractByKey和基本轉(zhuǎn)換操作中的subtract類似只不過這里是針對(duì)K的绰咽,返回在主RDD中出現(xiàn),并且不在otherRDD中出現(xiàn)的元素 |
2 Actions
- 輸入RDD地粪,輸出非RDD
- 立即執(zhí)行
- 常用的Action
Action | 含義 |
---|---|
reduce(func) | 通過func函數(shù)聚集RDD中的所有元素取募,這個(gè)功能必須是課交換且可并聯(lián)的 |
collect() | 在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素 |
count() | 返回RDD的元素個(gè)數(shù) |
first() | 返回RDD的第一個(gè)元素(類似于take(1)) |
take(n) | 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組 |
takeSample(withReplacement,num, [seed]) | 返回一個(gè)數(shù)組蟆技,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成玩敏,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子 |
saveAsTextFile(path) | 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)付魔,對(duì)于每個(gè)元素聊品,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下几苍,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)翻屈。 |
countByKey() | 針對(duì)(K,V)類型的RDD,返回一個(gè)(K,Int)的map妻坝,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)伸眶。 |
foreach(func) | 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新刽宪。 |
aggregate | 先對(duì)分區(qū)進(jìn)行操作厘贼,在總體操作 |
from pyspark.sql import SparkSession
master = 'local'
spark = SparkSession.builder. \
appName('test'). \
master(master). \
getOrCreate()
sc = spark.sparkContext
filename = 'file:///tmp/README.md'
logData = sc.textFile(filename)
wordsRDD = logData. \
flatMap(lambda x:x.split(" ")). \
map(lambda x:(x,1)). \
reduceByKey(lambda x,y:x+y)
out_filenmae = 'result'
wordsRDD.saveAsTextFile(out_filename)
words = wordsRDD.collect()
spark.stop()
作業(yè)
1.從身份證號(hào)中提取年齡
2.從身份證號(hào)中提取性別
3.從身份證號(hào)中提取星座
啟動(dòng)Pyspark
from pyspark.sql import SparkSession
form datetime import date
spark = SparkSession.builder.appName('CardAnalysis').getOrCreate()
sd = spark.sparkContext
###讀取文件
logData = sc.textFile('F:/pyspark/data/cardnum.txt')
###讀取身份證號(hào)
cardRDD = logData.map(lambda x:x.split(',')).map(lambda x:x[1])
###1 取身份證7-14位并計(jì)算年齡
birthdayRDD = cardRDD.map(lambda x:date(int(x[6:10]), int(x[10:12]), int(x[12:14])))
ageRDD = birthdayRDD.map(lambda x:int((date.today()-x).days/365))
age_list = ageRDD.collect()
print(age_list)
###2 取身份證第17位并計(jì)算性別
genderRDD = cardRDD.map(lambda x: 'Male' if int(x[17])%2 else 'Female')
gender_list = genderRDD.collect()
print(gender_list)
###3 取身份證11-14位并計(jì)算星座
定義計(jì)算星座函數(shù)
def getConstellation(month,day):
n = (u'摩羯座', u'水瓶座', u'雙魚座', u'白羊座', u'金牛座', u'雙子座', u'巨蟹座',
u'獅子座', u'處女座', u'天秤座', u'天蝎座', u'射手座')
d = ((1,20), (2,19), (3,21), (4,21), (5,21), (6,22), (7,23),(8,23), (9,23),(10,23),(11,23),(12,23))
return n[len(list(filter(lambda y: y<=(month,day),d)))%12]
constellationRDD = cardRDD.map(lambda x:getConstellation(int(x[10:12]),int(x[12:14])))
constellation_list = constellationRDD.collect()
print(constellation_list)