多線程場景pymysql線程池使用的一點總結(jié)

最近在做的項目中有用到pymysql線程池的使用眉撵,并且是在多線程場景下使用复濒,使用過程中實際跑多線程遇到了一些錯誤惭聂,所以借此機會簡單分析了一下针姿。

先看一下下面的代碼多線程場景下跑是否會有問題袱吆?

dao/dbcon.py

# -*-coding:utf-8-*-
import pymysql
from DBUtils.PooledDB import PooledDB

from conf.config import ConfParser
from logger.logger import *

conf = ConfParser()


class DB(object):
    """docstring for DbConnection"""
    __pool = None

    def __init__(self):
        self.pool = DB.__get_conn()
        self.conn = None
        self.cursor = None

    @staticmethod
    def __get_conn():
        if DB.__pool is None:
            try:
                DB.__pool = PooledDB(creator=pymysql, host=conf.get("DB", "host"), port=int(conf.get("DB", "port")), user=conf.get("DB", "user"), passwd=conf.get("DB", "passwd"), db=conf.get("DB", "db"), charset=conf.get("DB", "charset"))
            except Exception as e:
                logging.error("%s : %s" % (Exception, e))
        return DB.__pool

    def connect(self, cursor=pymysql.cursors.DictCursor):
        self.conn = self.pool.connection()
        self.cursor = self.conn.cursor(cursor=cursor)
        return self.cursor

    def close(self):
        self.cursor.close()
        self.conn.close()

    def query_sql(self, sql, params=None):
        self.connect()
        self.cursor.execute(sql, params)
        result = self.cursor.fetchall()
        self.close()
        return result

    def execute_sql(self, sql, params=None):
        self.connect()
        try:
            self.cursor.execute(sql, params)
            result = self.cursor.lastrowid
            self.conn.commit()
            self.close()
        except Exception as e:
            self.conn.rollback()
            self.close()
            logging.error(str(e))
            raise Exception("database commit error")
        return result

    def update_sql(self, sql, params=None):
        self.connect()
        try:
            result = self.cursor.execute(sql, params)
            self.conn.commit()
            self.close()
        except Exception as e:
            self.conn.rollback()
            self.close()
            logging.error(str(e))
            raise Exception("database commit error")
        return result

dao/querydb.py

# -*-coding:utf-8-*-

from dao.dbcon import DB
from conf.config import ConfParser
db = DB()
conf = ConfParser()
local_ip = conf.get("EMU", "ServerLocalIP")


class QueryDB(object):

    def __init__(self):
        self.local_ip = local_ip

    def getBaseNodeDiskImgFile(self, node_id):
        sql = "SELECT diskimg_file From `tb_node` WHERE node_id = %s;"
        res = db.query_sql(sql, (node_id,))
        return res[0]['diskimg_file']

    def GetNodeBName(self, nodeid):
        sql = "SELECT node_b_name from ``tb_node``  where node_id=%s"
        res = db.query_sql(sql, (nodeid,))
        return res

    def get_iso_path(self, iso_id):
        sql = "SELECT * FROM `tb_vm_iso` WHERE id = %s;"
        result = db.query_sql(sql, (iso_id,))[0]
        return result["file_path"]

    def get_case(self, case_id):
        sql = "SELECT * FROM `tb_case` WHERE case_id = %s;"
        result = db.query_sql(sql, (int(case_id),))
        return result

    def get_template(self, template_id):
        sql = "SELECT * FROM `tb_vm_template` WHERE id = %s;"
        result = db.query_sql(sql, (template_id,))
        return result

storage/storage.py

# -*- coding: utf-8 -*-

from dao.querydb import QueryDB

db = QueryDB()


class CentralizedStorage(object):

    def __init__(self):
        pass

    def test1(self, thread_name):
        res = db.getBaseNodeDiskImgFile(121212)
        print(thread_name, "getBaseNodeDiskImgFile", res)

    def test2(self, thread_name):
        db2 = QueryDB()
        res = db2.GetNodeBName(121212)
        print(thread_name, "GetNodeBName", res)

    def test3(self, thread_name):
        db3 = QueryDB()
        res = db3.get_iso_path(114)
        print(thread_name, "get_iso_path", res)

    def test4(self, thread_name):
        db4 = QueryDB()
        res = db4.get_case(20160802145038043410)
        print(thread_name, "get_case", res)

main.py

# -*- coding: utf-8 -*-

import threading
from storage.storage import CentralizedStorage
from dao.querydb import QueryDB


def db_test(thread_name):
    storage = CentralizedStorage()
    storage.test1(thread_name)
    storage.test2(thread_name)
    storage.test3(thread_name)
    storage.test4(thread_name)
    db_object = QueryDB()
    res = db_object.get_template(1615)
    print(thread_name, "get_template:", res)


if __name__ == "__main__":
    for i in range(10):
        thread = threading.Thread(target=db_test, args=(("Thread" + str(i) + ":"),))
        thread.setDaemon(False)
        thread.start()

上面代碼單線程執(zhí)行時一點問題也沒有,但是多線程執(zhí)行就會報如下錯誤:

我們可以分析一下上面的代碼距淫,注意看dao/querydb.py這個文件第5行db = DB()绞绒,這個文件中所有的查詢語句都是共用這一個DB實例的,再看dao/dbcon.py這個文件榕暇,DB這個類創(chuàng)建了一個連接池蓬衡,供所有sql執(zhí)行來用喻杈,還封裝了幾個底層真正執(zhí)行sql語句的函數(shù),重點在于這幾個sql執(zhí)行語句狰晚。

我們來看一下筒饰,它們基本流程都是從線程池取出一個connect賦給該實例成員變量self.conn,然后獲得游標(biāo)賦給該實例成員變量self.cursor家肯,然后通過游標(biāo)執(zhí)行sql語句龄砰,最后close掉。問題就在于這里讨衣,前面我們看到過换棚,DB只進行了一次實例化,出來的實例大家共用反镇,如果多線程場景下固蚤,線程1獲得了conn和cursor并賦值給DB實例的成員變量后還沒完成后續(xù)操作,線程2就進入了歹茶,重新獲取了conn和cursor并重新給DB實例的成員變量賦值夕玩,因為共用一個DB實例,這樣一定會覆蓋掉線程1時候賦的值惊豺,線程1再繼續(xù)執(zhí)行就可能會出錯燎孟。

分析出了出錯原因,那么如何修改呢尸昧?通過分析得出本質(zhì)上就是讓conn和cursor不共用揩页,那就可以對應(yīng)兩種思路,一是DB實例不共用烹俗,或者是DB實例共用但conn和cursor不共用爆侣。

DB實例不共用

這種方式其他文件不用動,就是在dao/querydb.py文件中每一個函數(shù)下都進行一次DB的實例化幢妄,公共的DB實例可以刪掉兔仰,修改后的代碼如下:

# -*-coding:utf-8-*-

from dao.dbcon_bac import DB
from conf.config import ConfParser
conf = ConfParser()
local_ip = conf.get("EMU", "ServerLocalIP")


class QueryDB(object):

    def __init__(self):
        self.local_ip = local_ip

    def getBaseNodeDiskImgFile(self, node_id):
        db = DB()
        sql = "SELECT diskimg_file From `tb_node` WHERE node_id = %s;"
        res = db.query_sql(sql, (node_id,))
        return res[0]['diskimg_file']

    def GetNodeBName(self, nodeid):
        db = DB()
        sql = "SELECT node_b_name from `tb_node ` where node_id=%s"
        res = db.query_sql(sql, (nodeid,))
        return res

    def get_iso_path(self, iso_id):
        db = DB()
        sql = "SELECT * FROM `tb_vm_iso` WHERE id = %s;"
        result = db.query_sql(sql, (iso_id,))[0]
        return result["file_path"]

    def get_case(self, case_id):
        db = DB()
        sql = "SELECT * FROM `tb_case` WHERE case_id = %s;"
        result = db.query_sql(sql, (int(case_id),))
        return result

    def get_template(self, template_id):
        db = DB()
        sql = "SELECT * FROM `tb_vm_template` WHERE id = %s;"
        result = db.query_sql(sql, (template_id,))
        return result

因為連接池我們需要共用的,也就是只能有一個連接池蕉鸳,那這種方式呢我們看一下連接池會不會也變得不共用乎赴,每次實例都會創(chuàng)建一個新的連接池呢?
我們注意看dbcon.py里的實現(xiàn)潮尝,__pool是一個類的成員變量而非實例成員變量无虚,在第一次實例化時就會生成一個連接池賦值給這個類成員變量,以后的實例化會先判斷這個類成員變量DB.__pool是否為空衍锚,如果不為空就不再重新生成連接池,這樣就保證了每一個實例實際上都是共用了類的連接池嗤堰,而且不會重復(fù)生成戴质。如果不先進行判空度宦,則會有問題,如果判了空告匠,就是ok的戈抄。

DB實例共用但conn和cursor不共用

那這一種方式呢,其他文件不動后专,修改dbcon.py函數(shù)划鸽,每一個sql實際執(zhí)行函數(shù)里都重新從連接池里取得一個conn并獲取cursor,這兩個變量不再賦值給實例成員變量戚哎,而是作為局部變量使用裸诽,這樣不同線程里調(diào)用sql執(zhí)行函數(shù)都會用自己的conn而不會共用,這樣就跟DB被實例化幾次沒什么關(guān)系了型凳,代碼如下:

# -*-coding:utf-8-*-
import pymysql
from DBUtils.PooledDB import PooledDB

from conf.config import ConfParser
from logger.logger import *

conf = ConfParser()


class DB(object):
    """docstring for DbConnection"""
    __pool = None

    def __init__(self):
        self.pool = DB.__get_conn_pool()

    @staticmethod
    def __get_conn_pool():
        if DB.__pool is None:
            try:
                DB.__pool = PooledDB(creator=pymysql, host=conf.get("DB", "host"), port=int(conf.get("DB", "port")),
                                     user=conf.get("DB", "user"), passwd=conf.get("DB", "passwd"),
                                     db=conf.get("DB", "db"), charset=conf.get("DB", "charset"))
            except Exception as e:
                logging.error("%s : %s" % (Exception, e))
        return DB.__pool

    def _get_connection(self):
        conn = self.pool.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    def _close_connection(self, conn, cursor):
        if cursor:
            cursor.close()
        if conn:
            conn.close()

    def query_sql(self, sql, params=None):
        conn, cursor = self._get_connection()
        try:
            cursor.execute(sql, params)
            result = cursor.fetchall()
            self._close_connection(conn, cursor)
        except Exception as e:
            self._close_connection(conn, cursor)
            logging.error(str(e))
            raise Exception("database execute error")
        return result

    def execute_sql(self, sql, params=None):
        conn, cursor = self._get_connection()
        try:
            cursor.execute(sql, params)
            result = cursor.lastrowid
            conn.commit()
            self._close_connection(conn, cursor)
        except Exception as e:
            conn.rollback()
            self._close_connection(conn, cursor)
            logging.error(str(e))
            raise Exception("database commit error")
        return result

    def update_sql(self, sql, params=None):
        conn, cursor = self._get_connection()
        try:
            result = cursor.execute(sql, params)
            conn.commit()
            self._close_connection(conn, cursor)
        except Exception as e:
            conn.rollback()
            self._close_connection(conn, cursor)
            logging.error(str(e))
            raise Exception("database commit error")
        return result

原文鏈接:https://blog.csdn.net/mrbuffoon/article/details/102720877

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末丈冬,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子甘畅,更是在濱河造成了極大的恐慌埂蕊,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疏唾,死亡現(xiàn)場離奇詭異蓄氧,居然都是意外死亡,警方通過查閱死者的電腦和手機槐脏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門喉童,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人准给,你說我怎么就攤上這事泄朴。” “怎么了露氮?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵祖灰,是天一觀的道長。 經(jīng)常有香客問我畔规,道長局扶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任叁扫,我火速辦了婚禮三妈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘莫绣。我一直安慰自己畴蒲,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布对室。 她就那樣靜靜地躺著模燥,像睡著了一般咖祭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蔫骂,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天么翰,我揣著相機與錄音,去河邊找鬼辽旋。 笑死浩嫌,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的补胚。 我是一名探鬼主播码耐,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼糖儡!你這毒婦竟也來了伐坏?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤握联,失蹤者是張志新(化名)和其女友劉穎桦沉,沒想到半個月后措左,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體廉白,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年辰斋,在試婚紗的時候發(fā)現(xiàn)自己被綠了代芜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片埠褪。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖挤庇,靈堂內(nèi)的尸體忽然破棺而出钞速,到底是詐尸還是另有隱情,我是刑警寧澤嫡秕,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布渴语,位于F島的核電站,受9級特大地震影響昆咽,放射性物質(zhì)發(fā)生泄漏驾凶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一掷酗、第九天 我趴在偏房一處隱蔽的房頂上張望调违。 院中可真熱鬧,春花似錦泻轰、人聲如沸技肩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽亩鬼。三九已至殖告,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間雳锋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工羡洁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留玷过,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓筑煮,卻偏偏與公主長得像辛蚊,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子真仲,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容