Flink-SQL如何連接外部資源
最近項目中需要把FlinkSQL對標SparkSQL做一套可視化頁面躯畴,但網(wǎng)上針對Flink的相關博客很少肠缔,官網(wǎng)的例子給的也不太全膏秫。目前大多數(shù)人給的結論就是現(xiàn)在Flink Table/SQL的功能還不穩(wěn)定浴栽,都在等待阿里的Blink和Flink合并后在使用。這篇我想用現(xiàn)在最新發(fā)行版1.7.2給大家點參考demo方妖。
目前官網(wǎng)支持的Connectors
**注意 :目前1.72版本的CSV只支持批處理不支持流處理,如果流處理的環(huán)境使用會報下面的錯誤罚攀。
Exception in thread "main"org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
Reason: No context matches.
**
編程范式
tableEnvironment
.connect(...) // 需要傳入繼承ConnectorDescriptor的實現(xiàn)類 eg/ Kafka,FileSystem
.withFormat(...) // 需要傳入繼承FormatDescriptor的實現(xiàn)類 eg/ Json,Avor,Csv
.withSchema(...) // 需要傳入new Schema() 這里邊的Schema是FIink注冊表的字段和類型
.inAppendMode()
/** 支持三種格式
inAppendMode(只支持動態(tài)表的insert過來的數(shù)據(jù))
inRetractMode (支持動態(tài)刷新數(shù)據(jù)表 包括update 和 delete党觅,但性能會受影響)
inUpsertMode (因為操作的是單條數(shù)據(jù)雌澄,所以性能高于inRetractMode)
**/
.registerTableSource("MyTable") // 注冊的表名稱
Example1 Kafka連接器 JSON -> CSV
package mqz.connector;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;
/**
* @author maqingze
* @version v1.0
* @date 2019/3/7 11:24
*/
public class KafkaConnectorFormatJSON2CSV {
private final static String SOURCE_TOPIC = "source";
private final static String SINK_TOPIC = "sink";
private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
private final static String GROUP_ID = "group1";
private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.connect(
new Kafka()
.version("0.10")
.topic(SOURCE_TOPIC)
.startFromEarliest()
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
)
.withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"id", "product", "amount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable");
Table result = tEnv.sqlQuery("select * from sourceTable ");
DataStream<Row> rowDataStream = tEnv.toAppendStream(result, Row.class);
rowDataStream.print();
CsvTableSink sink = new CsvTableSink(
"D:\\Aupload\\flink\\sink.csv", // 輸出路徑
"|", // 字段分隔符
1, // 寫入的文件個數(shù)
FileSystem.WriteMode.OVERWRITE); // 是否覆蓋原文件 還有NO_OVERWRITE模式
tEnv.registerTableSink(
"csvOutputTable",
new String[]{"f0", "f1", "f2"},
new TypeInformation[]{Types.LONG, Types.STRING, Types.INT},
sink);
result.insertInto("csvOutputTable");
env.execute(" tesst kafka connector demo");
}
}
Example2 Kafka連接器 JSON -> JSON
package mqz.connector;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;
/**
* @author maqingze
* @version v1.0
* @date 2019/3/7 11:24
*/
public class KafkaConnectorFormatJSON2JSON {
private final static String SOURCE_TOPIC = "source";
private final static String SINK_TOPIC = "sink";
private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
private final static String GROUP_ID = "group1";
private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.connect(
new Kafka()
.version("0.10")
.topic(SOURCE_TOPIC)
.startFromEarliest()
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
)
.withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"id", "product", "amount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable");
tEnv.connect(
new Kafka()
.version("0.10") // required: valid connector versions are
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic(SINK_TOPIC) // required: topic name from which the table is read
// optional: connector specific properties
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
.property("group.id", GROUP_ID)
// optional: select a startup mode for Kafka offsets
.startFromEarliest()
.sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default)
).withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"yid", "yproduct", "yamount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSink("sinkTable");
tEnv.sqlUpdate("insert into sinkTable(id,product,amount) select id,product,33 from sourceTable ");
env.execute(" tesst kafka connector demo");
}
}