需求
分析的中間數(shù)據(jù), 很多需要持久化到關(guān)系型數(shù)據(jù)庫, 以便后續(xù)的二次分析, 在官方給出insert指定字段的接口之前我先實現(xiàn)自己的方法吧畦贸。
背景
之前有一篇文章?spark SQL操作之關(guān)系型數(shù)據(jù)庫
簡單講解了spark 寫mysql的接口稽寒。
逐行指定字段寫入數(shù)據(jù)庫, 我們必須能拿到每一行數(shù)據(jù)召嘶。在spark SQL 操作完的對象是一個 RDD,?spark SQL scala api說明?上有一個api:
def foreachPartition(f: (Iterator[T]) ? Unit):Unit
// Applies a function f to each partition of this RDD.
因此我們只需要實現(xiàn)一個參數(shù)是?Iterator
類型的函數(shù), 就能取出每一行的數(shù)據(jù)宴猾。
代碼實現(xiàn)實例
閉包
val keyWords = sqlContext.sql("your sql ")
// scala 閉包, 傳參數(shù)
def keyWordsr2mysql(iter: Iterator[org.apache.spark.sql.Row]): Unit = {
val keyTags = Array("count_start_time", "kind")
val tags = Array("word", "count")
write2mysql(iter, "keyWordCount", keyTags, tags, sStartTime, kind)
}
抽象寫數(shù)據(jù)庫
將數(shù)據(jù)庫表名, 指定字段等以參數(shù)形式傳入, 接口更抽象通用
def write2mysql(iter :Iterator[org.apache.spark.sql.Row], sTable: String, keytags :Array[String], tags :Array[String], args:Any*): Unit = {
// your code
}