1. python 單進(jìn)程
用下載兩個(gè)文件模擬單進(jìn)程的問(wèn)題直晨。
from random import randint
from time import time, sleep
def download_task(filename):
print('開(kāi)始下載%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))
def main():
start = time()
download_task('Python從入門到住院.pdf')
download_task('Peking Hot.avi')
end = time()
print('總共耗費(fèi)了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
運(yùn)行結(jié)果
>>> 開(kāi)始下載Python從入門到住院.pdf...
Python從入門到住院.pdf下載完成! 耗費(fèi)了8秒
開(kāi)始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費(fèi)了7秒
總共耗費(fèi)了15.00秒.
2. python 多進(jìn)程
多進(jìn)程可以有效的解決GIL的問(wèn)題鳄逾,實(shí)現(xiàn)多進(jìn)程主要的類是Process,其他輔助的類跟threading模塊中的類似,進(jìn)程間共享數(shù)據(jù)可以使用管道万皿、套接字等,在multiprocessing模塊中有一個(gè)Queue類核行,它基于管道和鎖機(jī)制提供了多個(gè)進(jìn)程共享的隊(duì)列牢硅。
2.1 基本使用
重點(diǎn)在于多了Process()
類
from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep
def download_task(filename):
print('啟動(dòng)下載進(jìn)程,進(jìn)程號(hào)[%d].' % getpid())
print('開(kāi)始下載%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))
def main():
start = time()
p1 = Process(target=download_task, args=('Python從入門到住院.pdf', ))
p1.start()
p2 = Process(target=download_task, args=('Peking Hot.avi', ))
p2.start()
p1.join()
p2.join()
end = time()
print('總共耗費(fèi)了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
運(yùn)行結(jié)果
啟動(dòng)下載進(jìn)程芝雪,進(jìn)程號(hào)[6392].
開(kāi)始下載Python從入門到住院.pdf...
啟動(dòng)下載進(jìn)程减余,進(jìn)程號(hào)[12640].
開(kāi)始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費(fèi)了8秒
Python從入門到住院.pdf下載完成! 耗費(fèi)了10秒
總共耗費(fèi)了10.21秒.
2.2 進(jìn)程間通信
啟動(dòng)兩個(gè)進(jìn)程,一個(gè)輸出Ping惩系,一個(gè)輸出Pong位岔,兩個(gè)進(jìn)程輸出的Ping和Pong加起來(lái)一共10個(gè)。
'''
from multiprocessing import Process
from time import sleep
counter = 0
def sub_task(string):
global counter
while counter < 10:
print(string, end='', flush=True)
counter += 1
sleep(0.01)
def main():
Process(target=sub_task, args=('Ping', )).start()
Process(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
main()
'''
若是按照注釋中寫的話堡牡,“結(jié)果是Ping和Pong各輸出了10個(gè)”抒抬,因?yàn)?strong>在程序中創(chuàng)建進(jìn)程的時(shí)候,子進(jìn)程復(fù)制了父進(jìn)程及其所有的數(shù)據(jù)結(jié)構(gòu)晤柄,每個(gè)子進(jìn)程有自己獨(dú)立的內(nèi)存空間擦剑,這也就意味著兩個(gè)子進(jìn)程中各有一個(gè)
counter
變量。
2.2.1 multiprocessing中Queue的方式
在Queue
中芥颈,創(chuàng)建兩個(gè)子進(jìn)程惠勒,一個(gè)往Queue
里寫數(shù)據(jù),一個(gè)從Queue
里讀數(shù)據(jù):
multiprcessing.Queue.put()
為入隊(duì)操作
multiprcessing.Queue.get()
為出隊(duì)操作
隊(duì)列 線程 和 進(jìn)程 安全
put(obj[, block[, timeout]])
put
將對(duì)象放入隊(duì)列爬坑。
- 如果可選參數(shù)
block
為True
(默認(rèn)值)-
timeout
為None
(默認(rèn)值)纠屋,則必要時(shí)阻止,直到空閑插槽可用盾计。 -
timeout
為正數(shù)售担,它將阻止最多超時(shí)秒數(shù)肉康,如果在該時(shí)間內(nèi)沒(méi)有空閑插槽可用,則會(huì)引發(fā)Queue.Full異常灼舍。
-
- 如果可選參數(shù)
block
為False
吼和,如果空閑插槽立即可用,則將一個(gè)項(xiàng)目放在隊(duì)列中骑素,否則會(huì)引發(fā)Queue.Full
異常炫乓。
get([block[, timeout]])
從隊(duì)列中刪除并返回一個(gè)項(xiàng)目。
- 如果可選的
block
為True
(默認(rèn)值)- 超時(shí)為None(默認(rèn)值)献丑,則在必要時(shí)阻止末捣,直到項(xiàng)目可用。
- 如果超時(shí)為正數(shù)创橄,則它將阻塞至多超時(shí)秒數(shù)箩做,并在該時(shí)間內(nèi)沒(méi)有可用項(xiàng)目時(shí)引發(fā)Queue.Empty異常。
- 如果可選的
block
為False
妥畏,如果一個(gè)對(duì)象立即可用邦邦,返回一個(gè)對(duì)象;否則會(huì)引發(fā)Queue.Empty異常醉蚁。
from multiprocessing import Process
from time import sleep
from multiprocessing import Queue
counter = 0
def sub_task(string, q):
while True:
if q.empty():
break
print(string+' NO:' + str(q.get()), flush=True)
sleep(0.01)
def main():
counter = 10
q = Queue(counter)
for _ in range(counter):
q.put(_)
Process(target=sub_task, args=('Ping', q)).start()
Process(target=sub_task, args=('Pong', q)).start()
if __name__ == '__main__':
main()
2.2.2 Pipe管道方式
Pipe()
函數(shù)返回一對(duì)由管道連接的連接對(duì)象燃辖,默認(rèn)情況下是雙工(雙向)。
Pipe()
返回的兩個(gè)連接對(duì)象代表管道的兩端网棍。 每個(gè)連接對(duì)象都有send()
和recv()
方法(等等)黔龟。 請(qǐng)注意,如果兩個(gè)進(jìn)程(或線程)嘗試同時(shí)讀取或?qū)懭牍艿赖耐欢死溺瑁艿乐械臄?shù)據(jù)可能會(huì)損壞氏身。 當(dāng)然,同時(shí)使用管道不同端的過(guò)程也不會(huì)有風(fēng)險(xiǎn)惑畴。
如果duplex=True
(默認(rèn))蛋欣,則管道是雙向的。
如果duplex=False
桨菜,那么管道是單向的:conn1只能用于接收消息豁状,conn2只能用于發(fā)送消息。
from multiprocessing import Process
from time import sleep
from multiprocessing import Queue, Pipe
counter = 10
def sub_task(string, conn):
while True:
flag = conn.recv()
if flag >= counter:
# 要告訴另外一個(gè)進(jìn)程倒得,數(shù)量已經(jīng)夠了
conn.send(flag)
break
print(string+' NO:' + str(flag), flush=True)
sleep(0.01)
conn.send(flag+1)
def main():
conn1, conn2 = Pipe()
conn1.send(0)
Process(target=sub_task, args=('Ping', conn1)).start()
Process(target=sub_task, args=('Pong', conn2)).start()
if __name__ == '__main__':
main()
3. 多線程
Python中提供了Thread類并輔以Lock泻红、Condition、Event霞掺、Semaphore和Barrier谊路。Python中有GIL來(lái)防止多個(gè)線程同時(shí)執(zhí)行本地字節(jié)碼,這個(gè)鎖對(duì)于CPython是必須的菩彬,因?yàn)镃Python的內(nèi)存管理并不是線程安全的缠劝,因?yàn)镚IL的存在多線程并不能發(fā)揮CPU的多核特性潮梯。
3.1 基本使用
目前的多線程開(kāi)發(fā)我們推薦使用threading
模塊,該模塊對(duì)多線程編程提供了更好的面向?qū)ο蟮姆庋b惨恭。
- 函數(shù)方法
- threading.currentThread(): 返回當(dāng)前的線程變量秉馏。
- threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后脱羡、結(jié)束前萝究,不包括啟動(dòng)前和終止后的線程。
- threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量锉罐,與len(threading.enumerate())有相同的結(jié)果帆竹。
- Thread類方法
- run(): 用以表示線程活動(dòng)的方法。
- start():啟動(dòng)線程活動(dòng)脓规。
- join([time]): 等待至線程中止栽连。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
- isAlive(): 返回線程是否活動(dòng)的侨舆。
- getName(): 返回線程名秒紧。
- setName(): 設(shè)置線程名。
from random import randint
from threading import Thread
from time import time, sleep
def download(filename):
print('開(kāi)始下載%s...' % filename)
time_to_download = randint(1, 2)
sleep(time_to_download)
print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))
def main():
start = time()
threads = []
# 創(chuàng)建10個(gè)下載的線程
for _ in range(10):
t = Thread(target=download, args=('Python從入門到住院'+str(_)+'.pdf',))
threads.append(t)
t.start()
# 等所有線程都執(zhí)行完畢
for t in threads:
t.join()
end = time()
print('總共耗費(fèi)了%.3f秒' % (end - start))
if __name__ == '__main__':
main()
運(yùn)行結(jié)果
開(kāi)始下載Python從入門到住院0.pdf...
開(kāi)始下載Python從入門到住院1.pdf...
開(kāi)始下載Python從入門到住院2.pdf...
開(kāi)始下載Python從入門到住院3.pdf...
開(kāi)始下載Python從入門到住院4.pdf...
開(kāi)始下載Python從入門到住院5.pdf...
開(kāi)始下載Python從入門到住院6.pdf...
開(kāi)始下載Python從入門到住院7.pdf...
開(kāi)始下載Python從入門到住院8.pdf...
開(kāi)始下載Python從入門到住院9.pdf...
Python從入門到住院1.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院3.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院5.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院7.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院8.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院9.pdf下載完成! 耗費(fèi)了1秒
Python從入門到住院2.pdf下載完成! 耗費(fèi)了2秒
Python從入門到住院0.pdf下載完成! 耗費(fèi)了2秒
Python從入門到住院4.pdf下載完成! 耗費(fèi)了2秒
Python從入門到住院6.pdf下載完成! 耗費(fèi)了2秒
總共耗費(fèi)了2.004秒
Process finished with exit code 0
3.2 鎖
因?yàn)槎鄠€(gè)線程可以共享進(jìn)程的內(nèi)存空間态罪,因此要實(shí)現(xiàn)多個(gè)線程間的通信相對(duì)簡(jiǎn)單噩茄,大家能想到的最直接的辦法就是設(shè)置一個(gè)全局變量,多個(gè)線程共享這個(gè)全局變量即可复颈。但是當(dāng)多個(gè)線程共享同一個(gè)變量(我們通常稱之為“資源”)的時(shí)候,很有可能產(chǎn)生不可控的結(jié)果從而導(dǎo)致程序失效甚至崩潰沥割。如果一個(gè)資源被多個(gè)線程競(jìng)爭(zhēng)使用耗啦,那么我們通常稱之為“臨界資源”,對(duì)“臨界資源”的訪問(wèn)需要加上保護(hù)机杜,否則資源會(huì)處于“混亂”的狀態(tài)帜讲。下面的例子演示了100個(gè)線程向同一個(gè)銀行賬戶轉(zhuǎn)賬(轉(zhuǎn)入1元錢)的場(chǎng)景,在這個(gè)例子中椒拗,銀行賬戶就是一個(gè)臨界資源似将,在沒(méi)有保護(hù)的情況下我們很有可能會(huì)得到錯(cuò)誤的結(jié)果。
在這種情況下蚀苛,“鎖”就可以派上用場(chǎng)了在验。我們可以通過(guò)“鎖”來(lái)保護(hù)“臨界資源”,只有獲得“鎖”的線程才能訪問(wèn)“臨界資源”堵未,而其他沒(méi)有得到“鎖”的線程只能被阻塞起來(lái)腋舌,直到獲得“鎖”的線程釋放了“鎖”,其他線程才有機(jī)會(huì)獲得“鎖”渗蟹,進(jìn)而訪問(wèn)被保護(hù)的“臨界資源”块饺。
from time import sleep
from threading import Thread, Lock
from concurrent.futures import ThreadPoolExecutor
class UnLockAccount(object):
'''
未加鎖時(shí)
'''
def __init__(self):
self._balance = 0
def deposit(self, money):
# 計(jì)算存款后的余額
new_balance = self._balance + money
# 模擬受理存款業(yè)務(wù)需要0.01秒的時(shí)間
sleep(0.01)
# 修改賬戶余額
self._balance = new_balance
@property
def balance(self):
return self._balance
class LockAccount(object):
'''
加鎖后
'''
def __init__(self):
self._balance = 0
self._lock = Lock()
def deposit(self, money):
# 先獲取鎖才能執(zhí)行后續(xù)的代碼
self._lock.acquire()
try:
new_balance = self._balance + money
sleep(0.01)
self._balance = new_balance
finally:
# 在finally中執(zhí)行釋放鎖的操作保證正常異常鎖都能釋放
self._lock.release()
@property
def balance(self):
return self._balance
class myThread(Thread):
def __init__(self, thread_name, account, money):
super().__init__()
self.thread_name = thread_name
self.account = account
self.money = money
def run(self):
print(self.thread_name+' is saving money')
self.account.deposit(self.money)
def main1():
"""主函數(shù)"""
account = LockAccount()
# 創(chuàng)建線程池
pool = ThreadPoolExecutor(max_workers=10)
futures = []
for _ in range(100):
#================== 創(chuàng)建線程的第3種方式
# 調(diào)用線程池中的線程來(lái)執(zhí)行特定的任務(wù)
future = pool.submit(account.deposit, 1)
futures.append(future)
# 關(guān)閉線程池
pool.shutdown()
for future in futures:
future.result()
print('賬戶余額為: ¥%d元' % account.balance)
def main2():
account = LockAccount()
threads = []
# 創(chuàng)建100個(gè)存款的線程向同一個(gè)賬戶中存錢
for _ in range(100):
t = myThread('thread-'+str(_), account, 1)
threads.append(t)
t.start()
# 等所有存款的線程都執(zhí)行完畢
for t in threads:
t.join()
print('賬戶余額為: ¥%d元' % account.balance)
def main3():
account = LockAccount()
threads = []
# 創(chuàng)建100個(gè)存款的線程向同一個(gè)賬戶中存錢
for _ in range(100):
t = Thread(
target=account.deposit, args=(1, )
)
t.start()
threads.append(t)
for _ in threads:
_.join()
print('賬戶余額為: ¥%d元' % account.balance)
if __name__ == '__main__':
main3()
4.多任務(wù)
把任務(wù)分為計(jì)算密集型和I/O密集型赞辩。
- 計(jì)算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計(jì)算,消耗CPU資源授艰,比如對(duì)視頻進(jìn)行編碼解碼或者格式轉(zhuǎn)換等等辨嗽,這種任務(wù)全靠CPU的運(yùn)算能力,雖然也可以用多任務(wù)完成淮腾,但是任務(wù)越多召庞,花在任務(wù)切換的時(shí)間就越多,CPU執(zhí)行任務(wù)的效率就越低来破。計(jì)算密集型任務(wù)由于主要消耗CPU資源篮灼,這類任務(wù)用Python這樣的腳本語(yǔ)言去執(zhí)行效率通常很低,最能勝任這類任務(wù)的是C語(yǔ)言徘禁。
- I/O密集型任務(wù)主要為存儲(chǔ)介質(zhì)I/O的任務(wù)诅诱,這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時(shí)間都在等待I/O操作完成(因?yàn)镮/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)送朱。對(duì)于I/O密集型任務(wù)娘荡,如果啟動(dòng)多任務(wù),就可以減少I/O等待時(shí)間從而讓CPU高效率的運(yùn)轉(zhuǎn)驶沼。
4.1 分布式進(jìn)程
master.py
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import random, time, queue
from multiprocessing.managers import BaseManager
# 發(fā)送任務(wù)的隊(duì)列:
task_queue = queue.Queue()
# 接收結(jié)果的隊(duì)列:
result_queue = queue.Queue()
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
if __name__ == '__main__':
# 把兩個(gè)Queue都注冊(cè)到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對(duì)象:
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
# 綁定端口5000, 設(shè)置驗(yàn)證碼'abc':
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# 啟動(dòng)Queue:
manager.start()
# 獲得通過(guò)網(wǎng)絡(luò)訪問(wèn)的Queue對(duì)象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個(gè)任務(wù)進(jìn)去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊(duì)列讀取結(jié)果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關(guān)閉:
manager.shutdown()
print('master exit.')
task_worker.py
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue炮沐,所以注冊(cè)時(shí)只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗(yàn)證碼注意保持與task_master.py設(shè)置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網(wǎng)絡(luò)連接:
m.connect()
# 獲取Queue的對(duì)象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊(duì)列取任務(wù),并把結(jié)果寫入result隊(duì)列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except:
print('task queue is empty.')
# 處理結(jié)束:
print('worker1 exit.')
說(shuō)明:多線程和多進(jìn)程的比較回怜。
- 以下情況需要使用多線程:
- 程序需要維護(hù)許多共享的狀態(tài)(尤其是可變狀態(tài))大年,Python中的列表、字典玉雾、集合都是線程安全的翔试,所以使用線程而不是進(jìn)程維護(hù)共享狀態(tài)的代價(jià)相對(duì)較小。
- 程序會(huì)花費(fèi)大量時(shí)間在I/O操作上复旬,沒(méi)有太多并行計(jì)算的需求且不需占用太多的內(nèi)存垦缅。
- 以下情況需要使用多進(jìn)程:
- 程序執(zhí)行計(jì)算密集型任務(wù)(如:字節(jié)碼操作、數(shù)據(jù)處理驹碍、科學(xué)計(jì)算)壁涎。
- 程序的輸入可以并行的分成塊,并且可以將運(yùn)算結(jié)果合并志秃。
- 程序在內(nèi)存使用方面沒(méi)有任何限制且不強(qiáng)依賴于I/O操作(如:讀寫文件怔球、套接字等)。
4.2 單線程+異步I/O
為了解決
- CPU高速執(zhí)行能力和IO設(shè)備的龜速嚴(yán)重不匹配的問(wèn)題洽损;
- 線程數(shù)量過(guò)多庞溜,導(dǎo)致線程切換時(shí)間過(guò)長(zhǎng)的問(wèn)題。
在Python語(yǔ)言中,單線程+異步I/O的編程模型稱為協(xié)程流码,有了協(xié)程的支持又官,就可以基于事件驅(qū)動(dòng)編寫高效的多任務(wù)程序。
- 協(xié)程最大的優(yōu)勢(shì)就是極高的執(zhí)行效率漫试,因?yàn)樽映绦蚯袚Q不是線程切換六敬,而是由程序自身控制,因此驾荣,沒(méi)有線程切換的開(kāi)銷外构。
- 協(xié)程的第二個(gè)優(yōu)勢(shì)就是不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程播掷,也不存在同時(shí)寫變量沖突审编,在協(xié)程中控制共享資源不用加鎖,只需要判斷狀態(tài)就好了歧匈,所以執(zhí)行效率比多線程高很多垒酬。如果想要充分利用CPU的多核特性,最簡(jiǎn)單的方法是多進(jìn)程+協(xié)程件炉,既充分利用多核勘究,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能斟冕。