- 相較于rdd勤讽,在數(shù)據(jù)挖掘中更常用的數(shù)據(jù)格式是DataFrame蟋座,由于Catalyst優(yōu)化器的原因,DataFrame在python上并不比scala上慢多少
# 引入必要包
from pyspark.sql import SparkSession
from pyspark.sql import types
spark = SparkSession.builder.master("local").appName("learnsparkdf").enableHiveSupport().getOrCreate()
sc = spark.sparkContext
創(chuàng)建DataFrame
# 使用sc創(chuàng)建df
#方法一:通過(guò)json創(chuàng)建
stringJSONRDD = sc.parallelize(["""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}"""]
)
df = spark.read.json(stringJSONRDD)
df.show()
[out]:
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
# 方法二脚牍,通過(guò)sc創(chuàng)建向臀,通常要指定列名不然會(huì)變成[-1,-2]
list_rdd = sc.parallelize([('TOM', 23), ('JIM', 18), ('BOSE', 50), ('JAME',23), ('JAM')],4)
df2 = spark.createDataFrame(list_rdd)
df2.show()
print(df2.schema) # ***默認(rèn)int數(shù)據(jù)類型為L(zhǎng)ongType
[out]:
+----+---+
| _1| _2|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(_1,StringType,true),StructField(_2,LongType,true)))
# 需要注意的是整數(shù)位LongType,與FloatType和DoubleType不能隱式轉(zhuǎn)換
schema = types.StructType([
types.StructField('Name', types.StringType(), True), # 列名诸狭,數(shù)據(jù)類型券膀,能否為空
types.StructField('Age', types.ShortType(), True),
])
df2 = spark.createDataFrame(list_rdd, schema)
df2.show()
print(df2.schema)
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(Name,StringType,true),StructField(Age,ShortType,true)))
數(shù)據(jù)類型總共有一下類型
"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
"TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
"LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"
創(chuàng)建臨時(shí)表
dataframe的操作通常會(huì)使用sql語(yǔ)句完成,下面有四個(gè)創(chuàng)建表的方法
#df.createGlobalTempView("tempViewName") 創(chuàng)建一個(gè)全局臨時(shí)表驯遇,生命周期為程序的生命周期 **使用的時(shí)候 global_temp.tempViewName
#df.createOrReplaceGlobalTempView("tempViewName") 創(chuàng)建或者替換一個(gè)全局臨時(shí)表芹彬,生命周期為程序的生命周期
#df.createOrReplaceTempView("tempViewName") 創(chuàng)建一個(gè)臨時(shí)表,生命周期為當(dāng)前SparkSession的生命周期
#df.createTempView("tempViewName") 創(chuàng)建或者替換一個(gè)臨時(shí)表叉庐,生命周期為當(dāng)前SparkSession的生命周期
# 刪除臨時(shí)表
# spark.catalog.dropTempView("tempViewName")
# spark.catalog.dropGlobalTempView("tempViewName")
創(chuàng)建表之后舒帮,剩余的操作就和sql基本一樣,一般來(lái)說(shuō)sql操作都會(huì)返回一個(gè)新的dataframe
查看臨時(shí)表
# 創(chuàng)建臨時(shí)表,查看信息,
df2.createOrReplaceTempView('df')
spark.sql("select * from df").show() #***注意返回的也是一個(gè)dataframe
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
[Row(Name='TOM', Age=23),
Row(Name='JIM', Age=18),
Row(Name='BOSE', Age=50),
Row(Name='JAME', Age=23)]
# 取出數(shù)據(jù)后
ret= spark.sql("select * from df").collect() # 也可以直接取出來(lái)
ret[0]['name'] 類似于字典和列表綜合陡叠,這兩種方法都可以獲取元素
ret[0][0] 類似于字典和列表綜合会前,這兩種方法都可以獲取元素
數(shù)據(jù)結(jié)構(gòu)
printSchema()函數(shù)
輸出dataframe schema結(jié)構(gòu)
若不指定dataframe結(jié)構(gòu),系統(tǒng)會(huì)自動(dòng)推斷數(shù)據(jù)類型
df.printSchema()
[out]:
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
API操作
dataframe API查詢
dataframe可以通過(guò)take(),show()展示結(jié)果
可以使用select filter選擇過(guò)濾數(shù)據(jù),還有很多函數(shù)
df2.select("*").filter("age>18").show()
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
|BOSE| 50|
|JAME| 23|
+----+---+
保存文件
# 保存文件
# 一般來(lái)說(shuō)可以存儲(chǔ)為csv,json匾竿,不過(guò)更常見的是使用parquet存儲(chǔ)
# Parquet僅僅是一種存儲(chǔ)格式瓦宜,它是語(yǔ)言、平臺(tái)無(wú)關(guān)的岭妖,并且不需要和任何一種數(shù)據(jù)處理框架綁定临庇,目前能夠和Parquet適配的組件包括下面這些,可以看出基本上通常使用的查詢引擎和計(jì)算框架都已適配昵慌,并且可以很方便的將其它序列化工具生成的數(shù)據(jù)轉(zhuǎn)換成Parquet格式
df.rdd.getNumPartitions() # 獲取分區(qū)數(shù)目
df.write.parquet('/FileStore/tables/testpar', mode = 'overwrite') # 以parquet格式保存數(shù)據(jù)
df.toPandas().to_csv('/FileStore/tables/testpar2.csv')# 以csv格式保存數(shù)據(jù)
from sklearn.externals import joblib
joblib.dump(df.toJSON().collect(), '/FileStore/tables/testpar3.json') # 以json格式保存
# 保存后的/FileStore/tables/testpar是一個(gè)文件加假夺,有多少個(gè)分區(qū)數(shù)目就有多少個(gè)文件***********
df2 = spark.read.parquet('/FileStore/tables/testpar')# 讀取parquet格式文件
jupyter 代碼