用 parquet 數(shù)據(jù)模擬實(shí)時(shí)數(shù)據(jù)流
import ohmysummer.conf.{KafkaConfiguration, UriConfiguration}
import ohmysummer.pipeline.schema.{EnterpriseSchema, NationSchema}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions.{col, struct, to_json}
/**
* 讀取 parquet 文件轉(zhuǎn)為 JSON 后寫到 HDFS, 在用命令行將 JSON 數(shù)據(jù)逐行發(fā)到 Kakfa 模擬實(shí)時(shí)流
*/
object WriteEnterprise2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.appName("Write Enterprise Parquet to Kafka")
.getOrCreate()
val parquetSchema = (new EnterpriseSchema).schema
val parqurtUri = (new UriConfiguration).xs6enterprise
val topics = (new KafkaConfiguration).topics
val bootstrap_servers = (new KafkaConfiguration).bootstrap_servers
import spark.implicits._
val ds: DataFrame = spark.readStream
.schema(parquetSchema)
.parquet(parqurtUri)
.filter(($"timestamp" isNotNull) && ($"timestamp" > 956678797000L) && ($"timestamp" < 1924876800000L) )
val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )
.filter($"key" isNotNull)
// 將 parquet 寫為 json
val jdf = df
.writeStream
.format("json")
.option("path", "/tmp/json/nation")
.option("checkpointLocation", "/tmp/write-json2hdfs")
.start()
jdf.awaitTermination()
}
}
再將 JSON 數(shù)據(jù)逐行發(fā)到 Kafka 的不同 topic:
hdfs dfs -cat hdfs://xxxxxx/json/test.json | while read -r LINE; do echo $LINE | sed "s/\"}$/\",\"partition\":$(( ( RANDOM % 5 ) + 1 ))}/"; sleep 1; done | kt produce -topic xs6-nation-test -brokers "dn03,nn01,nn02" ; sleep 0.1; done