Apache Spark 是一個圍繞速度、易用性和復(fù)雜分析構(gòu)建的大數(shù)據(jù)處理框架,提供了一個全面羽氮、統(tǒng)一的框架用于管理各種有著不同性質(zhì)(文本數(shù)據(jù)、圖表數(shù)據(jù)等)的數(shù)據(jù)集和數(shù)據(jù)源(批量數(shù)據(jù)或?qū)崟r的流數(shù)據(jù))的大數(shù)據(jù)處理的需求翅楼。 這里簡單展示如何用Apache Spark 把數(shù)據(jù)存儲到Postgres數(shù)據(jù)庫映砖。
首先開始spark 服務(wù)
pyspark --driver-class-path /opt/spark/jars/postgresql-42.2.12.jar --jars /opt/spark/jars/postgresql-42.2.12.jar
然后刪除之前產(chǎn)生的表
import psycopg2 as p2
conn = p2.connect("host=localhost dbname =test user=detian password='p31415926'")
cur = conn.cursor()
cur.execute(""" drop table test.germanydata""")
conn.commit()
然后抽取網(wǎng)絡(luò)數(shù)據(jù),并且存儲在Dataframe 里面
import requests
import json
from pyspark.sql import Row
from collections import OrderedDict
from pyspark import SparkContext
from collections import OrderedDict
#Assign URL
URL = "https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-04-01T00:00:00Z&to=2020-05-01T00:00:00Z"
r = requests.get(url =URL)
data = r.json()
#define a function to parse json file to row
def convert_to_row(d: dict) -> Row:
return Row(**OrderedDict(sorted(d.items())))
#convert the data to a dataframe
df=sc.parallelize(data).map(convert_to_row).toDF()
#use only some of the columns
jdbcDF=df.select("Cases", "Country", "Date","Status")
然后通過jdbc driver 連接postgres 并將dataframe 里面的數(shù)據(jù)寫入數(shù)據(jù)庫面粮。
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/test") \
.option("dbtable", "test.germanydata") \
.option("user", "detian") \
.option("password", "p31415926") \
.save()
檢查數(shù)據(jù)庫的germanydata 表