在冷熱分離的Elasticsearch集群架構(gòu)中贺拣,往往會(huì)使用擁有較高規(guī)格cpu和內(nèi)存鸦难、以及SSD盤的機(jī)器作為熱節(jié)點(diǎn),用于保證高并發(fā)的寫入片林。通常索引都是按天創(chuàng)建的端盆,只有當(dāng)天的索引會(huì)進(jìn)行寫入,存量的索引會(huì)定期比如說在索引創(chuàng)建15天后遷移到冷節(jié)點(diǎn)或者warm節(jié)點(diǎn)上费封,冷節(jié)點(diǎn)或者warm節(jié)點(diǎn)的cpu和內(nèi)存配置都可以低一些焕妙,并且使用SATA盤存儲(chǔ)降低成本。
在熱節(jié)點(diǎn)上的索引弓摘,為了保證寫入性能访敌,通常分片數(shù)會(huì)設(shè)置的和熱節(jié)點(diǎn)的數(shù)量一致,使得每臺(tái)機(jī)器的資源都可以利用上衣盾。但是隨著索引數(shù)量的不斷增加,集群整體的分片數(shù)量也越來越多爷抓,如果分片數(shù)量達(dá)到了數(shù)萬势决,對(duì)集群的穩(wěn)定性和性能都會(huì)有不小的影響。所以需要對(duì)集群整體的分片數(shù)量進(jìn)行控制蓝撇,避免分片數(shù)過多而導(dǎo)致集群不穩(wěn)定果复,好在ES本身有shrink 索引的功能,可以降低索引的分片數(shù)渤昌,但是shrink操作有一些前置條件和使用限制虽抄,不是直接對(duì)索引的分片數(shù)調(diào)低,而是新建一個(gè)分片數(shù)量少的索引独柑,硬鏈接到老的索引迈窟,然后對(duì)新索引執(zhí)行recovery直至新索引變green。我們可以在新索引變green后忌栅,刪除老的索引车酣,然后對(duì)新索引建立別名,別名和老索引的名稱完全一樣索绪,可以按照老索引的名稱查詢數(shù)據(jù)湖员。
本文嘗試使用SCF云函數(shù)對(duì)存量的大量的老索引,通過shrink瑞驱,降低索引的分片數(shù)量娘摔。
實(shí)施步驟
1. 創(chuàng)建SCF云函數(shù)
如圖,基于名為"ES寫入函數(shù)"的模板唤反,創(chuàng)建一個(gè)新的函數(shù):
點(diǎn)擊"下一步"進(jìn)入函數(shù)編輯界面凳寺,直接復(fù)制如下函數(shù)代碼粘貼到編輯框鸭津,修改ES的vip和用戶名密碼,以及索引前綴名稱等信息:
# -*- coding: utf8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
import time
import math
ESServer = Elasticsearch(["xxx:9200"],http_auth=('elastic', 'xxx'),timeout=30)
# 臨時(shí)索引读第,用于記錄當(dāng)前正在執(zhí)行哪一個(gè)老的索引
tempIndex = "temp-snapshot"
# 老的索引前綴
indexPattern = "test-*"
currentIndex = None
# 根據(jù)磁盤使用量曙博,獲取用量最小的warm節(jié)點(diǎn)id
def get_warm_node():
rsp = ESServer.nodes.stats(metric="indices",index_metric="store")
nodesInfo = rsp["nodes"]
minStorageSizeNodeId = None
minStorageSize = 0
for node in nodesInfo:
nodeInfo = nodesInfo[node]
if nodeInfo["attributes"]["temperature"] != "warm":
continue
nodeStorageSize = nodeInfo["indices"]["store"]["size_in_bytes"]
if minStorageSize == 0:
minStorageSize = nodeStorageSize
minStorageSizeNodeId = node
if nodeStorageSize < minStorageSize:
minStorageSize = nodeStorageSize
minStorageSizeNodeId = node
return minStorageSizeNodeId
# 檢查老索引的狀態(tài),判斷是否有正在遷移中的分片
def check_old_index_status(index):
params = {}
params["format"] = "json"
rsp = ESServer.cat.recovery(index = index, params = params)
for shardStats in rsp:
if shardStats["stage"] != "done":
return False
return True
# 檢查新索引的狀態(tài)怜瞒,判斷是否green
def check_new_index_status(index):
rsp = ESServer.cluster.health(index = index)
if rsp != None and rsp["status"] == "green":
return True
return False
# 根據(jù)索引數(shù)據(jù)量確定要shrink到幾個(gè)分片
def calTargeIndexShardsNum(index):
params = {}
params["format"] = "json"
rsp = ESServer.cat.indices(index = index, params = params)
indexInfo = rsp[0]
storageSize = indexInfo["pri.store.size"]
shardNum = int(indexInfo["pri"])
if storageSize.endswith("gb"):
size = float(storageSize[0:storageSize.rfind("gb")])
targetShardsNum = int(math.ceil(size/50))
while shardNum / targetShardsNum * targetShardsNum < shardNum:
targetShardsNum = targetShardsNum + 1
return targetShardsNum
else:
return 1
# 執(zhí)行shrink
def shrink_index(index):
body = {}
body["settings"]={}
body["settings"]["index.number_of_replicas"]=0
targetShardsNum = calTargeIndexShardsNum(index)
print "shrink index: " + index + ", target shards num:" + str(targetShardsNum)
body["settings"]["index.number_of_shards"] = targetShardsNum
body["settings"]["index.routing.allocation.require._id"] = None
rsp = ESServer.indices.shrink(index=index, target="shrink-"+index, body=body)
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 添加別名
def add_alias(index):
shrinkIndex = "shrink-"+index
rsp = ESServer.indices.put_alias(index=shrinkIndex, name=index)
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 刪除索引
def delete_index(index):
rsp = ESServer.indices.delete(index=index, ignore=[400, 404])
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 選擇需要執(zhí)行shrink的老索引
def selectNeedToShrinkIndex():
params = {}
params["format"] = "json"
rsp = ESServer.cat.indices(index = indexPattern, params = params)
for indexInfo in rsp:
indexName = indexInfo["index"]
if indexName.startswith("shrink-"):
continue
if indexInfo["health"] == 'green' and indexInfo["status"] == 'open' and indexInfo["pri"] == "60":
indexSettings = ESServer.indices.get_settings(index=indexName)
allocationSettings = indexSettings[indexName]["settings"]["index"]["routing"]["allocation"]["require"]
if allocationSettings["temperature"] == 'warm' and "_id" not in allocationSettings:
return indexName
return None
# 把老索引的分片都遷移至一個(gè)節(jié)點(diǎn)上
def reallocatingOldIndex(index):
warmNodeId = get_warm_node()
if warmNodeId == None:
print "warmNodeId is null"
return
print "warmNodeId: " + warmNodeId
params = {}
params["index.blocks.write"] = "true"
params["index.routing.allocation.require._id"] = warmNodeId
ESServer.indices.put_settings(index= index, body=params)
# 記錄當(dāng)前正在執(zhí)行的老索引父泳,便于后續(xù)輪詢
def recordCurrentIndex(currentIndex):
indexBody ={}
indexBody["currentIndex"] = currentIndex
headers= {}
headers["Content-Type"] = "application/json"
indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="2")
print "index current index success!"
# 記錄當(dāng)前正在執(zhí)行中的新索引,便于后續(xù)輪詢
def recordShrinkIndex(shrinkIndex):
indexBody ={}
indexBody["shrinkIndex"] = shrinkIndex
headers= {}
headers["Content-Type"] = "application/json"
indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="3")
print "index current index success!"
# 檢查shrink操作
def checkShrink(index):
shrinkIndex = "shrink-"+index
isShrinkIndexReady = check_new_index_status(shrinkIndex)
if isShrinkIndexReady == True:
deleteSuccess = delete_index(index)
if deleteSuccess == True:
success = add_alias(index)
if success == True:
deleteDocument(tempIndex, "2")
deleteDocument(tempIndex, "3")
body = {}
body["indexName"] = index
addDocument(tempIndex, body)
forceMerge(index)
print "shrink index "+ index + "finished!"
else:
print "add alias failed"
else:
print "delete old index: "+ index + "failed"
# 刪除臨時(shí)索引中的記錄
def deleteDocument(index, docId):
rsp = ESServer.delete(index=index, id=docId, doc_type="_doc")
if rsp is not None:
print "delete document: " + index + ", id: "+ docId + "success"
return True
return False
# 在臨時(shí)索引中記錄所有的已完成shrink的索引名稱
def addDocument(index, body):
rsp = ESServer.index(index=index, doc_type="_doc", body = body)
if rsp is not None:
print "record document: " + index + " success"
return True
return False
# 對(duì)新索引執(zhí)行merge
def forceMerge(index):
params = {}
params["max_num_segments"] = 1
rsp = ESServer.indices.forcemerge(index=index, params =params)
if rsp is not None:
print "forcemerge index: " + index + " success"
return True
return False
# 執(zhí)行shrink
def execShrink(index):
isOldIndexReady = check_old_index_status(index)
if isOldIndexReady == True:
print "old index: " + index + " is ready"
success = shrink_index(index)
if success == True:
recordShrinkIndex("shrink-"+index)
else:
print "shrink failed"
else:
print "old index: " + index + " is reallocating"
# 選擇老索引
def selectIndexToExecShrink():
currentIndex = selectNeedToShrinkIndex()
if currentIndex == None:
print "No new index needs to be shrinken"
else:
print "current index: " + currentIndex
recordCurrentIndex(currentIndex)
reallocatingOldIndex(currentIndex)
return currentIndex
def check():
existed = ESServer.indices.exists(tempIndex)
if existed == True:
getTempDoc = ESServer.get(tempIndex, doc_type="_doc", id="2", ignore=[404])
if getTempDoc["found"] == False:
currentIndex = selectIndexToExecShrink()
else:
currentIndex = getTempDoc["_source"]["currentIndex"]
print "current index: " + currentIndex
if currentIndex == None:
return
tempDoc = ESServer.get(tempIndex, doc_type="_doc", id="3", ignore=[404])
if tempDoc["found"] == True:
checkShrink(currentIndex)
else:
execShrink(currentIndex)
else:
selectIndexToExecShrink()
def main_handler(event,context):
check()
該函數(shù)主要的邏輯有:
- 通過索引名稱通配符找到老的索引吴汪,選擇一個(gè)索引
- 選擇一個(gè)固定的warm節(jié)點(diǎn)惠窄,把1中選出的索引的分片全部移動(dòng)到這個(gè)warm節(jié)點(diǎn)上去(索引完整的一份數(shù)據(jù)都在一個(gè)節(jié)點(diǎn)上,才能執(zhí)行shrink漾橙,因?yàn)橐M(jìn)行硬鏈接)
- 根據(jù)老索引的數(shù)據(jù)量確定目標(biāo)分片數(shù)量杆融,按一個(gè)分片50GB確定,向上取整霜运,并且使得目標(biāo)分片數(shù)量為老索引分片數(shù)量的因子
- 分片移動(dòng)完畢后脾歇,執(zhí)行shrink操作,新索引的名稱為shrink-{老索引的名稱}淘捡,新索引的分片數(shù)量只能為老索引分片數(shù)量的因子藕各,比如老索引的分片數(shù)為10, 則新索引的分片數(shù)只能為1焦除、2或者5(為了保證數(shù)據(jù)不用rehash)
- 檢查新索引的狀態(tài)激况,等待狀態(tài)變?yōu)間reen并且沒有在初始化中的分片
- 刪除老索引
- 對(duì)新索引添加別名,別名為老索引的名稱
- 繼續(xù)執(zhí)行步驟1-7膘魄, 直至所有的老索引都執(zhí)行完畢乌逐,此舉是為了避免大量的分片移動(dòng)、初始化的操作對(duì)索引的新建產(chǎn)生影響创葡,在規(guī)模較大的集群中容易出現(xiàn)該問題
點(diǎn)擊"完成"即可完成云函數(shù)的創(chuàng)建浙踢。
2. 配置云函數(shù)
創(chuàng)建完云函數(shù)后,需要進(jìn)行配置才能使用蹈丸,如下圖成黄,可以配置函數(shù)的私有網(wǎng)絡(luò)VPC和Subnet(選擇和ES相同的VPC和Subnet):
3. 測(cè)試云函數(shù)
配置完云函數(shù)后,可以對(duì)函數(shù)代碼進(jìn)行測(cè)試逻杖,保證能夠正常運(yùn)行奋岁;如果需要進(jìn)行編輯,可以直接編輯然后點(diǎn)擊"保存并測(cè)試":
4. 配置觸發(fā)器
配置觸發(fā)器荸百,可以自定義選擇執(zhí)行周期觸發(fā)函數(shù):