最近在做的項目中有用到pymysql線程池的使用眉撵,并且是在多線程場景下使用复濒,使用過程中實際跑多線程遇到了一些錯誤惭聂,所以借此機會簡單分析了一下针姿。
先看一下下面的代碼多線程場景下跑是否會有問題袱吆?
dao/dbcon.py
# -*-coding:utf-8-*-
import pymysql
from DBUtils.PooledDB import PooledDB
from conf.config import ConfParser
from logger.logger import *
conf = ConfParser()
class DB(object):
"""docstring for DbConnection"""
__pool = None
def __init__(self):
self.pool = DB.__get_conn()
self.conn = None
self.cursor = None
@staticmethod
def __get_conn():
if DB.__pool is None:
try:
DB.__pool = PooledDB(creator=pymysql, host=conf.get("DB", "host"), port=int(conf.get("DB", "port")), user=conf.get("DB", "user"), passwd=conf.get("DB", "passwd"), db=conf.get("DB", "db"), charset=conf.get("DB", "charset"))
except Exception as e:
logging.error("%s : %s" % (Exception, e))
return DB.__pool
def connect(self, cursor=pymysql.cursors.DictCursor):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(cursor=cursor)
return self.cursor
def close(self):
self.cursor.close()
self.conn.close()
def query_sql(self, sql, params=None):
self.connect()
self.cursor.execute(sql, params)
result = self.cursor.fetchall()
self.close()
return result
def execute_sql(self, sql, params=None):
self.connect()
try:
self.cursor.execute(sql, params)
result = self.cursor.lastrowid
self.conn.commit()
self.close()
except Exception as e:
self.conn.rollback()
self.close()
logging.error(str(e))
raise Exception("database commit error")
return result
def update_sql(self, sql, params=None):
self.connect()
try:
result = self.cursor.execute(sql, params)
self.conn.commit()
self.close()
except Exception as e:
self.conn.rollback()
self.close()
logging.error(str(e))
raise Exception("database commit error")
return result
dao/querydb.py
# -*-coding:utf-8-*-
from dao.dbcon import DB
from conf.config import ConfParser
db = DB()
conf = ConfParser()
local_ip = conf.get("EMU", "ServerLocalIP")
class QueryDB(object):
def __init__(self):
self.local_ip = local_ip
def getBaseNodeDiskImgFile(self, node_id):
sql = "SELECT diskimg_file From `tb_node` WHERE node_id = %s;"
res = db.query_sql(sql, (node_id,))
return res[0]['diskimg_file']
def GetNodeBName(self, nodeid):
sql = "SELECT node_b_name from ``tb_node`` where node_id=%s"
res = db.query_sql(sql, (nodeid,))
return res
def get_iso_path(self, iso_id):
sql = "SELECT * FROM `tb_vm_iso` WHERE id = %s;"
result = db.query_sql(sql, (iso_id,))[0]
return result["file_path"]
def get_case(self, case_id):
sql = "SELECT * FROM `tb_case` WHERE case_id = %s;"
result = db.query_sql(sql, (int(case_id),))
return result
def get_template(self, template_id):
sql = "SELECT * FROM `tb_vm_template` WHERE id = %s;"
result = db.query_sql(sql, (template_id,))
return result
storage/storage.py
# -*- coding: utf-8 -*-
from dao.querydb import QueryDB
db = QueryDB()
class CentralizedStorage(object):
def __init__(self):
pass
def test1(self, thread_name):
res = db.getBaseNodeDiskImgFile(121212)
print(thread_name, "getBaseNodeDiskImgFile", res)
def test2(self, thread_name):
db2 = QueryDB()
res = db2.GetNodeBName(121212)
print(thread_name, "GetNodeBName", res)
def test3(self, thread_name):
db3 = QueryDB()
res = db3.get_iso_path(114)
print(thread_name, "get_iso_path", res)
def test4(self, thread_name):
db4 = QueryDB()
res = db4.get_case(20160802145038043410)
print(thread_name, "get_case", res)
main.py
# -*- coding: utf-8 -*-
import threading
from storage.storage import CentralizedStorage
from dao.querydb import QueryDB
def db_test(thread_name):
storage = CentralizedStorage()
storage.test1(thread_name)
storage.test2(thread_name)
storage.test3(thread_name)
storage.test4(thread_name)
db_object = QueryDB()
res = db_object.get_template(1615)
print(thread_name, "get_template:", res)
if __name__ == "__main__":
for i in range(10):
thread = threading.Thread(target=db_test, args=(("Thread" + str(i) + ":"),))
thread.setDaemon(False)
thread.start()
上面代碼單線程執(zhí)行時一點問題也沒有,但是多線程執(zhí)行就會報如下錯誤:
我們可以分析一下上面的代碼距淫,注意看dao/querydb.py這個文件第5行db = DB()绞绒,這個文件中所有的查詢語句都是共用這一個DB實例的,再看dao/dbcon.py這個文件榕暇,DB這個類創(chuàng)建了一個連接池蓬衡,供所有sql執(zhí)行來用喻杈,還封裝了幾個底層真正執(zhí)行sql語句的函數(shù),重點在于這幾個sql執(zhí)行語句狰晚。
我們來看一下筒饰,它們基本流程都是從線程池取出一個connect賦給該實例成員變量self.conn,然后獲得游標(biāo)賦給該實例成員變量self.cursor家肯,然后通過游標(biāo)執(zhí)行sql語句龄砰,最后close掉。問題就在于這里讨衣,前面我們看到過换棚,DB只進行了一次實例化,出來的實例大家共用反镇,如果多線程場景下固蚤,線程1獲得了conn和cursor并賦值給DB實例的成員變量后還沒完成后續(xù)操作,線程2就進入了歹茶,重新獲取了conn和cursor并重新給DB實例的成員變量賦值夕玩,因為共用一個DB實例,這樣一定會覆蓋掉線程1時候賦的值惊豺,線程1再繼續(xù)執(zhí)行就可能會出錯燎孟。
分析出了出錯原因,那么如何修改呢尸昧?通過分析得出本質(zhì)上就是讓conn和cursor不共用揩页,那就可以對應(yīng)兩種思路,一是DB實例不共用烹俗,或者是DB實例共用但conn和cursor不共用爆侣。
DB實例不共用
這種方式其他文件不用動,就是在dao/querydb.py文件中每一個函數(shù)下都進行一次DB的實例化幢妄,公共的DB實例可以刪掉兔仰,修改后的代碼如下:
# -*-coding:utf-8-*-
from dao.dbcon_bac import DB
from conf.config import ConfParser
conf = ConfParser()
local_ip = conf.get("EMU", "ServerLocalIP")
class QueryDB(object):
def __init__(self):
self.local_ip = local_ip
def getBaseNodeDiskImgFile(self, node_id):
db = DB()
sql = "SELECT diskimg_file From `tb_node` WHERE node_id = %s;"
res = db.query_sql(sql, (node_id,))
return res[0]['diskimg_file']
def GetNodeBName(self, nodeid):
db = DB()
sql = "SELECT node_b_name from `tb_node ` where node_id=%s"
res = db.query_sql(sql, (nodeid,))
return res
def get_iso_path(self, iso_id):
db = DB()
sql = "SELECT * FROM `tb_vm_iso` WHERE id = %s;"
result = db.query_sql(sql, (iso_id,))[0]
return result["file_path"]
def get_case(self, case_id):
db = DB()
sql = "SELECT * FROM `tb_case` WHERE case_id = %s;"
result = db.query_sql(sql, (int(case_id),))
return result
def get_template(self, template_id):
db = DB()
sql = "SELECT * FROM `tb_vm_template` WHERE id = %s;"
result = db.query_sql(sql, (template_id,))
return result
因為連接池我們需要共用的,也就是只能有一個連接池蕉鸳,那這種方式呢我們看一下連接池會不會也變得不共用乎赴,每次實例都會創(chuàng)建一個新的連接池呢?
我們注意看dbcon.py里的實現(xiàn)潮尝,__pool是一個類的成員變量而非實例成員變量无虚,在第一次實例化時就會生成一個連接池賦值給這個類成員變量,以后的實例化會先判斷這個類成員變量DB.__pool是否為空衍锚,如果不為空就不再重新生成連接池,這樣就保證了每一個實例實際上都是共用了類的連接池嗤堰,而且不會重復(fù)生成戴质。如果不先進行判空度宦,則會有問題,如果判了空告匠,就是ok的戈抄。
DB實例共用但conn和cursor不共用
那這一種方式呢,其他文件不動后专,修改dbcon.py函數(shù)划鸽,每一個sql實際執(zhí)行函數(shù)里都重新從連接池里取得一個conn并獲取cursor,這兩個變量不再賦值給實例成員變量戚哎,而是作為局部變量使用裸诽,這樣不同線程里調(diào)用sql執(zhí)行函數(shù)都會用自己的conn而不會共用,這樣就跟DB被實例化幾次沒什么關(guān)系了型凳,代碼如下:
# -*-coding:utf-8-*-
import pymysql
from DBUtils.PooledDB import PooledDB
from conf.config import ConfParser
from logger.logger import *
conf = ConfParser()
class DB(object):
"""docstring for DbConnection"""
__pool = None
def __init__(self):
self.pool = DB.__get_conn_pool()
@staticmethod
def __get_conn_pool():
if DB.__pool is None:
try:
DB.__pool = PooledDB(creator=pymysql, host=conf.get("DB", "host"), port=int(conf.get("DB", "port")),
user=conf.get("DB", "user"), passwd=conf.get("DB", "passwd"),
db=conf.get("DB", "db"), charset=conf.get("DB", "charset"))
except Exception as e:
logging.error("%s : %s" % (Exception, e))
return DB.__pool
def _get_connection(self):
conn = self.pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
def _close_connection(self, conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
def query_sql(self, sql, params=None):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.fetchall()
self._close_connection(conn, cursor)
except Exception as e:
self._close_connection(conn, cursor)
logging.error(str(e))
raise Exception("database execute error")
return result
def execute_sql(self, sql, params=None):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.lastrowid
conn.commit()
self._close_connection(conn, cursor)
except Exception as e:
conn.rollback()
self._close_connection(conn, cursor)
logging.error(str(e))
raise Exception("database commit error")
return result
def update_sql(self, sql, params=None):
conn, cursor = self._get_connection()
try:
result = cursor.execute(sql, params)
conn.commit()
self._close_connection(conn, cursor)
except Exception as e:
conn.rollback()
self._close_connection(conn, cursor)
logging.error(str(e))
raise Exception("database commit error")
return result
原文鏈接:https://blog.csdn.net/mrbuffoon/article/details/102720877