waterdrop 可以設置多數(shù)據(jù)源姆蘸,多輸出源淫奔,按照固定模板寫就行
mkdir -p /data/software
cd /data/software
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.2/waterdrop-1.1.2.zip -O waterdrop-1.1.2.zip
unzip waterdrop-1.1.2.zip
ln -s waterdrop-1.1.2 waterdrop
# 修改sparkHome路徑
cd waterdrop
vim config/waterdrop-env.sh
# SPARK_HOME=${SPARK_HOME:-/opt/spark}
SPARK_HOME=/opt/cloudera/parcels/SPARK2/lib/spark2
# 測試 hive to clickHouse
cp config/batch.conf.template config/batch.conf
vim config/batch.conf
# 配置文件的內(nèi)容
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}
【此代碼為引用其他文章】
waterdrop 執(zhí)行流程:
#第一步:
start-waterdrop.sh->exec ${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop
#第二步:
在waterdrop中:
調(diào)用entrypoint(configFilePath):
1. 獲取配置文件對象
2. 根據(jù)是否為Batch還是Streaming引擎創(chuàng)建對應的輸入灶平,輸出蠢护,過濾對象
3. 檢查相關配置
4. 如果是Batch處理雅宾,那么調(diào)用: batchProcessing(sparkSession, configBuilder, staticInputs, filters, outputs)函數(shù)
在batchProcessig中:
1. 取出第一個ds;
2. 執(zhí)行showWaterdropAsciiLogo()函數(shù),表明waterdrop真正啟動;
3. 如果輸入源非空葵硕,那么過濾眉抬,然后輸出結果 #(其實這塊我是有疑問的)
具體表現(xiàn)在:
private def batchProcessing{
val headDs = registerInputTempViewWithHead(staticInputs, sparkSession)
if (staticInputs.nonEmpty) {
var ds = headDs
for (f <- filters) {
ds = filterProcess(sparkSession, f, ds)
registerFilterTempView(f, ds)
}
outputs.foreach(p => {
outputProcess(sparkSession, p, ds) #為什么只有第一個ds過濾,其他的呢懈凹?
})else {
throw new ConfigRuntimeException("Input must be configured at least once.")
}
}