1.flink異步io的定義參考
2.應(yīng)用場景之流表join維表。
流表是kafka等流式數(shù)據(jù)。
維表可以是一個(gè)mysql或者cassandra,redis等存儲(chǔ)微酬,甚至是自己定義的一些api蔫缸。
根據(jù)流表join維表的字段去異步查詢維表渔扎。
3.舉個(gè)例子
流表:kafka id1,id2,id3三列
維表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的結(jié)果就是: id1,id2,id3,age,name 流表的字段加上mysql維表的字段盹舞。
流表這邊提供id1像樊,給到維表眷唉,維表那邊執(zhí)行的sql是select * from mysql where id=id1
4.實(shí)戰(zhàn)
參考袋鼠云開源的flinkStreamSQL:
https://github.com/DTStack/flinkStreamSQL
基于開源的flink予颤,對(duì)其實(shí)時(shí)sql進(jìn)行擴(kuò)展;主要實(shí)現(xiàn)了流與維表的join冬阳,支持原生flink SQL所有的語法
源表:kafka 0.9蛤虐,1.x版本
維表:mysql,SQlServer,oracle,hbase肝陪,mongo驳庭,redis,cassandra
結(jié)果表:mysql,SQlServer,oracle,hbase氯窍,elasticsearch5.x饲常,mongo,redis,cassandra
核心是
AsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
if (ORDERED.equals(sideTableInfo.getCacheMode())){
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}else {
return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}
inputStream 就是我們的流表
loadAsyncReq 就是返回一個(gè)RichAsyncFunction狼讨,定義是
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row>
就是從維表中查詢數(shù)據(jù)贝淤,目前袋鼠云支持的幾種維表有
5.擴(kuò)展
流表來源只有kafka,太少熊楼,我們可以擴(kuò)展一下讀取mysql作為流霹娄。參考這里http://www.reibang.com/p/5faa7f822d89