# -.- coding:utf-8 -.-
import threading
from . import torndb
from queue import Queue
from functools import partial
class PoolException(Exception):
pass
class Pool(object):
def __init__(self, host, database, user=None, password=None,
max_idle_time=7 * 3600, connect_timeout=30,
time_zone="+8:00", charset="utf8mb4",
sql_mode="TRADITIONAL", pool_size=100, **kwargs):
"""
:param pool_size: 線程池大小, 默認是100, 這個參數(shù)應(yīng)該多大比較合適, 取決
于mysql的max_connections參數(shù)的設(shè)定以及操作系統(tǒng)的限制, 例如linux的
open files, open process等因素, 更多討論 請參考:
https://stackoverflow.com/questions/39976756/the-max-connections-in-mysql-5-7
:param kwargs: 更多參數(shù)請參考這里 https://github.com/PyMySQL/PyMySQL/blob/master/pymysql/connections.py#L116
"""
self.cond = threading.Condition(threading.RLock())
self._idles = Queue()
self._busies = []
self.pool_size = pool_size
self.db_kwargs = dict(host=host, database=database, user=user,
password=password, max_idle_time=max_idle_time,
connect_timeout=connect_timeout,
time_zone=time_zone, charset=charset,
sql_mode=sql_mode, pool=self, **kwargs)
def connect(self):
connect_factory = partial(Connection, **self.db_kwargs)
return self._connect(connect_factory)
def _connect(self, connect_factory=None, timeout=10):
with self.cond:
if self.idles():
conn = connect_factory()
else:
if self._idles.qsize():
conn = self._idles.get()
else:
self.cond.wait(timeout)
if self._idles.qsize():
conn = self._idles.get()
else:
raise PoolException("Acquire connection time out.")
self._busies.append(conn)
return conn
def release(self, conn):
with self.cond:
if conn not in self._busies:
raise PoolException("Release unknown connection.")
index = self._busies.index(conn)
self._busies.pop(index)
self._idles.put(conn)
self.cond.notify()
def idles(self):
with self.cond:
return self.pool_size - len(self._busies)
def __str__(self):
return "<{} pool_size={} idles={} busies={} >".format(
self.__class__.__name__, self.pool_size,
self.idles(), len(self._busies))
class Connection(torndb.Connection):
def __init__(self, pool, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
self.pool = pool
def release(self):
self.pool.release(self)
?
?
測試
# -.- coding:utf-8 -.-
from torndbpool.pool import Pool
import time
import threading
pool = Pool(host="localhost:3306", database="database",
user="user", password="password", pool_size=3)
def test_release_connect(pol):
conn = pol.connect()
time.sleep(3)
conn.release()
threading.Thread(target=test_release_connect, args=(pool, )).start()
threading.Thread(target=lambda x: x.connect(), args=(pool, )).start()
threading.Thread(target=lambda x: x.connect(), args=(pool, )).start()
threading.Thread(target=test_release_connect, args=(pool, )).start()
threading.Thread(target=lambda x: x.connect(), args=(pool, )).start()