背景
最近在看Python基礎(chǔ)望迎,剛好看到生產(chǎn)者與消費(fèi)者這快內(nèi)容,視頻使用了queue
模塊來(lái)處理凌外,這里記錄一下學(xué)習(xí)的內(nèi)容
概念
生產(chǎn)者與消費(fèi)者是一個(gè)比較容易理解的概念辩尊,比如游泳池中一頭進(jìn)水一頭出水,就是很典型的例子康辑。
視頻中的內(nèi)容
視頻中的代碼主要是下面這塊:
# ecoding=utf-8
# Author: 翁彥彬 | Sven_Weng
# Email : sven_weng@wengyb.com
# Web : http://wybblog.applinzi.com
from threading import current_thread, Thread
import time
import random
import queue
q = queue.Queue(5)
class Productor(Thread):
def run(self):
name = current_thread().getName()
nums = range(100)
while 1:
nowput = random.choice(nums)
if q.full(): # 消息隊(duì)列滿則停止生產(chǎn)
print "隊(duì)列已經(jīng)達(dá)到上限{0}".format(q.qsize())
time.sleep(10)
q.put(nowput)
print "生產(chǎn)者{0}生產(chǎn)了{(lán)1}".format(name, nowput)
sl = random.choice([1, 2, 3])
time.sleep(sl)
print "生產(chǎn)者休息了{(lán)0}秒".format(sl)
class Consumer(Thread):
def run(self):
name = current_thread().getName()
while 1:
if q.empty(): # 消息隊(duì)列空的時(shí)候則暫停消費(fèi)
print "隊(duì)列空了摄欲,暫停消費(fèi)"
time.sleep(5)
num = q.get()
q.task_done()
print "消費(fèi)者{0}消費(fèi)了{(lán)1}".format(name, num)
sl = random.choice([1, 2, 3])
time.sleep(sl)
print "消費(fèi)者休息了{(lán)0}秒".format(sl)
if __name__ == '__main__':
p1 = Productor()
p1.start()
p2 = Productor()
p2.start()
c1 = Consumer()
c1.start()
c2 = Consumer()
c2.start()
c3 = Consumer()
c3.start()
用的是threading
模塊來(lái)一邊生產(chǎn)內(nèi)容一邊消費(fèi)內(nèi)容,整個(gè)代碼是比較簡(jiǎn)單疮薇,但是不是特別容易理解胸墙,尤其是新手理解的時(shí)候不夠直觀。
個(gè)人理解
消息隊(duì)列這個(gè)東西可以理解為一個(gè)復(fù)雜的list
按咒,比如要實(shí)現(xiàn)先進(jìn)先出迟隅,那么每次返回list[0]
就行了,同理励七,如果要實(shí)現(xiàn)后進(jìn)先出玻淑,那么每次返回list[len(list)]
就行了,這么理解起來(lái)比較容易呀伙。
當(dāng)然,消息隊(duì)列相比起list
來(lái)是復(fù)雜了一些添坊,但是原理基本上就是這樣剿另,比如queue
使用的是threading
模塊來(lái)實(shí)現(xiàn)的,再?gòu)?fù)雜的消息隊(duì)列贬蛙,比如Python
中用的比較多的celery
雨女,可選的消息隊(duì)列就有RabbitMQ
或者Redis
一個(gè)直觀的例子
鑒于視頻中的例子過(guò)于復(fù)雜,我自己寫了一個(gè)簡(jiǎn)單直觀的例子阳准,用Flask
寫了一個(gè)簡(jiǎn)單的Web
化的消息隊(duì)列服務(wù)氛堕。
from flask import Flask, jsonify, request
from queue import Queue
app = Flask(__name__)
@app.before_first_request
def init_queue():
app.q = Queue(5)
@app.route('/')
def hello_world():
data = {
"code": "0000",
"queue_count": app.q.qsize()
}
return jsonify(data)
@app.route('/put')
def put_to_queue():
num = request.args['num']
print num
if app.q.full():
data = {
"code": "0001",
"msg": "The Queue is full"
}
else:
app.q.put(num)
data = {
"code": "0000",
"msg": "Success put {1} to the Queue, current count is {0}".format(app.q.qsize(), num)
}
return jsonify(data)
@app.route('/get')
def get_from_queue():
if app.q.empty():
data = {
"code": "0002",
"msg": "The Queue is empty"
}
else:
data = {
"code": "0000",
"msg": "Success get from the Queue, current count is {0}".format(app.q.qsize()),
"num": app.q.get()
}
app.q.task_done()
return jsonify(data)
if __name__ == '__main__':
app.run(debug=True)
首先在啟動(dòng)的時(shí)候調(diào)用before_first_request
來(lái)初始化隊(duì)列,放到app.q
這個(gè)全局變量中野蝇,用不同的請(qǐng)求來(lái)執(zhí)行不同的隊(duì)列操作讼稚,在頁(yè)面上就可以看到不同的結(jié)果了。演示例子中隊(duì)列數(shù)量5個(gè)就滿了绕沈,再執(zhí)行put
就會(huì)返回錯(cuò)誤信息锐想,同理,如果空了乍狐,也會(huì)返回錯(cuò)誤信息赠摇。