Design Patterns for using foreachRDD
dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems.
However, it is important to understand how to use this primitive correctly and efficiently.
spark2.3.0版本的官網(wǎng)介紹說dstream.foreachRDD是一個功能強(qiáng)大的原語,允許將數(shù)據(jù)發(fā)送到外部系統(tǒng)烤芦。但是赚导,了解如何正確有效地使用此原語非常重要。
官網(wǎng)給出了foreachRDD的一些用法,但是沒有給出類似wordcount的完整代碼祭刚,在使用數(shù)據(jù)庫連接池的過程還是踩了一些坑宝泵,分享給大家。
示例代碼實(shí)現(xiàn)的是wordcount結(jié)果寫入數(shù)據(jù)庫洲守,并使用數(shù)據(jù)庫連接池疑务。我使用的spark版本是2.3.0沾凄,使用python編寫應(yīng)用程序,部署模式是on yarn client知允。
# foreachRDD_dbutils.py
import os
os.environ.setdefault('SPARK_HOME','/opt/appl/spark')
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import MySQLdb
from DBUtils.PooledDB import PooledDB
class Pool(object):
# 創(chuàng)建一個類變量
__pool = None
def __init__(self):
pass
# 獲取連接池
@staticmethod
def get_connection():
if Pool.__pool is None:
Pool.__pool = PooledDB(MySQLdb,5,host='******',user='root',passwd='root',database='******',charset='utf8')
return Pool.__pool.connection()
# 關(guān)閉連接池
@staticmethod
def close():
if Pool.__pool is not None:
Pool.__pool.close()
def sendPartition(partition):
connection = Pool.get_connection()
cursor = connection.cursor()
for record in partition:
# python 必須用str將int轉(zhuǎn)換成string
cursor.execute("insert into wordcount(word,wordcount) values('" + record[0] + "'," + str(record[1]) + ")")
# 批量提交到數(shù)據(jù)庫執(zhí)行
connection.commit()
# 關(guān)閉連接是指把連接放回連接池撒蟀,而不是真正的關(guān)閉
connection.close()
# 將wordcount統(tǒng)計結(jié)果寫到mysql中
if __name__ == "__main__":
sc = SparkContext(appName='spark_streaming_test',master='yarn')
ssc = StreamingContext(sc,5)
lines = ssc.socketTextStream('172.30.1.243', 9999)
counts = lines.flatMap(lambda line : line.split(' ')) \
.map(lambda word : (word, 1)) \
.reduceByKey(lambda a,b : a + b)
counts.foreachRDD(lambda rdd : rdd.foreachPartition(sendPartition))
counts.pprint()
ssc.start()
try:
ssc.awaitTermination()
except:
pass
finally:
# 關(guān)閉連接池
Pool.close()
提交程序執(zhí)行:
ssh://appl@172.30.1.243:22/opt/appl/anaconda3/bin/python -u /opt/appl/pycharm-projects/spark_streaming_test/foreachRDD_dbutils.py
/opt/appl/spark/conf/spark-env.sh: line 72: hadoop: command not found
2019-01-17 21:28:56 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2019-01-17 21:29:00 WARN Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
-------------------------------------------
Time: 2019-01-17 21:29:25
-------------------------------------------
-------------------------------------------
Time: 2019-01-17 21:29:30
-------------------------------------------
注意:
- 數(shù)據(jù)庫連接池需要是靜態(tài)并且最好是懶加載的。
- 上面的數(shù)據(jù)庫連接池如果不是懶加載會發(fā)生連接池對象序列化異常温鸽。官網(wǎng)對于這點(diǎn)也有說明保屯。
- 數(shù)據(jù)庫連接池懶加載的過程有線程并發(fā)不同步的風(fēng)險,但是如果我加了鎖就會發(fā)生鎖序列化異常涤垫,所以這里沒有更好的解決方案姑尺。
- 可能會發(fā)生 Error from python worker: /bin/python: No module named pyspark 錯誤。