背景交代
由于我們應(yīng)用系統(tǒng)使用的是mongo,所以每次操作結(jié)果都要輸出到MongoDB方便使用 清焕。
思路
1荷鼠、遇到這樣的情景我第一時間打開Spark官網(wǎng) Mongo數(shù)據(jù)源
2匹中、根據(jù)教程添加好依賴以后,開始配置鏈接信息点待。
3爱榕、根據(jù)教程創(chuàng)建Mongo的配置文件發(fā)現(xiàn)不少坑。
采坑
坑一
image.png
官方提供的demo都是不需要登錄憑證的花履。
坑二
image.png
1僧界、找到配置用戶密碼的缺發(fā)現(xiàn)在0.12.* 版本中 MongodbCredentials 類已經(jīng)移動包名了,直接拷貝文檔會報錯臭挽。
2捂襟、MongodbConfigBuilder 0.12.* 。MongodbConfigBuilder(map:Map,list:List)更本沒有這個構(gòu)造函數(shù)欢峰。
3葬荷、源碼配置文件沒有注釋,對于英語不好的我有點D疼纽帖。
最終在查看和調(diào)試源碼下找到關(guān)鍵字段
image.png
1宠漩、上面代碼大致意思以 MongodbConfig.Credentials 為key 讀取config中的屬性并轉(zhuǎn)換成[List[MongodbCredentials]] 對象,如果為空則獲取默認的配置懊直。最后map 成MongoCredential 對象扒吁。 從這里可以看出我們只需要配置MongodbConfig.Credentials 中配置用戶密碼就好了。
配置代碼
package cn.harsons.mbd.util
import cn.harsons.mbd.util.Config._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import com.stratio.datasource.mongodb.config.{MongodbConfigBuilder, MongodbCredentials}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* mongoDB操作工具
*
* @author liyabin
* @date 2020/3/12 0012
*/
object MongoUtils {
/**
* 使用 mongoDB 集成方式 寫入mongo
*
* @param collectionName 集合名稱
* @param data 數(shù)據(jù)集
* @param sparkSession spark 對象
*/
def saveToMongo(collectionName: String, data: DataFrame, sparkSession: SparkSession): Unit = {
// 已經(jīng)引入配置對象室囊,可以直接使用配置對象的屬性
// Credentials 認證 需要用戶名和密碼
val saveConfig = MongodbConfigBuilder(
Map(Host -> List(mongo_host),
Database -> mongo_database,
Collection -> collectionName,
SamplingRatio -> 1.0,
WriteConcern -> "normal",
SplitSize -> 8,
SplitKey -> "_id",
Credentials -> List(MongodbCredentials(mongo_user, mongo_authentication, mongo_password.toCharArray)))
)
data.saveToMongodb(saveConfig.build())
}
/**
* 使用 Spark 原生的方式 寫入mongo
*
* @param collectionName 集合名稱
* @param data 數(shù)據(jù)集
* @param mode 數(shù)據(jù)寫入模式 (覆蓋雕崩、追加等)
* @param sparkSession sparkSession 對象
*/
def writeToMongo(collectionName: String, data: DataFrame, mode: SaveMode, sparkSession: SparkSession): Unit = {
// Credentials 官網(wǎng)上要求 如果是String 類型 使用這種方式 配置用戶憑證 user,authDataBase,password
val options = Map("host" -> mongo_host,
Credentials -> (mongo_user + "," + mongo_authentication + "," + mongo_password),
"database" -> mongo_database,
"collection" -> collectionName)
data.write.format("com.stratio.datasource.mongodb").mode(mode)
.options(options)
.save()
}
}