mongodb
## 安裝
pip install pymongo
## 使用
from pymongo import MongoClient
conn = MongoClient('192.168.0.113', 27017)
db = conn.mydb #連接mydb數(shù)據(jù)庫瓦侮,沒有則自動創(chuàng)建
my_set = db.test_set #使用test_set集合教届,沒有則自動創(chuàng)建
### 增
db.COLLECTION_NAME.insert(document)
### 更新
my_set.update(
<query>, #查詢條件
<update>, #update的對象和一些更新的操作符
{
upsert: <boolean>, #如果不存在update的記錄品山,是否插入
multi: <boolean>, #可選,mongodb 默認是false,只更新找到的第一條記錄
writeConcern: <document> #可選络拌,拋出異常的級別研侣。
}
)
//刪除
my_set.remove(
<query>, #(可選)刪除的文檔的條件
{
justOne: <boolean>, #(可選)如果設為 true 或 1矢劲,則只刪除一個文檔
writeConcern: <document> #(可選)拋出異常的級別
}
)
//查
my_set.find({"name":"zhangsan"}).sort([("age",1)])
my_set.find({"age":{"$gt":25}}) //$lte
my_set.find({"name":"zhangsan"}).skip(2).limit(6)
my_set.find({"age":{"$in":(20,30,35)}})
my_set.find({"$or":[{"age":20},{"age":35}]})
my_set.find({'li':{'$all':[1,2,3,4]}})
//數(shù)組
my_set.update({'name':"lisi"}, {'$push':{'li':4}})
for i in my_set.find({'name':"lisi"}):
print(i)
#輸出:{'li': [1, 2, 3, 4], '_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'age': 18, 'name': 'lisi'}
my_set.update({'name':"lisi"}, {'$pushAll':{'li':[4,5]}})
for i in my_set.find({'name':"lisi"}):
print(i)
#輸出:{'li': [1, 2, 3, 4, 4, 5], 'name': 'lisi', 'age': 18, '_id': ObjectId('58c50d784fc9d44ad8f2e803')}
#pop
#移除最后一個元素(-1為移除第一個)
my_set.update({'name':"lisi"}, {'$pop':{'li':1}})
for i in my_set.find({'name':"lisi"}):
print(i)
#輸出:{'_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'age': 18, 'name': 'lisi', 'li': [1, 2, 3, 4, 4]}
#pull (按值移除)
#移除3
my_set.update({'name':"lisi"}, {'$pop':{'li':3}})
#pullAll (移除全部符合條件的)
my_set.update({'name':"lisi"}, {'$pullAll':{'li':[1,2,3]}})
for i in my_set.find({'name':"lisi"}):
print(i)
#輸出:{'name': 'lisi', '_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'li': [4, 4], 'age': 18}
my_set.find_one({"contact.1.iphone":"222"})
//多級對象
my_set.find({"contact.iphone":"11223344"})
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pymongo import MongoClient
settings = {
"ip":'192.168.0.113', #ip
"port":27017, #端口
"db_name" : "mydb", #數(shù)據(jù)庫名字
"set_name" : "test_set" #集合名字
}
class MyMongoDB(object):
def __init__(self):
try:
self.conn = MongoClient(settings["ip"], settings["port"])
except Exception as e:
print(e)
self.db = self.conn[settings["db_name"]]
self.my_set = self.db[settings["set_name"]]
def insert(self,dic):
print("inser...")
self.my_set.insert(dic)
def update(self,dic,newdic):
print("update...")
self.my_set.update(dic,newdic)
def delete(self,dic):
print("delete...")
self.my_set.remove(dic)
def dbfind(self,dic):
print("find...")
data = self.my_set.find(dic)
for result in data:
print(result["name"],result["age"])
def main():
dic={"name":"zhangsan","age":18}
mongo = MyMongoDB()
mongo.insert(dic)
mongo.dbfind({"name":"zhangsan"})
mongo.update({"name":"zhangsan"},{"$set":{"age":"25"}})
mongo.dbfind({"name":"zhangsan"})
mongo.delete({"name":"zhangsan"})
mongo.dbfind({"name":"zhangsan"})
if __name__ == "__main__":
main()
mysql
pip3 install PyMySQL
import pymysql
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
cursor = db.cursor()
### 創(chuàng)建表
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
cursor.execute(sql)
db.close()
### 插入數(shù)據(jù)
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 執(zhí)行sql語句
cursor.execute(sql)
# 提交到數(shù)據(jù)庫執(zhí)行
db.commit()
except:
# 如果發(fā)生錯誤則回滾
db.rollback()
# 關閉數(shù)據(jù)庫連接
db.close()
### 查詢,cursor:
fetchone(): 該方法獲取下一個查詢結果集蒲拉。結果集是一個對象
fetchall(): 接收全部的返回結果行.
rowcount: 這是一個只讀屬性,并返回執(zhí)行execute()方法后影響的行數(shù)忽冻。
redis
pip3 install redis
import redis
pool= redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)
r=redis.Redis(connection_pool=pool)
r.set('apple','a')
print(r.get('apple'))
--
redis中set() ==>r.set()
redis中setnx() ==>r.set()
redis中setex() ==>r.setex()
redis中setbit() ==>r.setbit()
redis中mset() == > r.mset()
redis中hset() ==>r.hset()
redis中sadd() == >r.sadd()
r.hset('info','name','lilei')
r.hset('info','age','18')
print(r.hgetall('info'))
r.sadd('course','math','english','chinese')
-- 管道代替事務
pipe=r.pipeline()
print(r.get('a'))
try:
# pipe.watch('a')
pipe.multi()
pipe.set('here', 'there')
pipe.set('here1', 'there1')
pipe.set('here2', 'there2')
time.sleep(5)
pipe.execute()
except redis.exceptions.WatchError as e:
print("Error")