spark中的RDD是一個(gè)核心概念,RDD是一種彈性分布式數(shù)據(jù)集捌木,spark計(jì)算操作都是基于RDD進(jìn)行的,本文介紹RDD的基本操作免钻。
Spark 初始化
Spark初始化主要是要?jiǎng)?chuàng)建一個(gè)SprakContext
實(shí)例,該實(shí)例表示與spark集群的連接》锔玻可以通過多種方式創(chuàng)建。
SparkContext
直接使用SparkContext
類創(chuàng)建一個(gè)spark上下文盯桦,主要參數(shù)是指定master
和appName
。
from pyspark import SparkContext
sc = SprakContext(master = 'local[*]',appName='test')
SprakContext的屬性
# spark版本
sc.version
'2.4.5'
# python版本
sc.pythonVer
'3.7'
# master地址
sc.master
'local[*]'
# 應(yīng)用名字
sc.appName
'test'
# 應(yīng)用id
sc.applicationId
'local-1596522649115'
SparkConf
還可以通過調(diào)用SparkConf
配置類來生成spark上下文贴膘。
from pyspark import SparkConf, SprakContext
conf = SparkConf().setMaster('local').setAppName('test')
sc = SparkContext(conf=conf)
創(chuàng)建RDD
RDD是spark中的主要數(shù)據(jù)格式略号,名稱為彈性分布式數(shù)據(jù)集刑峡,可以序列化python對(duì)象來得到RDD玄柠,或者讀取文件。
序列化
# parallelize方法序列化python對(duì)象為RDD
rdd = sc.parallelize([('a', 7), ('a', 2), ('b', 2)])
rdd1 = sc.parallelize([2,5,1,8])
rdd2 = sc.parallelize([('a', 2), ('d', 1), ('b', 1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])])
讀取文件
# 讀取本地json文件羽利,返回RDD
text_file = sc.textFile("e:/a.json")
獲取RDD信息
基本信息
# 獲取rdd的分區(qū)數(shù)
rdd.getNumPartitions()
12
# 獲取rdd的key
rdd.keys().collect()
['a', 'a', 'b']
# 獲取rdd的value
rdd.values().collect()
[7, 2, 2]
# 判斷rdd是否為空
rdd.isEmpty()
False
sc.parallelize([]).isEmpty()
True
統(tǒng)計(jì)信息
統(tǒng)計(jì)信息包含了基本的統(tǒng)計(jì)計(jì)算值,如最大值娃闲、最小值、平均數(shù)皇帮、描述統(tǒng)計(jì)等。
# 求和
rdd3.sum()
4950
# 最大值
rdd3.max()
99
# 最小值
rdd3.min()
0
# 均值
rdd3.mean()
49.5
# 標(biāo)準(zhǔn)差
rdd3.stdev()
28.86607004772212
# 方差
rdd3.variance()
833.25
# 分區(qū)間計(jì)數(shù)
rdd3.histogram(3)
([0, 33, 66, 99], [33, 33, 34])
# 描述統(tǒng)計(jì)
rdd3.stats()
(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)
處理RDD
切片/collect
# 獲取rdd里的所有元素玲献,返回list
rdd.collect()
[('a', 7), ('a', 2), ('b', 2)]
# 獲取rdd里的元素,返回字典
rdd.collectAsMap()
{'a': 2, 'd': 1, 'b': 1}
# 獲取開始的2個(gè)元素
rdd.take(2)
[('a', 7), ('a', 2)]
# 獲取第一個(gè)位置的元素
rdd.first()
('a', 7)
# 獲取降序排序的前3個(gè)元素
rdd3.top(3)
[99, 98, 97]
計(jì)數(shù)/count
# 統(tǒng)計(jì)rdd里的元素個(gè)數(shù)
rdd.count()
3
# 按key統(tǒng)計(jì)rdd里的元素個(gè)數(shù)
rdd.countByKey()
defaultdict(<class 'int'>, {'a': 2, 'b': 1})
# 按value統(tǒng)計(jì)rdd里的元素個(gè)數(shù)
rdd.countByValue()
defaultdict(<class 'int'>, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})
重采樣/sample
# 對(duì)rdd進(jìn)行重采樣
rdd3.sample(False,0.1,81).collect()
[4, 27, 28, 41, 49, 53, 58, 85, 93]
過濾/filter
# 根據(jù)key過濾
rdd.filter(lambda x:'a' in x).collect()
[('a', 7), ('a', 2)]
去重/distinct
# 對(duì)rdd元素去重
rdd5.distinct().collect()
['a', 7, 2, 'b']
排序/sortBy
# 升序排序(默認(rèn))
rdd1.sortBy(lambda x:x).collect()
[1, 2, 5, 8]
# 降序排序
rdd1.sortBy(lambda x:x,ascending=False).collect()
[8, 5, 2, 1]
# 對(duì)鍵值對(duì)rdd按照key排序
rdd2.sortByKey().collect()
[('a', 2), ('b', 1), ('d', 1)]
映射/map
# map方法對(duì)每個(gè)元素應(yīng)用函數(shù)
rdd.map(lambda x: x+(x[0],x[1])).collect()
[('a', 7, 'a', 7), ('a', 2, 'a', 2), ('b', 2, 'b', 2)]
# flatMap方法瓢娜,返回的結(jié)果會(huì)扁平化
rdd5 = rdd.flatMap(lambda x: x+(x[0],x[1]))
rdd5.collect()
['a', 7, 'a', 7, 'a', 2, 'a', 2, 'b', 2, 'b', 2]
# flatMapValues方法
rdd4.flatMapValues(lambda x:x).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
迭代/foreach
def g(x):print(x)
# foreach方法對(duì)所有元素應(yīng)用函數(shù)
rdd.foreach(x)
('a', 7)
('a', 2)
('b', 2)
簡(jiǎn)化/reduce
# reduce方法對(duì)rdd進(jìn)行合并
rdd.reduce(lambda x,y:x+y)
('a', 7, 'a', 2, 'b', 2)
# reduceByKey方法根據(jù)key對(duì)value進(jìn)行合并
rdd.reduceByKey(lambda v1,v2:v1+v2).collect()
[('a', 9), ('b', 2)]
分組/groupBy
# groupBy方法對(duì)rdd的元素分組
rdd1.groupBy(lambda x:x%2).mapValues(list).collect()
[(0, [2, 8]), (1, [5, 1])]
# groupByKey方法對(duì)rdd的元素根據(jù)key分組
rdd.groupByKey().mapValues(list).collect()
[('a', [7, 2]), ('b', [2])]
聚合/aggregate
# 定義兩個(gè)聚合函數(shù)
seq_op=lambda x,y:(x[0]+y,x[1]+1)
comb_op=lambda x,y:(x[0]+y[0],x[1]+y[1])
# aggregate方法聚合rdd
rdd1.aggregate((0,0),seq_op,comb_op)
(16, 4)
# aggregateByKey方法根據(jù)key聚合rdd
rdd.aggregateByKey((0,0),seq_op,comb_op).collect()
[('a', (9, 2)), ('b', (2, 1))]
# fold方法聚合rdd
rdd1.fold(0,lambda x,y:x+y)
16
# foldByKey方法根據(jù)key聚合rdd
rdd.foldByKey(0,lambda x,y:x+y).collect()
[('a', 9), ('b', 2)]
合并/union
# 調(diào)用sc的union方法按順序合并多個(gè)rdd
sc.union([rdd,rdd2]).collect()
[('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)]
集合/intersection,union,subtract
# 兩個(gè)rdd的交集
rdd.intersection(rdd2).collect()
[('a', 2)]
# 兩個(gè)rdd的并集(包含重復(fù)元素)
rdd.union(rdd2).collect()
[('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)]
# rdd對(duì)rdd2的補(bǔ)集
rdd.subtract(rdd2).collect()
[('a', 7), ('b', 2)]
# 根據(jù)key求rdd2對(duì)rdd的補(bǔ)集)
rdd2.subtractByKey(rdd).collect()
[('d', 1)]
# 兩個(gè)rdd計(jì)算笛卡爾積
rdd1.cartesian(rdd1).collect()
[(2, 2), (2, 5), (2, 1), (2, 8), (5, 2), (5, 5), (5, 1), (5, 8), (1, 2), (1, 5), (1, 1), (1, 8), (8, 2), (8, 5), (8, 1), (8, 8)]
保存RDD
# 保存rdd到本地
rdd.saveAsTextFile('rdd.txt')
關(guān)閉spark
# 使用stop方法關(guān)閉spark context實(shí)例
sc.stop()
運(yùn)行
進(jìn)入spark安裝目錄下眠砾,通過sprak-submit
命令運(yùn)行py文件托酸。
./bin/spark-submit example/src/main/python/pi.py
另外,本地開發(fā)励堡,可直接通過pyCharm運(yùn)行。