Python 期物之 concurrent.futures.Future

Python 期物用在異步編程,所謂期物指的是排定的執(zhí)行事件。Python 3.4起

  • 總結(jié)
    • 1、期物處理并發(fā)只涉及到三個(gè)對(duì)象些膨,一個(gè)是期物(concurrent.futures.Future),一個(gè)是執(zhí)行器(concurrent.futures.Executor)钦铺,還有一個(gè)是 _WorkItem類(lèi)
      • 1)期物對(duì)象:本身不涉及多線程订雾,多進(jìn)程或者yield等語(yǔ)法,其只是一個(gè)具有運(yùn)行狀態(tài)運(yùn)行結(jié)果以及可添加回調(diào)函數(shù)的類(lèi)
      • 2)_WorkItem 對(duì)象:真正被添加到任務(wù)隊(duì)列的對(duì)象矛洞。將一個(gè)需執(zhí)行的函數(shù)洼哎,期物實(shí)例化成一個(gè) _WorkItem 對(duì)象。通過(guò) run 方法執(zhí)行沼本,run 方法負(fù)責(zé)標(biāo)記期物的狀態(tài)噩峦,執(zhí)行函數(shù),將執(zhí)行結(jié)果賦值給期物抽兆。
      • 3)執(zhí)行器:有兩個(gè)方法 map, submit
        • submit 接收一個(gè)函數(shù)识补,期物,生成 _WorkItem 對(duì)象辫红,并將該對(duì)象添加到任務(wù)隊(duì)列中凭涂。每調(diào)用 submit 方法都會(huì)調(diào)整處理隊(duì)列的線程個(gè)數(shù),如果當(dāng)前運(yùn)行線程數(shù)小于執(zhí)行器設(shè)置的最大執(zhí)行線程數(shù)贴妻,則新建一個(gè)線程去處理任務(wù)隊(duì)列切油。返回值為期物對(duì)象
        • map 方法,使用 submit 迭代執(zhí)行器要執(zhí)行函數(shù)的參數(shù)列表名惩,返回一個(gè)期物列表白翻。遍歷期物列表,使用 yield 去接收每個(gè)期物對(duì)象的result屬性。
    • 2滤馍、任務(wù)隊(duì)列的運(yùn)行:
      • 1)每個(gè)線程均執(zhí)行 _worker()方法
      • 2)任務(wù)隊(duì)列 work_queue 使用 queue.Queue() 存儲(chǔ)

循環(huán)執(zhí)行 work_queue.get 得到的 _WorkItem 對(duì)象,直到獲取的對(duì)象為 None

  • Future 源碼
class Future(object):
    # 表征了異步計(jì)算的結(jié)果

    def __init__(self):
        # 初始化 future 實(shí)例底循,不應(yīng)該通過(guò)用戶端調(diào)用
        self._condition = threading.Condition()  # condition是條件鎖
        self._state = PENDING
        self._result = None
        self._exception = None
        self._waiters = []
        self._done_callbacks = []

    # 回調(diào)
    def _invoke_callbacks(self):
        for callback in self._done_callbacks:
            try:
                callback(self)
            except Exception:
                LOGGER.exception('exception calling callback for %r', self)

    # 格式化輸出對(duì)象
    def __repr__(self):
        with self._condition:
            if self._state == FINISHED:
                if self._exception:
                    return '<%s at %#x state=%s raised %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._exception.__class__.__name__)
                else:
                    return '<%s at %#x state=%s returned %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._result.__class__.__name__)
            return '<%s at %#x state=%s>' % (
                    self.__class__.__name__,
                    id(self),
                   _STATE_TO_DESCRIPTION_MAP[self._state])

    def cancel(self):
        # 取消期物的調(diào)用巢株,取消成功返回 Ture,其余返回 False熙涤。
        # 如果期物已經(jīng)運(yùn)行或者已經(jīng)結(jié)束阁苞,則該期物不可以被取消,返回 True祠挫。
        
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                return True

            self._state = CANCELLED
            # 喚醒所有使用 _condition 條件阻塞的線程
            self._condition.notify_all()
        
        # 執(zhí)行任務(wù)結(jié)束或cancel的回調(diào)
        self._invoke_callbacks()
        return True

    def cancelled(self):
        # 如果 future 已被 cancel那槽,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]

    def running(self):
        # 如果 future 正在運(yùn)行,返回 True
        with self._condition:
            return self._state == RUNNING

    def done(self):
        # 如果 future 已被 cancel 或者 執(zhí)行結(jié)束等舔,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

    # 返回期物運(yùn)行結(jié)果
    def __get_result(self):
        if self._exception:
            raise self._exception
        else:
            return self._result

    def add_done_callback(self, fn):
        # 期物運(yùn)行結(jié)束調(diào)用的對(duì)象
        # fn: 期物運(yùn)行結(jié)束或 cancel 后被調(diào)用骚灸,總會(huì)在所添加的進(jìn)程內(nèi)調(diào)用。如果期物已經(jīng)結(jié)束或 cancel 則會(huì)立即調(diào)用慌植;根據(jù)添加順序進(jìn)行調(diào)用
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

    def result(self, timeout=None):
        """
        Returns:
            期物的運(yùn)行結(jié)果
        Raises:
            CanceledError: 期物被 cancell
            TimeoutError: 期物在給定的時(shí)間沒(méi)有執(zhí)行完畢
            Exception: 其他 Error
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            # 此處會(huì)阻塞甚牲,等待 notify 
            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

    def exception(self, timeout=None):
        """
        Returns:
            期物運(yùn)行的異常
        Raises:
            CancelledError: 如果期物被 cancel
            TimeoutError: 如果期物在給定的時(shí)間沒(méi)有執(zhí)行完畢
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception
            else:
                raise TimeoutError()

    # The following methods should only be used by Executors and in tests.
    def set_running_or_notify_cancel(self):
        """
        標(biāo)記期物為 RUNNING 或者 CANCELLED_AND_NOTIFIED,
            1蝶柿、如果期物已經(jīng) cancelled 則期物任何等待執(zhí)行的線程都會(huì)被 notify 并且 return False丈钙。
            2、如果期物沒(méi)有被 cancelled交汤,則狀態(tài)變更為 RUNNING雏赦,返回 True
            3、此方法應(yīng)該在期物所關(guān)聯(lián)的work執(zhí)行前被調(diào)用芙扎,如果此方法返回 False星岗,那么 work 不應(yīng)該被執(zhí)行。
        Returns:
            如果期物已經(jīng)被 cancelled纵顾,返回 False伍茄;其他情況返回 True
        Raises:
            RuntimeError:如果此方法已經(jīng)被調(diào)用或者 set_result() 或者 set_exception()被調(diào)用。
        """
        with self._condition:
            if self._state == CANCELLED:
                self._state = CANCELLED_AND_NOTIFIED
                for waiter in self._waiters:
                    waiter.add_cancelled(self)
                # self._condition.notify_all() is not necessary because
                # self.cancel() triggers a notification.
                return False
            elif self._state == PENDING:
                self._state = RUNNING
                return True
            else:
                LOGGER.critical('Future %s in unexpected state: %s',
                                id(self),
                                self._state)
                raise RuntimeError('Future in unexpected state')

    def set_result(self, result):
        """Sets the return value of work associated with the future.

        Should only be used by Executor implementations and unit tests.
        """
        """
        將期物關(guān)聯(lián) work 的返回值賦值給期物對(duì)象施逾,并發(fā)送通知 notify
        """
        with self._condition:
            self._result = result
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """
        使用給定的異常設(shè)置期物的 _exception 值
        """
        with self._condition:
            self._exception = exception
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_exception(self)
            self._condition.notify_all()
        self._invoke_callbacks()
  • 單從 Future 類(lèi)并無(wú)法獲知期物何時(shí)運(yùn)行敷矫,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """
        初始化一個(gè) ThreadPoolExecutor 實(shí)例
        Args: max_workers 使用最大線程數(shù)
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()  # _WorkItem 實(shí)例隊(duì)列
        self._threads = set()  # 實(shí)例的線程數(shù)
        self._shutdown = False  # 設(shè)置為 True 不再接受事件提交
        self._shutdown_lock = threading.Lock()  # 鎖

    # 事件提交
    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)  # 用以在線程中調(diào)用其 run 方法

            self._work_queue.put(w)
            self._adjust_thread_count()  # 用以開(kāi)啟最多 _max_workers 數(shù)量的線程,并且在每個(gè)線程中 while 循環(huán)執(zhí)行 _work_queue 隊(duì)列中的實(shí)例
            
            return f  # 返回期物
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):
        # 用以喚醒 worker 線程
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
  • Executor
class Executor(object):
    # 異步調(diào)用的抽象基類(lèi)

    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """
        Returns:
            迭代器汉额,等同于 map(fn, *iteravles) 但是不是按照順序執(zhí)行
        Args:
            fn: 可調(diào)用對(duì)象曹仗,參數(shù)在 iterable 對(duì)象中
            timeout: 最大等待時(shí)間
        Raises:
            TimeoutError: 所有的迭代器不能在給定的時(shí)間生成
            Exception: 任何其他異常錯(cuò)誤
        """
        if timeout is not None:
            end_time = timeout + time.time()

        # submit 的作用是將 函數(shù)+期物 綁定生成_WorkItem 實(shí)例對(duì)象,并且創(chuàng)建線程去循環(huán)執(zhí)行 _WorkItem 對(duì)象實(shí)例
        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        # 清理所有關(guān)聯(lián) executor 對(duì)象的資源
        pass

    def __enter__(self):
        # return 的 self 是給 as 使用的
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
  • _WorkItem
# 簡(jiǎn)單的工作類(lèi) 
class _WorkItem(object):

    # 初始化蠕搜,參數(shù)為 期物+函數(shù)+參數(shù)
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    # 標(biāo)記期物為notify怎茫,非 True 直接返回。調(diào)用期物關(guān)聯(lián)的fn方法轨蛤。
    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)
  • _worker()
# _worker方法
def _worker(executor_reference, work_queue):
    """
    此方法在被調(diào)用的線程內(nèi) while True 執(zhí)行
    """
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            """
            1蜜宪、編譯器是否關(guān)閉
            2、executor 是否被回收
            3祥山、executor._shutdown 被設(shè)置
            """
            if _shutdown or executor is None or executor._shutdown:
                # 通知其他線程的 worker
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)

備注

期物的使用標(biāo)準(zhǔn)流程

with futures.ThreadPoolExecutor(10) as executor:
    res = executor.map(func, para_data_list)  # func 是要調(diào)用的函數(shù)圃验,para_data_list 是參數(shù) list
  • 分析
  • ThreadPoolExecutorProcessPoolExecutor 均繼承自 concurrent.futures.Executor,因其實(shí)現(xiàn)了 __enter__, __exit__方法,故可以使用 with語(yǔ)法
  • 使用 .map() 方法會(huì)調(diào)用 .submit() 方法缝呕;
  • .submit() 方法:
    • 將函數(shù)func + future綁定生成 _WorkItem 實(shí)例對(duì)象 w澳窑,
    • w添加到隊(duì)列 _work_queue
    • 并且創(chuàng)建線程執(zhí)行 _worker 方法(此處的創(chuàng)建線程是指會(huì)最多創(chuàng)建如上例10個(gè)線程去執(zhí)行);
  • _worker() 方法:
    • _work_queue隊(duì)列中獲取 _WorkItem對(duì)象 w供常,執(zhí)行其 w.run()方法
  • 返回值 res 是生成器摊聋,使用迭代獲取函數(shù)返回的值;
  • future.result() 會(huì)阻塞調(diào)用栈暇。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末麻裁,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子瞻鹏,更是在濱河造成了極大的恐慌悲立,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件新博,死亡現(xiàn)場(chǎng)離奇詭異薪夕,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)赫悄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)原献,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人埂淮,你說(shuō)我怎么就攤上這事姑隅。” “怎么了倔撞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵荐操,是天一觀的道長(zhǎng)缰揪。 經(jīng)常有香客問(wèn)我深夯,道長(zhǎng)喻旷,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任躏啰,我火速辦了婚禮趁矾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘给僵。我一直安慰自己毫捣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著蔓同,像睡著了一般饶辙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上牌柄,一...
    開(kāi)封第一講書(shū)人閱讀 52,268評(píng)論 1 309
  • 那天畸悬,我揣著相機(jī)與錄音,去河邊找鬼珊佣。 笑死,一個(gè)胖子當(dāng)著我的面吹牛披粟,可吹牛的內(nèi)容都是我干的咒锻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼守屉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惑艇!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起拇泛,我...
    開(kāi)封第一講書(shū)人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤滨巴,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后俺叭,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體恭取,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年熄守,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜈垮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡裕照,死狀恐怖攒发,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情晋南,我是刑警寧澤惠猿,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站负间,受9級(jí)特大地震影響偶妖,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜唉擂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一餐屎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧玩祟,春花似錦腹缩、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)润讥。三九已至,卻和暖如春盘寡,著一層夾襖步出監(jiān)牢的瞬間楚殿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工竿痰, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留脆粥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓影涉,卻偏偏與公主長(zhǎng)得像变隔,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蟹倾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理匣缘,服務(wù)發(fā)現(xiàn),斷路器鲜棠,智...
    卡卡羅2017閱讀 134,701評(píng)論 18 139
  • 序言第1章 并行和分布式計(jì)算介紹第2章 異步編程第3章 Python的并行計(jì)算第4章 Celery分布式應(yīng)用第5章...
    SeanCheney閱讀 12,525評(píng)論 3 35
  • 親愛(ài)的各位家長(zhǎng)肌厨!你們好! 今天中午豁陆,我們幼兒園收到了柑爸,關(guān)于“傳家訓(xùn)、樹(shù)家風(fēng)献联,嚴(yán)家教”的家庭教育講座通知書(shū)竖配。由...
    一只溫順的老虎閱讀 412評(píng)論 0 0
  • 當(dāng)你在婚姻中遭受委屈不幸時(shí)原押,努力自我強(qiáng)大的時(shí)候胁镐,你應(yīng)該經(jīng)常聽(tīng)到一句話,“女本為弱诸衔,為母則強(qiáng)”盯漂,這句話,是不是讓你如...
    陳曉柒閱讀 437評(píng)論 0 0
  • 我喜歡年齡比自己稍大的男生笨农,3~5歲吧就缆,覺(jué)得他們會(huì)比較成熟,會(huì)比較包容自己谒亦,這樣不會(huì)因?yàn)樾∈鲁臣芏纷旖咴祝刻於紩?huì)過(guò)...
    葡萄不酸但很甜閱讀 238評(píng)論 1 1