使用dstream.foreachRDD發(fā)送數(shù)據(jù)到外部系統(tǒng)
經(jīng)過Spark Streaming處理后的數(shù)據(jù)經(jīng)常需要推到外部系統(tǒng)尔店,比如緩存孕锄、數(shù)據(jù)庫已慢、消息系統(tǒng)座掘、文件系統(tǒng)递惋、實時數(shù)據(jù)大屏等
放一張官網(wǎng)上的圖:
圖片來自https://spark.apache.org/docs/latest/streaming-programming-guide.html#overview
其中,最常用的就是使用方法dstream.foreachRDD
看下最佳用法:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
- 循環(huán)每個分區(qū)
- 在每個分區(qū)中溢陪,從連接池獲取連接(數(shù)據(jù)庫/緩存等)
- 循環(huán)操作每條記錄萍虽,存儲或者發(fā)送數(shù)據(jù)
- 釋放連接
幾個常見的錯誤/低效用法
- dirver端創(chuàng)建連接, worker端使用連接(序列化/初始化錯誤等)
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
- 每條記錄創(chuàng)建一個連接(開銷太高)
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
上面的代碼會在worker端創(chuàng)建連接并使用,但是每條記錄都會創(chuàng)建新的連接
當然可以使用連接池進行優(yōu)化形真,但是還有更好的方法
- 每個分區(qū)創(chuàng)建一個連接(可進一步使用連接池優(yōu)化)
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
上面的沒啥大問題了杉编,使用連接池后就是最上面的最佳用法了