pyspark系列--讀寫dataframe

來源:https://blog.csdn.net/suzyu12345/article/details/79673473

1. 連接spark

2. 創(chuàng)建dataframe

2.1. 從變量創(chuàng)建

2.2. 從變量創(chuàng)建

2.3. 讀取json

2.4. 讀取csv

2.5. 讀取MySQL

2.6. 從pandas.dataframe創(chuàng)建

2.7. 從列式存儲的parquet讀取

2.8. 從hive讀取

3. 保存數(shù)據(jù)

3.1. 寫到csv

3.2. 保存到parquet

3.3. 寫到hive

3.4. 寫到hdfs

3.5. 寫到mysql

1. 連接sparkfrom pyspark.sql import SparkSession

spark=SparkSession \
        .builder \
        .appName('my_first_app_name') \
        .getOrCreate()

2. 創(chuàng)建dataframe

2.1. 從變量創(chuàng)建

# 生成以逗號分隔的數(shù)據(jù)
stringCSVRDD = spark.sparkContext.parallelize([
    (123, "Katie", 19, "brown"),
    (234, "Michael", 22, "green"),
    (345, "Simone", 23, "blue")
])
# 指定模式, StructField(name,dataType,nullable)
# 其中:
#   name: 該字段的名字旭从,
#   dataType:該字段的數(shù)據(jù)類型惠啄,
#   nullable: 指示該字段的值是否為空
from pyspark.sql.types import StructType, StructField, LongType, StringType  # 導(dǎo)入類型

schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# 對RDD應(yīng)用該模式并且創(chuàng)建DataFrame
swimmers = spark.createDataFrame(stringCSVRDD,schema)

# 利用DataFrame創(chuàng)建一個臨時視圖
swimmers.registerTempTable("swimmers")

# 查看DataFrame的行數(shù)
swimmers.count()

2.2. 從變量創(chuàng)建

# 使用自動類型推斷的方式創(chuàng)建dataframe

data = [(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()

2.3. 讀取json

# 讀取spark下面的示例數(shù)據(jù)

file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()

2.4. 讀取csv

# 先創(chuàng)建csv文件
import pandas as pd
import numpy as np
df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\
    applymap(lambda x: int(x*10))
file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
df.to_csv(file,index=False)

# 再讀取csv文件
monthlySales = spark.read.csv(file, header=True, inferSchema=True)
monthlySales.show()

2.5. 讀取MySQL

# 此時需要將mysql-jar驅(qū)動放到spark-2.2.0-bin-hadoop2.7\jars下面
# 單機環(huán)境可行,集群環(huán)境不行
# 重新執(zhí)行
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable='mysql.db',
    user='root',
    password='123456' 
    ).load()
df.show()

# 也可以傳入SQL語句

sql="(select * from mysql.db where db='wp230') t"
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable=sql,
    user='root',
    password='123456' 
    ).load()
df.show()

2.6. 從pandas.dataframe創(chuàng)建

# 如果不指定schema則用pandas的列名
df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=['a','b','c','d']) 

2.7. 從列式存儲的parquet讀取

# 讀取example下面的parquet文件
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
df=spark.read.parquet(file)
df.show()

2.8. 從hive讀取

# 如果已經(jīng)配置spark連接hive的參數(shù)萌丈,可以直接讀取hive數(shù)據(jù)
spark = SparkSession \
        .builder \
        .enableHiveSupport() \      
        .master("172.31.100.170:7077") \
        .appName("my_first_app_name") \
        .getOrCreate()

df=spark.sql("select * from hive_tb_name")
df.show()

3. 保存數(shù)據(jù)

3.1. 寫到csv

# 創(chuàng)建dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)

# 寫到csv
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
spark_df.write.csv(path=file, header=True, sep=",", mode='overwrite')

3.2. 保存到parquet

# 創(chuàng)建dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)

# 寫到parquet
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
spark_df.write.parquet(path=file,mode='overwrite')

3.3. 寫到hive

# 打開動態(tài)分區(qū)
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")

# 使用普通的hive-sql寫入分區(qū)表
spark.sql("""
    insert overwrite table ai.da_aipurchase_dailysale_hive 
    partition (saledate) 
    select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate 
    from szy_aipurchase_tmp_szy_dailysale distribute by saledate
    """)

# 或者使用每次重建分區(qū)表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')

# 不寫分區(qū)表,只是簡單的導(dǎo)入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)

3.4. 寫到hdfs

# 數(shù)據(jù)寫到hdfs雷则,而且以csv格式保存
jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")

3.5. 寫到mysql

# 會自動對齊字段辆雾,也就是說,spark_df 的列不一定要全部包含MySQL的表的全部列才行

# overwrite 清空表再導(dǎo)入
spark_df.write.mode("overwrite").format("jdbc").options(
    url='jdbc:mysql://127.0.0.1',
    user='root',
    password='123456',
    dbtable="test.test",
    batchsize="1000",
).save()

# append 追加方式
spark_df.write.mode("append").format("jdbc").options(
    url='jdbc:mysql://127.0.0.1',
    user='root',
    password='123456',
    dbtable="test.test",
    batchsize="1000",
).save()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末月劈,一起剝皮案震驚了整個濱河市度迂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌猜揪,老刑警劉巖惭墓,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異而姐,居然都是意外死亡腊凶,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門拴念,熙熙樓的掌柜王于貴愁眉苦臉地迎上來钧萍,“玉大人,你說我怎么就攤上這事政鼠》缡荩” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵公般,是天一觀的道長万搔。 經(jīng)常有香客問我器躏,道長,這世上最難降的妖魔是什么蟹略? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任登失,我火速辦了婚禮,結(jié)果婚禮上挖炬,老公的妹妹穿的比我還像新娘揽浙。我一直安慰自己,他們只是感情好意敛,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布馅巷。 她就那樣靜靜地躺著,像睡著了一般草姻。 火紅的嫁衣襯著肌膚如雪钓猬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天撩独,我揣著相機與錄音敞曹,去河邊找鬼。 笑死综膀,一個胖子當著我的面吹牛澳迫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播剧劝,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼橄登,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了讥此?” 一聲冷哼從身側(cè)響起拢锹,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萄喳,沒想到半個月后卒稳,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡取胎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年展哭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闻蛀。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡匪傍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出觉痛,到底是詐尸還是另有隱情役衡,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布薪棒,位于F島的核電站手蝎,受9級特大地震影響榕莺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棵介,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一钉鸯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧邮辽,春花似錦唠雕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至揣云,卻和暖如春捕儒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背邓夕。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工刘莹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人翎迁。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓栋猖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親汪榔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355