在MongoDB中使用批量操作或塊操作「Bulk Write」在效率上有非常大的提升运褪,適合大量寫操作
第一次嘗試使用批量操作進(jìn)行數(shù)據(jù)清洗,并且用PyMongo模擬了少量數(shù)據(jù)來(lái)進(jìn)行測(cè)試,構(gòu)造50w條數(shù)據(jù)進(jìn)行插入或更新操作秸讹。
模擬環(huán)境:
PyMongo 3.6.1
MongoDB 3.4.7
Python 3.6.4 :: Anaconda, Inc.
模擬數(shù)據(jù)項(xiàng):
items = [
{'i': 0},
{'i': 1},
{'i': 2},
{'i': 3},
{'i': 4},
...
{'i': 500000},
]
按條插入/更新的情況如下:
方法 | 總數(shù) | 單次條數(shù) | 時(shí)間 | 語(yǔ)句 |
---|---|---|---|---|
save | 50w | 1 | 00:02:54 | db['test'].save(item) |
insert | 50w | 1 | 00:02:50 | db['test'].insert(item) |
insert批量插入的情況如下:
方法 | 總數(shù) | 單次條數(shù) | 時(shí)間 | 語(yǔ)句 |
---|---|---|---|---|
insert | 50w | 1k | 00:00:07 | db['test'].insert(items) |
insert | 50w | 10k | 00:00:08 | db['test'].insert(items) |
塊操作的情況如下:
方法 | 總數(shù) | 單次 | 時(shí)間 | 語(yǔ)句 |
---|---|---|---|---|
bulk_write + InsertOne | 50w | 1k | 00:00:09 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne | 50w | 10k | 00:00:07 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne | 50w | 50w | 00:00:09 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + ReplaceOne | 50w | 1k | 00:00:20 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + ReplaceOne | 50w | 10k | 00:00:21 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + ReplaceOne | 50w | 50w | 00:00:22 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + UpdateOne | 50w | 1k | 00:00:20 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne | 50w | 10k | 00:00:21 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne | 50w | 50w | 00:00:22 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne + InsertOne | 100w | 10k | 00:00:38 | db['test'].bulk_write(list(map(InsertOne, items1)) + list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': 0}}, upsert=True),items2))) |
模擬代碼如下:
import pymongo
import time
from pymongo import InsertOne, ReplaceOne, UpdateOne
from pymongo.errors import BulkWriteError
settings = {
'MONGO_HOST': "***", # 數(shù)據(jù)庫(kù)地址
'MONGO_PORT': ***, # 數(shù)據(jù)庫(kù)端口
'MONGO_DB': "***", # 數(shù)據(jù)庫(kù)名
'MONGO_USER': "***", # 用戶名
'MONGO_PSW': "***", # 密碼
}
client = pymongo.MongoClient(host=settings['MONGO_HOST'],port=settings['MONGO_PORT'])
client.admin.authenticate(settings['MONGO_USER'], settings['MONGO_PSW'],mechanism='SCRAM-SHA-1')
db = client[settings['MONGO_DB']]
l1 = []
for i in range(500000, 1000001):
l1.append({'i': i})
l2 = list(db['test'].find({}))
start_time = time.time()
page = 0
count = 10000
while True:
skip = page * count
page = page + 1
items1 = l1[skip:skip + count]
items2 = l2[skip:skip + count]
items = list(map(InsertOne, items1)) + list(map(InsertOne, items1))
try:
db['test'].bulk_write( \
list(map(InsertOne, items1)) + \
list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': 0}}, upsert=True),items2)))
except BulkWriteError as bwe:
print(bwe.details)
else:
print(page)
if page == 50:
break
end_time = time.time()
consume_time = end_time - start_time
consume_time = '{:0>2s}'.format(str(int(consume_time // 3600))) \
+ ':{:0>2s}'.format(str(int((consume_time // 60) % 60))) \
+ ':{:0>2s}'.format(str(int(consume_time % 60)))
print(consume_time)
注意:bulk_write(list)傳入的list不能為空胁后,會(huì)出現(xiàn)報(bào)錯(cuò)信息。
經(jīng)過測(cè)試嗦枢,可以看到批量操作與單條操作的寫入效率相差非常大攀芯,Insert批量插入與Bulk Write快操作效率基本相同。
但bulk_write()可以將增刪改操作合在一起文虏,具有更好的靈活性侣诺。
吐槽:手賤循環(huán)了一個(gè)億的數(shù)據(jù)進(jìn)列表,系統(tǒng)直接跑死機(jī)了氧秘,PyCharm/SecureCRT/Studio 3T環(huán)境全部崩潰年鸳,連搜狗輸入法都崩了!M柘唷搔确!摔!C鹬摇膳算!