ElasticSearch-River-Kafka實(shí)現(xiàn)動(dòng)態(tài)Index、Type數(shù)據(jù)同步

背景

我們在做大數(shù)據(jù)的項(xiàng)目時(shí)經(jīng)常遇到數(shù)據(jù)同步的問題偷拔,kafka到es是一個(gè)比較常見的數(shù)據(jù)同步管道。有這樣的一個(gè)需求亏钩,kafka入es的過程中莲绰,我們需要達(dá)到如下效果:kafka中同一個(gè)topic數(shù)據(jù)流可以動(dòng)態(tài)的插入到es不同的type里面,當(dāng)然铸屉,它們是同一個(gè)index钉蒲。什么叫動(dòng)態(tài)插入切端?舉個(gè)例子:譬如我們按照時(shí)間來切分type彻坛,每個(gè)小時(shí)為單位,那么踏枣,在三點(diǎn)到四點(diǎn)之間昌屉,數(shù)據(jù)就會同步到type為2018011003里面,在四點(diǎn)到五點(diǎn)之間茵瀑,數(shù)據(jù)就會同步到type為2018011004里面间驮。將這個(gè)需求往深處想的話,同樣马昨,是不是可以實(shí)現(xiàn)動(dòng)態(tài)index插入呢竞帽?答案是可以的扛施。

實(shí)現(xiàn)原理

river這個(gè)es插件大家不知道聽過沒,我們可以利用river實(shí)現(xiàn)該功能屹篓。網(wǎng)上有很多現(xiàn)成的開源插件elasticsearch-river-kafka疙渣,它們都有個(gè)共同點(diǎn)就是只能靜態(tài)的同步數(shù)據(jù)。這里的靜態(tài)指的是不能動(dòng)態(tài)生成index和type堆巧。所以妄荔,我們基于開源庫elasticsearch-river-kafka,可以自己動(dòng)手修改代碼谍肤,來實(shí)現(xiàn)我們想要的功能啦租。而且,這個(gè)插件本身就是支持自定義開發(fā)的荒揣。

步驟

kafka:v0.10
elasticsearch:v1.7.0

一:代碼庫地址

github:elasticsearch-river-kafka
我是fork別人的開源項(xiàng)目篷角,在此基礎(chǔ)上自己修改的代碼,原項(xiàng)目地址

二:安裝該插件

  • 方式一
    去你安裝elasticsearch的機(jī)器上乳附,找到es下的bin目錄執(zhí)行安裝插件命令
cd $ELASTICSEARCH_HOME
./bin/plugin --install <plugin-name> --url https://github.com/zhuyinglinfeng/elasticsearch-river-kafka/archive/master.zip
  • 方式二
    如果你本地已經(jīng)clone了該項(xiàng)目内地,直接本地編譯打包,然后把zip包發(fā)送到安裝elasticsearch的機(jī)器上面赋除,再執(zhí)行安裝插件命令
cd $ELASTICSEARCH_HOME
./bin/plugin --install <plugin-name> --url file:<zip_path>

三:配置river(這個(gè)是重點(diǎn)哦)

一條river代表著一條同步規(guī)則阱缓,創(chuàng)建river很簡單,執(zhí)行如下的命令即可

curl -XPUT 'localhost:9200/_river/<river-name>/_meta' -d '
{
     "type" : "kafka",
     "kafka" : {
        "zookeeper.connect" : <zookeeper.connect>, 
        "zookeeper.connection.timeout.ms" : <zookeeper.connection.timeout.ms>,
        "topic" : <topic.name>,
        "message.type" : <message.type>
    },
    "index" : {
        "index" : <index.name>,
        "frequency.index" : <frequency.index>,
        "type" : <mapping.type.name>,
        "frequency.type" : <frequency.type>,
        "bulk.size" : <bulk.size>,
        "concurrent.requests" : <concurrent.requests>,
        "action.type" : <action.type>,
        "flush.interval" : <flush.interval>
    },
    "statsd" : {
        "host" : <statsd.host>,
        "prefix" : <statsd.prefix>,
        "port" : <statsd.port>,
        "log.interval" : <statsd.log.interval>
    }
 }'
參數(shù)名字 是否必填 默認(rèn)值 描述
river-name 名字
zookeeper.connect localhost zoo的地址
zookeeper.connection.timeout.ms 10000 zoo連接超時(shí)時(shí)間
topic elasticsearch-river-kafka topic名字
message.type json kafka消息類型举农,json/string
index kafka-index ES索引
frequency.index 動(dòng)態(tài)索引切分頻率荆针,1mon/1day/1hour/10min
type status ES類型
frequency.type 動(dòng)態(tài)類型切分頻率,1mon/1day/1hour/10min
bulk.size 100 單次處理的消息數(shù)量
concurrent.requests 1 并發(fā)請求數(shù)
action.type index 同步行為颁糟,index(插入)/delete(刪除)/raw.execute(執(zhí)行語句)
host localhost statsd服務(wù)地址
port 8125 statsd端口
prefix kafka-river statsd鍵值前綴
log.interval 10 statsd上報(bào)metrics時(shí)間間隔

Note:
如果填寫了frequency.index參數(shù)航背,表示index根據(jù)時(shí)間動(dòng)態(tài)創(chuàng)建,index參數(shù)可以不用填寫棱貌,即使填寫也不會生效
如果填寫了frequency.type參數(shù)玖媚,表示type根據(jù)時(shí)間動(dòng)態(tài)創(chuàng)建,type參數(shù)可以不用填寫婚脱,即使填寫也不會生效

Example:
將kafka中topic名字為topic-test流中的數(shù)據(jù)同步到index為index-test里面今魔,并且每隔10min動(dòng)態(tài)切換type類型

curl -XPUT 'localhost:9200/_river/river-test/_meta' -d '
{
    "type": "kafka",
    "kafka": {
        "zookeeper.connect": "localhost",
        "zookeeper.connection.timeout.ms": 10000,
        "topic": "topic-test",
        "message.type": "json"
    },
    "index": {
        "index": "index-test",
        "frequency.type”: "10min",
        "bulk.size": 100,
        "concurrent.requests": 1,
        "action.type": "index",
        "flush.interval": "5s"
    }
}'

四:重啟elasticsearch

查詢進(jìn)程號:ps -ef | grep elastic
關(guān)閉進(jìn)程:kill -9 pid
啟動(dòng):./bin/elasticsearch -d

五:驗(yàn)證

到此為止,理論上已經(jīng)全部完成障贸,現(xiàn)在可以往kafka中寫數(shù)據(jù)了错森,不出意外,如果你的<flush.interval>參數(shù)設(shè)置很短的話篮洁,幾秒后es中就可以查看數(shù)據(jù)了

升級維護(hù)

  • 插件刪除
./bin/plugin --remove <plugin-name>
  • 刪除river
    和刪除elasticsearch數(shù)據(jù)方式一樣涩维,利用es提供的接口命令刪除即可
curl -XDELETE 'localhost:9200/_river/river-test/'
  • 代碼升級
    如果你想要手動(dòng)改寫代碼,也很簡單袁波,fork一下代碼庫瓦阐,自己本地升級代碼蜗侈,然后按照上面的步驟就可以

自定義elasticsearch-river-kafka開發(fā)

......待完善

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市睡蟋,隨后出現(xiàn)的幾起案子宛篇,更是在濱河造成了極大的恐慌,老刑警劉巖薄湿,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叫倍,死亡現(xiàn)場離奇詭異,居然都是意外死亡豺瘤,警方通過查閱死者的電腦和手機(jī)吆倦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坐求,“玉大人蚕泽,你說我怎么就攤上這事∏培停” “怎么了须妻?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長泛领。 經(jīng)常有香客問我荒吏,道長,這世上最難降的妖魔是什么渊鞋? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任绰更,我火速辦了婚禮,結(jié)果婚禮上锡宋,老公的妹妹穿的比我還像新娘儡湾。我一直安慰自己,他們只是感情好执俩,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布徐钠。 她就那樣靜靜地躺著,像睡著了一般役首。 火紅的嫁衣襯著肌膚如雪尝丐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天宋税,我揣著相機(jī)與錄音摊崭,去河邊找鬼讼油。 笑死杰赛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的矮台。 我是一名探鬼主播乏屯,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼根时,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了辰晕?” 一聲冷哼從身側(cè)響起蛤迎,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎含友,沒想到半個(gè)月后替裆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡窘问,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年辆童,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惠赫。...
    茶點(diǎn)故事閱讀 37,997評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡把鉴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出儿咱,到底是詐尸還是另有隱情庭砍,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布混埠,位于F島的核電站怠缸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏钳宪。R本人自食惡果不足惜凯旭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望使套。 院中可真熱鬧罐呼,春花似錦、人聲如沸侦高。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奉呛。三九已至计螺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間瞧壮,已是汗流浹背登馒。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咆槽,地道東北人陈轿。 一個(gè)月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親麦射。 傳聞我的和親對象是個(gè)殘疾皇子蛾娶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)潜秋,斷路器蛔琅,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 概述 監(jiān)控預(yù)警平臺, eagle + eye (鷹眼)的合體詞, 寓意可以快速發(fā)現(xiàn)問題, 并及時(shí)作出響應(yīng),Eagl...
    Kungfu貓熊閱讀 7,367評論 0 52
  • 創(chuàng)城工作結(jié)束了額 又迎來了十九大工作 加不完的班 整不完的材料 心情超級郁悶 見人就想罵 一肚子火 即便這樣工作也...
    一朵太陽花shl閱讀 143評論 0 0
  • “寶貝蛋子,下午咱練歌吧峻呛,好久沒唱了罗售,都沒氣唱了都”」呈觯“好莽囤,我陪你練”!“你先唱我pk你可以嗎”切距⌒喽校“好啊”!“那我...
    寶寶的寶貝蛋子閱讀 294評論 2 1
  • 2017年4月19日23時(shí)55分谜悟,當(dāng)我第一次從劉潤老師的《五分鐘商學(xué)院》中看到這個(gè)概念话肖,就被其深深震撼,耐心讀了三...
    飄舞的星空閱讀 1,485評論 0 2