Flink DataSink

同DataSource一樣唆缴,flink流處理和批處理也都內(nèi)置了很多DataSink锤躁,可以滿足部分應(yīng)用場景。但平時使用的應(yīng)用場景比較多狼讨,光是靠內(nèi)置的DataSink完全不滿足日常使用贝淤。flink也考慮到了這個問題,允許我們實現(xiàn)自定義的DataSink政供。

1 批處理

最簡單的DataSink就是print()播聪,平時我們在編寫flink程序時進(jìn)行簡單測試的時候通常都會使用print()在控制臺上打印處理后的結(jié)果數(shù)據(jù)。
真正業(yè)務(wù)應(yīng)用比較多的還是writeAsCsv(),writeAsText()布隔。還可以通過繼承write()中FileOutputFormat類來實現(xiàn)將結(jié)果數(shù)據(jù)輸出到其他格式文件中离陶。

    targetDataSet.print()
    targetDataSet.writeAsCsv("file:///xxx/Documents/batch_csv")
    targetDataSet.writeAsText("file:///xxx/Documents/batch_txt")
    targetDataSet.write(
      outputFormat: FileOutputFormat[T],
      filePath: String,
      writeMode: FileSystem.WriteMode = null)
01_batch_sink.png

2 流處理

2.1 kafka connector

kafka connector是flink提供給我們的自定義連接器,可以直接實例化FlinkKafkaProducer對象來將記錄存放到kafka中衅檀。

object FlinkStreamDataSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val original: DataStream[String] = ...

    // kafka sink properties參數(shù)
    val producerConf = new Properties()
    producerConf.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    producerConf.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    producerConf.setProperty("group.id", "leslie")

    // sink結(jié)果數(shù)據(jù)到kafka中
    original.addSink(new FlinkKafkaProducer010[String]("test_1", new SimpleStringSchema(), producerConf))

    env.execute("flink_data_sink")
  }
}
01_stream_sink_kafka.png

2.2 自定義DataSink

自定義DataSink和自定義DataSource一樣簡單枕磁,只需要繼承SinkFunction接口并重寫其中的invoke()方法。

object FlinkStreamCustomerDataSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // kafka source properties參數(shù)
    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    props.setProperty("group.id", "leslie")
    props.setProperty("auto.offset.reset", "latest")

    val originalStream: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String](
        "test", // 被消費(fèi)的kafka topic
        new SimpleStringSchema(), // 序列化
        props)) // kafka source properties參數(shù)

    val targetStream: DataStream[(String, String, Int)] = originalStream
      .flatMap(_.split(","))
      .map { name =>
        val sex = if (name.contains("e")) "男" else "女"
        val age = if (name.contains("e")) 23 else 18
        (name, sex, age) // 根據(jù)名字來構(gòu)造人物的基本信息
      }

    // 自定義DataSink
    targetStream.addSink(new MysqlDataSink())

    env.execute("fink_consumer_data_sink")
  }
}

下文代碼MysqlDataSink類繼承RichSinkFunction類术吝,實現(xiàn)將記錄存放到mysql的目的计济。為了避免重復(fù)創(chuàng)建和銷毀mysql連接,我們和自定義DataSouce一樣繼承"富函數(shù)"排苍,在open(),close()方法中實現(xiàn)連接的創(chuàng)建和銷毀(前一篇博客Flink DataSouce中有提到open()方法僅在函數(shù)類實例化的時候調(diào)用一次沦寂,close()則是在實例對象銷毀前調(diào)用一次)。

class MysqlDataSink extends RichSinkFunction[(String, String, Int)] {
  private var pStmt: PreparedStatement = _
  private var conn: Connection = _

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
    val username = "root"
    val password = "123456"
    conn = DriverManager.getConnection(url, username, password);
    val sql =
      """
        |insert into user (name, sex, age) values (?, ?, ?);
        |""".stripMargin
    pStmt = conn.prepareStatement(sql)
  }

  override def close(): Unit = {
    if (conn != null) conn.close()
    if (pStmt != null) pStmt.close()
  }

  // 主體方法淘衙,插入數(shù)據(jù)到mysql中
  override def invoke(value: (String, String, Int)): Unit = {
    pStmt.setString(1, value._1)
    pStmt.setNString(2, value._2)
    pStmt.setInt(3, value._3)
    pStmt.execute()
  }
}
02_stream_sink_customer.png

最后:
上邊就是Flink DataSink的介紹部分传藏。當(dāng)然DataSink相關(guān)的知識并不只有這么一點(diǎn)點(diǎn),此文只是些基礎(chǔ)知識彤守,各位老哥們以后遇到具體的場景具體處理毯侦。
下一篇博文將會把這兩次提到的富函數(shù)給大家說說,也會通過富函數(shù)的講解把flink的有狀態(tài)編程也說說具垫。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末侈离,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子筝蚕,更是在濱河造成了極大的恐慌卦碾,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件起宽,死亡現(xiàn)場離奇詭異洲胖,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)坯沪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門绿映,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人腐晾,你說我怎么就攤上這事叉弦。” “怎么了赴魁?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵卸奉,是天一觀的道長。 經(jīng)常有香客問我颖御,道長榄棵,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任潘拱,我火速辦了婚禮疹鳄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘芦岂。我一直安慰自己瘪弓,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布禽最。 她就那樣靜靜地躺著腺怯,像睡著了一般袱饭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上呛占,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天虑乖,我揣著相機(jī)與錄音,去河邊找鬼晾虑。 笑死疹味,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的帜篇。 我是一名探鬼主播糙捺,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼笙隙!你這毒婦竟也來了洪灯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤逃沿,失蹤者是張志新(化名)和其女友劉穎婴渡,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凯亮,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡边臼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了假消。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片柠并。...
    茶點(diǎn)故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖富拗,靈堂內(nèi)的尸體忽然破棺而出臼予,到底是詐尸還是另有隱情,我是刑警寧澤啃沪,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布粘拾,位于F島的核電站,受9級特大地震影響创千,放射性物質(zhì)發(fā)生泄漏缰雇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一追驴、第九天 我趴在偏房一處隱蔽的房頂上張望械哟。 院中可真熱鬧,春花似錦殿雪、人聲如沸暇咆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽爸业。三九已至其骄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間沃呢,已是汗流浹背年栓。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留薄霜,地道東北人。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓纸兔,卻偏偏與公主長得像惰瓜,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子汉矿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容

  • CocoaPods 開源庫的制作過程: 添加私有Pod倉庫崎坊,用來存儲私有Pod庫的podspec文件,類似Coco...
    心至靜行至遠(yuǎn)閱讀 843評論 0 1
  • 大觀園里長袖舞洲拇,怡紅院中盡纏綿奈揍。 緣起緣滅緣已盡,花開花落花歸塵赋续。 君言天下無我處處皆君男翰, 君言老驥伏櫪烈士暮年,...
    昨夜星雨閱讀 672評論 3 7
  • 血淋淋的日子總在提醒 ——銘記“九一·八” 文/黃影 這個日子纽乱,不敢忘記 又不愿想起 不敢忘記蛾绎,是...
    黃影詩風(fēng)閱讀 328評論 1 7