前言
前些天可以讓批處理的配置變得更優(yōu)雅StreamingPro 支持多輸入,多輸出配置萨咳,現(xiàn)在流式計算也支持相同的配置方式了擒权。
另外未來等另外一個項目穩(wěn)定,會釋放出來配合StreamingPro使用适篙,它可以讓你很方便的讀寫HBase,比如可以為HBase 表 添加mapping,類似ES的做法,也可以不用mapping箫爷,系統(tǒng)會自動為你創(chuàng)建列(familly:column作為列名)嚷节,或者將所有列合并成一個字段讓你做處理。
配置
首先需要配置源:
{
"name": "stream.sources.kafka",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test",
"header": "true"
},
{
"topics":"test",
"zk":"127.0.0.1",
"groupId":"kk3",
"outputTable": "abc"
}
]
}
我們配置了一個Kafka流虎锚,一個普通的CSV文件硫痰。目前StreamingPro只允許配置一個Kafka流,但是支持多個topic,按逗號分隔即可。你可以配置多個其他非流式源窜护,比如從MySQL,Parquet,CSV同時讀取數(shù)據(jù)并且映射成表效斑。
之后你就可以寫SQL進(jìn)行處理了。
{
"name": "stream.sql",
"params": [
{
"sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
"outputTableName": "finalOutputTable"
}
]
},
我這里做了簡單的join柱徙。
{
"name": "stream.outputs",
"params": [
{
"format": "jdbc",
"path": "-",
"driver":"com.mysql.jdbc.Driver",
"url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
"inputTableName": "finalOutputTable",
"user":"~",
"password":"~",
"dbtable":"aaa",
"mode":"Append"
}
]
}
然后把數(shù)據(jù)追加到Mysql里去缓屠。其實你也可以配置多個輸出。
完整配置
{
"example": {
"desc": "測試",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "stream.sources.kafka",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test",
"header": "true"
},
{
"topics":"test",
"zk":"127.0.0.1",
"groupId":"kk3",
"outputTable": "abc"
}
]
},
{
"name": "stream.sql",
"params": [
{
"sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
"outputTableName": "finalOutputTable"
}
]
},
{
"name": "stream.outputs",
"params": [
{
"format": "jdbc",
"path": "-",
"driver":"com.mysql.jdbc.Driver",
"url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
"inputTableName": "finalOutputTable",
"user":"~",
"password":"~",
"dbtable":"aaa",
"mode":"Append"
}
]
}
],
"configParams": {
}
}
}
你可以在StreamingPro-0.4.11 下載到包护侮,然后用命令啟動:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar \
-streaming.name test \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json