隊(duì)列 -- 線程安全的FIFO實(shí)現(xiàn)

queue — Thread-Safe FIFO Implementation

隊(duì)列 -- 線程安全的FIFO實(shí)現(xiàn)

Purpose: Provides a thread-safe FIFO implementation

目的:提供一個(gè)線程安全的FIFO實(shí)現(xiàn)

The queue module provides a first-in, first-out (FIFO) data structure suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so many threads can work with the same Queue instance safely and easily. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.
queue模塊提供了一個(gè)適用于多線程編程的先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)西剥。該模塊可以用于在生產(chǎn)者和消費(fèi)者之間線程安全的傳遞消息或者其他數(shù)據(jù)。調(diào)用者會(huì)自動(dòng)創(chuàng)建鎖, 多個(gè)線程能夠與同一個(gè)queue實(shí)例安全并且容易的共同協(xié)作摇锋。Queue的大幸镀浴(所包含的元素個(gè)數(shù))可能收到所使用或者處理的內(nèi)存限制秀仲。

Note:
注意:
This discussion assumes you already understand the general nature of a queue. If you do not, you may want to read some of the references before continuing.
這個(gè)討論假定讀者已經(jīng)理解queue的一般本質(zhì)建钥。如果還沒(méi)有理解,就在繼續(xù)本文后續(xù)內(nèi)容之前需要閱讀一些相關(guān)的參考文檔凛膏。

Basic FIFO Queue

基本的 FIFO 隊(duì)列

The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().
Queue類實(shí)現(xiàn)一個(gè)基本的先進(jìn)先出容器杨名。通過(guò)put()函數(shù)將元素加入序列的一端,然后通過(guò)get()函數(shù)從序列的另一端移出猖毫。

queue_fifo.py

import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted.
本示例使用了一個(gè)單線程來(lái)展示元素從queue中移除台谍,以和它們插入相同的順序。

$ python3 queue_fifo.py

0 1 2 3 4
LIFO Queue

In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).
與Queue的標(biāo)準(zhǔn)FIFO實(shí)現(xiàn)相反吁断,LifoQueue使用后進(jìn)顯出的規(guī)則(通常與棧這個(gè)數(shù)據(jù)結(jié)構(gòu)關(guān)聯(lián))趁蕊。

queue_lifo.py

import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

The item most recently put into the queue is removed by get.
放入queue中的元素將通過(guò)get函數(shù)移除。

$ python3 queue_lifo.py

4 3 2 1 0

Priority Queue

優(yōu)先隊(duì)列

Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.
有時(shí)候仔役,隊(duì)列中元素的處理順序需要基于元素的特性掷伙,而不是元素創(chuàng)建或者加入隊(duì)列的順序。例如又兵,人事部門打印工資單的任務(wù)可能按照優(yōu)先級(jí)任柜,而不是程序員想要打印的順序。優(yōu)先隊(duì)列將對(duì)于隊(duì)列的內(nèi)容進(jìn)行排序沛厨,然后再?zèng)Q定檢索那個(gè)元素宙地。

queue_priority.py

import functools
import queue
import threading

@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()

workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
本示例使用了多線程來(lái)執(zhí)行任務(wù),在調(diào)用get()函數(shù)時(shí)俄烁,會(huì)基于隊(duì)列中元素的優(yōu)先級(jí)處理相應(yīng)的元素绸栅。隊(duì)列中元素的處理順序,消費(fèi)線程的運(yùn)行基于線程上下文切換页屠。

$ python3 queue_priority.py

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
Building a Threaded Podcast Client

The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes from each feed to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation illustrates the use of the queue module.
本節(jié)中podcasting客戶端的源代碼展示了在多線程環(huán)境下使用隊(duì)列類粹胯。程序讀取了一個(gè)或者多個(gè)RSS feed蓖柔,將每個(gè)feed中需要下載的五個(gè)最常見(jiàn)的劇集放入隊(duì)列,然后使用線程并行處理需要下載的多個(gè)隊(duì)列风纠。在生產(chǎn)環(huán)境下使用時(shí)不會(huì)有太多的錯(cuò)誤處理况鸣,但是所執(zhí)行的架構(gòu)很清晰的表現(xiàn)了queue模塊的使用。

First, some operating parameters are established. Usually, these would come from user inputs (e.g., preferences or a database). The example uses hard-coded values for the number of threads and list of URLs to fetch.
首先竹观,需要確認(rèn)一些操作參數(shù)镐捧。通常情況下,這些參數(shù)都是來(lái)源于用戶輸入(例如:優(yōu)先級(jí)或者數(shù)據(jù)庫(kù))臭增。以下示例對(duì)于線程的數(shù)量和需要抓取的URL的列表使用了硬編碼值懂酱。

fetch_podcasts.py
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]


def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))
The function download_enclosures() runs in the worker thread and processes the downloads using urllib.

def download_enclosures(q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and exit only when
    the main thread ends.
    """
    while True:
        message('looking for the next enclosure')
        url = q.get()
        filename = url.rpartition('/')[-1]
        message('downloading {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # Save the downloaded file to the current directory
        message('writing to {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

Once the target function for the threads is defined, the worker threads can be started. When download_enclosures() processes the statement url = q.get(), it blocks and waits until the queue has something to return. That means it is safe to start the threads before there is anything in the queue.
一旦線程的目標(biāo)函數(shù)被定義,worker線程就可以開始工作誊抛。當(dāng)download_enclosures()處理語(yǔ)句url = q.get()時(shí)列牺,他就阻塞并且一直等待,直到queue需要返回拗窃,這就意味著在隊(duì)列中有東西時(shí)瞎领,啟動(dòng)線程總是安全的。

# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()

The next step is to retrieve the feed contents using the feedparser module and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads picks it up and starts downloading it. The loop continues to add items until the feed is exhausted, and the worker threads take turns dequeuing URLs to download them.
下一步是使用feedparser模塊随夸,以及檢索feed的內(nèi)容九默,以及入隊(duì)URL的集合。一旦第一個(gè)URL添加到隊(duì)列宾毒,就有一個(gè)worker線程選中它驼修,然后開始下載它。循環(huán)將繼續(xù)添加元素伍俘,直到耗盡全部的feed邪锌,worker線程將輪流的將URL出隊(duì),然后下載它們癌瘾。

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

The only thing left to do is wait for the queue to empty out again, using join().
現(xiàn)在唯一要做的事情就是等待隊(duì)列再次變?yōu)榭彰俜幔褂胘oin()。

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
Running the sample script produces output similar to the following.

$ python3 fetch_podcasts.py

worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done
The actual output will depend on the contents of the RSS feed used

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末妨退,一起剝皮案震驚了整個(gè)濱河市妇萄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌咬荷,老刑警劉巖冠句,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異幸乒,居然都是意外死亡懦底,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門罕扎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)聚唐,“玉大人丐重,你說(shuō)我怎么就攤上這事「瞬椋” “怎么了扮惦?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)亲桦。 經(jīng)常有香客問(wèn)我崖蜜,道長(zhǎng),這世上最難降的妖魔是什么客峭? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任豫领,我火速辦了婚禮,結(jié)果婚禮上桃笙,老公的妹妹穿的比我還像新娘氏堤。我一直安慰自己沙绝,他們只是感情好搏明,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著闪檬,像睡著了一般星著。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上粗悯,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天虚循,我揣著相機(jī)與錄音,去河邊找鬼样傍。 笑死横缔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的衫哥。 我是一名探鬼主播茎刚,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼撤逢!你這毒婦竟也來(lái)了膛锭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蚊荣,失蹤者是張志新(化名)和其女友劉穎初狰,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體互例,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奢入,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了媳叨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腥光。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡丁存,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出柴我,到底是詐尸還是另有隱情解寝,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布艘儒,位于F島的核電站聋伦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏界睁。R本人自食惡果不足惜觉增,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望翻斟。 院中可真熱鬧逾礁,春花似錦、人聲如沸访惜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)债热。三九已至砾嫉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間窒篱,已是汗流浹背焕刮。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留墙杯,地道東北人配并。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像高镐,于是被迫代替她去往敵國(guó)和親溉旋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 簡(jiǎn)介 在iOS中避消,我們需要將非UI且耗時(shí)的任務(wù)放在主線程當(dāng)中執(zhí)行低滩,同時(shí)確保在任務(wù)完成時(shí)進(jìn)行回調(diào)。常用的三種實(shí)現(xiàn)多線...
    adduct閱讀 384評(píng)論 0 1
  • 事情是這樣的岩喷,我偶爾從關(guān)系很好的同事那里聽到公司正在招人的事情恕沫,想起有個(gè)朋友好像想找工作,就順嘴說(shuō)了一句纱意,事實(shí)證明...
    宋小嘉閱讀 906評(píng)論 0 1
  • 01 接到朋友的來(lái)電,他說(shuō)他被一個(gè)妹子欺騙了感情迄委。 他說(shuō)褐筛,他和那個(gè)妹子相處得好好的,本來(lái)都要在一起了叙身,結(jié)果妹子卻突...
    小七要快樂(lè)閱讀 1,073評(píng)論 18 25
  • 一直想就互聯(lián)網(wǎng)時(shí)代下的個(gè)人成長(zhǎng)之路做一些簡(jiǎn)要的心得分享渔扎,礙于各種原因,總是遲遲不能為這個(gè)念頭采取實(shí)際行動(dòng)信轿。趁著最近...
    布衣夜行人閱讀 217評(píng)論 0 0
  • 在大二快過(guò)完兩個(gè)月的時(shí)候我終于開始想刻苦學(xué)習(xí)了晃痴。。财忽。(尷尬啊~)所以希望在這剩下的兩個(gè)月的時(shí)候能夠刷完《算法》第四...
    HilaryLi閱讀 1,880評(píng)論 0 3