前言
Flink 可以從各種來源獲取數(shù)據(jù)箫老,然后構(gòu)建 DataStream 進(jìn)行轉(zhuǎn)換處理析桥。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源(data source)涕蚤,而讀取數(shù)據(jù)的算子就是源算子(source operator)地来。所以,source就是我們整個處理程序的輸入端幌蚊。
Flink 代碼中通用的添加 source 的方式,是調(diào)用執(zhí)行環(huán)境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
方法傳入一個對象參數(shù)溃卡,需要實現(xiàn) SourceFunction 接口溢豆;返回 DataStreamSource。這里的 DataStreamSource 類繼承自 SingleOutputStreamOperator 類瘸羡,又進(jìn)一步繼承自 DataStream漩仙。所以很明顯,讀取數(shù)據(jù)的 source 操作是一個算子犹赖,得到的是一個數(shù)據(jù)流(DataStream)队他。
傳入的參數(shù)是一個“源函數(shù)”(source function),需要實現(xiàn)SourceFunction 接口峻村。
Flink 直接提供了很多預(yù)實現(xiàn)的接口麸折,此外還有很多外部連接工具也幫我們實現(xiàn)了對應(yīng)的 source function,通常情況下足以應(yīng)對我們的實際需求粘昨。
Flink 已實現(xiàn)的Source:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/overview/
準(zhǔn)備工作
為了更好地理解垢啼,我們先構(gòu)建一個實際應(yīng)用場景。比如網(wǎng)站的訪問操作张肾,可以抽象成一個三元組(用戶名芭析,用戶訪問的 urrl,用戶訪問 url 的時間戳)吞瞪,所以在這里馁启,我們可以創(chuàng)建一個類 Event,將用戶行為包裝成它的一個對象芍秆。Event 包含了以下一些字段惯疙。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {
private String user;
private String url;
private Long timestamp;
}
導(dǎo)入相關(guān)maven依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
一、從集合中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readCollection();
}
/**
* 從集合中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readCollection() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Event> list = new ArrayList<>();
list.add(new Event("Mary","./home",1000L));
list.add(new Event("Bob","./cart",2000L));
//從集合中讀取數(shù)據(jù)
DataStreamSource<Event> dataStream = env.fromCollection(list);
dataStream.print();
env.execute();
}
}
二浪听、從元素中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readElement();
}
/**
* 從元素中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readElement() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//從元素中讀取數(shù)據(jù)
DataStreamSource<Event> dataStream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
dataStream.print();
env.execute();
}
}
三螟碎、從文件中讀取數(shù)據(jù)
真正的實際應(yīng)用中,自然不會直接將數(shù)據(jù)寫在代碼中迹栓。通常情況下掉分,我們會從存儲介質(zhì)中獲取數(shù)據(jù)俭缓,一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式酥郭。
說明:
- 參數(shù)可以是目錄华坦,也可以是文件;
- 路徑可以是相對路徑不从,也可以是絕對路徑惜姐;
- 相對路徑是從系統(tǒng)屬性 user.dir 獲取路徑: idea 下是 project 的根目錄, standalone 模式
下是集群節(jié)點根目錄; - 也可以從 hdfs 目錄下讀取, 使用路徑 hdfs://..., 由于 Flink 沒有提供 hadoop 相關(guān)依賴,
需要 pom 中添加相關(guān)依賴:
public class SourceTest {
public static void main(String[] args) throws Exception {
readFile();
}
/**
* 從文件中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readFile() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//從文件中讀取數(shù)據(jù)
DataStreamSource<String> dataStream = env.readTextFile("src/main/resources/clicks.txt");
dataStream.print();
env.execute();
}
}
四椿息、從hdfs中讀取數(shù)據(jù)
引入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.3</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readHdfs();
}
/**
* 從hdfs中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readHdfs() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//讀取hdfs文件路徑
DataStreamSource<String> hdfsSource = env.readTextFile("hdfs://192.168.111.188:8020/input/README.txt");
//將hdfs文件路徑打印輸出
hdfsSource.print();
env.execute();
}
}
五歹袁、從socket中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readSocket();
}
/**
* 從socket中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readSocket() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//讀取文本流, 監(jiān)聽linux主機(jī)端口, linux通過nc -lk 7777發(fā)送文本
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.111.188",7777);
dataStreamSource.print();
env.execute();
}
}
六、從kafka中讀取數(shù)據(jù)
Kafka 作為分布式消息傳輸隊列寝优,是一個高吞吐条舔、易于擴(kuò)展的消息系統(tǒng)。而消息隊列的傳輸方式乏矾,恰恰和流處理是完全一致的孟抗。所以可以說 Kafka 和 Flink 天生一對,是當(dāng)前處理流式數(shù)據(jù)的雙子星钻心。在如今的實時流處理應(yīng)用中凄硼,由 Kafka 進(jìn)行數(shù)據(jù)的收集和傳輸,F(xiàn)link 進(jìn)行分析計算捷沸,這樣的架構(gòu)已經(jīng)成為眾多企業(yè)的首選摊沉。
Flink官方提供了連接工具flink-connector-kafka,直接幫我們實現(xiàn)了一個消費者FlinkKafkaConsumer亿胸,它就是用來讀取 Kafka 數(shù)據(jù)的SourceFunction坯钦。
所以想要以 Kafka 作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入 Kafka 連接器的依賴侈玄。Flink 官方提供的是一個通用的 Kafka 連接器婉刀,它會自動跟蹤最新版本的 Kafka 客戶端。目前最新版本只支持 0.10.0 版本以上的 Kafka序仙,讀者使用時可以根據(jù)自己安裝的 Kafka 版本選定連接器的依賴版本突颊。這里我們需要導(dǎo)入的依賴如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.15.0</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readKafka();
}
/**
* 從kafka中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readKafka() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.111.188:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
kafkaDataStream.print();
env.execute();
}
}
創(chuàng)建 FlinkKafkaConsumer 時需要傳入三個參數(shù):
- 第一個參數(shù) topic潘悼,定義了從哪些主題中讀取數(shù)據(jù)律秃。可以是一個 topic治唤,也可以是 topic
列表棒动,還可以是匹配所有想要讀取的 topic 的正則表達(dá)式。當(dāng)從多個 topic 中讀取數(shù)據(jù)
時宾添,Kafka 連接器將會處理所有 topic 的分區(qū)船惨,將這些分區(qū)的數(shù)據(jù)放到一條流中去柜裸。 - 第二個參數(shù)是一個 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存儲為原始的字節(jié)數(shù)據(jù)粱锐,所以需要反序列化成 Java 或者 Scala 對象疙挺。上面代碼中
使用的 SimpleStringSchema,是一個內(nèi)置的 DeserializationSchema怜浅,它只是將字節(jié)數(shù)
組簡單地反序列化成字符串铐然。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我們也可以自定義反序列化邏輯恶座。 - 第三個參數(shù)是一個 Properties 對象搀暑,設(shè)置了 Kafka 客戶端的一些屬性。
七跨琳、從Pulsar中讀取數(shù)據(jù)
隨著數(shù)據(jù)日益膨脹险掀,采用事件流處理數(shù)據(jù)至關(guān)重要。Apache Flink 將批流處理統(tǒng)一到計算引擎中湾宙,提供了一致化的編程接口。Apache Pulsar(與 Apache BookKeeper 一起)以 "流 "的方式統(tǒng)一數(shù)據(jù)冈绊。在 Pulsar 中侠鳄,數(shù)據(jù)存儲成一個副本,以流(streaming)(通過 pub-sub 接口)和 segment(用于批處理)的方式進(jìn)行訪問死宣。Pulsar 解決了企業(yè)在使用不同的存儲和消息技術(shù)解決方案時遇到的數(shù)據(jù)孤島問題伟恶。
Flink 可以直接與 Pulsar broker 進(jìn)行實時的流式讀寫,同時 Flink 也可以批量讀取 Pulsar 底層離線存儲毅该,與 BookKeeper 的內(nèi)容進(jìn)行批次讀寫博秫。同時支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴眶掌。把 Flink 和 Pulsar 結(jié)合使用挡育,這兩種開源技術(shù)可以創(chuàng)建一個統(tǒng)一的數(shù)據(jù)架構(gòu),為實時數(shù)據(jù)驅(qū)動企業(yè)提供最佳解決方案朴爬。
為了將 Pulsar 與 Flink 的功能進(jìn)行整合即寒,為用戶提供更強(qiáng)大的開發(fā)能力,StreamNative 開發(fā)并開源了 Pulsar Flink Connector召噩。經(jīng)過多次的打磨母赵,Pulsar Flink Connector 已合并進(jìn) Flink 代碼倉庫,并在 Flink 1.14.0 及其之后版本中發(fā)布具滴!
Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供彈性數(shù)據(jù)處理凹嘲,允許 Apache Flink 讀寫 Apache Pulsar 中的數(shù)據(jù)。使用 Pulsar Flink Connector构韵,企業(yè)能夠更專注于業(yè)務(wù)邏輯周蹭,無需關(guān)注存儲問題趋艘。
引入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>1.15.0</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readPulsar();
}
/**
* 從Pulsar中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readPulsar() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2. 添加source源, 用于讀取數(shù)據(jù) pulsar
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
String adminUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
String topic = "persistent://my-tenant/my-ns/my-partitioned-topic";
PulsarSource<String> pulsarSource = PulsarSource.builder()
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setStartCursor(StartCursor.earliest())
.setTopics(topic)
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.setSubscriptionName("my-subscription")
.setSubscriptionType(SubscriptionType.Exclusive)
.build();
DataStreamSource<String> streamSource = env.fromSource(pulsarSource, WatermarkStrategy.noWatermarks(),"Pulsar Source");
streamSource.print();
env.execute();
}
}
八、自定義 Source
大多數(shù)情況下谷醉,前面的數(shù)據(jù)源已經(jīng)能夠滿足需要致稀。但是凡事總有例外,如果遇到特殊情況俱尼,我們想要讀取的數(shù)據(jù)源來自某個外部系統(tǒng)抖单,而 flink 既沒有預(yù)實現(xiàn)的方法、也沒有提供連接器遇八,又該怎么辦呢矛绘?
那就只好自定義實現(xiàn) SourceFunction 了。
Flink還提供了數(shù)據(jù)源接口刃永,實現(xiàn)該接口就可以實現(xiàn)自定義數(shù)據(jù)源货矮,不同的接口有不同的功能,分類如下:
- SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)
- ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=l)
- RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)
- RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)
接下來我們創(chuàng)建一個自定義的數(shù)據(jù)源斯够,實現(xiàn) SourceFunction 接口囚玫。主要重寫兩個關(guān)鍵方法:run()和 cancel()。
- run()方法:使用運行時上下文對象(SourceContext)向下游發(fā)送數(shù)據(jù)读规;
- cancel()方法:通過標(biāo)識位控制退出循環(huán)抓督,來達(dá)到中斷數(shù)據(jù)源的效果。
8.1 SourceFunction——單并行度Source
實現(xiàn)代碼
public class ClickSource implements SourceFunction<Event> {
//聲明一個標(biāo)志位
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());
parallelDataStream.print();
env.execute();
}
}
注意:SourceFunction 接口定義的數(shù)據(jù)源束亏,并行度只能設(shè)置為 1铃在,如果數(shù)據(jù)源設(shè)置為大于 1 的并行度,則會拋出異常碍遍。
8.2 ParallelSourceFunction——多并行度Source
實現(xiàn)代碼
public class ParallelClickSource implements ParallelSourceFunction<Event> {
//聲明一個標(biāo)志位
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> parallelDataStream = env.addSource(new ParallelClickSource()).setParallelism(2);
parallelDataStream.print();
env.execute();
}
}
RichSourceFunction定铜、RichParallelSourceFunction提供了外部連接的open和close方法以及運行時的getRuntimeContext等方法
8.3 RichSourceFunction——多功能非并行Source
實現(xiàn)代碼
public class ClickRichSource extends RichSourceFunction<Event> {
//聲明一個標(biāo)志位
private boolean running = true;
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext runtimeContext = getRuntimeContext();
String taskName = runtimeContext.getTaskName();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
System.out.println(taskName + "-" + indexOfThisSubtask);
}
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
Event event = new Event(user, url, timestamp);
System.out.println(event.toString());
sourceContext.collect(event);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
System.out.println("關(guān)閉資源.....");
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> RichDataStream = env.addSource(new ClickRichSource());
parallelDataStream.print();
env.execute();
}
}
8.4 RichParallelSourceFunction——多功能并行Source
實現(xiàn)代碼
public class RichParallelClickSource extends RichParallelSourceFunction<Event> {
//聲明一個標(biāo)志位
private boolean running = true;
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext runtimeContext = getRuntimeContext();
String taskName = runtimeContext.getTaskName();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
System.out.println(taskName + "-" + indexOfThisSubtask);
}
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
System.out.println("關(guān)閉資源.....");
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> RichDataStream = env.addSource(new RichParallelClickSource()).setParallelism(2);
parallelDataStream.print();
env.execute();
}
}
8.5 從MySQL實時加載數(shù)據(jù)
實現(xiàn)代碼
public class MySQLSource extends RichParallelSourceFunction<Student> {
private boolean flag = true;
private Connection conn;
private PreparedStatement statement;
private ResultSet resultSet;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456");
String sql = "select id, name, age from t_student";
statement = conn.prepareStatement(sql);
super.open(parameters);
}
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (flag) {
resultSet = statement.executeQuery();
while (resultSet.next()){
String id = resultSet.getString("id");
String name = resultSet.getString("name");
Integer age = resultSet.getInt("age");
ctx.collect(new Student(id, name, age));
}
Thread.sleep(3000);
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if(conn != null) conn.close();
if(statement != null) statement.close();
if(resultSet != null) resultSet.close();
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStreamSource<Student> parallelDataStream = env.addSource(new MySQLSource()).setParallelism(4);
parallelDataStream.print();
env.execute();
}
}
參考:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/pulsar/
https://blog.csdn.net/weixin_47491957/article/details/124317150
https://blog.csdn.net/weixin_45417821/article/details/124143407
https://blog.csdn.net/weixin_45417821/article/details/124145083
https://blog.csdn.net/weixin_45417821/article/details/124146085
https://blog.csdn.net/weixin_45417821/article/details/124147285