實(shí)時計(jì)算的一個方向
實(shí)時計(jì)算未來會成為一個趨勢,基本上所有的離線計(jì)算任務(wù)都能通過實(shí)時計(jì)算來完成昙读,對于實(shí)時計(jì)算來算召调,除了性能,延遲性和吞吐量這些硬指標(biāo)要求以外,我覺得易用性上面應(yīng)該是未來的一個發(fā)展方向唠叛,畢竟現(xiàn)在的實(shí)時計(jì)算入storm只嚣,flink,sparkstreaming等都是通過API來進(jìn)行的艺沼,這些使用起來都不太方便册舞,后續(xù)更大的一個側(cè)重方向應(yīng)該是SQL ON STREAMING,對storm了解不是很多澳厢,但是有些公司已經(jīng)針對storm進(jìn)行了sql封裝,下面只想談下兩個比較流行的開源流計(jì)算引擎對SQL的封裝粒度环础。
Flink
SQL on Streaming Tables
code examples
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
限制
1.2版本 只支持SELECT, FROM, WHERE, and UNION,不支持聚合剩拢,join操作线得,感覺離真正的使用還是有一段距離要走。
spark 2.0徐伐,structure streaming
code examples
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val input = spark.readStream.text("file:///home/hadoop/data/")
val words = input.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start
query.awaitTermination
限制
output mode只實(shí)現(xiàn)了兩種贯钩,且有限制
Append mode (default)
This is the default mode, where only the new rows added to the result table since the last trigger will be outputted to the sink. This is only applicable to queries that do not have any aggregations (e.g. queries with only select, where, map, flatMap, filter,join, etc.).Complete mode
The whole result table will be outputted to the sink.This is only applicable to queries that have aggregations不支持update模式