queue — Thread-Safe FIFO Implementation
隊(duì)列 -- 線程安全的FIFO實(shí)現(xiàn)
Purpose: Provides a thread-safe FIFO implementation
目的:提供一個(gè)線程安全的FIFO實(shí)現(xiàn)
The queue module provides a first-in, first-out (FIFO) data structure suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so many threads can work with the same Queue instance safely and easily. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.
queue模塊提供了一個(gè)適用于多線程編程的先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)西剥。該模塊可以用于在生產(chǎn)者和消費(fèi)者之間線程安全的傳遞消息或者其他數(shù)據(jù)。調(diào)用者會(huì)自動(dòng)創(chuàng)建鎖, 多個(gè)線程能夠與同一個(gè)queue實(shí)例安全并且容易的共同協(xié)作摇锋。Queue的大幸镀浴(所包含的元素個(gè)數(shù))可能收到所使用或者處理的內(nèi)存限制秀仲。
Note:
注意:
This discussion assumes you already understand the general nature of a queue. If you do not, you may want to read some of the references before continuing.
這個(gè)討論假定讀者已經(jīng)理解queue的一般本質(zhì)建钥。如果還沒(méi)有理解,就在繼續(xù)本文后續(xù)內(nèi)容之前需要閱讀一些相關(guān)的參考文檔凛膏。
Basic FIFO Queue
基本的 FIFO 隊(duì)列
The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().
Queue類實(shí)現(xiàn)一個(gè)基本的先進(jìn)先出容器杨名。通過(guò)put()函數(shù)將元素加入序列的一端,然后通過(guò)get()函數(shù)從序列的另一端移出猖毫。
queue_fifo.py
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted.
本示例使用了一個(gè)單線程來(lái)展示元素從queue中移除台谍,以和它們插入相同的順序。
$ python3 queue_fifo.py
0 1 2 3 4
LIFO Queue
In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).
與Queue的標(biāo)準(zhǔn)FIFO實(shí)現(xiàn)相反吁断,LifoQueue使用后進(jìn)顯出的規(guī)則(通常與棧這個(gè)數(shù)據(jù)結(jié)構(gòu)關(guān)聯(lián))趁蕊。
queue_lifo.py
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
The item most recently put into the queue is removed by get.
放入queue中的元素將通過(guò)get函數(shù)移除。
$ python3 queue_lifo.py
4 3 2 1 0
Priority Queue
優(yōu)先隊(duì)列
Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.
有時(shí)候仔役,隊(duì)列中元素的處理順序需要基于元素的特性掷伙,而不是元素創(chuàng)建或者加入隊(duì)列的順序。例如又兵,人事部門打印工資單的任務(wù)可能按照優(yōu)先級(jí)任柜,而不是程序員想要打印的順序。優(yōu)先隊(duì)列將對(duì)于隊(duì)列的內(nèi)容進(jìn)行排序沛厨,然后再?zèng)Q定檢索那個(gè)元素宙地。
queue_priority.py
import functools
import queue
import threading
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
本示例使用了多線程來(lái)執(zhí)行任務(wù),在調(diào)用get()函數(shù)時(shí)俄烁,會(huì)基于隊(duì)列中元素的優(yōu)先級(jí)處理相應(yīng)的元素绸栅。隊(duì)列中元素的處理順序,消費(fèi)線程的運(yùn)行基于線程上下文切換页屠。
$ python3 queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
Building a Threaded Podcast Client
The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes from each feed to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation illustrates the use of the queue module.
本節(jié)中podcasting客戶端的源代碼展示了在多線程環(huán)境下使用隊(duì)列類粹胯。程序讀取了一個(gè)或者多個(gè)RSS feed蓖柔,將每個(gè)feed中需要下載的五個(gè)最常見(jiàn)的劇集放入隊(duì)列,然后使用線程并行處理需要下載的多個(gè)隊(duì)列风纠。在生產(chǎn)環(huán)境下使用時(shí)不會(huì)有太多的錯(cuò)誤處理况鸣,但是所執(zhí)行的架構(gòu)很清晰的表現(xiàn)了queue模塊的使用。
First, some operating parameters are established. Usually, these would come from user inputs (e.g., preferences or a database). The example uses hard-coded values for the number of threads and list of URLs to fetch.
首先竹观,需要確認(rèn)一些操作參數(shù)镐捧。通常情況下,這些參數(shù)都是來(lái)源于用戶輸入(例如:優(yōu)先級(jí)或者數(shù)據(jù)庫(kù))臭增。以下示例對(duì)于線程的數(shù)量和需要抓取的URL的列表使用了硬編碼值懂酱。
fetch_podcasts.py
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse
import feedparser
# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()
# A real app wouldn't use hard-coded data...
feed_urls = [
'http://talkpython.fm/episodes/rss',
]
def message(s):
print('{}: {}'.format(threading.current_thread().name, s))
The function download_enclosures() runs in the worker thread and processes the downloads using urllib.
def download_enclosures(q):
"""This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and exit only when
the main thread ends.
"""
while True:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# Save the downloaded file to the current directory
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()
Once the target function for the threads is defined, the worker threads can be started. When download_enclosures() processes the statement url = q.get(), it blocks and waits until the queue has something to return. That means it is safe to start the threads before there is anything in the queue.
一旦線程的目標(biāo)函數(shù)被定義,worker線程就可以開始工作誊抛。當(dāng)download_enclosures()處理語(yǔ)句url = q.get()時(shí)列牺,他就阻塞并且一直等待,直到queue需要返回拗窃,這就意味著在隊(duì)列中有東西時(shí)瞎领,啟動(dòng)線程總是安全的。
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
worker = threading.Thread(
target=download_enclosures,
args=(enclosure_queue,),
name='worker-{}'.format(i),
)
worker.setDaemon(True)
worker.start()
The next step is to retrieve the feed contents using the feedparser module and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads picks it up and starts downloading it. The loop continues to add items until the feed is exhausted, and the worker threads take turns dequeuing URLs to download them.
下一步是使用feedparser模塊随夸,以及檢索feed的內(nèi)容九默,以及入隊(duì)URL的集合。一旦第一個(gè)URL添加到隊(duì)列宾毒,就有一個(gè)worker線程選中它驼修,然后開始下載它。循環(huán)將繼續(xù)添加元素伍俘,直到耗盡全部的feed邪锌,worker線程將輪流的將URL出隊(duì),然后下載它們癌瘾。
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(
parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])
The only thing left to do is wait for the queue to empty out again, using join().
現(xiàn)在唯一要做的事情就是等待隊(duì)列再次變?yōu)榭彰俜幔褂胘oin()。
# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
Running the sample script produces output similar to the following.
$ python3 fetch_podcasts.py
worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done
The actual output will depend on the contents of the RSS feed used