來源:https://blog.csdn.net/suzyu12345/article/details/79673473
1. 連接spark
2. 創(chuàng)建dataframe
2.1. 從變量創(chuàng)建
2.2. 從變量創(chuàng)建
2.3. 讀取json
2.4. 讀取csv
2.5. 讀取MySQL
2.6. 從pandas.dataframe創(chuàng)建
2.7. 從列式存儲的parquet讀取
2.8. 從hive讀取
3. 保存數(shù)據(jù)
3.1. 寫到csv
3.2. 保存到parquet
3.3. 寫到hive
3.4. 寫到hdfs
3.5. 寫到mysql
1. 連接sparkfrom pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
2. 創(chuàng)建dataframe
2.1. 從變量創(chuàng)建
# 生成以逗號分隔的數(shù)據(jù)
stringCSVRDD = spark.sparkContext.parallelize([
(123, "Katie", 19, "brown"),
(234, "Michael", 22, "green"),
(345, "Simone", 23, "blue")
])
# 指定模式, StructField(name,dataType,nullable)
# 其中:
# name: 該字段的名字旭从,
# dataType:該字段的數(shù)據(jù)類型惠啄,
# nullable: 指示該字段的值是否為空
from pyspark.sql.types import StructType, StructField, LongType, StringType # 導(dǎo)入類型
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
# 對RDD應(yīng)用該模式并且創(chuàng)建DataFrame
swimmers = spark.createDataFrame(stringCSVRDD,schema)
# 利用DataFrame創(chuàng)建一個臨時視圖
swimmers.registerTempTable("swimmers")
# 查看DataFrame的行數(shù)
swimmers.count()
2.2. 從變量創(chuàng)建
# 使用自動類型推斷的方式創(chuàng)建dataframe
data = [(123, "Katie", 19, "brown"),
(234, "Michael", 22, "green"),
(345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()
2.3. 讀取json
# 讀取spark下面的示例數(shù)據(jù)
file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()
2.4. 讀取csv
# 先創(chuàng)建csv文件
import pandas as pd
import numpy as np
df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\
applymap(lambda x: int(x*10))
file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
df.to_csv(file,index=False)
# 再讀取csv文件
monthlySales = spark.read.csv(file, header=True, inferSchema=True)
monthlySales.show()
2.5. 讀取MySQL
# 此時需要將mysql-jar驅(qū)動放到spark-2.2.0-bin-hadoop2.7\jars下面
# 單機環(huán)境可行,集群環(huán)境不行
# 重新執(zhí)行
df = spark.read.format('jdbc').options(
url='jdbc:mysql://127.0.0.1',
dbtable='mysql.db',
user='root',
password='123456'
).load()
df.show()
# 也可以傳入SQL語句
sql="(select * from mysql.db where db='wp230') t"
df = spark.read.format('jdbc').options(
url='jdbc:mysql://127.0.0.1',
dbtable=sql,
user='root',
password='123456'
).load()
df.show()
2.6. 從pandas.dataframe創(chuàng)建
# 如果不指定schema則用pandas的列名
df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])
2.7. 從列式存儲的parquet讀取
# 讀取example下面的parquet文件
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
df=spark.read.parquet(file)
df.show()
2.8. 從hive讀取
# 如果已經(jīng)配置spark連接hive的參數(shù)萌丈,可以直接讀取hive數(shù)據(jù)
spark = SparkSession \
.builder \
.enableHiveSupport() \
.master("172.31.100.170:7077") \
.appName("my_first_app_name") \
.getOrCreate()
df=spark.sql("select * from hive_tb_name")
df.show()
3. 保存數(shù)據(jù)
3.1. 寫到csv
# 創(chuàng)建dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)
# 寫到csv
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
spark_df.write.csv(path=file, header=True, sep=",", mode='overwrite')
3.2. 保存到parquet
# 創(chuàng)建dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)
# 寫到parquet
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
spark_df.write.parquet(path=file,mode='overwrite')
3.3. 寫到hive
# 打開動態(tài)分區(qū)
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
# 使用普通的hive-sql寫入分區(qū)表
spark.sql("""
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
""")
# 或者使用每次重建分區(qū)表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')
# 不寫分區(qū)表,只是簡單的導(dǎo)入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
3.4. 寫到hdfs
# 數(shù)據(jù)寫到hdfs雷则,而且以csv格式保存
jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")
3.5. 寫到mysql
# 會自動對齊字段辆雾,也就是說,spark_df 的列不一定要全部包含MySQL的表的全部列才行
# overwrite 清空表再導(dǎo)入
spark_df.write.mode("overwrite").format("jdbc").options(
url='jdbc:mysql://127.0.0.1',
user='root',
password='123456',
dbtable="test.test",
batchsize="1000",
).save()
# append 追加方式
spark_df.write.mode("append").format("jdbc").options(
url='jdbc:mysql://127.0.0.1',
user='root',
password='123456',
dbtable="test.test",
batchsize="1000",
).save()