利用StreamSets實(shí)現(xiàn)MySQL中變化數(shù)據(jù)實(shí)時寫入Kudu

  1. 環(huán)境準(zhǔn)備
    1. 開啟MariaDB的Binlog日志

      修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置

      server-id=999
      log-bin=mysql-bin
      binlog_format=ROW
      

      注意:
      MySQL Binlog支持多種數(shù)據(jù)更新格式包括Row脏答、Statement和mix(Row和Statement的混合)卸奉,這里建議使用Row模式的Binlog格式桑逝,可以更加方便實(shí)時的反應(yīng)行級別的數(shù)據(jù)變化赴背。

      binlog 配置參考:
      https://dev.mysql.com/doc/refman/5.7/en/binary-log-setting.html

      [root@node01 mariadb]# systemctl restart mysqld
      [root@node01 mariadb]#  systemctl status mysqld
      ● mysqld.service - LSB: start and stop MariaDB
         Loaded: loaded (/etc/rc.d/init.d/mysqld; bad; vendor preset: disabled)
         Active: active (running) since Tue 2020-04-28 09:59:03 CST; 1min 11s ago
           Docs: man:systemd-sysv-generator(8)
        Process: 12771 ExecStop=/etc/rc.d/init.d/mysqld stop (code=exited, status=0/SUCCESS)
        Process: 12925 ExecStart=/etc/rc.d/init.d/mysqld start (code=exited, status=0/SUCCESS)
         CGroup: /system.slice/mysqld.service
                 ├─12970 /bin/sh /usr/local/mariadb/bin/mysqld_safe --datadir=/usr/local/mariadb/data --pid-file=/usr/local/mariadb/data/node01.pid
                 └─13079 /usr/local/mariadb/bin/mysqld --basedir=/usr/local/mariadb --datadir=/usr/local/mariadb/data --plugin-dir=/usr/local/mariadb/lib/plugin --user=mysql --log-error=/usr/l...
      
      Apr 28 09:59:02 node01 systemd[1]: Starting LSB: start and stop MariaDB...
      Apr 28 09:59:02 node01 mysqld[12925]: Starting MariaDB.200428 09:59:02 mysqld_safe Logging to '/usr/local/mariadb/data/node01.err'.
      Apr 28 09:59:02 node01 mysqld[12925]: 200428 09:59:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mariadb/data
      Apr 28 09:59:03 node01 mysqld[12925]: [  OK  ]
      Apr 28 09:59:03 node01 systemd[1]: Started LSB: start and stop MariaDB.
      
    2. 創(chuàng)建 MariaDB 同步賬號
      GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
      GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
      FLUSH PRIVILEGES;
      
    3. StreamSets 安裝 MySQL 驅(qū)動

      參考:
      https://blog.csdn.net/weixin_43215250/article/details/87981707

    4. 創(chuàng)建測試表
      1. 在MariaDB數(shù)據(jù)庫中創(chuàng)建測試表
      CREATE DATABASE IF NOT EXISTS test;
      CREATE TABLE test.`binlog_test` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `name` varchar(255) NOT NULL,
        PRIMARY KEY (`id`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
      1. 在 HUE 上創(chuàng)建 KUDU 表
      CREATE DATABASE IF NOT EXISTS test;
      CREATE TABLE IF NOT EXISTS test.binlog_test ( 
        id int, 
        name String,
        PRIMARY key(id)
      ) 
      PARTITION BY HASH PARTITIONS 16 
      STORED AS KUDU;
      
  2. 創(chuàng)建 StreamSets 的 Pipline
  1. 創(chuàng)建一個新的 Pipline
    image
  2. 選擇 Origins 類別毛雇,搜索 MySQL Binary Log,并拖動到畫布
    image

    配置 MySQL Binary Log 基本信息

    image

  **配置 MySQL 連接信息**
  
  **注意:** 此處配置的 Server ID 應(yīng)與 MySQL 的 my.cnf 文件中的 server-id 保持一致玄妈。
  ![image](https://upload-images.jianshu.io/upload_images/18545623-f43db30aa5089c27?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

  **配置 MySQL 賬號信息**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-33d3abeb44e632e3?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

  **高級配置乾吻,根據(jù)自己的需要進(jìn)行配置,這里采用默認(rèn)**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-59232a75d0526241?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **參考:** 
  https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MySQLBinaryLog.html?hl=mysql%2Cbinary%2Clog
  1. 添加表過濾的Stream Selector 1
    image

    配置 Stream Selector 基本信息

    image

    配置分流條件
    ${record:value("/Table") == "binlog_test"}
    image

  2. 添加表過濾的 Stream Selector 1

    配置 Stream Selector 基本信息

    image

    配置分流條件
    ${record:value("/Table") == "DELETE"}

    image

    參考:
    https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/StreamSelector.html?hl=stream%2Cselector

  1. 添加處理日志 JavaScript Evaluator

    添加解析 DELETE 類型的Binary Log 日志的 JavaScript Evaluator

    image

    配置JavaScript腳本

    for(var i = 0; i < records.length; i++) {
      try { 
        var newRecord = sdcFunctions.createRecord(true);
        newRecord.value = records[i].value['Data'];
        newRecord.value.Type = records[i].value['Type'];
        newRecord.value.Database = records[i].value['Database'];
        newRecord.value.Table = records[i].value['Table'];
        log.info(records[i].value['Type'])
        output.write(newRecord);
      } catch (e) {
        // Send record to error
        error.write(records[i], e);
      }
    }
    
    image
  **添加解析 INSRET 和 UPDATE 類型日志的 JavaScript Evaluator**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-30378a88b8f7f613?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **配置JavaScript腳本**
  ```
  for(var i = 0; i < records.length; i++) {
    try { 
      var newRecord = sdcFunctions.createRecord(true);
      newRecord.value = records[i].value['OldData'];
      newRecord.value.Type = records[i].value['Type'];
      newRecord.value.Database = records[i].value['Database'];
      newRecord.value.Table = records[i].value['Table'];
      log.info(records[i].value['Type'])
      output.write(newRecord);
    } catch (e) {
      // Send record to error
      error.write(records[i], e);
    }
  }
  ```
  ![image](https://upload-images.jianshu.io/upload_images/18545623-8492b6202b22945e?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **參考:** 
  https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JavaScript.html?hl=javascript%2Cevaluator
  1. 添加 KUDU

    配置 Kudu Delete
    配置Kudu基本屬性

    image

    配置Kudu環(huán)境信息
    image

    Kudu的高級配置拟蜻,這里使用默認(rèn)配置
    image

    配置 Kudu Upsert
    配置Kudu基本屬性

    image

    配置Kudu環(huán)境信息


    image

    Kudu的高級配置绎签,這里使用默認(rèn)配置


    image
  2. 校驗(yàn) Pipelines 配置
    image
  3. 啟動 Pipelines
    image
  4. Pipeline 流程測試
    1. 新增數(shù)據(jù)

      向 MariaDB 中的 binlog_test 表中插入數(shù)據(jù)

      insert into test.binlog_test values(1, '張三');
      

      在 StreamSets 中查看的 Pipeline 狀態(tài)

      image

      image

      使用Hue查看 Kudu 表數(shù)據(jù),驗(yàn)證數(shù)據(jù)是否成功插入

      image

    2. 更新數(shù)據(jù)

      更新 MariaDB 中的 binlog_test 表中數(shù)據(jù)

      update test.binlog_test set name='李四' where id = 1;
      

      在 StreamSets 中查看的 Pipeline 狀態(tài)

      image

      image

      使用Hue查看 Kudu 表數(shù)據(jù)酝锅,驗(yàn)證數(shù)據(jù)是否成功更新
      image

    3. 刪除數(shù)據(jù)

      刪除 MariaDB 中的 binlog_test 表中數(shù)據(jù)

      delete from test.binlog_test where id = 1;
      

      在 StreamSets 中查看的 Pipeline 狀態(tài)

      image

      image

      使用Hue查看 Kudu 表數(shù)據(jù)诡必,驗(yàn)證數(shù)據(jù)是否成功刪除
      image

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市搔扁,隨后出現(xiàn)的幾起案子爸舒,更是在濱河造成了極大的恐慌,老刑警劉巖稿蹲,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件碳抄,死亡現(xiàn)場離奇詭異,居然都是意外死亡场绿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進(jìn)店門嫉入,熙熙樓的掌柜王于貴愁眉苦臉地迎上來焰盗,“玉大人,你說我怎么就攤上這事咒林“揪埽” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵垫竞,是天一觀的道長澎粟。 經(jīng)常有香客問我,道長欢瞪,這世上最難降的妖魔是什么活烙? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮遣鼓,結(jié)果婚禮上啸盏,老公的妹妹穿的比我還像新娘。我一直安慰自己骑祟,他們只是感情好回懦,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布气笙。 她就那樣靜靜地躺著,像睡著了一般怯晕。 火紅的嫁衣襯著肌膚如雪潜圃。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天舟茶,我揣著相機(jī)與錄音谭期,去河邊找鬼。 笑死稚晚,一個胖子當(dāng)著我的面吹牛崇堵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播客燕,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼鸳劳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了也搓?” 一聲冷哼從身側(cè)響起赏廓,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎傍妒,沒想到半個月后幔摸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡颤练,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年既忆,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嗦玖。...
    茶點(diǎn)故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡患雇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出宇挫,到底是詐尸還是另有隱情苛吱,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布器瘪,位于F島的核電站翠储,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏橡疼。R本人自食惡果不足惜援所,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望衰齐。 院中可真熱鬧任斋,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至澈蟆,卻和暖如春墨辛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背趴俘。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工睹簇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人寥闪。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓太惠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親疲憋。 傳聞我的和親對象是個殘疾皇子凿渊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容