Elasticsearch的兩種分頁(yè)方式
使用常規(guī)的分頁(yè)方式通用函數(shù)
from elasticsearch import Elasticsearch
es = Elasticsearch('127.0.0.1', port=9200)
def search(self, index, doc_type, q_dict={}, search_str='', search_fields=[], fields=[],
page_index=0,page_size=10, all_data=False, body={}, size=1000, sort=''):
'''
查詢es的基礎(chǔ)函數(shù)
:param index: 索引 必填
:param doc_type: 類型 必填
:param q_dict: 精確條件 例:{'name': 'tom', 'age': [12,13,14]}
:param search_str: 模糊查詢 例: 'om'
:param search_fields: 模糊查詢字段 例 ['name', 'age'] 注:數(shù)字等特殊類型不能使用
:param fields: 返回字段 例:['name']
:param page_index: 查詢開始的條頁(yè)客燕,查詢從第多少條數(shù)據(jù)開始查 int
:param page_size: 從查詢開始往后查多少條寻定,顆粒理解為每頁(yè)的大小 int
:param all_data: 是否獲取全部的數(shù)據(jù) False/Ture
:param body: 可自定義的查詢條件
:param size: 每次索引的大小
:param sort: 排序
'''
result = {}
must = []
if q_dict:
for key in q_dict:
if isinstance(q_dict[key], list):
must.append({
'terms': {key: q_dict[key]}
})
else:
must.append({
'term': {key: q_dict[key]}
})
if search_str:
if not search_fields:
search_fields = fields
should = []
for field in search_fields:
should.append({
"wildcard": {
field: "*{}*".format(search_str)
}
})
must.append({"bool": {"should": should}})
dsl = {
'query': {
'bool': {
'must': must,
'must_not': [],
'should': []
}
},"from": self.start_index,
"size": self.paginate_by
}
es_res = es.search(index='backup', doc_type='backup_log', body=dsl, sort=self.ordering)
es_hits = es_res["hits"]["hits"]
result["count"] = es_res["hits"]["total"]
result["data"] = es_hits
return result
修改最大查詢1萬(wàn)條的限制
默認(rèn)情況下去扣,最大查詢返回條數(shù)size被限制在10000,傳入的值或查詢的內(nèi)容超過(guò)這個(gè)限制就會(huì)報(bào)錯(cuò),可以通過(guò)請(qǐng)求更改這個(gè)最大限制
首先關(guān)閉索引
然后在復(fù)合查詢的界面中請(qǐng)求
test/_settings?preserve_existing=true
#test為index的名字
然后請(qǐng)求json 為
{"max_result_window":"200000"}
請(qǐng)求方式 為put,發(fā)起請(qǐng)求后返回true拷况,就設(shè)置成功
然后開啟索引找爱,查看信息
可是梗顺,設(shè)置的太大,往后也會(huì)越來(lái)越慢车摄,
這個(gè)時(shí)候寺谤,可以用scroll的方式進(jìn)行分頁(yè),起原理類似于關(guān)系型數(shù)據(jù)庫(kù)的游標(biāo)吮播,加上scroll參數(shù)之后变屁,就會(huì)以size的大小作為游標(biāo)移動(dòng)的大小進(jìn)行移動(dòng)
def search(self, index, doc_type, q_dict={}, search_str='', search_fields=[], fields=[], page_index=1,
page_size=10, all_data=False, body={}, size=1000, sort=''):
result = {}
must = []
if q_dict:
for key in q_dict:
if isinstance(q_dict[key], list):
must.append({
'terms': {key: q_dict[key]}
})
else:
must.append({
'term': {key: q_dict[key]}
})
if search_str:
if not search_fields:
search_fields = fields
should = []
for field in search_fields:
should.append({
"wildcard": {
field: "*{}*".format(search_str)
}
})
must.append({"bool": {"should": should}})
dsl = {
'query': {
'bool': {
'must': must,
'must_not': [],
'should': []
}
},
}
if body:
dsl = body
if fields:
dsl['fields'] = fields
exact_res = self.db.search(index=index, doc_type=doc_type, scroll='1m', body=dsl,
sort=sort.lower(), size=size)
# scroll = '1m'表示這個(gè)游標(biāo)要維護(hù)多久,如果過(guò)期意狠,將不能在用
num = exact_res.get('hits').get('total')
print 'Total: {}'.format(num)
scroll_id = exact_res.get('_scroll_id') # 獲取scroll_id
while True:
p_data = self.db.scroll(scroll_id=scroll_id, scroll='1m')
# 通過(guò)scroll_id 拿到本次的值粟关,當(dāng)值為空的時(shí)候表示游標(biāo)走到了底,其中环戈,每次循環(huán)游標(biāo)會(huì)移動(dòng)size個(gè)數(shù)據(jù)誊役,例如size=1000,總數(shù)據(jù)為10000谷市,那么每次循環(huán)會(huì)取100條數(shù)據(jù)蛔垢,循環(huán)100次會(huì)取到全部數(shù)據(jù)
data = p_data['hits']['hits']
self.db.clear_scroll(scroll_id=scroll_id) # 如果不在使用,調(diào)用關(guān)閉迫悠,不然多次查詢會(huì)維護(hù)很多個(gè)scroll鹏漆,損耗性能
return result
用scroll查詢,在每一次的查詢中,性能的確是有了艺玲,不過(guò)括蝠,很頭疼的是,這樣只能一頁(yè)一頁(yè)的往下翻饭聚,不能到指定頁(yè)
如果想要跳至指定頁(yè)忌警,那么,有一個(gè)效率一般的方法秒梳,可以考慮慎重使用法绵,就是直接通過(guò)循環(huán)迭代至指定頁(yè)數(shù),然后把需要的數(shù)據(jù)通過(guò)切片取出來(lái)
def search(self, index, doc_type, q_dict={}, search_str='', search_fields=[], fields=[], page_index=1,
page_size=10, all_data=False, body={}, size=1000, sort=''):
page_befor_l = int(int(page_index) / size)
page_over = int(page_index) + int(page_size)
page_over_l = int(page_over / size)
if (page_over_l - page_befor_l) > 1:
raise Exception('page error: The number of query bars cannot exceed the maximum query')
result = {}
must = []
if q_dict:
for key in q_dict:
if isinstance(q_dict[key], list):
must.append({
'terms': {key: q_dict[key]}
})
else:
must.append({
'term': {key: q_dict[key]}
})
if search_str:
if not search_fields:
search_fields = fields
should = []
for field in search_fields:
should.append({
"wildcard": {
field: "*{}*".format(search_str)
}
})
must.append({"bool": {"should": should}})
dsl = {
'query': {
'bool': {
'must': must,
'must_not': [],
'should': []
}
},
}
if body:
dsl = body
if fields:
dsl['fields'] = fields
exact_res = self.db.search(index=index, doc_type=doc_type, scroll='1m', body=dsl,
sort=sort.lower(), size=size)
num = exact_res.get('hits').get('total')
print 'Total: {}'.format(num)
res, res_list, page, ch = None, [], 1, 0
p_o = page_befor_l
if page_befor_l < page_over_l:
ch = page_over - (page_befor_l + 1) * size
scroll_id = exact_res.get('_scroll_id')
while True:
p_data = self.db.scroll(scroll_id=scroll_id, scroll='1m')
if p_o > page:
page += 1
continue
if p_o == 0:
es_data = exact_res['hits']['hits']
else:
es_data = p_data['hits']['hits']
if all_data:
res_list.extend(es_data) # 全量數(shù)據(jù)
elif ch:
if p_o == 0:
qian = int(page_index)
res_list.extend(es_data[qian:])
res_list.extend(p_data['hits']['hits'][:ch])
break
elif page_over_l == page:
res_list.extend(es_data[:ch])
break
else:
qian = int(page_index) - page_befor_l * size
res_list.extend(es_data[qian:])
page += 1
continue
else:
qian = int(page_index) - int(int(int(page_index) / size) * size)
hou = qian + page_size
res = es_data[qian:hou]
break
if res:
result['data'] = res
else:
result['data'] = res_list
self.db.clear_scroll(scroll_id=scroll_id)
return result
這樣就可以返回指定數(shù)據(jù)酪碘,不過(guò)朋譬,越是往后,效率越低兴垦,
根據(jù)數(shù)據(jù)量的大小和翻到指定頁(yè)的大小和頻率徙赢,靈活的修改size的大小,可明顯看出效率是不一樣的探越,但是在數(shù)據(jù)量小的時(shí)候狡赐,也是毫秒間的,在一定量數(shù)據(jù)的時(shí)候钦幔,查詢效率也是高于from阴汇,size的,在數(shù)據(jù)量超大节槐,而且用必須用es分頁(yè)跳頁(yè)的時(shí)候,這也不失為一個(gè)解決方法