使用用戶名灶挟、密碼
安裝PyMongo:pip install pymongo
import pymongo #導(dǎo)入pymongo
class client_Mongo(object):
def __init__(self, host, port, db, p, u, w):
self.host = host
self.port = port
self.username = u
self.password = w
self.client = pymongo.MongoClient(host=self.host,
port=self.port,
connect=False, # 防止多線程造成卡頓
)
self.client.gdata.authenticate(self.username, self.password, mechanism='SCRAM-SHA-1')
self.db = self.client[db] # 鏈接myTest數(shù)據(jù)庫
self.p = self.db[p] # persons集合
def insert(self, person):
# 插入數(shù)據(jù)
try:
self.p.insert(person) # 可以是列表,保存多個(gè)
print('存儲成功', person['_id'])
except Exception as e:
if 'E11000' in str(e):
print('數(shù)據(jù)重復(fù)', person['_id'])
else:
print(e)
def find(self, p):
# 查詢數(shù)據(jù)
try:
result = self.p.find_one(p)
return result
except Exception as e:
print(e)
def updata(self, p, up):
# 更新
if '$set' not in up: # 如需重置集合表,注釋此項(xiàng)
raise Exception('未添加[set]函數(shù)') # 如需重置集合表,注釋此項(xiàng)
try:
result = self.p.update(p, up)
return result
except Exception as e:
print(e)
def find_stored(self, lang):
# 排序查詢
try:
result = self.p.find().sort([('排序字段', -1)]).limit(1).skip(0)
return result
except Exception as e:
print(e)
def tj_find_all(self, params):
# 參數(shù)查詢
try:
result = self.p.find(params, no_cursor_timeout=True) # no_cursor_timeout 防止長時(shí)間鏈接卡頓
return result
except Exception as e:
print(e)
def del_data(self, person):
# 刪除
try:
result = self.p.delete_many(person)
print(result)
except Exception as e:
print(e)
if __name__ == '__main__':
mongo = client_Mongo(host='localhost', # IP
port=27017, # 端口
db='xxx', # 數(shù)據(jù)庫
p='xxx', # 表
u='xxx', # 用戶名
w='xxx', # 密碼
)
如需轉(zhuǎn)載請注明出處