foreachRDD(func)的官方解釋為
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
對(duì)于這個(gè)定義會(huì)產(chǎn)生一個(gè)疑問(wèn):在一個(gè)batch interval里面會(huì)產(chǎn)生幾個(gè)RDD缓呛?
結(jié)論:有且只有一個(gè)。
那么定義里面所說(shuō)的“each RDD”應(yīng)該如何理解呢?
DStream可以理解為是基于時(shí)間的赞咙,即每個(gè)interval產(chǎn)生一個(gè)RDD播玖,所以如果以時(shí)間為軸痕檬,每隔一段時(shí)間就會(huì)產(chǎn)生一個(gè)RDD晌梨,那么定義中的“each RDD”應(yīng)該理解為每個(gè)interval的RDD宦焦,而不是一個(gè)interval中的每個(gè)RDD。
從spark的源碼分析
DStream中的foreachRDD方法最終會(huì)調(diào)用如下的代碼
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
可以看到這個(gè)方法里面并沒(méi)有任何的Iterator菇篡,可以對(duì)比一下RDD中的foreachPartition
和foreach
方法漩符,這兩個(gè)方法是會(huì)遍歷RDD,所以才會(huì)有Iterator類(lèi)型的引用
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
而如果每個(gè)interval中有多個(gè)RDD驱还,那么DStream中的foreachRDD也一定會(huì)有Iterator類(lèi)型的引用嗜暴,但是從上述的代碼中并沒(méi)有。