Python多線(xiàn)程原理與實(shí)戰(zhàn)
目的:
(1)了解python線(xiàn)程執(zhí)行原理
(2)掌握多線(xiàn)程編程與線(xiàn)程同步
(3)了解線(xiàn)程池的使用
1 線(xiàn)程基本概念
1.1 線(xiàn)程是什么?
線(xiàn)程是指進(jìn)程內(nèi)的一個(gè)執(zhí)行單元,也是進(jìn)程內(nèi)的可調(diào)度實(shí)體.
與進(jìn)程的區(qū)別:
(1) 地址空間:進(jìn)程內(nèi)的一個(gè)執(zhí)行單元;進(jìn)程至少有一個(gè)線(xiàn)程;它們共享進(jìn)程的地址空間;而進(jìn)程有自己獨(dú)立的地址空間;
(2) 資源擁有:進(jìn)程是資源分配和擁有的單位,同一個(gè)進(jìn)程內(nèi)的線(xiàn)程共享進(jìn)程的資源
(3) 線(xiàn)程是處理器調(diào)度的基本單位,但進(jìn)程不是.
(4) 二者均可并發(fā)執(zhí)行.
簡(jiǎn)而言之,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線(xiàn)程.
線(xiàn)程的劃分尺度小于進(jìn)程铐望,使得多線(xiàn)程程序的并發(fā)性高赂苗。
另外玩徊,進(jìn)程在執(zhí)行過(guò)程中擁有獨(dú)立的內(nèi)存單元谍失,而多個(gè)線(xiàn)程共享內(nèi)存,從而極大地提高了程序的運(yùn)行效率嫡丙。
1.2 線(xiàn)程和進(jìn)程關(guān)系拴袭?
? 進(jìn)程就是一個(gè)應(yīng)用程序在處理機(jī)上的一次執(zhí)行過(guò)程,它是一個(gè)動(dòng)態(tài)的概念曙博,而線(xiàn)程是進(jìn)程中的一部分拥刻,進(jìn)程包含多個(gè)線(xiàn)程在運(yùn)行。
? 多線(xiàn)程可以共享全局變量父泳,多進(jìn)程不能般哼。多線(xiàn)程中,所有子線(xiàn)程的進(jìn)程號(hào)相同惠窄;多進(jìn)程中蒸眠,不同的子進(jìn)程進(jìn)程號(hào)不同。
? 進(jìn)程是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
? 線(xiàn)程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線(xiàn)程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線(xiàn)程共享進(jìn)程所擁有的全部資源.
? 一個(gè)線(xiàn)程可以創(chuàng)建和撤銷(xiāo)另一個(gè)線(xiàn)程;同一個(gè)進(jìn)程中的多個(gè)線(xiàn)程之間可以并發(fā)執(zhí)行.
?
2 Python線(xiàn)程模塊
? python主要是通過(guò)thread和threading這兩個(gè)模塊來(lái)實(shí)現(xiàn)多線(xiàn)程支持杆融。python的thread模塊是比較底層的模塊楞卡,python的threading模塊是對(duì)thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由于GIL的存在無(wú)法使用threading充分利用CPU資源臀晃,如果想充分發(fā)揮多核CPU的計(jì)算能力需要使用multiprocessing模塊(Windows下使用會(huì)有諸多問(wèn)題)觉渴。
2.1 如何創(chuàng)建線(xiàn)程
? python3.x中已經(jīng)摒棄了Python2.x中采用函數(shù)式thread模塊中的start_new_thread()函數(shù)來(lái)產(chǎn)生新線(xiàn)程方式。
? python3.x中通過(guò)threading模塊創(chuàng)建新的線(xiàn)程有兩種方法:一種是通過(guò)threading.Thread(Target=executable Method)-即傳遞給Thread對(duì)象一個(gè)可執(zhí)行方法(或?qū)ο螅?第二種是繼承threading.Thread定義子類(lèi)并重寫(xiě)run()方法徽惋。第二種方法中案淋,唯一必須重寫(xiě)的方法是run()
(1)通過(guò)threading.Thread進(jìn)行創(chuàng)建多線(xiàn)程
import threading
import time
def target():
print("the current threading %s is runing"
%(threading.current_thread().name))
time.sleep(1)
print("the current threading %s is ended"%(threading.current_thread().name))
print("the current threading %s is runing"%(threading.current_thread().name))
## 屬于線(xiàn)程t的部分
t = threading.Thread(target=target)
t.start()
## 屬于線(xiàn)程t的部分
t.join() # join是阻塞當(dāng)前線(xiàn)程(此處的當(dāng)前線(xiàn)程時(shí)主線(xiàn)程) 主線(xiàn)程直到Thread-1結(jié)束之后才結(jié)束
print("the current threading %s is ended"%(threading.current_thread().name))
(2)通過(guò)繼承threading.Thread定義子類(lèi)創(chuàng)建多線(xiàn)程
? 使用Threading模塊創(chuàng)建線(xiàn)程,直接從threading.Thread繼承险绘,然后重寫(xiě)init方法和run方法:
import threading
import time
class myThread(threading.Thread): # 繼承父類(lèi)threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self): # 把要執(zhí)行的代碼寫(xiě)到run函數(shù)里面 線(xiàn)程在創(chuàng)建后會(huì)直接運(yùn)行run函數(shù)
print("Starting " + self.name)
print_time(self.name, 5, self.counter)
print("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print("%s process at: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 創(chuàng)建新線(xiàn)程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開(kāi)啟線(xiàn)程
thread1.start()
thread2.start()
# 等待線(xiàn)程結(jié)束
thread1.join()
thread2.join()
print("Exiting Main Thread")
通過(guò)以上案例可以知道踢京,thread1和thread2執(zhí)行順序是亂序的。要使之有序宦棺,需要進(jìn)行線(xiàn)程同步
3 線(xiàn)程間同步
? 如果多個(gè)線(xiàn)程共同對(duì)某個(gè)數(shù)據(jù)修改瓣距,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性代咸,需要對(duì)多個(gè)線(xiàn)程進(jìn)行同步蹈丸。
? 使用Thread對(duì)象的Lock和Rlock可以實(shí)現(xiàn)簡(jiǎn)單的線(xiàn)程同步,這兩個(gè)對(duì)象都有acquire方法和release方法呐芥,對(duì)于那些需要每次只允許一個(gè)線(xiàn)程操作的數(shù)據(jù)逻杖,可以將其操作放到acquire和release方法之間。
? 需要注意的是思瘟,Python有一個(gè)GIL(Global Interpreter Lock)機(jī)制荸百,任何線(xiàn)程在運(yùn)行之前必須獲取這個(gè)全局鎖才能執(zhí)行,每當(dāng)執(zhí)行完100條字節(jié)碼滨攻,全局鎖才會(huì)釋放够话,切換到其他線(xiàn)程執(zhí)行。
3.1 線(xiàn)程同步問(wèn)題
多線(xiàn)程實(shí)現(xiàn)同步有四種方式:
鎖機(jī)制光绕,信號(hào)量女嘲,條件判斷和同步隊(duì)列。
下面我主要關(guān)注兩種同步機(jī)制:鎖機(jī)制和同步隊(duì)列诞帐。
(1)鎖機(jī)制
threading的Lock類(lèi)澡为,用該類(lèi)的acquire函數(shù)進(jìn)行加鎖,用realease函數(shù)進(jìn)行解鎖
import threading
import time
class myThread(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("Starting " + self.name)
# 獲得鎖景埃,成功獲得鎖定后返回True
# 可選的timeout參數(shù)不填時(shí)將一直阻塞直到獲得鎖定
# 否則超時(shí)后將返回False
threadLock.acquire()
print_time(self.name, self.counter, 5)
# 釋放鎖
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 創(chuàng)建新線(xiàn)程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開(kāi)啟新線(xiàn)程
thread1.start()
thread2.start()
# 添加線(xiàn)程到線(xiàn)程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有線(xiàn)程完成
for t in threads:
t.join()
print("Exiting Main Thread")
?
(2) 線(xiàn)程同步隊(duì)列queue
python2.x中提供的Queue媒至, Python3.x中提供的是queue
見(jiàn)import queue.
Python的queue模塊中提供了同步的、線(xiàn)程安全的隊(duì)列類(lèi)谷徙,包括FIFO(先入先出)隊(duì)列Queue拒啰,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue完慧。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)谋旦,能夠在多線(xiàn)程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線(xiàn)程間的同步册着。
queue模塊中的常用方法:
- queue.qsize() 返回隊(duì)列的大小
- queue.empty() 如果隊(duì)列為空拴孤,返回True,反之False
- queue.full() 如果隊(duì)列滿(mǎn)了,返回True,反之False
- queue.full 與 maxsize 大小對(duì)應(yīng)
- queue.get([block[, timeout]])獲取隊(duì)列甲捏,timeout等待時(shí)間
- queue.get_nowait() 相當(dāng)Queue.get(False)
- queue.put(item) 寫(xiě)入隊(duì)列演熟,timeout等待時(shí)間
- queue.put_nowait(item) 相當(dāng)Queue.put(item, False)
- queue.task_done() 在完成一項(xiàng)工作之后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
- queue.join() 實(shí)際上意味著等到隊(duì)列為空司顿,再執(zhí)行別的操作
案例1:
import queue
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 創(chuàng)建新線(xiàn)程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充隊(duì)列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待隊(duì)列清空
while not workQueue.empty():
pass
# 通知線(xiàn)程是時(shí)候退出
exitFlag = 1
# 等待所有線(xiàn)程完成
for t in threads:
t.join()
print("Exiting Main Thread")
from queue import Queue
from threading import Thread, Lock
from redis import Redis
import time
isExitFlag = False # 是否中斷子線(xiàn)程
class MyThread(Thread):
def __init__(self, name, task_q):
super(MyThread, self).__init__()
self.name = name
self.task_q = task_q
self.start()
def run(self):
print(self.name, '---start---')
while not isExitFlag:
# task = self.task_q.get() # 獲取一項(xiàng)任務(wù)
task = self.task_q.blpop('taskQueue', timeout=1)
print(self.name, '--正在執(zhí)行的任務(wù)-->', task)
time.sleep(1)
print(self.name, '---end---')
if __name__ == '__main__':
print('--所有任務(wù)開(kāi)始--')
# work_queue = Queue(maxsize=10) # 最大數(shù)據(jù)量
work_queue = Redis('127.0.0.1', db=4)
threads = [MyThread('Thread-%d' % i, work_queue) for i in range(3)]
# 發(fā)布任務(wù)
tasks = ['www.baidu.com', 'www.hao123.com', 'www.jd.com', 'www.qf.ed', 'www.qq.com', 'www.google.com.cn']
for task in tasks:
# work_queue.put(task)
work_queue.lpush('taskQueue', task)
# 假如芒粹,隊(duì)列為空后,等待 指定時(shí)間(10s)后大溜,再?zèng)]有任務(wù)(消息)化漆,則退出
while work_queue.llen('taskQueue'):
pass
isExitFlag = True # 嘗試停止所有線(xiàn)程
for thread_ in threads:
thread_.join()
print('--所有任務(wù)完成--')
案例2:
import time
import threading
import queue
class Worker(threading.Thread):
def __init__(self, name, queue):
threading.Thread.__init__(self)
self.queue = queue
self.start() #執(zhí)行run()
def run(self):
#循環(huán),保證接著跑下一個(gè)任務(wù)
while True:
# 隊(duì)列為空則退出線(xiàn)程
if self.queue.empty():
break
# 獲取一個(gè)隊(duì)列數(shù)據(jù)
foo = self.queue.get()
# 延時(shí)1S模擬你要做的事情
time.sleep(1)
# 打印
print(self.getName() + " process " + str(foo))
# 任務(wù)完成
self.queue.task_done()
# 隊(duì)列
queue = queue.Queue()
# 加入100個(gè)任務(wù)隊(duì)列
for i in range(100):
queue.put(i)
# 開(kāi)10個(gè)線(xiàn)程
for i in range(10):
threadName = 'Thread' + str(i)
Worker(threadName, queue)
# 所有線(xiàn)程執(zhí)行完畢后關(guān)閉
queue.join()
4 線(xiàn)程池
傳統(tǒng)多線(xiàn)程問(wèn)題钦奋?
? 傳統(tǒng)多線(xiàn)程方案會(huì)使用“即時(shí)創(chuàng)建座云, 即時(shí)銷(xiāo)毀”的策略。盡管與創(chuàng)建進(jìn)程相比付材,創(chuàng)建線(xiàn)程的時(shí)間已經(jīng)大大的縮短朦拖,但是如果提交給線(xiàn)程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁伞租,那么服務(wù)器將處于不停的創(chuàng)建線(xiàn)程,銷(xiāo)毀線(xiàn)程的狀態(tài)限佩。
? 一個(gè)線(xiàn)程的運(yùn)行時(shí)間可以分為3部分:線(xiàn)程的啟動(dòng)時(shí)間葵诈、線(xiàn)程體的運(yùn)行時(shí)間和線(xiàn)程的銷(xiāo)毀時(shí)間。在多線(xiàn)程處理的情景中祟同,如果線(xiàn)程不能被重用作喘,就意味著每次創(chuàng)建都需要經(jīng)過(guò)啟動(dòng)、銷(xiāo)毀和運(yùn)行3個(gè)過(guò)程晕城。這必然會(huì)增加系統(tǒng)相應(yīng)的時(shí)間泞坦,降低了效率。
有沒(méi)有一種高效的解決方案呢砖顷? —— 線(xiàn)程池
線(xiàn)程池基本原理:
? 我們把任務(wù)放進(jìn)隊(duì)列中去贰锁,然后開(kāi)N個(gè)線(xiàn)程,每個(gè)線(xiàn)程都去隊(duì)列中取一個(gè)任務(wù)滤蝠,執(zhí)行完了之后告訴系統(tǒng)說(shuō)我執(zhí)行完了豌熄,然后接著去隊(duì)列中取下一個(gè)任務(wù),直至隊(duì)列中所有任務(wù)取空物咳,退出線(xiàn)程锣险。
使用線(xiàn)程池:
? 由于線(xiàn)程預(yù)先被創(chuàng)建并放入線(xiàn)程池中,同時(shí)處理完當(dāng)前任務(wù)之后并不銷(xiāo)毀而是被安排處理下一個(gè)任務(wù),因此能夠避免多次創(chuàng)建線(xiàn)程芯肤,從而節(jié)省線(xiàn)程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)巷折,能帶來(lái)更好的性能和系統(tǒng)穩(wěn)定性。
內(nèi)置線(xiàn)程池
from multiprocessing.pool import ThreadPool
import time
w_start = time.time()
def worker(msg):
time.sleep(1)
print('hi', msg)
if __name__ == '__main__':
pool = ThreadPool(4)
for i in range(12):
msg = 'threading %d' % i
pool.apply_async(worker,(msg,))
pool.close()
pool.join()
print(time.time() - w_start)
print('sub-threading all done')
線(xiàn)程池要設(shè)置為多少崖咨?
服務(wù)器CPU核數(shù)有限锻拘,能夠同時(shí)并發(fā)的線(xiàn)程數(shù)有限,并不是開(kāi)得越多越好掩幢,以及線(xiàn)程切換是有開(kāi)銷(xiāo)的逊拍,如果線(xiàn)程切換過(guò)于頻繁,反而會(huì)使性能降低
線(xiàn)程執(zhí)行過(guò)程中际邻,計(jì)算時(shí)間分為兩部分:
- CPU計(jì)算芯丧,占用CPU
- 不需要CPU計(jì)算,不占用CPU世曾,等待IO返回缨恒,比如recv(), accept(), sleep()等操作,具體操作就是比如
訪(fǎng)問(wèn)cache轮听、RPC調(diào)用下游service骗露、訪(fǎng)問(wèn)DB,等需要網(wǎng)絡(luò)調(diào)用的操作
那么如果計(jì)算時(shí)間占50%血巍, 等待時(shí)間50%萧锉,那么為了利用率達(dá)到最高,可以開(kāi)2個(gè)線(xiàn)程:
假如工作時(shí)間是2秒述寡, CPU計(jì)算完1秒后柿隙,線(xiàn)程等待IO的時(shí)候需要1秒,此時(shí)CPU空閑了鲫凶,這時(shí)就可以切換到另外一個(gè)線(xiàn)程禀崖,讓CPU工作1秒后,線(xiàn)程等待IO需要1秒螟炫,此時(shí)CPU又可以切回去波附,第一個(gè)線(xiàn)程這時(shí)剛好完成了1秒的IO等待,可以讓CPU繼續(xù)工作昼钻,就這樣循環(huán)的在兩個(gè)線(xiàn)程之前切換操作掸屡。
那么如果計(jì)算時(shí)間占20%, 等待時(shí)間80%然评,那么為了利用率達(dá)到最高折晦,可以開(kāi)5個(gè)線(xiàn)程:
可以想象成完成任務(wù)需要5秒,CPU占用1秒沾瓦,等待時(shí)間4秒满着,CPU在線(xiàn)程等待時(shí)谦炒,可以同時(shí)再激活4個(gè)線(xiàn)程,這樣就把CPU和IO等待時(shí)間风喇,最大化的重疊起來(lái)
抽象一下宁改,計(jì)算線(xiàn)程數(shù)設(shè)置的公式就是:
N核服務(wù)器,通過(guò)執(zhí)行業(yè)務(wù)的單線(xiàn)程分析出本地計(jì)算時(shí)間為x魂莫,等待時(shí)間為y还蹲,則工作線(xiàn)程數(shù)(線(xiàn)程池線(xiàn)程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化耙考。
由于有GIL的影響谜喊,python只能使用到1個(gè)核,所以這里設(shè)置N=1
import queue
import threading
import time
class WorkManager(object):
def __init__(self, work_num=1000, thread_num=2):
self.work_queue = queue.Queue()
self.threads = []
self.__init_work_queue(work_num)
self.__init_thread_pool(thread_num)
"""
初始化線(xiàn)程
"""
def __init_thread_pool(self, thread_num):
for i in range(thread_num):
self.threads.append(Work(self.work_queue))
"""
初始化工作隊(duì)列
"""
def __init_work_queue(self, jobs_num):
for i in range(jobs_num):
self.add_job(do_job, i)
"""
添加一項(xiàng)工作入隊(duì)
"""
def add_job(self, func, *args):
self.work_queue.put((func, list(args))) # 任務(wù)入隊(duì)倦始,Queue內(nèi)部實(shí)現(xiàn)了同步機(jī)制
"""
等待所有線(xiàn)程運(yùn)行完畢
"""
def wait_allcomplete(self):
for item in self.threads:
if item.isAlive(): item.join()
class Work(threading.Thread):
def __init__(self, work_queue):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.start()
def run(self):
# 死循環(huán)斗遏,從而讓創(chuàng)建的線(xiàn)程在一定條件下關(guān)閉退出
while True:
try:
do, args = self.work_queue.get(block=False) # 任務(wù)異步出隊(duì),Queue內(nèi)部實(shí)現(xiàn)了同步機(jī)制
do(args)
self.work_queue.task_done() # 通知系統(tǒng)任務(wù)完成
except:
break
# 具體要做的任務(wù)
def do_job(args):
time.sleep(0.1) # 模擬處理時(shí)間
print(threading.current_thread())
print(list(args))
if __name__ == '__main__':
start = time.time()
work_manager = WorkManager(100, 10) # 或者work_manager = WorkManager(10000, 20)
work_manager.wait_allcomplete()
end = time.time()
print("cost all time: %s" % (end - start))
# 自定義線(xiàn)程池
from threading import Thread, current_thread
from queue import Queue
import time
import random
class Task(Thread):
"""
工作線(xiàn)程-從任務(wù)隊(duì)列中讀取任務(wù)(函數(shù)對(duì)象和函數(shù)參數(shù))
"""
def __init__(self, task_queue):
super(Task, self).__init__()
self.task_queue: Queue = task_queue
self.start()
def run(self):
while True:
try:
do_task, args = self.task_queue.get(False)
do_task(*args)
time.sleep(0.2)
except:
break
def doTask(*args):
print(current_thread().name, '正在執(zhí)行任務(wù):', args)
time.sleep(random.uniform(0, 1))
class PoolManager:
"""
自定義線(xiàn)程池管理器
"""
def __init__(self, maxsize=10):
self.threads = []
self.taskQueue = Queue()
self.maxsize = maxsize # 最大線(xiàn)程并發(fā)數(shù)
def apply_async(self, *args):
self.taskQueue.put((doTask, args))
def close(self): # 停止發(fā)布任務(wù)鞋邑,并開(kāi)啟線(xiàn)程
self.init_threads()
def init_threads(self): # 創(chuàng)建線(xiàn)程并啟動(dòng)
# 發(fā)布了2個(gè)任務(wù)诵次,用多少線(xiàn)程執(zhí)行合適
init_size = 1
task_len = self.taskQueue.qsize() # 獲取任務(wù)隊(duì)列的大小
init_size = task_len // 10 + (1 if task_len % 10 > 0 else 0)
# 判斷需要的線(xiàn)程數(shù)是否超過(guò)最大線(xiàn)程數(shù)
init_size = init_size if init_size < self.maxsize else self.maxsize
for _ in range(init_size):
self.threads.append(Task(self.taskQueue))
def wait_all_complete(self):
for t in self.threads:
if t.is_alive():
t.join()
if __name__ == '__main__':
pool = PoolManager()
# 發(fā)布任務(wù)
for i in range(100):
pool.apply_async('www.baidu.com', 'index%d.html' % i)
# 定制發(fā)布任務(wù)并啟動(dòng)任務(wù)線(xiàn)程來(lái)完成任務(wù)
pool.close()
pool.wait_all_complete()
print('--所有任務(wù)完成--')
5 協(xié)程
? 在python GIL之下,同一時(shí)刻只能有一個(gè)線(xiàn)程在運(yùn)行枚碗,那么對(duì)于CPU計(jì)算密集的程序來(lái)說(shuō)逾一,線(xiàn)程之間的切換開(kāi)銷(xiāo)就成了拖累,而以I/O為瓶頸的程序正是協(xié)程所擅長(zhǎng)的:
Python中的協(xié)程經(jīng)歷了很長(zhǎng)的一段發(fā)展歷程肮雨。其大概經(jīng)歷了如下三個(gè)階段:
- 最初的生成器變形yield/send
- 引入@asyncio.coroutine和yield from
- 在最近的Python3.5版本中引入async/await關(guān)鍵字
(1)從yield說(shuō)起
先看一段普通的計(jì)算斐波那契續(xù)列的代碼
def fibs(n):
res = [0] * n
index = 0
a = 0
b = 1
while index < n:
res[index] = b
a, b = b, a + b
index += 1
return res
for fib_res in fibs(20):
print(fib_res)
? 如果我們僅僅是需要拿到斐波那契序列的第n位遵堵,或者僅僅是希望依此產(chǎn)生斐波那契序列,那么上面這種傳統(tǒng)方式就會(huì)比較耗費(fèi)內(nèi)存怨规。
這時(shí)陌宿,yield就派上用場(chǎng)了。
def fib(n):
index = 0
a = 0
b = 1
while index < n:
yield b
a, b = b, a + b
index += 1
for fib_res in fib(20):
print(fib_res)
? 當(dāng)一個(gè)函數(shù)中包含yield語(yǔ)句時(shí)椅亚,python會(huì)自動(dòng)將其識(shí)別為一個(gè)生成器限番。這時(shí)fib(20)并不會(huì)真正調(diào)用函數(shù)體舱污,而是以函數(shù)體生成了一個(gè)生成器對(duì)象實(shí)例呀舔。
? yield在這里可以保留fib函數(shù)的計(jì)算現(xiàn)場(chǎng),暫停fib的計(jì)算并將b返回扩灯。而將fib放入for…in循環(huán)中時(shí)媚赖,每次循環(huán)都會(huì)調(diào)用next(fib(20)),喚醒生成器珠插,執(zhí)行到下一個(gè)yield語(yǔ)句處惧磺,直到拋出StopIteration異常。此異常會(huì)被for循環(huán)捕獲捻撑,導(dǎo)致跳出循環(huán)磨隘。
(2) Send來(lái)了
? 從上面的程序中可以看到缤底,目前只有數(shù)據(jù)從fib(20)中通過(guò)yield流向外面的for循環(huán);如果可以向fib(20)發(fā)送數(shù)據(jù)番捂,那不是就可以在Python中實(shí)現(xiàn)協(xié)程了嘛个唧。
? 于是,Python中的生成器有了send函數(shù)设预,yield表達(dá)式也擁有了返回值徙歼。
? 我們用這個(gè)特性,模擬一個(gè)慢速斐波那契數(shù)列的計(jì)算:
import time
import random
def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_cnt = yield b
print('let me think {0} secs'.format(sleep_cnt))
time.sleep(sleep_cnt)
a, b = b, a + b
index += 1
print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib) # 第一次必須要執(zhí)行next()函數(shù)鳖枕,讓程序控制到y(tǒng)ield b位置
while True:
print(fib_res)
try:
fib_res = sfib.send(random.uniform(0, 0.5))
except StopIteration:
break
python 進(jìn)行并發(fā)編程
? 在Python 2的時(shí)代魄梯,高性能的網(wǎng)絡(luò)編程主要是使用Twisted、Tornado和Gevent這三個(gè)庫(kù)宾符,但是它們的異步代碼相互之間既不兼容也不能移植酿秸。
? asyncio是Python 3.4版本引入的標(biāo)準(zhǔn)庫(kù),直接內(nèi)置了對(duì)異步IO的支持吸奴。
? asyncio
的編程模型就是一個(gè)消息循環(huán)允扇。我們從asyncio
模塊中直接獲取一個(gè)EventLoop
的引用,然后把需要執(zhí)行的協(xié)程扔到EventLoop
中執(zhí)行则奥,就實(shí)現(xiàn)了異步IO考润。
? Python的在3.4中引入了協(xié)程的概念,可是這個(gè)還是以生成器對(duì)象為基礎(chǔ)读处。
? Python 3.5添加了async和await這兩個(gè)關(guān)鍵字糊治,分別用來(lái)替換asyncio.coroutine
和yield from
。
? python3.5則確定了協(xié)程的語(yǔ)法罚舱。下面將簡(jiǎn)單介紹asyncio的使用井辜。實(shí)現(xiàn)協(xié)程的不僅僅是asyncio,tornado和gevent都實(shí)現(xiàn)了類(lèi)似的功能管闷。
(1)協(xié)程定義
用asyncio
實(shí)現(xiàn)Hello world
代碼如下:
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 異步調(diào)用asyncio.sleep(1) --> 協(xié)程函數(shù):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 獲取EventLoop(事件循環(huán)器):
loop = asyncio.get_event_loop()
# 執(zhí)行coroutine
loop.run_until_complete(hello())
loop.close()
? @asyncio.coroutine把一個(gè)generator標(biāo)記為coroutine類(lèi)型粥脚,然后,我們就把這個(gè)
coroutine扔到
EventLoop中執(zhí)行包个。 hello()
會(huì)首先打印出Hello world!
刷允,然后,yield from
語(yǔ)法可以讓我們方便地調(diào)用另一個(gè)generator
碧囊。由于asyncio.sleep()
也是一個(gè)coroutine
树灶,所以線(xiàn)程不會(huì)等待asyncio.sleep()
,而是直接中斷并執(zhí)行下一個(gè)消息循環(huán)糯而。當(dāng)asyncio.sleep()
返回時(shí)天通,線(xiàn)程就可以從yield from
拿到返回值(此處是None
),然后接著執(zhí)行下一行語(yǔ)句熄驼。
? 把asyncio.sleep(1)
看成是一個(gè)耗時(shí)1秒的IO操作像寒,在此期間烘豹,主線(xiàn)程并未等待,而是去執(zhí)行EventLoop
中其他可以執(zhí)行的coroutine
了诺祸,因此可以實(shí)現(xiàn)并發(fā)執(zhí)行吴叶。
我們用Task封裝兩個(gè)coroutine
試試:
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
觀察執(zhí)行過(guò)程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的當(dāng)前線(xiàn)程名稱(chēng)可以看出,兩個(gè)coroutine
是由同一個(gè)線(xiàn)程并發(fā)執(zhí)行的序臂。
如果把asyncio.sleep()
換成真正的IO操作蚌卤,則多個(gè)coroutine
就可以由一個(gè)線(xiàn)程并發(fā)執(zhí)行。
asyncio案例實(shí)戰(zhàn)
我們用asyncio
的異步網(wǎng)絡(luò)連接來(lái)獲取sina奥秆、sohu和163的網(wǎng)站首頁(yè):
async_wget.py
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host) # 等待打開(kāi)host:80的連接
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect # 開(kāi)始連接逊彭,如果連接成功,則返回讀Reader和寫(xiě)Writer的對(duì)象
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain() # 刷新底層傳輸?shù)膶?xiě)緩沖區(qū)
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
結(jié)果信息如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時(shí)間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可見(jiàn)3個(gè)連接由一個(gè)線(xiàn)程通過(guò)coroutine
并發(fā)完成构订。
新版本改寫(xiě)
@asyncio.coroutine -> async
yield from -> await
小結(jié)
asyncio
提供了完善的異步IO支持侮叮;
異步操作需要在coroutine
中通過(guò)yield from
完成;
多個(gè)coroutine
可以封裝成一組Task然后并發(fā)執(zhí)行悼瘾。