Python操作MySQL
一. python操作數(shù)據(jù)庫介紹
Python 標(biāo)準(zhǔn)數(shù)據(jù)庫接口為 Python DB-API夸楣,Python DB-API為開發(fā)人員提供了數(shù)據(jù)庫應(yīng)用編程接口宾抓。
Python 數(shù)據(jù)庫接口支持非常多的數(shù)據(jù)庫,你可以選擇適合你項(xiàng)目的數(shù)據(jù)庫:
- GadFly
- mSQL
- MySQL
- PostgreSQL
- Microsoft SQL Server 2000
- Informix
- Interbase
- Oracle
- Sybase
- ...
你可以訪問Python數(shù)據(jù)庫接口及API查看詳細(xì)的支持?jǐn)?shù)據(jù)庫列表豫喧。
不同的數(shù)據(jù)庫你需要下載不同的DB API模塊石洗,例如你需要訪問Oracle數(shù)據(jù)庫和Mysql數(shù)據(jù),你需要下載Oracle和MySQL數(shù)據(jù)庫模塊紧显。
DB-API 是一個(gè)規(guī)范. 它定義了一系列必須的對象和數(shù)據(jù)庫存取方式, 以便為各種各樣的底層數(shù)據(jù)庫系統(tǒng)和多種多樣的數(shù)據(jù)庫接口程序提供一致的訪問接口 讲衫。
Python的DB-API,為大多數(shù)的數(shù)據(jù)庫實(shí)現(xiàn)了接口孵班,使用它連接各數(shù)據(jù)庫后涉兽,就可以用相同的方式操作各數(shù)據(jù)庫。
Python DB-API使用流程:
引入 API 模塊篙程。
獲取與數(shù)據(jù)庫的連接枷畏。
執(zhí)行SQL語句和存儲過程。
關(guān)閉數(shù)據(jù)庫連接虱饿。
二. python操作MySQL模塊
Python操作MySQL主要使用兩種方式:
- DB模塊(原生SQL)
- PyMySQL(支持python2.x/3.x)
- MySQLdb(目前僅支持python2.x)
- ORM框架
- SQLAchemy
2.1 PyMySQL模塊
本文主要介紹PyMySQL模塊,MySQLdb使用方式類似
2.1.1 安裝PyMySQL
PyMySQL是一個(gè)Python編寫的MySQL驅(qū)動程序拥诡,讓我們可以用Python語言操作MySQL數(shù)據(jù)庫。
pip install PyMySQL
2.2 基本使用
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
# 創(chuàng)建連接
conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4')
# 創(chuàng)建游標(biāo)(查詢數(shù)據(jù)返回為元組格式)
# cursor = conn.cursor()
# 創(chuàng)建游標(biāo)(查詢數(shù)據(jù)返回為字典格式)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 1. 執(zhí)行SQL,返回受影響的行數(shù)
effect_row1 = cursor.execute("select * from USER")
# 2. 執(zhí)行SQL,返回受影響的行數(shù),一次插入多行數(shù)據(jù)
effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [("jack"), ("boom"), ("lucy")]) # 3
# 查詢所有數(shù)據(jù),返回?cái)?shù)據(jù)為元組格式
result = cursor.fetchall()
# 增/刪/改均需要進(jìn)行commit提交,進(jìn)行保存
conn.commit()
# 關(guān)閉游標(biāo)
cursor.close()
# 關(guān)閉連接
conn.close()
print(result)
"""
[{'id': 6, 'name': 'boom'}, {'id': 5, 'name': 'jack'}, {'id': 7, 'name': 'lucy'}, {'id': 4, 'name': 'tome'}, {'id': 3, 'name': 'zff'}, {'id': 1, 'name': 'zhaofengfeng'}, {'id': 2, 'name': 'zhaofengfeng02'}]
"""
2.3 獲取最新創(chuàng)建的數(shù)據(jù)自增ID
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
# 創(chuàng)建連接
conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4')
# 創(chuàng)建游標(biāo)(查詢數(shù)據(jù)返回為元組格式)
cursor = conn.cursor()
# 獲取新創(chuàng)建數(shù)據(jù)自增ID
effect_row = cursor.executemany("insert into USER (NAME)values(%s)", [("eric")])
# 增刪改均需要進(jìn)行commit提交
conn.commit()
# 關(guān)閉游標(biāo)
cursor.close()
# 關(guān)閉連接
conn.close()
new_id = cursor.lastrowid
print(new_id)
"""
8
"""
2.4 查詢操作
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
# 創(chuàng)建連接
conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4')
# 創(chuàng)建游標(biāo)
cursor = conn.cursor()
cursor.execute("select * from USER")
# 獲取第一行數(shù)據(jù)
row_1 = cursor.fetchone()
# 獲取前n行數(shù)據(jù)
row_2 = cursor.fetchmany(3)
#
# # 獲取所有數(shù)據(jù)
row_3 = cursor.fetchall()
# 關(guān)閉游標(biāo)
cursor.close()
# 關(guān)閉連接
conn.close()
print(row_1)
print(row_2)
print(row_3)
?? 在fetch數(shù)據(jù)時(shí)按照順序進(jìn)行氮发,可以使用cursor.scroll(num,mode)來移動游標(biāo)位置渴肉,如:
- cursor.scroll(1,mode='relative') # 相對當(dāng)前位置移動
- cursor.scroll(2,mode='absolute') # 相對絕對位置移動
2.5 防止SQL注入
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
# 創(chuàng)建連接
conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4')
# 創(chuàng)建游標(biāo)
cursor = conn.cursor()
# 存在sql注入情況(不要用格式化字符串的方式拼接SQL)
sql = "insert into USER (NAME) values('%s')" % ('zhangsan',)
effect_row = cursor.execute(sql)
# 正確方式一
# execute函數(shù)接受一個(gè)元組/列表作為SQL參數(shù),元素個(gè)數(shù)只能有1個(gè)
sql = "insert into USER (NAME) values(%s)"
effect_row1 = cursor.execute(sql, ['wang6'])
effect_row2 = cursor.execute(sql, ('wang7',))
# 正確方式二
sql = "insert into USER (NAME) values(%(name)s)"
effect_row1 = cursor.execute(sql, {'name': 'wudalang'})
# 寫入插入多行數(shù)據(jù)
effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [('ermazi'), ('dianxiaoer')])
# 提交
conn.commit()
# 關(guān)閉游標(biāo)
cursor.close()
# 關(guān)閉連接
conn.close()
這樣,SQL操作就更安全了爽冕。如果需要更詳細(xì)的文檔參考PyMySQL文檔吧仇祭。不過好像這些SQL數(shù)據(jù)庫的實(shí)現(xiàn)還不太一樣,PyMySQL的參數(shù)占位符使用%s這樣的C格式化符颈畸,而Python自帶的sqlite3模塊的占位符好像是問號(?)前塔。因此在使用其他數(shù)據(jù)庫的時(shí)候還是仔細(xì)閱讀文檔吧。
Welcome to PyMySQL’s documentation
三. 數(shù)據(jù)庫連接池
上文中的方式存在一個(gè)問題,單線程情況下可以滿足,程序需要頻繁的創(chuàng)建釋放連接來完成對數(shù)據(jù)庫的操作,那么,我們的程序/腳本在多線程情況下會引發(fā)什么問題呢?此時(shí),我們就需要使用數(shù)據(jù)庫連接池來解決這個(gè)問題!
3.1 DBUtils模塊
DBUtils是Python的一個(gè)用于實(shí)現(xiàn)數(shù)據(jù)庫連接池的模塊承冰。
此連接池有兩種連接模式:
- 為每個(gè)線程創(chuàng)建一個(gè)連接扣墩,線程即使調(diào)用了close方法劲厌,也不會關(guān)閉笋除,只是把連接重新放到連接池袜爪,供自己線程再次使用。當(dāng)線程終止時(shí),連接才會自動關(guān)閉
- 創(chuàng)建一批連接到連接池迁霎,供所有線程共享使用(推薦使用)
3.2 模式一
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用鏈接數(shù)據(jù)庫的模塊
maxusage=None, # 一個(gè)鏈接最多被重復(fù)使用的次數(shù)吱抚,None表示無限制
setsession=[], # 開始會話前執(zhí)行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務(wù)端考廉,檢查是否服務(wù)可用秘豹。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果為False時(shí), conn.close() 實(shí)際上被忽略昌粤,供下次使用既绕,再線程關(guān)閉時(shí),才會自動關(guān)閉鏈接涮坐。如果為True時(shí)凄贩, conn.close()則關(guān)閉鏈接,那么再次調(diào)用pool.connection時(shí)就會報(bào)錯袱讹,因?yàn)橐呀?jīng)真的關(guān)閉了連接(pool.steady_connection()可以獲取一個(gè)新的鏈接)
threadlocal=None, # 本線程獨(dú)享值得對象疲扎,用于保存鏈接對象,如果鏈接對象被重置
host='127.0.0.1',
port=3306,
user='zff',
password='zff123',
database='zff',
charset='utf8',
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from USER')
result = cursor.fetchall()
cursor.close()
conn.close()
return result
result = func()
print(result)
3.2 模式二
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用鏈接數(shù)據(jù)庫的模塊
maxconnections=6, # 連接池允許的最大連接數(shù)捷雕,0和None表示不限制連接數(shù)
mincached=2, # 初始化時(shí)椒丧,鏈接池中至少創(chuàng)建的空閑的鏈接,0表示不創(chuàng)建
maxcached=5, # 鏈接池中最多閑置的鏈接救巷,0和None不限制
maxshared=3,
# 鏈接池中最多共享的鏈接數(shù)量瓜挽,0和None表示全部共享。PS: 無用征绸,因?yàn)閜ymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設(shè)置為多少俄占,_maxcached永遠(yuǎn)為0管怠,所以永遠(yuǎn)是所有鏈接都共享。
blocking=True, # 連接池中如果沒有可用連接后缸榄,是否阻塞等待渤弛。True,等待甚带;False她肯,不等待然后報(bào)錯
maxusage=None, # 一個(gè)鏈接最多被重復(fù)使用的次數(shù),None表示無限制
setsession=[], # 開始會話前執(zhí)行的命令列表鹰贵。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務(wù)端晴氨,檢查是否服務(wù)可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='zff',
password='zff123',
database='zff',
charset='utf8'
)
def func():
# 檢測當(dāng)前正在運(yùn)行連接數(shù)的是否小于最大鏈接數(shù)碉输,如果不小于則:等待或報(bào)raise TooManyConnections異常
# 否則
# 則優(yōu)先去初始化時(shí)創(chuàng)建的鏈接中獲取鏈接 SteadyDBConnection籽前。
# 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中并返回。
# 如果最開始創(chuàng)建的鏈接沒有鏈接,則去創(chuàng)建一個(gè)SteadyDBConnection對象枝哄,再封裝到PooledDedicatedDBConnection中并返回肄梨。
# 一旦關(guān)閉鏈接后,連接就返回到連接池讓后續(xù)線程繼續(xù)使用挠锥。
conn = POOL.connection()
# print('連接被拿走了', conn._con)
# print('池子里目前有', POOL._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from USER')
result = cursor.fetchall()
conn.close()
return result
result = func()
print(result)
?? 由于pymysql众羡、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享,因此是線程安全的蓖租。
如果沒有連接池粱侣,使用pymysql來連接數(shù)據(jù)庫時(shí),單線程應(yīng)用完全沒有問題菜秦,但如果涉及到多線程應(yīng)用那么就需要加鎖甜害,一旦加鎖那么連接勢必就會排隊(duì)等待,當(dāng)請求比較多時(shí)球昨,性能就會降低了尔店。
3.3 加鎖
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
import threading
from threading import RLock
LOCK = RLock()
CONN = pymysql.connect(host='127.0.0.1',
port=3306,
user='zff',
password='zff123',
database='zff',
charset='utf8')
def task(arg):
with LOCK:
cursor = CONN.cursor()
cursor.execute('select * from USER ')
result = cursor.fetchall()
cursor.close()
print(result)
for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()
3.4 無鎖(報(bào)錯)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2018/5/13
import pymysql
import threading
CONN = pymysql.connect(host='127.0.0.1',
port=3306,
user='zff',
password='zff123',
database='zff',
charset='utf8')
def task(arg):
cursor = CONN.cursor()
cursor.execute('select * from USER ')
# cursor.execute('select sleep(10)')
result = cursor.fetchall()
cursor.close()
print(result)
for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()
此時(shí)可以在數(shù)據(jù)庫中查看連接情況: show status like 'Threads%';
四. 數(shù)據(jù)庫連接池結(jié)合pymsql使用
# cat sql_helper.py
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用鏈接數(shù)據(jù)庫的模塊
maxconnections=20, # 連接池允許的最大連接數(shù),0和None表示不限制連接數(shù)
mincached=2, # 初始化時(shí)主慰,鏈接池中至少創(chuàng)建的空閑的鏈接嚣州,0表示不創(chuàng)建
maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制
#maxshared=3, # 鏈接池中最多共享的鏈接數(shù)量共螺,0和None表示全部共享该肴。PS: 無用,因?yàn)閜ymysql和MySQLdb等模塊的 threadsafety都為1藐不,所有值無論設(shè)置為多少匀哄,_maxcached永遠(yuǎn)為0,所以永遠(yuǎn)是所有鏈接都共享雏蛮。
blocking=True, # 連接池中如果沒有可用連接后涎嚼,是否阻塞等待。True挑秉,等待法梯;False,不等待然后報(bào)錯
maxusage=None, # 一個(gè)鏈接最多被重復(fù)使用的次數(shù)犀概,None表示無限制
setsession=[], # 開始會話前執(zhí)行的命令列表立哑。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務(wù)端,檢查是否服務(wù)可用姻灶。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.11.38',
port=3306,
user='root',
passwd='apNXgF6RDitFtDQx',
db='m2day03db',
charset='utf8'
)
def connect():
# 創(chuàng)建連接
# conn = pymysql.connect(host='192.168.11.38', port=3306, user='root', passwd='apNXgF6RDitFtDQx', db='m2day03db')
conn = POOL.connection()
# 創(chuàng)建游標(biāo)
cursor = conn.cursor(pymysql.cursors.DictCursor)
return conn,cursor
def close(conn,cursor):
# 關(guān)閉游標(biāo)
cursor.close()
# 關(guān)閉連接
conn.close()
def fetch_one(sql,args):
conn,cursor = connect()
# 執(zhí)行SQL铛绰,并返回收影響行數(shù)
effect_row = cursor.execute(sql,args)
result = cursor.fetchone()
close(conn,cursor)
return result
def fetch_all(sql,args):
conn, cursor = connect()
# 執(zhí)行SQL,并返回收影響行數(shù)
cursor.execute(sql,args)
result = cursor.fetchall()
close(conn, cursor)
return result
def insert(sql,args):
"""
創(chuàng)建數(shù)據(jù)
:param sql: 含有占位符的SQL
:return:
"""
conn, cursor = connect()
# 執(zhí)行SQL产喉,并返回收影響行數(shù)
effect_row = cursor.execute(sql,args)
conn.commit()
close(conn, cursor)
def delete(sql,args):
"""
創(chuàng)建數(shù)據(jù)
:param sql: 含有占位符的SQL
:return:
"""
conn, cursor = connect()
# 執(zhí)行SQL至耻,并返回收影響行數(shù)
effect_row = cursor.execute(sql,args)
conn.commit()
close(conn, cursor)
return effect_row
def update(sql,args):
conn, cursor = connect()
# 執(zhí)行SQL若皱,并返回收影響行數(shù)
effect_row = cursor.execute(sql, args)
conn.commit()
close(conn, cursor)
return effect_row
PS: 可以利用靜態(tài)方法封裝到一個(gè)類中,方便使用