Apache Cassandra Connector(連接器)

這個Connectors提供sink即寫數(shù)據到一個Cassandra數(shù)據庫中
為了使用這個Connectors棺聊,將下面的依賴添加到你的工程中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cassandra_2.10</artifactId>
  <version>1.3.0</version>
</dependency>

注意:streaming connectors目前還不是二進制發(fā)布包的一部分,請參考此處來了解如何在分布式執(zhí)行中關聯(lián)到這些connectors栗柒。

安裝Cassandra

請參考這篇文檔

Cassandra Sink

Flink的Cassandra sink通過這個靜態(tài)的方法CassandraSink.addSink(DataStream input)來創(chuàng)建,這個方法會返回一個CassandraSinkBuilder,它提供了方法來更深入地配置sink誊抛。
下面的配置方法可以被使用:
  1孵班、setQuery(String query)
  2、setHost(String host[, int port])
  3竖独、setClusterBuilder(ClusterBuilder builder)
  4、enableWriteAheadLog([CheckpointCommitter committer])
  5挤牛、build()
setQuery()方法設置了為sink接收到的每個值執(zhí)行的query語句莹痢,setHost()設置要去連接的Cassandra的host/port,此方法用于簡單的用例墓赴,setclusterbuilder()設置了用來配置連接到Cassandra的cluster builder竞膳,setHost()的功能可以被這個方法替代。enableWriteAheadLog()是個可選的方法诫硕,為非確定性算法提供精確處理(exactly-once)坦辟。

例如:
Java 代碼:

CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
      return builder.addContactPoint("127.0.0.1").build();
    }
  })
  .build();

Scala代碼

CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    override def buildCluster(builder: Cluster.Builder): Cluster = {
      builder.addContactPoint("127.0.0.1").build()
    }
  })
  .build()

Cassandra sink支持使用DataStax注釋的Tuple和POJO,F(xiàn)link會自動去探測輸入數(shù)據的類型:
一個使用DataStax注釋的Pojo例子:

@Table(keyspace= "test", name = "mappersink")
public class Pojo implements Serializable {

  private static final long serialVersionUID = 1038054554690916991L;

  @Column(name = "id")
  private long id;
  @Column(name = "value")
  private String value;

  public Pojo(long id, String value){
    this.id = id;
    this.value = value;
  }

  public long getId() {
    return id;
  }

  public void setId(long id) {
    this.id = id;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末章办,一起剝皮案震驚了整個濱河市锉走,隨后出現(xiàn)的幾起案子滨彻,更是在濱河造成了極大的恐慌,老刑警劉巖挪蹭,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件亭饵,死亡現(xiàn)場離奇詭異,居然都是意外死亡梁厉,警方通過查閱死者的電腦和手機辜羊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來词顾,“玉大人八秃,你說我怎么就攤上這事∪忭铮” “怎么了昔驱?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長上忍。 經常有香客問我舍悯,道長,這世上最難降的妖魔是什么睡雇? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮饮醇,結果婚禮上它抱,老公的妹妹穿的比我還像新娘。我一直安慰自己朴艰,他們只是感情好观蓄,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著祠墅,像睡著了一般侮穿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上毁嗦,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天亲茅,我揣著相機與錄音,去河邊找鬼狗准。 笑死克锣,一個胖子當著我的面吹牛,可吹牛的內容都是我干的腔长。 我是一名探鬼主播袭祟,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼捞附!你這毒婦竟也來了巾乳?” 一聲冷哼從身側響起您没,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胆绊,沒想到半個月后氨鹏,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡辑舷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年喻犁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片何缓。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡肢础,死狀恐怖,靈堂內的尸體忽然破棺而出碌廓,到底是詐尸還是另有隱情传轰,我是刑警寧澤,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布谷婆,位于F島的核電站慨蛙,受9級特大地震影響,放射性物質發(fā)生泄漏纪挎。R本人自食惡果不足惜期贫,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望异袄。 院中可真熱鬧通砍,春花似錦、人聲如沸烤蜕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽讽营。三九已至虎忌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間橱鹏,已是汗流浹背膜蠢。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留莉兰,地道東北人狡蝶。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像贮勃,于是被迫代替她去往敵國和親贪惹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內容