基于SCF實(shí)現(xiàn)Elasticsearch索引的批量shrink

在冷熱分離的Elasticsearch集群架構(gòu)中贺拣,往往會(huì)使用擁有較高規(guī)格cpu和內(nèi)存鸦难、以及SSD盤的機(jī)器作為熱節(jié)點(diǎn),用于保證高并發(fā)的寫入片林。通常索引都是按天創(chuàng)建的端盆,只有當(dāng)天的索引會(huì)進(jìn)行寫入,存量的索引會(huì)定期比如說在索引創(chuàng)建15天后遷移到冷節(jié)點(diǎn)或者warm節(jié)點(diǎn)上费封,冷節(jié)點(diǎn)或者warm節(jié)點(diǎn)的cpu和內(nèi)存配置都可以低一些焕妙,并且使用SATA盤存儲(chǔ)降低成本。

在熱節(jié)點(diǎn)上的索引弓摘,為了保證寫入性能访敌,通常分片數(shù)會(huì)設(shè)置的和熱節(jié)點(diǎn)的數(shù)量一致,使得每臺(tái)機(jī)器的資源都可以利用上衣盾。但是隨著索引數(shù)量的不斷增加,集群整體的分片數(shù)量也越來越多爷抓,如果分片數(shù)量達(dá)到了數(shù)萬势决,對(duì)集群的穩(wěn)定性和性能都會(huì)有不小的影響。所以需要對(duì)集群整體的分片數(shù)量進(jìn)行控制蓝撇,避免分片數(shù)過多而導(dǎo)致集群不穩(wěn)定果复,好在ES本身有shrink 索引的功能,可以降低索引的分片數(shù)渤昌,但是shrink操作有一些前置條件和使用限制虽抄,不是直接對(duì)索引的分片數(shù)調(diào)低,而是新建一個(gè)分片數(shù)量少的索引独柑,硬鏈接到老的索引迈窟,然后對(duì)新索引執(zhí)行recovery直至新索引變green。我們可以在新索引變green后忌栅,刪除老的索引车酣,然后對(duì)新索引建立別名,別名和老索引的名稱完全一樣索绪,可以按照老索引的名稱查詢數(shù)據(jù)湖员。

本文嘗試使用SCF云函數(shù)對(duì)存量的大量的老索引,通過shrink瑞驱,降低索引的分片數(shù)量娘摔。

實(shí)施步驟

1. 創(chuàng)建SCF云函數(shù)

如圖,基于名為"ES寫入函數(shù)"的模板唤反,創(chuàng)建一個(gè)新的函數(shù):


image

點(diǎn)擊"下一步"進(jìn)入函數(shù)編輯界面凳寺,直接復(fù)制如下函數(shù)代碼粘貼到編輯框鸭津,修改ES的vip和用戶名密碼,以及索引前綴名稱等信息:

# -*- coding: utf8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
import time
import math

ESServer = Elasticsearch(["xxx:9200"],http_auth=('elastic', 'xxx'),timeout=30)

# 臨時(shí)索引读第,用于記錄當(dāng)前正在執(zhí)行哪一個(gè)老的索引
tempIndex = "temp-snapshot"
# 老的索引前綴
indexPattern = "test-*"
currentIndex = None

# 根據(jù)磁盤使用量曙博,獲取用量最小的warm節(jié)點(diǎn)id
def get_warm_node():
    rsp = ESServer.nodes.stats(metric="indices",index_metric="store")
    nodesInfo = rsp["nodes"]
    minStorageSizeNodeId = None
    minStorageSize = 0
    for node in nodesInfo:
        nodeInfo = nodesInfo[node]
        if nodeInfo["attributes"]["temperature"] != "warm":
            continue
        nodeStorageSize = nodeInfo["indices"]["store"]["size_in_bytes"]
        if minStorageSize == 0:
            minStorageSize = nodeStorageSize
            minStorageSizeNodeId = node
        if nodeStorageSize < minStorageSize:
            minStorageSize = nodeStorageSize
            minStorageSizeNodeId = node
    return minStorageSizeNodeId

# 檢查老索引的狀態(tài),判斷是否有正在遷移中的分片
def check_old_index_status(index):
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.recovery(index = index, params = params)
    for shardStats in rsp:
        if shardStats["stage"] != "done":
            return False
    return True

# 檢查新索引的狀態(tài)怜瞒,判斷是否green
def check_new_index_status(index):
    rsp = ESServer.cluster.health(index = index)
    if rsp != None and rsp["status"] == "green":
        return True
    return False

# 根據(jù)索引數(shù)據(jù)量確定要shrink到幾個(gè)分片
def calTargeIndexShardsNum(index):
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.indices(index = index, params = params)
    indexInfo = rsp[0]
    storageSize = indexInfo["pri.store.size"]
    shardNum = int(indexInfo["pri"])
    if storageSize.endswith("gb"):
        size = float(storageSize[0:storageSize.rfind("gb")])
        targetShardsNum =  int(math.ceil(size/50))
        while shardNum / targetShardsNum * targetShardsNum < shardNum:
            targetShardsNum = targetShardsNum + 1
        return targetShardsNum
    else:
        return 1

# 執(zhí)行shrink
def shrink_index(index):
    body = {}
    body["settings"]={}
    body["settings"]["index.number_of_replicas"]=0
    targetShardsNum = calTargeIndexShardsNum(index)
    print "shrink index: " + index + ", target shards num:" + str(targetShardsNum)
    body["settings"]["index.number_of_shards"] = targetShardsNum
    body["settings"]["index.routing.allocation.require._id"] = None
    rsp = ESServer.indices.shrink(index=index, target="shrink-"+index, body=body)
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 添加別名
def add_alias(index):
    shrinkIndex = "shrink-"+index
    rsp = ESServer.indices.put_alias(index=shrinkIndex, name=index)
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 刪除索引
def delete_index(index):
    rsp = ESServer.indices.delete(index=index, ignore=[400, 404])
    if rsp is not None and rsp["acknowledged"] == True:
        return True
    else:
        return False

# 選擇需要執(zhí)行shrink的老索引
def selectNeedToShrinkIndex():
    params = {}
    params["format"] = "json"
    rsp = ESServer.cat.indices(index = indexPattern, params = params)
    for indexInfo in rsp:
        indexName = indexInfo["index"]
        if indexName.startswith("shrink-"):
            continue
        if indexInfo["health"] == 'green' and indexInfo["status"] == 'open' and indexInfo["pri"] == "60":
            indexSettings = ESServer.indices.get_settings(index=indexName)
            allocationSettings = indexSettings[indexName]["settings"]["index"]["routing"]["allocation"]["require"]
            if allocationSettings["temperature"] == 'warm' and "_id" not in allocationSettings:
                return indexName
    return None

# 把老索引的分片都遷移至一個(gè)節(jié)點(diǎn)上
def reallocatingOldIndex(index):
    warmNodeId = get_warm_node()
    if warmNodeId == None:
        print "warmNodeId is null"
        return
    print "warmNodeId: " + warmNodeId
    params = {}
    params["index.blocks.write"] = "true"
    params["index.routing.allocation.require._id"] = warmNodeId
    ESServer.indices.put_settings(index= index, body=params)

# 記錄當(dāng)前正在執(zhí)行的老索引父泳,便于后續(xù)輪詢
def recordCurrentIndex(currentIndex):
    indexBody ={}
    indexBody["currentIndex"] = currentIndex
    headers= {}
    headers["Content-Type"] = "application/json"
    indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="2")
    print "index current index success!"

# 記錄當(dāng)前正在執(zhí)行中的新索引,便于后續(xù)輪詢
def recordShrinkIndex(shrinkIndex):
    indexBody ={}
    indexBody["shrinkIndex"] = shrinkIndex
    headers= {}
    headers["Content-Type"] = "application/json"
    indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="3")
    print "index current index success!"

# 檢查shrink操作
def checkShrink(index):
    shrinkIndex = "shrink-"+index
    isShrinkIndexReady = check_new_index_status(shrinkIndex)
    if isShrinkIndexReady == True:
        deleteSuccess = delete_index(index)
        if deleteSuccess == True:
            success = add_alias(index)
            if success == True:
                deleteDocument(tempIndex, "2")
                deleteDocument(tempIndex, "3")
                body = {}
                body["indexName"] = index
                addDocument(tempIndex, body)
                forceMerge(index)
                print "shrink index "+ index + "finished!"
            else:
                print "add alias failed"
        else:
            print "delete old index: "+ index + "failed"

# 刪除臨時(shí)索引中的記錄
def deleteDocument(index, docId):
    rsp = ESServer.delete(index=index, id=docId, doc_type="_doc")
    if rsp is not None:
        print "delete document: " + index + ", id: "+ docId + "success"
        return True
    return False

# 在臨時(shí)索引中記錄所有的已完成shrink的索引名稱
def addDocument(index, body):
    rsp = ESServer.index(index=index, doc_type="_doc", body = body)
    if rsp is not None:
        print "record document: " + index + " success"
        return True
    return False

# 對(duì)新索引執(zhí)行merge
def forceMerge(index):
    params = {}
    params["max_num_segments"] = 1
    rsp = ESServer.indices.forcemerge(index=index, params =params)
    if rsp is not None:
        print "forcemerge index: " + index + " success"
        return True
    return False

# 執(zhí)行shrink
def execShrink(index):
    isOldIndexReady = check_old_index_status(index)
    if isOldIndexReady == True:
        print "old index: " + index + " is ready"
        success = shrink_index(index)
        if success == True:
            recordShrinkIndex("shrink-"+index)
        else:
            print "shrink failed"
    else:
        print "old index: " + index + " is reallocating"

# 選擇老索引
def selectIndexToExecShrink():
    currentIndex = selectNeedToShrinkIndex()
    if currentIndex == None:
        print "No new index needs to be shrinken"
    else:
        print "current index: " + currentIndex
        recordCurrentIndex(currentIndex)
        reallocatingOldIndex(currentIndex)
    return currentIndex

def check():
    existed = ESServer.indices.exists(tempIndex)
    if existed == True:
        getTempDoc = ESServer.get(tempIndex, doc_type="_doc", id="2", ignore=[404])
        if getTempDoc["found"] == False:
            currentIndex = selectIndexToExecShrink()
        else:
            currentIndex = getTempDoc["_source"]["currentIndex"]
            print "current index: " + currentIndex
        if currentIndex == None:
            return

        tempDoc = ESServer.get(tempIndex, doc_type="_doc", id="3", ignore=[404])
        if tempDoc["found"] == True:
            checkShrink(currentIndex)
        else:
            execShrink(currentIndex)
    else:
        selectIndexToExecShrink()


def main_handler(event,context):
    check()

該函數(shù)主要的邏輯有:

  1. 通過索引名稱通配符找到老的索引吴汪,選擇一個(gè)索引
  2. 選擇一個(gè)固定的warm節(jié)點(diǎn)惠窄,把1中選出的索引的分片全部移動(dòng)到這個(gè)warm節(jié)點(diǎn)上去(索引完整的一份數(shù)據(jù)都在一個(gè)節(jié)點(diǎn)上,才能執(zhí)行shrink漾橙,因?yàn)橐M(jìn)行硬鏈接)
  3. 根據(jù)老索引的數(shù)據(jù)量確定目標(biāo)分片數(shù)量杆融,按一個(gè)分片50GB確定,向上取整霜运,并且使得目標(biāo)分片數(shù)量為老索引分片數(shù)量的因子
  4. 分片移動(dòng)完畢后脾歇,執(zhí)行shrink操作,新索引的名稱為shrink-{老索引的名稱}淘捡,新索引的分片數(shù)量只能為老索引分片數(shù)量的因子藕各,比如老索引的分片數(shù)為10, 則新索引的分片數(shù)只能為1焦除、2或者5(為了保證數(shù)據(jù)不用rehash)
  5. 檢查新索引的狀態(tài)激况,等待狀態(tài)變?yōu)間reen并且沒有在初始化中的分片
  6. 刪除老索引
  7. 對(duì)新索引添加別名,別名為老索引的名稱
  8. 繼續(xù)執(zhí)行步驟1-7膘魄, 直至所有的老索引都執(zhí)行完畢乌逐,此舉是為了避免大量的分片移動(dòng)、初始化的操作對(duì)索引的新建產(chǎn)生影響创葡,在規(guī)模較大的集群中容易出現(xiàn)該問題
image

點(diǎn)擊"完成"即可完成云函數(shù)的創(chuàng)建浙踢。

2. 配置云函數(shù)

創(chuàng)建完云函數(shù)后,需要進(jìn)行配置才能使用蹈丸,如下圖成黄,可以配置函數(shù)的私有網(wǎng)絡(luò)VPC和Subnet(選擇和ES相同的VPC和Subnet):


image

3. 測(cè)試云函數(shù)

配置完云函數(shù)后,可以對(duì)函數(shù)代碼進(jìn)行測(cè)試逻杖,保證能夠正常運(yùn)行奋岁;如果需要進(jìn)行編輯,可以直接編輯然后點(diǎn)擊"保存并測(cè)試":


image

4. 配置觸發(fā)器

配置觸發(fā)器荸百,可以自定義選擇執(zhí)行周期觸發(fā)函數(shù):


image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闻伶,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子够话,更是在濱河造成了極大的恐慌蓝翰,老刑警劉巖光绕,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異畜份,居然都是意外死亡诞帐,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門爆雹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來停蕉,“玉大人,你說我怎么就攤上這事钙态』燮穑” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵册倒,是天一觀的道長(zhǎng)蚓挤。 經(jīng)常有香客問我,道長(zhǎng)驻子,這世上最難降的妖魔是什么灿意? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮崇呵,結(jié)果婚禮上脾歧,老公的妹妹穿的比我還像新娘。我一直安慰自己演熟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布司顿。 她就那樣靜靜地躺著芒粹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪大溜。 梳的紋絲不亂的頭發(fā)上化漆,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音钦奋,去河邊找鬼座云。 笑死,一個(gè)胖子當(dāng)著我的面吹牛付材,可吹牛的內(nèi)容都是我干的朦拖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼厌衔,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼璧帝!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起富寿,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤睬隶,失蹤者是張志新(化名)和其女友劉穎锣夹,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苏潜,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡银萍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了恤左。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贴唇。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖赃梧,靈堂內(nèi)的尸體忽然破棺而出滤蝠,到底是詐尸還是另有隱情,我是刑警寧澤授嘀,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布物咳,位于F島的核電站,受9級(jí)特大地震影響蹄皱,放射性物質(zhì)發(fā)生泄漏览闰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一巷折、第九天 我趴在偏房一處隱蔽的房頂上張望压鉴。 院中可真熱鬧,春花似錦锻拘、人聲如沸油吭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽婉宰。三九已至,卻和暖如春推穷,著一層夾襖步出監(jiān)牢的瞬間心包,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工馒铃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蟹腾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓区宇,卻偏偏與公主長(zhǎng)得像娃殖,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子议谷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355