Background
剛接觸ElasticSearch不久妖爷,公司讓我?guī)兔θ?dǎo)一下數(shù)據(jù),剛開始數(shù)量并不是很大理朋,我都是用Elasticsearch python的api接口絮识,一條一條數(shù)據(jù)去往新的index里插. 但是馬上又有了千萬(wàn)級(jí)別的數(shù)據(jù)需要操作,如果還是老辦法嗽上,會(huì)特別慢次舌,而且不夠穩(wěn)定。于是去查了一下資料兽愤。關(guān)于Elasticsearch的python api文檔很少彼念,中文的就更少了,官方穩(wěn)定又不是很詳細(xì)烹看,只是大概得到一個(gè)信息国拇,可以用bulk() api去操作大量的數(shù)據(jù)。
Solution:
我需要解決的問題有兩個(gè):
- 查詢多個(gè)Index中的內(nèi)容惯殊,然后將滿足條件的數(shù)據(jù)寫到新的一個(gè)Index中:
這個(gè)問題比較簡(jiǎn)單酱吝,elasticsearch 的helpers module提供了一個(gè)完美的方法來(lái)做這件事:reindex()
elasticsearch.helpers.reindex(client, source_index, target_index, query=None,target_client=None, chunk_size=500, scroll=u'5m', scan_kwargs={}, bulk_kwargs={})
這個(gè)方法的參數(shù),提供了source_index(可以是List), target_index, query以及 scroll_size 和 scroll的保存時(shí)間,所以直接跑就可以了土思。
- 批量更新現(xiàn)有Index中的所有數(shù)據(jù)务热,給每個(gè)document增加一個(gè)field并賦值:
官方文檔中的api 簽名是這樣的:
elasticsearch.helpers.bulk(client, actions, stats_only=False, **kwargs)
我一直沒搞明白actions是什么類型的參數(shù)忆嗜,以為是個(gè)函數(shù)類行的參數(shù),后來(lái)看了一下源碼崎岂,發(fā)現(xiàn)其實(shí)是一個(gè)List, 而且是要被操作的document的集合,官方文檔上顯示是要滿足這個(gè)樣子捆毫,跟search()返回的結(jié)果格式一樣:
{ '_index': 'index-name', '_type': 'document', '_id': 42, '_parent': 5, '_ttl': '1d', '_source': { "title": "Hello World!", "body": "..." }}
但是又說:The bulk()
api accepts index, create, delete, and update actions. Use the _op_type field to specify an action (_op_type defaults to index):
{ '_op_type': 'delete', '_index': 'index-name', '_type': 'document', '_id': 42,}{ '_op_type': 'update', '_index': 'index-name', '_type': 'document', '_id': 42, 'doc': {'question': 'The life, universe and everything.'}}
我在自己的數(shù)據(jù)上加了"_op_type":"update", 然后運(yùn)行一直出錯(cuò):
TransportError(400, u'action_request_validation_exception',u'Validation Failed: 1: script or doc is missing
直到我嘗試著刪掉"_op_type"這個(gè)字段,終于運(yùn)行成功了冲甘。以下是我的代碼:
def queryInES( esinstance):
search_body={"query":{"match_all":{}}}
page = esinstance.search(index='my_index', body=search_body, search_type='scan', doc_type='Tweet', scroll='5s', size=1000)
sid=page['_scroll_id']
scroll_size = page['hits']['hits']
while(scroll_size>0):
pg = es.scroll(scroll_id=sid, scroll='30s')
scroll_size = len(pg['hits']['hits'])
print "scroll size: " + str(scroll_size)
sid = pg['_scroll_id']
data=pg['hits']['hits']
... ...
for i in range(0, scroll_size):
data[i]['_source']['attributes']['senti']={"label":label, "score": score, "confidence": confidence}
helpers.bulk(client=esinstance, actions=data)