原文鏈接:Python連接es筆記三之es更新操作
這一篇筆記介紹如何使用 Python 對(duì)數(shù)據(jù)進(jìn)行更新操作闰渔。
對(duì)于 es 的更新的操作,不用到 Search() 方法铐望,而是直接使用 es 的連接加上相應(yīng)的函數(shù)來(lái)操作冈涧,本篇筆記目錄如下:
- 獲取連接
- update()
- update_by_query()
- 批量更新
- UpdateByQuery()
1茂附、獲取連接
如果使用的是之前的全局創(chuàng)建連接的方式:
from elasticsearch_dsl import connections
connections.configure(
default={"hosts": "localhost:9200"},
)
我們可以根據(jù)別名獲取相應(yīng)的連接:
conn = connections.connections.get_connection("default")
或者我們直接使用 elasticsearch.Elasticsearch 模塊來(lái)重新建立一個(gè)連接:
from elasticsearch import Elasticsearch
conn = Elasticsearch(hosts="localhost:9200")
前面介紹過(guò),我們安裝 elasticsearch_dsl 依賴的時(shí)候督弓,會(huì)自動(dòng)為我們安裝上相應(yīng)的 elasticsearch 模塊营曼,我們這里直接使用即可。
然后通過(guò) conn 連接可以直接對(duì)數(shù)據(jù)進(jìn)行更新愚隧,可用的方法有 update()蒂阱,update_by_query() 以及一個(gè)批量的 bulk() 方法。
2狂塘、update()
update() 函數(shù)一般只用于指定 id 的更新操作录煤,如果我們知道一條數(shù)據(jù)的 id,我們可以直接使用 update()荞胡。
比如對(duì)于 exam 這個(gè) index 下 id=18 的數(shù)據(jù)妈踊,我們想要更新它的 name 字段和 address 字段分別為 王五和湖南省,我們可以如下操作:
conn.update(
index="exam",
id=18,
body={
"doc": {
"name": "王五2",
"address": "湖南省",
}
}
)
在上面的操作中泪漂,index 為指定的索引廊营,id 參數(shù)為我們需要更新的 id,body 內(nèi) doc 下的字段即為我們要更新的數(shù)據(jù)萝勤。
3露筒、update_by_query()
update_by_query() 函數(shù)不局限于 id 的查詢更新,我們可以更新任意符合條件的數(shù)據(jù)纵刘,以下是一個(gè)簡(jiǎn)單的示例:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "張三豐"}
},
"script": {
"source": "ctx._source.address = params.address",
"params": {
"address": "新地址",
}
}
}
)
在這里邀窃,index 參數(shù)還是指向?qū)?yīng)的索引,body 內(nèi)包含了需要更新查詢的條件假哎,這里都在 query 參數(shù)內(nèi)瞬捕,需要更新的數(shù)據(jù)在 script 下,通過(guò)腳本的形式來(lái)操作更新舵抹。
這里注意下肪虎,我這里用到的是 7.6.0 版本,所以 script 下使用的 source惧蛹,更低一點(diǎn)版本用的字段可能是 inline扇救,這里使用對(duì)應(yīng)版本的參數(shù)即可。
在 script.source 中香嗓,內(nèi)容為 ctx._source.address = params.address
迅腔,意思是將符合條件數(shù)據(jù)的 address 字段內(nèi)容更新為 params 的 address 的數(shù)據(jù)。
如果想要更改其他字段內(nèi)容靠娱,注意前面 ctx._source 為固定寫(xiě)法沧烈,只需要更改后面的字段名即可。
在 script.params 中像云,我們則可以定義各種對(duì)應(yīng)的字段及其內(nèi)容锌雀。
更新多個(gè)字段
如果我們想同時(shí)更新多個(gè)字段蚂夕,比如說(shuō)符合條件的數(shù)據(jù)將 address 改為 新地址
,將 age 字段改為 28腋逆,我們則需要將多個(gè)條件在 script.source 中使用分號(hào) ;
連接起來(lái)婿牍,示例如下:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三豐2"}
},
"script": {
"source": "ctx._source.address = params.address; ctx._source.age = params.age",
"params": {
"address": "新地址3",
"age": "28"
}
}
}
)
雖然這里更新多個(gè)字段需要使用分號(hào)連接,但是在實(shí)際的代碼中我們不用這么寫(xiě)死惩歉,比如說(shuō)我們需要更改三個(gè)字段等脂,為 ["address", "name", "age"]
,我們?nèi)缦虏僮鳎?/p>
field_list = ["address", "name", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
params = {
"address": "新地址3",
"age": "28",
"name": "new name"
}
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三豐3"}
},
"script": {
"source": ";".join(source_list),
"params": params
}
}
)
4柬泽、批量更新
如果我們想批量更新一批數(shù)據(jù)慎菲,這批數(shù)據(jù)各個(gè)字段的值都不一致嫁蛇,自定義的程度很大锨并,使用 update_by_query() 函數(shù)已經(jīng)不現(xiàn)實(shí)了,怎么辦睬棚?
好解決第煮,我們可以使用 helpers.bulk() 批量更新方法。
首先引入這個(gè)模塊:
from elasticsearch import helpers
假設(shè)我們系統(tǒng)里現(xiàn)在有 id 為 21抑党,23包警,24 的幾條數(shù)據(jù),還是在 exam 這個(gè)索引下底靠,我們來(lái)構(gòu)造幾條需要更新的數(shù)據(jù)來(lái)操作:
action_1 = {
"_op_type": "update",
"_index": "exam",
"_id": 21,
"doc": {"age": 19, "name": "令狐沖", "address": "華山派"},
}
action_2 = {
"_op_type": "update",
"_index": "exam",
"_id": 23,
"doc": {"age": 20, "name": "楊過(guò)", "address": "終南山"},
}
action_3 = {
"_op_type": "update",
"_index": "exam",
"_id": 24,
"doc": {"age": 21, "name": "張無(wú)忌", "address": "武當(dāng)"},
}
action_list = [action_1, action_2, action_3]
helpers.bulk(conn, actions=action_list)
對(duì)于每一條需要更新的數(shù)據(jù)害晦,有這幾個(gè)參數(shù):
_op_type:如果是更新操作,其值則是 update
_index:表示需要更新的數(shù)據(jù)所在的索引暑中,這里是 exam
_id:表示這條需要更新的數(shù)據(jù)的 id
doc:是一個(gè) dict 數(shù)據(jù)壹瘟,其下包含了需要更新的字段及其對(duì)應(yīng)的值
至此,一條需要更新的數(shù)據(jù)的結(jié)構(gòu)就構(gòu)造完畢了鳄逾。
然后對(duì)于 helpers.bulk() 函數(shù)稻轨,接收的第一個(gè)參數(shù)為 es 連接,actions 參數(shù)是一個(gè)列表雕凹,其內(nèi)容就是我們前面構(gòu)造的數(shù)據(jù)的集合殴俱。
然后執(zhí)行這個(gè)操作就可以發(fā)現(xiàn) es 中對(duì)應(yīng)的值已經(jīng)更改了。
5枚抵、UpdateByQuery()
UpdateByQuery() 函數(shù)來(lái)源于 elasticsearch_dsl 模塊线欲,它的使用和 Search() 方法差不多,都是通過(guò) using 和 index 參數(shù)來(lái)獲取 es 連接和索引:
from elasticsearch_dsl import connections
from elasticsearch_dsl import UpdateByQuery
from elasticsearch_dsl import Q as ES_Q
connections.configure(
default={"hosts": "localhost:9200"},
)
ubq = UpdateByQuery(using="default", index="exam")
使用這個(gè)方法更新數(shù)據(jù)的具體語(yǔ)法和 update_by_query 差不多汽摹,都是通過(guò) script 的方式來(lái)操作李丰,以下是一個(gè)簡(jiǎn)單示例:
ubq = UpdateByQuery(using="default", index="exam")
q1 = ES_Q("term", name="郭靖")
ubq = ubq.query(q1)
ubq = ubq.script(
source="ctx._source.address=params.address",
params={
"address": "襄陽(yáng)城"
}
)
ubq.execute()
與 Search() 函數(shù)一樣,都需要通過(guò) execute() 函數(shù)來(lái)向 es 提交數(shù)據(jù)竖慧。