軟件下載略過孽查,直接進入安裝
部署
- 到config目錄下配置
application.yml 選擇元數(shù)據(jù)存儲類型 往枣,我選pgsql
編輯application-pgsql.yml,配置pg連接信息
2.執(zhí)行pgsql腳本到數(shù)據(jù)庫
3.復制flink/lib下jar到 dinky/extends里
測試
1.新建flinksql任務(wù)
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '50'
);
select * from Orders;
local模式預覽(因為select,只能預覽)
上面這個未明確具體原因喧兄, 把元數(shù)據(jù)庫換到mysql 后就可以正常運行了。
CDC配置
將dinky lib目錄下的 dinky-client-base-1.0.0-rc4.jar啊楚、dinky-common-1.0.0-rc4.jar以及 dinky/extends/flink版本/dinky/dinky-client-1.15-1.0.0-rc4.jar 放到了flink的lib下吠冤,
mysql的cdc包 和jdbc包在dinky的extend和 flink/lib下都要有。
重啟flink特幔, 重啟dinky
《注意》:提交standalone模式下咨演, 任務(wù)在flink里, print也在對應(yīng)的任務(wù)節(jié)點的stdout里顯示蚯斯。
《注意》:dinky的jar和flink lib下的jar 不能用軟鏈接薄风,浪費了一上午時間
kafka配置
dinky寫到kafka
報錯:Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
看這篇文章里https://blog.csdn.net/m0_37759590/article/details/127791947 ,
需要在kafka的 server.property里設(shè)置一個屬性值transaction.max.timeout.ms=7200000拍嵌,
在flink sql中設(shè)置對應(yīng)屬性的值