備注:
Spark 2.4.0
一. 需求
最近做數(shù)據(jù)倉庫項目的時候瞧壮,覺得sqoop有點慢酬诀,然后想嘗試使用Spark來做ETL稿存。
二. 解決方案
ODS層:
Spark可以從MySQL等數(shù)據(jù)源讀取數(shù)據(jù)糟描,然后寫入到Hive中斩个,所以用Spark來做ETL也是沒太大問題的。
數(shù)倉其它層:
Spark可以通過Spark SQL直接運行hive的sql語句,所以用Spark來做ETL也是沒太大問題的膜宋。
我們這邊來模擬幾個例子:
將mysql下的emp表同步到hive中
hive需提前創(chuàng)建好emp表的表結(jié)構(gòu)窿侈。
2.1 全量同步
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# 創(chuàng)建一個連接
spark = SparkSession. \
Builder(). \
appName('local'). \
master('local'). \
getOrCreate()
df1=spark.read.format("jdbc").options(url="jdbc:mysql://10.31.1.123:3306/test", \
driver="com.mysql.jdbc.Driver", \
dbtable="(SELECT * FROM EMP) tmp", \
user="root", \
password="abc123").load()
df1.registerTempTable('emp_mysql')
spark.sql("use test")
df2 = spark.sql("select e1.empno,e1.ename,e1.job,e1.mgr,e1.hiredate,e1.sal,e1.comm,e1.deptno from emp_mysql e1 left join emp e2 on e1.empno = e2.empno where e2.empno is null")
df2.registerTempTable('emp_incre')
spark.sql("insert into emp select empno,ename,job,mgr,hiredate,sal,comm,deptno from emp_incre")
# 關(guān)閉spark會話
spark.stop()
數(shù)據(jù)驗證:
image.png
2.2 增量同步
我們演練的增量同步以一個時間字段為例
例如上例中的hiredate
準(zhǔn)備增量數(shù)據(jù):
insert into EMP(empno,ename,job,mgr,hiredate,sal,comm,deptno)
values (9000000, 'test1', 'CLERK', 7782, '2022-06-05', 1300, null, 10);
image.png
Spark代碼:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# 創(chuàng)建一個連接
spark = SparkSession. \
Builder(). \
appName('local'). \
master('local'). \
getOrCreate()
df1=spark.read.format("jdbc").options(url="jdbc:mysql://10.31.1.123:3306/test",
driver="com.mysql.jdbc.Driver",
dbtable="(select * from EMP where hiredate >= '2022-06-05') tmp",
user="root",
password="abc123").load()
df1.registerTempTable('emp_mysql')
spark.sql("use test")
spark.sql("insert into emp select empno,ename,job,mgr,hiredate,sal,comm,deptno from emp_mysql")
# 關(guān)閉spark會話
spark.stop()
數(shù)據(jù)驗證:
image.png