Mac OS X戴甩,UNIX符喝,Linux,Windows等甜孤,都是多任務(wù)操作系統(tǒng)即操作系統(tǒng)可以同時(shí)運(yùn)行多個(gè)任務(wù)协饲。對(duì)于操作系統(tǒng)來說,一個(gè)任務(wù)就是一個(gè)進(jìn)程(Process)缴川,一個(gè)任務(wù)可分為多個(gè)子任務(wù)茉稠。在一個(gè)進(jìn)程內(nèi)部,要同時(shí)干多件事把夸,就需要同時(shí)運(yùn)行多個(gè)“子任務(wù)”而线,這些“子任務(wù)”稱為子進(jìn)程。一個(gè)進(jìn)程至少有一個(gè)線程恋日。進(jìn)程是系統(tǒng)資源分配的基本單位膀篮,線程操作系統(tǒng)調(diào)度的基本單元。
操作系統(tǒng)的設(shè)計(jì)岂膳,可以歸結(jié)為三點(diǎn):
- 以多進(jìn)程形式誓竿,允許多個(gè)任務(wù)同時(shí)運(yùn)行;
- 以多線程形式谈截,允許將單個(gè)任務(wù)分成多個(gè)子任務(wù)運(yùn)行筷屡;
- 提供協(xié)調(diào)機(jī)制,一方面防止進(jìn)程之間和線程之間產(chǎn)生沖突簸喂,另一方面允許進(jìn)程之間和線程之間共享資源毙死。
多任務(wù)
并發(fā):指的是任務(wù)數(shù)多余cpu核數(shù),通過操作系統(tǒng)的各種任務(wù)調(diào)度算法娘赴,實(shí)現(xiàn)用多個(gè)任務(wù)“一起”執(zhí)行(實(shí)際上總有一些任務(wù)不在執(zhí)行规哲,因?yàn)榍袚Q任務(wù)的速度相當(dāng)快,看上去一起執(zhí)行而已)
并行:指的是任務(wù)數(shù)小于等于cpu核數(shù)诽表,即任務(wù)真的是一起執(zhí)行的
單核cpu實(shí)現(xiàn)多任務(wù)唉锌,時(shí)間片輪轉(zhuǎn)方式。并發(fā)竿奏,假的多進(jìn)程袄简。
多核cpu實(shí)現(xiàn)多任務(wù):并行,真的多進(jìn)程泛啸。
1. 多線程
python可使用多線程實(shí)現(xiàn)多任務(wù)绿语。python提供了thread模塊和threading模塊,thread是低級(jí)模塊候址,后者對(duì)其進(jìn)行了封裝吕粹。一般我們使用threading模塊實(shí)現(xiàn)多線程。一個(gè)進(jìn)程至少啟動(dòng)一個(gè)線程岗仑,該線程稱為主線程(MainThread)匹耕。主線程可以啟動(dòng)新的線程即子線程。
實(shí)現(xiàn)多線程一般有兩種方式:
1.1 threading.Thread類創(chuàng)建線程
Thread類語法結(jié)構(gòu)如下:Thread([group [, target [, name [, args [, kwargs]]]]])
- target:如果傳遞了函數(shù)的引用荠雕,子進(jìn)程就執(zhí)行函數(shù)內(nèi)的代碼
- name:給線程設(shè)定一個(gè)名字稳其,可以不設(shè)定
- args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
- kwargs:給target指定的函數(shù)傳遞命名參數(shù)
- group:指定線程組炸卑,大多數(shù)情況下用不到
Thread類創(chuàng)建的實(shí)例對(duì)象的常用方法:
- start():創(chuàng)建并啟動(dòng)子線程
- join([timeout]):是否等待子線程執(zhí)行結(jié)束或等待多少秒
- is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
from threading import Thread
import time
def info(num):
for x in range(3):
print("--------線程%d執(zhí)行-------" % num)
time.sleep(1)
if __name__ == "__main__":
t1 = Thread(target=info, args=(1,))
t2 = Thread(target=info, args=(2,))
t1.start()
t2.start()
# 執(zhí)行結(jié)果
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
from threading import Thread
import time
def info(num):
for x in range(3):
print("--------線程%d執(zhí)行-------" % num)
time.sleep(1)
if __name__ == "__main__":
t1 = Thread(target=info, args=(1,))
t2 = Thread(target=info, args=(2,))
t1.start()
t1.join() # join()方法阻塞其他線程既鞠,直到t1線程執(zhí)行完才會(huì)執(zhí)行其他進(jìn)程
# time.sleep(5) (也可使用時(shí)間延時(shí)控制進(jìn)程執(zhí)行順序)
t2.start()
# 執(zhí)行結(jié)果
--------線程1執(zhí)行-------
--------線程1執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程2執(zhí)行-------
--------線程2執(zhí)行-------
1.2 自定義線程類
自定義線程類應(yīng)繼承于Threading類。線程實(shí)例調(diào)用start()方法創(chuàng)建并啟動(dòng)線程時(shí)盖文,會(huì)自動(dòng)調(diào)用run()方法嘱蛋。因此編寫線程類時(shí),需要將執(zhí)行的功能代碼封裝到run()內(nèi)部五续。
# 自定義線程類
import time
from threading import Thread
class Mythread(Thread):
def __init__(self, num):
self.num = num
return super().__init__()
def run(self):
for x in range(3):
print("-----線程%d執(zhí)行-----" % self.num)
time.sleep(0.5)
t1 = Mythread(1)
t2 = Mythread(2)
t1.start() # 自動(dòng)調(diào)用run()方法執(zhí)行功能代碼
t2.start()
# 執(zhí)行結(jié)果
-----線程1執(zhí)行-----
-----線程2執(zhí)行-----
-----線程2執(zhí)行-----
-----線程1執(zhí)行-----
-----線程2執(zhí)行-----
-----線程1執(zhí)行-----
1.3 線程同步(互斥鎖)
同一個(gè)進(jìn)程內(nèi)的線程是共享全局變量的洒敏,所以變量可以被任何一個(gè)線程修改。
# 線程間共享全局變量
from threading import Thread
num = 100
num_list = []
def update_data():
global num
for x in range(5):
num += 1
num_list.append(x)
def get_data():
global num
print("num:%d" % num)
print("num_list:%s" % num_list)
t1 = Thread(target=update_data)
t2 = Thread(target=get_data)
t1.start()
t1.join() # 等待線程t1完成全局變量修改任務(wù)
t2.start() # t2線程執(zhí)行打印全局變量值看是否被修改
# 執(zhí)行結(jié)果
num:105
num_list:[0, 1, 2, 3, 4]
如果多個(gè)線程同時(shí)對(duì)同一個(gè)全局變量操作返帕,會(huì)出現(xiàn)資源競(jìng)爭(zhēng)問題桐玻,從而數(shù)據(jù)結(jié)果會(huì)不正確。
# 多個(gè)線程對(duì)一個(gè)全局變量修改
import time
from threading import Thread
num = 0
def addition():
global num
for x in range(1000000):
num += 1
print("")
def subtraction():
global num
for x in range(1000000):
num -= 1
t1 = Thread(target=addition)
t2 = Thread(target=subtraction)
t1.start()
t2.start()
time.sleep(1)
print("num:%d" % num)
# 執(zhí)行結(jié)果:正常情況下應(yīng)該為0
num:-119796
當(dāng)多個(gè)線程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候荆萤,需要進(jìn)行同步控制镊靴。線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖链韭。
互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定偏竟。某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定敞峭,此時(shí)資源的狀態(tài)為“鎖定”踊谋,其他線程不能更改址晕;直到該線程釋放資源班缰,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源∫缮唬互斥鎖保證了每次只有一個(gè)線程進(jìn)行寫入操作抽高,從而保證了多線程情況下數(shù)據(jù)的正確性道盏。
Threading模塊提供Lock類來定義一個(gè)鎖對(duì)象因苹。鎖對(duì)象有兩個(gè)方法,acquire方法表示獲得鎖蛤育,release方法表示釋放鎖宛官。
上鎖解鎖過程:
- 當(dāng)一個(gè)線程調(diào)用鎖的acquire()方法獲得鎖時(shí),鎖就進(jìn)入“l(fā)ocked”狀態(tài)瓦糕。
- 每次只有一個(gè)線程可以獲得鎖底洗。如果此時(shí)另一個(gè)線程試圖獲得這個(gè)鎖,該線程就會(huì)變?yōu)椤癰locked”狀態(tài)咕娄,稱為“阻塞”亥揖,直到擁有鎖的線程調(diào)用鎖的release()方法釋放鎖之后,鎖進(jìn)入“unlocked”狀態(tài)谭胚。
- 線程調(diào)度程序從處于同步阻塞狀態(tài)的線程中選擇一個(gè)來獲得鎖徐块,并使得該線程進(jìn)入運(yùn)行(running)狀態(tài)。
# 線程同步
import time
from threading import Thread, Lock
num = 0
lock = Lock()
def test1():
global num
for x in range(1000000):
lock.acquire()
num += 1
lock.release()
def test2():
global num
for x in range(1000000):
lock.acquire()
num -= 1
lock.release()
t1 = Thread(target=test1)
t2 = Thread(target=test2)
t1.start()
t2.start()
time.sleep(1)
print("num:%d" % num)
# 執(zhí)行結(jié)果
num:0
鎖的好處就是確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行灾而,壞處是阻止了多線程并發(fā)執(zhí)行即包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行胡控,效率下降,盡量保證加鎖的代碼盡可能少旁趟。而且由于可以存在多個(gè)鎖昼激,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí)锡搜,可能會(huì)造成死鎖橙困。
1.4 死鎖
多個(gè)線程共享多個(gè)資源時(shí),若兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方釋放資源耕餐,會(huì)造成死鎖凡傅。
# 死鎖
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對(duì)mutexA上鎖
mutexA.acquire()
# mutexA上鎖后,延時(shí)1秒肠缔,等待另外那個(gè)線程 把mutexB上鎖
print(self.name+'----do1---up----')
time.sleep(1)
# 此時(shí)會(huì)堵塞夏跷,因?yàn)檫@個(gè)mutexB已經(jīng)被另外的線程搶先上鎖了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 對(duì)mutexA解鎖
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì)mutexB上鎖
mutexB.acquire()
# mutexB上鎖后,延時(shí)1秒明未,等待另外那個(gè)線程 把mutexA上鎖
print(self.name+'----do2---up----')
time.sleep(1)
# 此時(shí)會(huì)堵塞槽华,因?yàn)檫@個(gè)mutexA已經(jīng)被另外的線程搶先上鎖了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 對(duì)mutexB解鎖
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
2. 進(jìn)程
一個(gè)程序運(yùn)行起來后,代碼及其利用到的資源稱為進(jìn)程趟妥,進(jìn)程是系統(tǒng)資源分配的基本單位猫态。子進(jìn)程會(huì)復(fù)制父進(jìn)程的代碼、數(shù)據(jù)等資源,所以進(jìn)程之間不共享全局變量(缺點(diǎn)就是多進(jìn)程占用系統(tǒng)資源較多)亲雪。常用在CPU密集型任務(wù)上勇凭,可充分利用Cpu的多個(gè)核心。使用子進(jìn)程替代線程可以有效避免全局解釋器鎖帶來的性能影響匆光。
2.1 fork()函數(shù)
在Linux和Unix系統(tǒng)中套像,fork()函數(shù)用來創(chuàng)建進(jìn)程酿联。普通函數(shù)终息,調(diào)用一次返回一次。fork()函數(shù)調(diào)用一次贞让,返回兩次周崭。fork()函數(shù)創(chuàng)建新的進(jìn)程(子進(jìn)程)。子進(jìn)程是當(dāng)前進(jìn)程(父進(jìn)程)的拷貝喳张,會(huì)復(fù)制父進(jìn)程的代碼段续镇、堆棧段和數(shù)據(jù)段。
對(duì)于父進(jìn)程销部,返回子進(jìn)程的進(jìn)程號(hào)PID摸航,對(duì)于子進(jìn)程返回0【俗可根據(jù)返回值判斷當(dāng)前進(jìn)程是子進(jìn)程或父進(jìn)程酱虎。python的os模塊提供了普遍的操作系統(tǒng)功能。
- os.getpid()用來獲取當(dāng)前進(jìn)程ID擂涛,os.getppid()獲取當(dāng)前進(jìn)程的父進(jìn)程ID读串。
# fork函數(shù)
import os
pid = os.fork()
if pid < 0:
print("process create failed.")
elif pid == 0:
print("當(dāng)前進(jìn)程為子進(jìn)程")
print("父進(jìn)程:%s 子進(jìn)程:%s" % (os.getppid(), os.getpid()))
else:
print("當(dāng)前進(jìn)程為父進(jìn)程")
print("父進(jìn)程:%s 子進(jìn)程:%s" % (os.getpid(), pid))
# 執(zhí)行結(jié)果
當(dāng)前進(jìn)程為父進(jìn)程
父進(jìn)程:891 子進(jìn)程:1990
當(dāng)前進(jìn)程為子進(jìn)程
父進(jìn)程:891 子進(jìn)程:1990
2.2 多進(jìn)程實(shí)現(xiàn)
python提供了multiprocessing模塊實(shí)現(xiàn)多進(jìn)程。multiprocessing模塊提供了一個(gè)Process類來創(chuàng)建一個(gè)進(jìn)程對(duì)象撒妈。
Process語法結(jié)構(gòu)如下:Process([group [, target [, name [, args [, kwargs]]]]])
- target:如果傳遞了函數(shù)的引用恢暖,可以任務(wù)這個(gè)子進(jìn)程就執(zhí)行這里的代碼
- args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
- kwargs:給target指定的函數(shù)傳遞命名參數(shù)
- name:給進(jìn)程設(shè)定一個(gè)名字狰右,可以不設(shè)定
- group:指定進(jìn)程組杰捂,大多數(shù)情況下用不到
Process創(chuàng)建的實(shí)例對(duì)象的常用方法:
- start():?jiǎn)?dòng)子進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程)
- is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
- join([timeout]):是否等待子進(jìn)程執(zhí)行結(jié)束,或等待多少秒
- terminate():不管任務(wù)是否完成棋蚌,立即終止子進(jìn)程
Process創(chuàng)建的實(shí)例對(duì)象的常用屬性:
- name:當(dāng)前進(jìn)程的別名嫁佳,默認(rèn)為Process-N,N為從1開始遞增的整數(shù)
- pid:當(dāng)前進(jìn)程的pid(進(jìn)程號(hào))
# 多進(jìn)程
from multiprocessing import Process
def test1():
for x in range(3):
print("-----------test1-----------")
def test2():
for x in range(3):
print("***********test2***********")
if __name__ == "__main__":
p1 = Process(target=test1)
p2 = Process(target=test2)
p1.start()
p2.start()
# 執(zhí)行結(jié)果
-----------test1-----------
-----------test1-----------
***********test2***********
-----------test1-----------
***********test2***********
***********test2***********
start()方法用于創(chuàng)建并啟動(dòng)進(jìn)程附鸽,join()方法用于阻塞進(jìn)程脱拼。該方法會(huì)阻塞調(diào)用該方法的進(jìn)程之外的其他進(jìn)程,直至該進(jìn)程執(zhí)行完畢坷备。其他進(jìn)程才會(huì)繼續(xù)執(zhí)行熄浓。.
進(jìn)程之間不共享全局變量
# 進(jìn)程間不共享全局變量
from multiprocessing import Process
g_num = []
def update_data_01():
for x in range(5):
g_num.append(x)
print("update_data_01函數(shù)修改完g_num:%s" % g_num)
def update_data_02():
for x in range(10):
g_num.append(x)
print("update_data_02函數(shù)修改完g_num:%s" % g_num)
p1 = Process(target=update_data_01)
p2 = Process(target=update_data_02)
p1.start()
p2.start()
# 執(zhí)行結(jié)果
update_data_01函數(shù)修改完g_num:[0, 1, 2, 3, 4]
update_data_02函數(shù)修改完g_num:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2.3 進(jìn)程間通信
之前說過進(jìn)程之間不共享全局變量,因此多進(jìn)程實(shí)現(xiàn)多任務(wù)時(shí)進(jìn)程之間需要進(jìn)行通信即數(shù)據(jù)傳遞。進(jìn)程間通信可通過管道(Pipe)赌蔑、隊(duì)列(Queue)等多種方式進(jìn)行俯在。Pipe常用在兩個(gè)進(jìn)程間通信,Queue用來在多個(gè)進(jìn)程間實(shí)現(xiàn)通信娃惯。
Queue:即隊(duì)列(FIFO跷乐,先進(jìn)先出)是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作趾浅,而在表的后端(rear)進(jìn)行插入操作愕提。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭皿哨。
初始化Queue()對(duì)象時(shí)(例如:q=Queue())浅侨,若括號(hào)中沒有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值证膨,那么就代表可接受的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)
Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量如输;
Queue.empty():如果隊(duì)列為空,返回True央勒,反之False 不见;
Queue.full():如果隊(duì)列滿了,返回True崔步,反之False稳吮;
-
Queue.get([block[, timeout]]):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除刷晋,block默認(rèn)值為True
- 如果block使用默認(rèn)值盖高,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果為空眼虱,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài))喻奥,直到從消息列隊(duì)讀到消息為止,如果設(shè)置了timeout捏悬,則會(huì)等待timeout秒撞蚕,若還沒讀取到任何消息,則拋出"Queue.Empty"異常过牙;
- 如果block值為False甥厦,消息列隊(duì)如果為空,則會(huì)立刻拋出"Queue.Empty"異常寇钉;
Queue.get_nowait():相當(dāng)Queue.get(False)刀疙;
-
Queue.put(item,[block[, timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True扫倡;
- 如果block使用默認(rèn)值谦秧,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒有空間可寫入,此時(shí)程序?qū)⒈蛔枞ㄍT趯懭霠顟B(tài))疚鲤,直到從消息列隊(duì)騰出空間為止锥累,如果設(shè)置了timeout,則會(huì)等待timeout秒集歇,若還沒空間桶略,則拋出"Queue.Full"異常;
- 如果block值為False诲宇,消息列隊(duì)如果沒有空間可寫入际歼,則會(huì)立刻拋出"Queue.Full"異常;
Queue.put_nowait(item):相當(dāng)Queue.put(item, False)焕窝;
下面示例:兩個(gè)進(jìn)程向隊(duì)列中寫入數(shù)據(jù)蹬挺,一個(gè)進(jìn)程從隊(duì)列中取出數(shù)據(jù)。
# 進(jìn)程間通信:隊(duì)列
from multiprocessing import Process, Queue
import time
def put_data(sequence):
infos = ["fengdi", "yuxi", "python", "java", "hello world"]
for info in infos:
print("隊(duì)列中添加數(shù)據(jù):%s" % info)
sequence.put(info)
time.sleep(0.5)
if queue.full():
break
def get_data(sequence):
while True:
info = sequence.get()
print("隊(duì)列中取出數(shù)據(jù):%s" % info)
time.sleep(0.5)
if queue.empty():
break
if __name__ == "__main__":
queue = Queue(5) # 定義一個(gè)長(zhǎng)度為5的隊(duì)列
p1 = Process(target=put_data, args=(queue,))
p2 = Process(target=get_data, args=(queue,))
p1.start()
p2.start()
# 執(zhí)行結(jié)果
隊(duì)列中添加數(shù)據(jù):fengdi
隊(duì)列中取出數(shù)據(jù):fengdi
隊(duì)列中添加數(shù)據(jù):yuxi
隊(duì)列中取出數(shù)據(jù):yuxi
隊(duì)列中添加數(shù)據(jù):python
隊(duì)列中取出數(shù)據(jù):python
隊(duì)列中添加數(shù)據(jù):java
隊(duì)列中取出數(shù)據(jù):java
隊(duì)列中添加數(shù)據(jù):hello world
隊(duì)列中取出數(shù)據(jù):hello world
Pipe:常用來在兩個(gè)進(jìn)程間通信它掂,兩個(gè)進(jìn)程分別位于管道兩端。Pipe()方法返回元組(conn1, conn2)溯泣,代表一個(gè)管道的兩端虐秋。該方法有duplex參數(shù),默認(rèn)為True垃沦,即全雙工模式<即兩端均可收發(fā)客给。若為False,則conn1接受消息肢簿,conn2發(fā)送消息靶剑。send和recv方法分別發(fā)送和接受消息。
# 進(jìn)程間通信:管道
import os, time, random
from multiprocessing import Process, Pipe
def proc_send(pipe, urls):
for url in urls:
print('Process(%s) send: %s' % (os.getpid(), url))
pipe.send(url)
time.sleep(random.random())
def proc_recv(pipe):
while True:
print('Process(%s) recv:%s' % (os.getpid(), pipe.recv()))
time.sleep(random.random())
if __name__ == "__main__":
pipe = Pipe()
p1 = Process(target=proc_send, args=(pipe[0], ['url_' + str(i) for i in range(5)]))
p2 = Process(target=proc_recv, args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()
# 執(zhí)行結(jié)果
Process(20908) send: url_0
Process(20909) recv:url_0
Process(20908) send: url_1
Process(20909) recv:url_1
Process(20908) send: url_2
Process(20909) recv:url_2
Process(20908) send: url_3
Process(20909) recv:url_3
Process(20908) send: url_4
Process(20909) recv:url_4
2.4 進(jìn)程池
python提供進(jìn)程池方式池充,可批量創(chuàng)建多個(gè)進(jìn)程桩引,進(jìn)程池適用于多個(gè)任務(wù)的情況。初始化Pool時(shí)收夸,可以指定一個(gè)最大進(jìn)程數(shù)坑匠,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿卧惜,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求厘灼;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待咽瓷,直到池中有進(jìn)程結(jié)束设凹,才會(huì)用之前的進(jìn)程來執(zhí)行新的任務(wù)。
multiprocessing.Pool常用函數(shù)解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行茅姜,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程)闪朱,args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;
- close():關(guān)閉Pool监透,使其不再接受新的任務(wù)桶错;
- terminate():不管任務(wù)是否完成,立即終止胀蛮;
- join():主進(jìn)程阻塞院刁,等待子進(jìn)程的退出, 必須在close或terminate之后使用粪狼;
在下面示例中退腥,創(chuàng)建了一個(gè)有5個(gè)進(jìn)程的進(jìn)程池,設(shè)置了10個(gè)要執(zhí)行的任務(wù)再榄。故只有一個(gè)任務(wù)執(zhí)行結(jié)束狡刘,進(jìn)程池空出位置后,另一個(gè)任務(wù)才能執(zhí)行困鸥。
# 進(jìn)程池
from multiprocessing import Pool
def test(num):
print("------任務(wù)%d正在執(zhí)行-------" % num)
# 創(chuàng)建進(jìn)程池
pool = Pool(5)
for x in range(1, 11):
pool.apply_async(test, args=(x, ))
print("***********主進(jìn)程%s開始執(zhí)行***************" % os.getpid())
pool.close()
pool.join()
print("***********主進(jìn)程%s結(jié)束執(zhí)行***************" % os.getpid())
# 執(zhí)行結(jié)果
***********主進(jìn)程20982開始執(zhí)行***************
------任務(wù)1正在執(zhí)行-------
------任務(wù)2正在執(zhí)行-------
------任務(wù)3正在執(zhí)行-------
------任務(wù)6正在執(zhí)行-------
------任務(wù)7正在執(zhí)行-------
------任務(wù)4正在執(zhí)行-------
------任務(wù)8正在執(zhí)行-------
------任務(wù)9正在執(zhí)行-------
------任務(wù)5正在執(zhí)行-------
------任務(wù)10正在執(zhí)行-------
***********主進(jìn)程20982結(jié)束執(zhí)行***************
2.5 進(jìn)程池間通信
進(jìn)程間通信使用隊(duì)列即processing.Queue類創(chuàng)建隊(duì)列對(duì)象嗅蔬,進(jìn)程池進(jìn)程間隊(duì)列通信使用processing.Manager()中的Queue類類創(chuàng)建隊(duì)列對(duì)象。
import time
from multiprocessing import Manager, Pool
def put_data(queue):
info_list = ["python", "java", "php"]
for info in info_list:
print("向隊(duì)列中插入數(shù)據(jù):%s" % info)
queue.put(info)
time.sleep(0.5)
def get_data(queue):
while True:
if queue.empty():
break
info = queue.get()
print("從隊(duì)列中取出數(shù)據(jù):%s" % info)
time.sleep(0.5)
pool = Pool()
queue = Manager().Queue()
pool.apply_async(put_data, args=(queue,))
pool.apply_async(get_data, args=(queue,))
pool.close()
pool.join()
# 執(zhí)行結(jié)果
向隊(duì)列中插入數(shù)據(jù):python
從隊(duì)列中取出數(shù)據(jù):python
向隊(duì)列中插入數(shù)據(jù):java
從隊(duì)列中取出數(shù)據(jù):java
向隊(duì)列中插入數(shù)據(jù):php
從隊(duì)列中取出數(shù)據(jù):php
2.6 進(jìn)程線程區(qū)別
-
定義
- 進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
- 線程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線程共享進(jìn)程所擁有的全部資源.
-
區(qū)別
- 一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.
- 線程的劃分尺度小于進(jìn)程(資源比進(jìn)程少)疾就,使得多線程程序的并發(fā)性高澜术。
- 進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元,而多個(gè)線程共享內(nèi)存猬腰,從而極大地提高了程序的運(yùn)行效率
3. 協(xié)程
協(xié)程是python個(gè)中另外一種實(shí)現(xiàn)多任務(wù)的方式鸟废,只不過比線程更小占用更小執(zhí)行單元(理解為需要的資源)。
通俗的理解:在一個(gè)線程中的某個(gè)函數(shù)姑荷,可以在任何地方保存當(dāng)前函數(shù)的一些臨時(shí)變量等信息盒延,然后切換到另外一個(gè)函數(shù)中執(zhí)行,注意不是通過調(diào)用函數(shù)的方式做到的鼠冕,并且切換的次數(shù)以及什么時(shí)候再切換到原來的函數(shù)都由開發(fā)者自己確定添寺。
協(xié)程和線程差異:在實(shí)現(xiàn)多任務(wù)時(shí), 線程切換從系統(tǒng)層面遠(yuǎn)不止保存和恢復(fù) CPU上下文這么簡(jiǎn)單。 操作系統(tǒng)為了程序運(yùn)行的高效性每個(gè)線程都有自己緩存Cache等等數(shù)據(jù)供鸠,操作系統(tǒng)還會(huì)幫你做這些數(shù)據(jù)的恢復(fù)操作畦贸, 所以線程的切換非常耗性能。但是協(xié)程的切換只是單純的操作CPU的上下文楞捂,所以一秒鐘切換個(gè)上百萬次系統(tǒng)都抗的住薄坏。協(xié)程一般是在一個(gè)線程內(nèi)通過切換調(diào)用函數(shù)實(shí)現(xiàn)。
3.1 greenlet實(shí)現(xiàn)協(xié)程
# 協(xié)程:greenlet
import time
from greenlet import greenlet
def test1():
for x in range(5):
print("---------test1---------")
t2.switch()
def test2():
for x in range(5):
print("*********test2**********")
t1.switch()
t1 = greenlet(test1)
t2 = greenlet(test2)
t1.switch()
# 執(zhí)行結(jié)果
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
3.2 gevent實(shí)現(xiàn)協(xié)程
# 協(xié)程:gevent
import gevent
def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
gevent.sleep(0.5)
g1 = gevent.spawn(f, 3)
g2 = gevent.spawn(f, 3)
g3 = gevent.spawn(f, 3)
g1.join()
g2.join()
g3.join()
# 執(zhí)行結(jié)果
<Greenlet at 0x10f5129d8: f(3)> 0
<Greenlet at 0x10f512488: f(3)> 0
<Greenlet at 0x10f5126a8: f(3)> 0
<Greenlet at 0x10f5129d8: f(3)> 1
<Greenlet at 0x10f512488: f(3)> 1
<Greenlet at 0x10f5126a8: f(3)> 1
<Greenlet at 0x10f5129d8: f(3)> 2
<Greenlet at 0x10f512488: f(3)> 2
<Greenlet at 0x10f5126a8: f(3)> 2
4. 進(jìn)程線程協(xié)程總結(jié)
- 進(jìn)程是資源分配的單位
- 線程是操作系統(tǒng)調(diào)度的單位
- 進(jìn)程切換需要的資源很最大寨闹,效率很低
- 線程切換需要的資源一般胶坠,效率一般(當(dāng)然了在不考慮GIL的情況下)
- 協(xié)程切換任務(wù)資源很小,效率高
- 多進(jìn)程繁堡、多線程根據(jù)cpu核數(shù)不一樣可能是并行的沈善,但是協(xié)程是在一個(gè)線程中所以是并發(fā)
5. GIL(全局解釋器鎖)
GIL(全局解釋器鎖)是python的歷史遺留問題乡数,僅在cpython解釋器里面存在。所以在執(zhí)行多線程操作時(shí)闻牡,多個(gè)線程只能交替執(zhí)行即同一時(shí)刻在只有有個(gè)線程在執(zhí)行净赴,不能調(diào)用多個(gè)CPU內(nèi)核,只能利用一個(gè)內(nèi)核罩润。因此執(zhí)行CPU密集型任務(wù)時(shí)玖翅,多使用多進(jìn)程實(shí)現(xiàn),可利用多個(gè)CPU內(nèi)核割以;執(zhí)行IO密集型任務(wù)時(shí)金度,多使用多線程實(shí)現(xiàn),可明顯提高效率(多線程遇到IO阻塞會(huì)自動(dòng)釋放GIL鎖)严沥。下面我們編寫一個(gè)示例:線程數(shù)和CPU核心數(shù)(2核)相同猜极。驗(yàn)證多線程執(zhí)行時(shí),利用幾個(gè)CPU內(nèi)核消玄?
# GIL
from threading import Thread
def test():
while True:
pass
t1 = Thread(test)
t1.start()
while True:
pass
在上面代碼中兩個(gè)個(gè)線程執(zhí)行的函數(shù)均為死循環(huán)跟伏,兩個(gè)進(jìn)程執(zhí)行時(shí),最終的CPU占用率在50%左右莱找。也就是說雖然同時(shí)運(yùn)行了2個(gè)進(jìn)程酬姆,但實(shí)際上只使用了1個(gè)CPU內(nèi)核。這正是GIL的原因奥溺。
要想解決多線程執(zhí)行中的GIL問題可以:
- 更換解釋器,GIL僅存在于cpython解釋器中
- 使用其他語言編寫線程任務(wù)代碼