同DataSource一樣唆缴,flink流處理和批處理也都內(nèi)置了很多DataSink锤躁,可以滿足部分應(yīng)用場景。但平時使用的應(yīng)用場景比較多狼讨,光是靠內(nèi)置的DataSink完全不滿足日常使用贝淤。flink也考慮到了這個問題,允許我們實現(xiàn)自定義的DataSink政供。
1 批處理
最簡單的DataSink就是print()播聪,平時我們在編寫flink程序時進(jìn)行簡單測試的時候通常都會使用print()在控制臺上打印處理后的結(jié)果數(shù)據(jù)。
真正業(yè)務(wù)應(yīng)用比較多的還是writeAsCsv(),writeAsText()布隔。還可以通過繼承write()中FileOutputFormat類來實現(xiàn)將結(jié)果數(shù)據(jù)輸出到其他格式文件中离陶。
targetDataSet.print()
targetDataSet.writeAsCsv("file:///xxx/Documents/batch_csv")
targetDataSet.writeAsText("file:///xxx/Documents/batch_txt")
targetDataSet.write(
outputFormat: FileOutputFormat[T],
filePath: String,
writeMode: FileSystem.WriteMode = null)
2 流處理
2.1 kafka connector
kafka connector是flink提供給我們的自定義連接器,可以直接實例化FlinkKafkaProducer對象來將記錄存放到kafka中衅檀。
object FlinkStreamDataSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val original: DataStream[String] = ...
// kafka sink properties參數(shù)
val producerConf = new Properties()
producerConf.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
producerConf.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
producerConf.setProperty("group.id", "leslie")
// sink結(jié)果數(shù)據(jù)到kafka中
original.addSink(new FlinkKafkaProducer010[String]("test_1", new SimpleStringSchema(), producerConf))
env.execute("flink_data_sink")
}
}
2.2 自定義DataSink
自定義DataSink和自定義DataSource一樣簡單枕磁,只需要繼承SinkFunction接口并重寫其中的invoke()方法。
object FlinkStreamCustomerDataSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// kafka source properties參數(shù)
val props = new Properties()
props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
props.setProperty("group.id", "leslie")
props.setProperty("auto.offset.reset", "latest")
val originalStream: DataStream[String] = env
.addSource(new FlinkKafkaConsumer010[String](
"test", // 被消費(fèi)的kafka topic
new SimpleStringSchema(), // 序列化
props)) // kafka source properties參數(shù)
val targetStream: DataStream[(String, String, Int)] = originalStream
.flatMap(_.split(","))
.map { name =>
val sex = if (name.contains("e")) "男" else "女"
val age = if (name.contains("e")) 23 else 18
(name, sex, age) // 根據(jù)名字來構(gòu)造人物的基本信息
}
// 自定義DataSink
targetStream.addSink(new MysqlDataSink())
env.execute("fink_consumer_data_sink")
}
}
下文代碼MysqlDataSink類繼承RichSinkFunction類术吝,實現(xiàn)將記錄存放到mysql的目的计济。為了避免重復(fù)創(chuàng)建和銷毀mysql連接,我們和自定義DataSouce一樣繼承"富函數(shù)"排苍,在open(),close()方法中實現(xiàn)連接的創(chuàng)建和銷毀(前一篇博客Flink DataSouce中有提到open()方法僅在函數(shù)類實例化的時候調(diào)用一次沦寂,close()則是在實例對象銷毀前調(diào)用一次)。
class MysqlDataSink extends RichSinkFunction[(String, String, Int)] {
private var pStmt: PreparedStatement = _
private var conn: Connection = _
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
val username = "root"
val password = "123456"
conn = DriverManager.getConnection(url, username, password);
val sql =
"""
|insert into user (name, sex, age) values (?, ?, ?);
|""".stripMargin
pStmt = conn.prepareStatement(sql)
}
override def close(): Unit = {
if (conn != null) conn.close()
if (pStmt != null) pStmt.close()
}
// 主體方法淘衙,插入數(shù)據(jù)到mysql中
override def invoke(value: (String, String, Int)): Unit = {
pStmt.setString(1, value._1)
pStmt.setNString(2, value._2)
pStmt.setInt(3, value._3)
pStmt.execute()
}
}
最后:
上邊就是Flink DataSink的介紹部分传藏。當(dāng)然DataSink相關(guān)的知識并不只有這么一點(diǎn)點(diǎn),此文只是些基礎(chǔ)知識彤守,各位老哥們以后遇到具體的場景具體處理毯侦。
下一篇博文將會把這兩次提到的富函數(shù)給大家說說,也會通過富函數(shù)的講解把flink的有狀態(tài)編程也說說具垫。