業(yè)務(wù)背景
將hive中多個(gè)表數(shù)據(jù)同步到clickhouse中提供實(shí)時(shí)查詢圾另,表均2億條記錄扛点。對(duì)同步工具的要求一是能夠?qū)崿F(xiàn)抽數(shù)時(shí)間不宜過(guò)長(zhǎng)术辐;二是能夠自定義控制將數(shù)據(jù)抽取到clickhouse集群指定的節(jié)點(diǎn)實(shí)例上贷揽。作為一名java開發(fā)渡处,自然不想過(guò)多依賴Hadoop那一套镜悉,網(wǎng)上搜索一番后決定使用seatunnel,通過(guò)簡(jiǎn)單配置化就可以實(shí)現(xiàn)數(shù)據(jù)的抽取医瘫。
簡(jiǎn)介
Apache SeaTunnel (Incubating) 是一個(gè)分布式侣肄、高性能、易擴(kuò)展醇份、用于海量數(shù)據(jù)(離線&實(shí)時(shí))同步和轉(zhuǎn)化的數(shù)據(jù)集成平臺(tái)稼锅。
官方文檔:https://interestinglab.github.io/seatunnel-docs/#/
安裝
安裝比較簡(jiǎn)單,參考官方文檔即可僚纷。
配置
config.conf 下述配置是從hive中抽數(shù)插入到clickhouse中的配置矩距,數(shù)據(jù)源是hive的一張表,通過(guò)seatunnel插件根據(jù)id字段進(jìn)行分片插入clickhouse集群不同分片怖竭。
spark {
spark.sql.catalogImplementation = "hive"
spark.app.name = "hive2clickhouse"
spark.executor.instances = 30
spark.executor.cores = 1
spark.executor.memory = "2g"
spark.ui.port = 13000
}
input {
hive {
pre_sql = "select id,name,create_time from table"
table_name = "table_tmp"
}
}
filter {
convert {
source_field = "data_source"
new_type = "UInt8"
}
org.interestinglab.waterdrop.filter.Slice {
source_table_name = "table_tmp"
source_field = "id"
slice_num = 2
slice_code = 0
result_table_name = "table_8123"
}
org.interestinglab.waterdrop.filter.Slice {
source_table_name = "table_tmp"
source_field = "id"
slice_num = 2
slice_code = 1
result_table_name = "table_8124"
}
}
output {
clickhouse {
source_table_name="table_8123"
host = "ip1:8123"
database = "db_name"
username="username"
password="pwd"
table = "table1"
fields = ["id","name","create_time"]
clickhouse.socket_timeout = 50000
retry_codes = [209, 210]
retry = 3
bulk_size = 500000
}
clickhouse {
source_table_name="table_8124"
host = "ip2:8124"
database = "db_name"
username="username"
password="pwd"
table = "table1"
fields = ["id","name","create_time"]
clickhouse.socket_timeout = 50000
retry_codes = [209, 210]
retry = 3
bulk_size = 500000
}
}
插件開發(fā)
package org.interestinglab.waterdrop.filter
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.{col, hash, lit, udf}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
class Slice extends BaseFilter {
var conf: Config = ConfigFactory.empty()
/**
* Set Config.
* */
override def setConfig(config: Config): Unit = {
this.conf = config
}
/**
* Get Config.
* */
override def getConfig(): Config = {
this.conf
}
override def checkConfig(): (Boolean, String) = {
if (!conf.hasPath("source_field")) {
(false, "please specify [source_field] as a non-empty string")
} else if (!conf.hasPath("slice_code")) {
(false, "please specify [slice_code] as a non-empty string")
} else if (!conf.hasPath("slice_num")) {
(false, "please specify [slice_num] as a non-empty string")
} else {
(true, "")
}
}
override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
val srcField = conf.getString("source_field")
val sliceCode = conf.getInt("slice_code")
val sliceNum = conf.getInt("slice_num")
df.filter(func(hash(col(srcField)), lit(sliceNum), lit(sliceCode)))
}
val func = udf((s: String, num: Int, target: Int) => {
val moCOde = s.toDouble % num
val absValue = moCOde.toInt.abs
absValue == target
})
}
啟動(dòng)
../bin/start-waterdrop.sh --master local[4] --deploy-mode client --config.conf