對于foreachRDD的正確理解拾枣,請參考對DStream.foreachRDD的理解
在spark streaming的官方文檔中也有對foreachRDD的說明泊碑,請參見Design Patterns for using foreachRDD
基于數(shù)據(jù)的連接
在實際的應(yīng)用中經(jīng)常會使用foreachRDD將數(shù)據(jù)存儲到外部數(shù)據(jù)源躺孝,那么就會涉及到創(chuàng)建和外部數(shù)據(jù)源的連接問題鸳址,最常見的錯誤寫法就是為每條數(shù)據(jù)都建立連接
dstream.foreachRDD { rdd =>
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root") // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
缺點:要為每行數(shù)據(jù)進(jìn)行創(chuàng)建連接操作烘苹,非常的低效红且。
基于partition的連接
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
缺點:這種方式雖然可以一定程度的緩解外部數(shù)據(jù)源的壓力评雌,但是如果partition數(shù)量過多,也會導(dǎo)致連接數(shù)過多直焙。
基于靜態(tài)連接
在上面案例的基礎(chǔ)上景东,可以通過靜態(tài)對象的方式,創(chuàng)建一個靜態(tài)單例奔誓,每個JVM中只有一個連接對象
object Client {
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
def apply(): Connection = conn
}
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = Client()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
缺點:這樣寫的問題在于無論是否有數(shù)據(jù)執(zhí)行了查詢都會創(chuàng)建連接
基于lazy的靜態(tài)連接
可以對上面的稍加改動就可以實現(xiàn)只有在真正使用的時候才創(chuàng)建連接
object Client {
lazy val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
def apply(): Connection = conn
}
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = Client()
partitionOfRecords.foreach(record => connection.send(record))
}
}
缺點:這種方式一個executor上的task都依賴于同一個連接對象斤吐,有可能會造成性能的瓶頸,所以需要一個終極的解決方案厨喂。
基于lazy的靜態(tài)連接池
在官方的樣例中也提到創(chuàng)建連接的時候需要ConnectionPool is a static, lazily initialized pool of connections
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
ConnectionPool可以借助org.apache.commons.pool2框架實現(xiàn)和措,請參考使用commons.pool2實現(xiàn)mysql連接池
下面是簡單的一種實現(xiàn)方法
object ConnectionPool {
private val pool = new GenericObjectPool[Connection](new MysqlConnectionFactory("jdbc:mysql://103.235.245.156:3306/tutorials", "root", "root", "com.mysql.jdbc.Driver"))
def getConnection(): Connection ={
pool.borrowObject()
}
def returnConnection(conn: Connection): Unit ={
pool.returnObject(conn)
}
}
class MysqlConnectionFactory(url: String, userName: String, password: String, className: String) extends BasePooledObjectFactory[Connection]{
override def create(): Connection = {
Class.forName(className)
DriverManager.getConnection(url, userName, password)
}
override def wrap(conn: Connection): PooledObject[Connection] = new DefaultPooledObject[Connection](conn)
override def validateObject(pObj: PooledObject[Connection]) = !pObj.getObject.isClosed
override def destroyObject(pObj: PooledObject[Connection]) = pObj.getObject.close()
}
這樣官方的樣例就可以改造為
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
lazy val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}