Python 期物用在異步編程,所謂期物指的是排定的執(zhí)行事件。Python 3.4起
-
總結(jié)
- 1、期物處理并發(fā)只涉及到三個(gè)對(duì)象些膨,一個(gè)是期物(
concurrent.futures.Future
),一個(gè)是執(zhí)行器(concurrent.futures.Executor
)钦铺,還有一個(gè)是_WorkItem
類(lèi)- 1)期物對(duì)象:本身不涉及多線程订雾,多進(jìn)程或者
yield
等語(yǔ)法,其只是一個(gè)具有運(yùn)行狀態(tài)
和運(yùn)行結(jié)果
以及可添加回調(diào)函數(shù)
的類(lèi) - 2)
_WorkItem
對(duì)象:真正被添加到任務(wù)隊(duì)列的對(duì)象矛洞。將一個(gè)需執(zhí)行的函數(shù)
洼哎,期物
實(shí)例化成一個(gè) _WorkItem 對(duì)象。通過(guò)run
方法執(zhí)行沼本,run
方法負(fù)責(zé)標(biāo)記期物的狀態(tài)噩峦,執(zhí)行函數(shù),將執(zhí)行結(jié)果賦值給期物抽兆。 - 3)執(zhí)行器:有兩個(gè)方法
map
,submit
- submit 接收一個(gè)
函數(shù)
识补,期物
,生成 _WorkItem 對(duì)象辫红,并將該對(duì)象添加到任務(wù)隊(duì)列
中凭涂。每調(diào)用 submit 方法都會(huì)調(diào)整處理隊(duì)列的線程個(gè)數(shù),如果當(dāng)前運(yùn)行線程數(shù)小于執(zhí)行器設(shè)置的最大執(zhí)行線程數(shù)贴妻,則新建一個(gè)線程去處理任務(wù)隊(duì)列切油。返回值為期物對(duì)象 - map 方法,使用
submit
迭代執(zhí)行器要執(zhí)行函數(shù)的參數(shù)列表名惩,返回一個(gè)期物列表白翻。遍歷期物列表,使用yield
去接收每個(gè)期物對(duì)象的result
屬性。
- submit 接收一個(gè)
- 1)期物對(duì)象:本身不涉及多線程订雾,多進(jìn)程或者
- 2滤馍、任務(wù)隊(duì)列的運(yùn)行:
- 1)每個(gè)線程均執(zhí)行
_worker()
方法 - 2)任務(wù)隊(duì)列
work_queue
使用queue.Queue()
存儲(chǔ)
- 1)每個(gè)線程均執(zhí)行
- 1、期物處理并發(fā)只涉及到三個(gè)對(duì)象些膨,一個(gè)是期物(
循環(huán)執(zhí)行 work_queue.get 得到的 _WorkItem 對(duì)象,直到獲取的對(duì)象為 None
-
Future 源碼
class Future(object):
# 表征了異步計(jì)算的結(jié)果
def __init__(self):
# 初始化 future 實(shí)例底循,不應(yīng)該通過(guò)用戶端調(diào)用
self._condition = threading.Condition() # condition是條件鎖
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
# 回調(diào)
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
# 格式化輸出對(duì)象
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<%s at %#x state=%s raised %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<%s at %#x state=%s returned %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<%s at %#x state=%s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
# 取消期物的調(diào)用巢株,取消成功返回 Ture,其余返回 False熙涤。
# 如果期物已經(jīng)運(yùn)行或者已經(jīng)結(jié)束阁苞,則該期物不可以被取消,返回 True祠挫。
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
# 喚醒所有使用 _condition 條件阻塞的線程
self._condition.notify_all()
# 執(zhí)行任務(wù)結(jié)束或cancel的回調(diào)
self._invoke_callbacks()
return True
def cancelled(self):
# 如果 future 已被 cancel那槽,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
# 如果 future 正在運(yùn)行,返回 True
with self._condition:
return self._state == RUNNING
def done(self):
# 如果 future 已被 cancel 或者 執(zhí)行結(jié)束等舔,返回 True
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
# 返回期物運(yùn)行結(jié)果
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
# 期物運(yùn)行結(jié)束調(diào)用的對(duì)象
# fn: 期物運(yùn)行結(jié)束或 cancel 后被調(diào)用骚灸,總會(huì)在所添加的進(jìn)程內(nèi)調(diào)用。如果期物已經(jīng)結(jié)束或 cancel 則會(huì)立即調(diào)用慌植;根據(jù)添加順序進(jìn)行調(diào)用
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""
Returns:
期物的運(yùn)行結(jié)果
Raises:
CanceledError: 期物被 cancell
TimeoutError: 期物在給定的時(shí)間沒(méi)有執(zhí)行完畢
Exception: 其他 Error
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
# 此處會(huì)阻塞甚牲,等待 notify
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""
Returns:
期物運(yùn)行的異常
Raises:
CancelledError: 如果期物被 cancel
TimeoutError: 如果期物在給定的時(shí)間沒(méi)有執(zhí)行完畢
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""
標(biāo)記期物為 RUNNING 或者 CANCELLED_AND_NOTIFIED,
1蝶柿、如果期物已經(jīng) cancelled 則期物任何等待執(zhí)行的線程都會(huì)被 notify 并且 return False丈钙。
2、如果期物沒(méi)有被 cancelled交汤,則狀態(tài)變更為 RUNNING雏赦,返回 True
3、此方法應(yīng)該在期物所關(guān)聯(lián)的work執(zhí)行前被調(diào)用芙扎,如果此方法返回 False星岗,那么 work 不應(yīng)該被執(zhí)行。
Returns:
如果期物已經(jīng)被 cancelled纵顾,返回 False伍茄;其他情況返回 True
Raises:
RuntimeError:如果此方法已經(jīng)被調(diào)用或者 set_result() 或者 set_exception()被調(diào)用。
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
"""
將期物關(guān)聯(lián) work 的返回值賦值給期物對(duì)象施逾,并發(fā)送通知 notify
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""
使用給定的異常設(shè)置期物的 _exception 值
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
-
單從 Future 類(lèi)并無(wú)法獲知期物何時(shí)運(yùn)行敷矫,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""
初始化一個(gè) ThreadPoolExecutor 實(shí)例
Args: max_workers 使用最大線程數(shù)
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue() # _WorkItem 實(shí)例隊(duì)列
self._threads = set() # 實(shí)例的線程數(shù)
self._shutdown = False # 設(shè)置為 True 不再接受事件提交
self._shutdown_lock = threading.Lock() # 鎖
# 事件提交
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs) # 用以在線程中調(diào)用其 run 方法
self._work_queue.put(w)
self._adjust_thread_count() # 用以開(kāi)啟最多 _max_workers 數(shù)量的線程,并且在每個(gè)線程中 while 循環(huán)執(zhí)行 _work_queue 隊(duì)列中的實(shí)例
return f # 返回期物
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# 用以喚醒 worker 線程
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
Executor
class Executor(object):
# 異步調(diào)用的抽象基類(lèi)
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
Returns:
迭代器汉额,等同于 map(fn, *iteravles) 但是不是按照順序執(zhí)行
Args:
fn: 可調(diào)用對(duì)象曹仗,參數(shù)在 iterable 對(duì)象中
timeout: 最大等待時(shí)間
Raises:
TimeoutError: 所有的迭代器不能在給定的時(shí)間生成
Exception: 任何其他異常錯(cuò)誤
"""
if timeout is not None:
end_time = timeout + time.time()
# submit 的作用是將 函數(shù)+期物 綁定生成_WorkItem 實(shí)例對(duì)象,并且創(chuàng)建線程去循環(huán)執(zhí)行 _WorkItem 對(duì)象實(shí)例
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
def shutdown(self, wait=True):
# 清理所有關(guān)聯(lián) executor 對(duì)象的資源
pass
def __enter__(self):
# return 的 self 是給 as 使用的
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
-
_WorkItem
# 簡(jiǎn)單的工作類(lèi)
class _WorkItem(object):
# 初始化蠕搜,參數(shù)為 期物+函數(shù)+參數(shù)
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
# 標(biāo)記期物為notify怎茫,非 True 直接返回。調(diào)用期物關(guān)聯(lián)的fn方法轨蛤。
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
-
_worker()
# _worker方法
def _worker(executor_reference, work_queue):
"""
此方法在被調(diào)用的線程內(nèi) while True 執(zhí)行
"""
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
"""
1蜜宪、編譯器是否關(guān)閉
2、executor 是否被回收
3祥山、executor._shutdown 被設(shè)置
"""
if _shutdown or executor is None or executor._shutdown:
# 通知其他線程的 worker
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
備注
期物的使用標(biāo)準(zhǔn)流程
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(func, para_data_list) # func 是要調(diào)用的函數(shù)圃验,para_data_list 是參數(shù) list
- 分析
-
ThreadPoolExecutor
和ProcessPoolExecutor
均繼承自concurrent.futures.Executor
,因其實(shí)現(xiàn)了__enter__
,__exit__
方法,故可以使用with
語(yǔ)法 - 使用
.map()
方法會(huì)調(diào)用.submit()
方法缝呕; -
.submit()
方法:- 將函數(shù)
func
+future
綁定生成_WorkItem
實(shí)例對(duì)象w
澳窑, - 將
w
添加到隊(duì)列_work_queue
- 并且創(chuàng)建線程執(zhí)行
_worker
方法(此處的創(chuàng)建線程是指會(huì)最多創(chuàng)建如上例10
個(gè)線程去執(zhí)行);
- 將函數(shù)
-
_worker()
方法:- 從
_work_queue
隊(duì)列中獲取_WorkItem
對(duì)象w
供常,執(zhí)行其w.run()
方法
- 從
- 返回值
res
是生成器摊聋,使用迭代獲取函數(shù)返回的值; -
future.result()
會(huì)阻塞調(diào)用栈暇。