python多任務(wù):進(jìn)程線程協(xié)程

Mac OS X戴甩,UNIX符喝,Linux,Windows等甜孤,都是多任務(wù)操作系統(tǒng)即操作系統(tǒng)可以同時(shí)運(yùn)行多個(gè)任務(wù)协饲。對(duì)于操作系統(tǒng)來說,一個(gè)任務(wù)就是一個(gè)進(jìn)程(Process)缴川,一個(gè)任務(wù)可分為多個(gè)子任務(wù)茉稠。在一個(gè)進(jìn)程內(nèi)部,要同時(shí)干多件事把夸,就需要同時(shí)運(yùn)行多個(gè)“子任務(wù)”而线,這些“子任務(wù)”稱為子進(jìn)程。一個(gè)進(jìn)程至少有一個(gè)線程恋日。進(jìn)程是系統(tǒng)資源分配的基本單位膀篮,線程操作系統(tǒng)調(diào)度的基本單元。

操作系統(tǒng)的設(shè)計(jì)岂膳,可以歸結(jié)為三點(diǎn):

  • 以多進(jìn)程形式誓竿,允許多個(gè)任務(wù)同時(shí)運(yùn)行;
  • 以多線程形式谈截,允許將單個(gè)任務(wù)分成多個(gè)子任務(wù)運(yùn)行筷屡;
  • 提供協(xié)調(diào)機(jī)制,一方面防止進(jìn)程之間和線程之間產(chǎn)生沖突簸喂,另一方面允許進(jìn)程之間和線程之間共享資源毙死。

多任務(wù)

  • 并發(fā):指的是任務(wù)數(shù)多余cpu核數(shù),通過操作系統(tǒng)的各種任務(wù)調(diào)度算法娘赴,實(shí)現(xiàn)用多個(gè)任務(wù)“一起”執(zhí)行(實(shí)際上總有一些任務(wù)不在執(zhí)行规哲,因?yàn)榍袚Q任務(wù)的速度相當(dāng)快,看上去一起執(zhí)行而已)

  • 并行:指的是任務(wù)數(shù)小于等于cpu核數(shù)诽表,即任務(wù)真的是一起執(zhí)行的

  • 單核cpu實(shí)現(xiàn)多任務(wù)唉锌,時(shí)間片輪轉(zhuǎn)方式。并發(fā)竿奏,假的多進(jìn)程袄简。

  • 多核cpu實(shí)現(xiàn)多任務(wù):并行,真的多進(jìn)程泛啸。

1. 多線程

python可使用多線程實(shí)現(xiàn)多任務(wù)绿语。python提供了thread模塊和threading模塊,thread是低級(jí)模塊候址,后者對(duì)其進(jìn)行了封裝吕粹。一般我們使用threading模塊實(shí)現(xiàn)多線程。一個(gè)進(jìn)程至少啟動(dòng)一個(gè)線程岗仑,該線程稱為主線程(MainThread)匹耕。主線程可以啟動(dòng)新的線程即子線程。

實(shí)現(xiàn)多線程一般有兩種方式:

1.1 threading.Thread類創(chuàng)建線程

Thread類語法結(jié)構(gòu)如下:Thread([group [, target [, name [, args [, kwargs]]]]])

  • target:如果傳遞了函數(shù)的引用荠雕,子進(jìn)程就執(zhí)行函數(shù)內(nèi)的代碼
  • name:給線程設(shè)定一個(gè)名字稳其,可以不設(shè)定
  • args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
  • kwargs:給target指定的函數(shù)傳遞命名參數(shù)
  • group:指定線程組炸卑,大多數(shù)情況下用不到

Thread類創(chuàng)建的實(shí)例對(duì)象的常用方法:

  • start():創(chuàng)建并啟動(dòng)子線程
  • join([timeout]):是否等待子線程執(zhí)行結(jié)束或等待多少秒
  • is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
from threading import Thread
import time

def info(num):
    for x in range(3):
        print("--------線程%d執(zhí)行-------" % num)
        time.sleep(1)
    
if __name__ == "__main__":
    t1 = Thread(target=info, args=(1,))
    t2 = Thread(target=info, args=(2,))
    t1.start() 
    t2.start()

# 執(zhí)行結(jié)果
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
from threading import Thread
import time

def info(num):
    for x in range(3):
        print("--------線程%d執(zhí)行-------" % num)
        time.sleep(1)
    
if __name__ == "__main__":
    t1 = Thread(target=info, args=(1,))
    t2 = Thread(target=info, args=(2,))
    t1.start() 
    t1.join()  # join()方法阻塞其他線程既鞠,直到t1線程執(zhí)行完才會(huì)執(zhí)行其他進(jìn)程  
    # time.sleep(5)  (也可使用時(shí)間延時(shí)控制進(jìn)程執(zhí)行順序)
    t2.start()

# 執(zhí)行結(jié)果
--------線程1執(zhí)行-------
--------線程1執(zhí)行-------
--------線程1執(zhí)行-------
--------線程2執(zhí)行-------
--------線程2執(zhí)行-------
--------線程2執(zhí)行-------
1.2 自定義線程類

自定義線程類應(yīng)繼承于Threading類。線程實(shí)例調(diào)用start()方法創(chuàng)建并啟動(dòng)線程時(shí)盖文,會(huì)自動(dòng)調(diào)用run()方法嘱蛋。因此編寫線程類時(shí),需要將執(zhí)行的功能代碼封裝到run()內(nèi)部五续。

# 自定義線程類
import time
from threading import Thread

class Mythread(Thread):
    
    def __init__(self, num):
        self.num = num
        return super().__init__()
    
    def run(self):
        for x in range(3):
            print("-----線程%d執(zhí)行-----" % self.num)
            time.sleep(0.5)
            
t1 = Mythread(1)
t2 = Mythread(2)
t1.start()  # 自動(dòng)調(diào)用run()方法執(zhí)行功能代碼
t2.start()

# 執(zhí)行結(jié)果
-----線程1執(zhí)行-----
-----線程2執(zhí)行-----
-----線程2執(zhí)行-----
-----線程1執(zhí)行-----
-----線程2執(zhí)行-----
-----線程1執(zhí)行-----
1.3 線程同步(互斥鎖)

同一個(gè)進(jìn)程內(nèi)的線程是共享全局變量的洒敏,所以變量可以被任何一個(gè)線程修改。

# 線程間共享全局變量
from threading import Thread

num = 100
num_list = []

def update_data():
    global num
    for x in range(5):
        num += 1
        num_list.append(x)
        
def get_data():
    global num
    print("num:%d" % num)
    print("num_list:%s" % num_list)
    
t1 = Thread(target=update_data)
t2 = Thread(target=get_data)

t1.start()
t1.join() # 等待線程t1完成全局變量修改任務(wù)
t2.start()  # t2線程執(zhí)行打印全局變量值看是否被修改

# 執(zhí)行結(jié)果
num:105
num_list:[0, 1, 2, 3, 4]

如果多個(gè)線程同時(shí)對(duì)同一個(gè)全局變量操作返帕,會(huì)出現(xiàn)資源競(jìng)爭(zhēng)問題桐玻,從而數(shù)據(jù)結(jié)果會(huì)不正確。

# 多個(gè)線程對(duì)一個(gè)全局變量修改
import time
from threading import Thread

num = 0

def addition():
    global num
    for x in range(1000000):
        num += 1
    print("")
        
def subtraction():
    global num
    for x in range(1000000):
        num -= 1

t1 = Thread(target=addition)
t2 = Thread(target=subtraction)

t1.start()
t2.start() 
time.sleep(1)

print("num:%d" % num)

# 執(zhí)行結(jié)果:正常情況下應(yīng)該為0 
num:-119796 

當(dāng)多個(gè)線程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候荆萤,需要進(jìn)行同步控制镊靴。線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖链韭。

互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定偏竟。某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定敞峭,此時(shí)資源的狀態(tài)為“鎖定”踊谋,其他線程不能更改址晕;直到該線程釋放資源班缰,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源∫缮唬互斥鎖保證了每次只有一個(gè)線程進(jìn)行寫入操作抽高,從而保證了多線程情況下數(shù)據(jù)的正確性道盏。

Threading模塊提供Lock類來定義一個(gè)鎖對(duì)象因苹。鎖對(duì)象有兩個(gè)方法,acquire方法表示獲得鎖蛤育,release方法表示釋放鎖宛官。

上鎖解鎖過程:

  • 當(dāng)一個(gè)線程調(diào)用鎖的acquire()方法獲得鎖時(shí),鎖就進(jìn)入“l(fā)ocked”狀態(tài)瓦糕。
  • 每次只有一個(gè)線程可以獲得鎖底洗。如果此時(shí)另一個(gè)線程試圖獲得這個(gè)鎖,該線程就會(huì)變?yōu)椤癰locked”狀態(tài)咕娄,稱為“阻塞”亥揖,直到擁有鎖的線程調(diào)用鎖的release()方法釋放鎖之后,鎖進(jìn)入“unlocked”狀態(tài)谭胚。
  • 線程調(diào)度程序從處于同步阻塞狀態(tài)的線程中選擇一個(gè)來獲得鎖徐块,并使得該線程進(jìn)入運(yùn)行(running)狀態(tài)。
# 線程同步
import time
from threading import Thread, Lock

num = 0
lock = Lock()

def test1():
    global num
    for x in range(1000000):
        lock.acquire()
        num += 1
        lock.release()
        
def test2():
    global num
    for x in range(1000000):
        lock.acquire()
        num -= 1
        lock.release()

t1 = Thread(target=test1)
t2 = Thread(target=test2)

t1.start()
t2.start() 
time.sleep(1)

print("num:%d" % num)

# 執(zhí)行結(jié)果 
num:0 

鎖的好處就是確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行灾而,壞處是阻止了多線程并發(fā)執(zhí)行即包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行胡控,效率下降,盡量保證加鎖的代碼盡可能少旁趟。而且由于可以存在多個(gè)鎖昼激,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí)锡搜,可能會(huì)造成死鎖橙困。

1.4 死鎖

多個(gè)線程共享多個(gè)資源時(shí),若兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方釋放資源耕餐,會(huì)造成死鎖凡傅。

# 死鎖
import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # 對(duì)mutexA上鎖
        mutexA.acquire()

        # mutexA上鎖后,延時(shí)1秒肠缔,等待另外那個(gè)線程 把mutexB上鎖
        print(self.name+'----do1---up----')
        time.sleep(1)

        # 此時(shí)會(huì)堵塞夏跷,因?yàn)檫@個(gè)mutexB已經(jīng)被另外的線程搶先上鎖了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()

        # 對(duì)mutexA解鎖
        mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        # 對(duì)mutexB上鎖
        mutexB.acquire()

        # mutexB上鎖后,延時(shí)1秒明未,等待另外那個(gè)線程 把mutexA上鎖
        print(self.name+'----do2---up----')
        time.sleep(1)

        # 此時(shí)會(huì)堵塞槽华,因?yàn)檫@個(gè)mutexA已經(jīng)被另外的線程搶先上鎖了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()

        # 對(duì)mutexB解鎖
        mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

2. 進(jìn)程

一個(gè)程序運(yùn)行起來后,代碼及其利用到的資源稱為進(jìn)程趟妥,進(jìn)程是系統(tǒng)資源分配的基本單位猫态。子進(jìn)程會(huì)復(fù)制父進(jìn)程的代碼、數(shù)據(jù)等資源,所以進(jìn)程之間不共享全局變量(缺點(diǎn)就是多進(jìn)程占用系統(tǒng)資源較多)亲雪。常用在CPU密集型任務(wù)上勇凭,可充分利用Cpu的多個(gè)核心。使用子進(jìn)程替代線程可以有效避免全局解釋器鎖帶來的性能影響匆光。

2.1 fork()函數(shù)

在Linux和Unix系統(tǒng)中套像,fork()函數(shù)用來創(chuàng)建進(jìn)程酿联。普通函數(shù)终息,調(diào)用一次返回一次。fork()函數(shù)調(diào)用一次贞让,返回兩次周崭。fork()函數(shù)創(chuàng)建新的進(jìn)程(子進(jìn)程)。子進(jìn)程是當(dāng)前進(jìn)程(父進(jìn)程)的拷貝喳张,會(huì)復(fù)制父進(jìn)程的代碼段续镇、堆棧段和數(shù)據(jù)段。

對(duì)于父進(jìn)程销部,返回子進(jìn)程的進(jìn)程號(hào)PID摸航,對(duì)于子進(jìn)程返回0【俗可根據(jù)返回值判斷當(dāng)前進(jìn)程是子進(jìn)程或父進(jìn)程酱虎。python的os模塊提供了普遍的操作系統(tǒng)功能。

  • os.getpid()用來獲取當(dāng)前進(jìn)程ID擂涛,os.getppid()獲取當(dāng)前進(jìn)程的父進(jìn)程ID读串。
# fork函數(shù)
import os

pid = os.fork()

if pid < 0:
    print("process create failed.")
elif pid == 0:
    print("當(dāng)前進(jìn)程為子進(jìn)程")
    print("父進(jìn)程:%s   子進(jìn)程:%s" % (os.getppid(), os.getpid()))
else:
    print("當(dāng)前進(jìn)程為父進(jìn)程")
    print("父進(jìn)程:%s   子進(jìn)程:%s" % (os.getpid(), pid))

# 執(zhí)行結(jié)果
當(dāng)前進(jìn)程為父進(jìn)程
父進(jìn)程:891   子進(jìn)程:1990
當(dāng)前進(jìn)程為子進(jìn)程
父進(jìn)程:891   子進(jìn)程:1990
2.2 多進(jìn)程實(shí)現(xiàn)

python提供了multiprocessing模塊實(shí)現(xiàn)多進(jìn)程。multiprocessing模塊提供了一個(gè)Process類來創(chuàng)建一個(gè)進(jìn)程對(duì)象撒妈。

Process語法結(jié)構(gòu)如下:Process([group [, target [, name [, args [, kwargs]]]]])

  • target:如果傳遞了函數(shù)的引用恢暖,可以任務(wù)這個(gè)子進(jìn)程就執(zhí)行這里的代碼
  • args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
  • kwargs:給target指定的函數(shù)傳遞命名參數(shù)
  • name:給進(jìn)程設(shè)定一個(gè)名字狰右,可以不設(shè)定
  • group:指定進(jìn)程組杰捂,大多數(shù)情況下用不到

Process創(chuàng)建的實(shí)例對(duì)象的常用方法:

  • start():?jiǎn)?dòng)子進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程)
  • is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
  • join([timeout]):是否等待子進(jìn)程執(zhí)行結(jié)束,或等待多少秒
  • terminate():不管任務(wù)是否完成棋蚌,立即終止子進(jìn)程

Process創(chuàng)建的實(shí)例對(duì)象的常用屬性:

  • name:當(dāng)前進(jìn)程的別名嫁佳,默認(rèn)為Process-N,N為從1開始遞增的整數(shù)
  • pid:當(dāng)前進(jìn)程的pid(進(jìn)程號(hào))
# 多進(jìn)程
from multiprocessing import Process

def test1():
    for x in range(3):
        print("-----------test1-----------")
        
def test2():
    for x in range(3):
        print("***********test2***********")
        
if __name__ == "__main__":
    p1 = Process(target=test1)
    p2 = Process(target=test2)
    p1.start()
    p2.start()
    
# 執(zhí)行結(jié)果
-----------test1-----------
-----------test1-----------
***********test2***********
-----------test1-----------
***********test2***********
***********test2***********

start()方法用于創(chuàng)建并啟動(dòng)進(jìn)程附鸽,join()方法用于阻塞進(jìn)程脱拼。該方法會(huì)阻塞調(diào)用該方法的進(jìn)程之外的其他進(jìn)程,直至該進(jìn)程執(zhí)行完畢坷备。其他進(jìn)程才會(huì)繼續(xù)執(zhí)行熄浓。.

進(jìn)程之間不共享全局變量

# 進(jìn)程間不共享全局變量
from multiprocessing import Process

g_num = []

def update_data_01():
    for x in range(5):
        g_num.append(x)
    print("update_data_01函數(shù)修改完g_num:%s" % g_num)
        
def update_data_02():
    for x in range(10):
        g_num.append(x)
    print("update_data_02函數(shù)修改完g_num:%s" % g_num)
        
p1 = Process(target=update_data_01)
p2 = Process(target=update_data_02)
p1.start()
p2.start()

# 執(zhí)行結(jié)果
update_data_01函數(shù)修改完g_num:[0, 1, 2, 3, 4]
update_data_02函數(shù)修改完g_num:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2.3 進(jìn)程間通信

之前說過進(jìn)程之間不共享全局變量,因此多進(jìn)程實(shí)現(xiàn)多任務(wù)時(shí)進(jìn)程之間需要進(jìn)行通信即數(shù)據(jù)傳遞。進(jìn)程間通信可通過管道(Pipe)赌蔑、隊(duì)列(Queue)等多種方式進(jìn)行俯在。Pipe常用在兩個(gè)進(jìn)程間通信,Queue用來在多個(gè)進(jìn)程間實(shí)現(xiàn)通信娃惯。

Queue:即隊(duì)列(FIFO跷乐,先進(jìn)先出)是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作趾浅,而在表的后端(rear)進(jìn)行插入操作愕提。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭皿哨。

初始化Queue()對(duì)象時(shí)(例如:q=Queue())浅侨,若括號(hào)中沒有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值证膨,那么就代表可接受的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)

  • Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量如输;

  • Queue.empty():如果隊(duì)列為空,返回True央勒,反之False 不见;

  • Queue.full():如果隊(duì)列滿了,返回True崔步,反之False稳吮;

  • Queue.get([block[, timeout]]):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除刷晋,block默認(rèn)值為True

    • 如果block使用默認(rèn)值盖高,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果為空眼虱,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài))喻奥,直到從消息列隊(duì)讀到消息為止,如果設(shè)置了timeout捏悬,則會(huì)等待timeout秒撞蚕,若還沒讀取到任何消息,則拋出"Queue.Empty"異常过牙;
    • 如果block值為False甥厦,消息列隊(duì)如果為空,則會(huì)立刻拋出"Queue.Empty"異常寇钉;
  • Queue.get_nowait():相當(dāng)Queue.get(False)刀疙;

  • Queue.put(item,[block[, timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True扫倡;

    • 如果block使用默認(rèn)值谦秧,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒有空間可寫入,此時(shí)程序?qū)⒈蛔枞ㄍT趯懭霠顟B(tài))疚鲤,直到從消息列隊(duì)騰出空間為止锥累,如果設(shè)置了timeout,則會(huì)等待timeout秒集歇,若還沒空間桶略,則拋出"Queue.Full"異常;
    • 如果block值為False诲宇,消息列隊(duì)如果沒有空間可寫入际歼,則會(huì)立刻拋出"Queue.Full"異常;
  • Queue.put_nowait(item):相當(dāng)Queue.put(item, False)焕窝;

下面示例:兩個(gè)進(jìn)程向隊(duì)列中寫入數(shù)據(jù)蹬挺,一個(gè)進(jìn)程從隊(duì)列中取出數(shù)據(jù)。

# 進(jìn)程間通信:隊(duì)列
from multiprocessing import Process, Queue
import time

def put_data(sequence):
    infos = ["fengdi", "yuxi", "python", "java", "hello world"]
    for info in infos:
        print("隊(duì)列中添加數(shù)據(jù):%s" % info)
        sequence.put(info)
        time.sleep(0.5)
        if queue.full():
            break

def get_data(sequence):
    while True:
        info = sequence.get()
        print("隊(duì)列中取出數(shù)據(jù):%s" % info)
        time.sleep(0.5)
        if queue.empty():
            break

if __name__ == "__main__":
    queue = Queue(5)  # 定義一個(gè)長(zhǎng)度為5的隊(duì)列

    p1 = Process(target=put_data, args=(queue,))
    p2 = Process(target=get_data, args=(queue,))

    p1.start()
    p2.start()
    
# 執(zhí)行結(jié)果
隊(duì)列中添加數(shù)據(jù):fengdi
隊(duì)列中取出數(shù)據(jù):fengdi
隊(duì)列中添加數(shù)據(jù):yuxi
隊(duì)列中取出數(shù)據(jù):yuxi
隊(duì)列中添加數(shù)據(jù):python
隊(duì)列中取出數(shù)據(jù):python
隊(duì)列中添加數(shù)據(jù):java
隊(duì)列中取出數(shù)據(jù):java
隊(duì)列中添加數(shù)據(jù):hello world
隊(duì)列中取出數(shù)據(jù):hello world

Pipe:常用來在兩個(gè)進(jìn)程間通信它掂,兩個(gè)進(jìn)程分別位于管道兩端。Pipe()方法返回元組(conn1, conn2)溯泣,代表一個(gè)管道的兩端虐秋。該方法有duplex參數(shù),默認(rèn)為True垃沦,即全雙工模式<即兩端均可收發(fā)客给。若為False,則conn1接受消息肢簿,conn2發(fā)送消息靶剑。send和recv方法分別發(fā)送和接受消息。

# 進(jìn)程間通信:管道
import os, time, random
from multiprocessing import Process, Pipe

def proc_send(pipe, urls):
    for url in urls:
        print('Process(%s) send: %s' % (os.getpid(), url))
        pipe.send(url)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print('Process(%s) recv:%s' % (os.getpid(), pipe.recv()))
        time.sleep(random.random())

if __name__ == "__main__":
    pipe = Pipe()
    p1 = Process(target=proc_send, args=(pipe[0], ['url_' + str(i) for i in range(5)]))
    p2 = Process(target=proc_recv, args=(pipe[1], ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
# 執(zhí)行結(jié)果
Process(20908) send: url_0
Process(20909) recv:url_0
Process(20908) send: url_1
Process(20909) recv:url_1
Process(20908) send: url_2
Process(20909) recv:url_2
Process(20908) send: url_3
Process(20909) recv:url_3
Process(20908) send: url_4
Process(20909) recv:url_4
2.4 進(jìn)程池

python提供進(jìn)程池方式池充,可批量創(chuàng)建多個(gè)進(jìn)程桩引,進(jìn)程池適用于多個(gè)任務(wù)的情況。初始化Pool時(shí)收夸,可以指定一個(gè)最大進(jìn)程數(shù)坑匠,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿卧惜,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求厘灼;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待咽瓷,直到池中有進(jìn)程結(jié)束设凹,才會(huì)用之前的進(jìn)程來執(zhí)行新的任務(wù)。

multiprocessing.Pool常用函數(shù)解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行茅姜,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程)闪朱,args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;
  • close():關(guān)閉Pool监透,使其不再接受新的任務(wù)桶错;
  • terminate():不管任務(wù)是否完成,立即終止胀蛮;
  • join():主進(jìn)程阻塞院刁,等待子進(jìn)程的退出, 必須在close或terminate之后使用粪狼;

在下面示例中退腥,創(chuàng)建了一個(gè)有5個(gè)進(jìn)程的進(jìn)程池,設(shè)置了10個(gè)要執(zhí)行的任務(wù)再榄。故只有一個(gè)任務(wù)執(zhí)行結(jié)束狡刘,進(jìn)程池空出位置后,另一個(gè)任務(wù)才能執(zhí)行困鸥。

# 進(jìn)程池
from multiprocessing import Pool

def test(num):
    print("------任務(wù)%d正在執(zhí)行-------" % num)
    
# 創(chuàng)建進(jìn)程池

pool = Pool(5)

for x in range(1, 11):
    pool.apply_async(test, args=(x, ))
    
print("***********主進(jìn)程%s開始執(zhí)行***************" % os.getpid())

pool.close()
pool.join()

print("***********主進(jìn)程%s結(jié)束執(zhí)行***************" % os.getpid())

# 執(zhí)行結(jié)果
***********主進(jìn)程20982開始執(zhí)行***************
------任務(wù)1正在執(zhí)行-------
------任務(wù)2正在執(zhí)行-------
------任務(wù)3正在執(zhí)行-------
------任務(wù)6正在執(zhí)行-------
------任務(wù)7正在執(zhí)行-------
------任務(wù)4正在執(zhí)行-------
------任務(wù)8正在執(zhí)行-------
------任務(wù)9正在執(zhí)行-------
------任務(wù)5正在執(zhí)行-------
------任務(wù)10正在執(zhí)行-------
***********主進(jìn)程20982結(jié)束執(zhí)行***************
2.5 進(jìn)程池間通信

進(jìn)程間通信使用隊(duì)列即processing.Queue類創(chuàng)建隊(duì)列對(duì)象嗅蔬,進(jìn)程池進(jìn)程間隊(duì)列通信使用processing.Manager()中的Queue類類創(chuàng)建隊(duì)列對(duì)象。

import time
from multiprocessing import Manager, Pool

def put_data(queue):
    info_list = ["python", "java", "php"]
    for info in info_list:
        print("向隊(duì)列中插入數(shù)據(jù):%s" % info)
        queue.put(info)
        time.sleep(0.5)

def get_data(queue):
    while True:
        if queue.empty():
            break
        info = queue.get()
        print("從隊(duì)列中取出數(shù)據(jù):%s" % info)
        time.sleep(0.5)

pool = Pool()
queue = Manager().Queue()
pool.apply_async(put_data, args=(queue,))
pool.apply_async(get_data, args=(queue,))

pool.close()
pool.join()

# 執(zhí)行結(jié)果
向隊(duì)列中插入數(shù)據(jù):python
從隊(duì)列中取出數(shù)據(jù):python
向隊(duì)列中插入數(shù)據(jù):java
從隊(duì)列中取出數(shù)據(jù):java
向隊(duì)列中插入數(shù)據(jù):php
從隊(duì)列中取出數(shù)據(jù):php
2.6 進(jìn)程線程區(qū)別
  • 定義

    • 進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
    • 線程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線程共享進(jìn)程所擁有的全部資源.
  • 區(qū)別

    • 一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.
    • 線程的劃分尺度小于進(jìn)程(資源比進(jìn)程少)疾就,使得多線程程序的并發(fā)性高澜术。
    • 進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元,而多個(gè)線程共享內(nèi)存猬腰,從而極大地提高了程序的運(yùn)行效率

3. 協(xié)程

協(xié)程是python個(gè)中另外一種實(shí)現(xiàn)多任務(wù)的方式鸟废,只不過比線程更小占用更小執(zhí)行單元(理解為需要的資源)。

通俗的理解:在一個(gè)線程中的某個(gè)函數(shù)姑荷,可以在任何地方保存當(dāng)前函數(shù)的一些臨時(shí)變量等信息盒延,然后切換到另外一個(gè)函數(shù)中執(zhí)行,注意不是通過調(diào)用函數(shù)的方式做到的鼠冕,并且切換的次數(shù)以及什么時(shí)候再切換到原來的函數(shù)都由開發(fā)者自己確定添寺。

協(xié)程和線程差異:在實(shí)現(xiàn)多任務(wù)時(shí), 線程切換從系統(tǒng)層面遠(yuǎn)不止保存和恢復(fù) CPU上下文這么簡(jiǎn)單。 操作系統(tǒng)為了程序運(yùn)行的高效性每個(gè)線程都有自己緩存Cache等等數(shù)據(jù)供鸠,操作系統(tǒng)還會(huì)幫你做這些數(shù)據(jù)的恢復(fù)操作畦贸, 所以線程的切換非常耗性能。但是協(xié)程的切換只是單純的操作CPU的上下文楞捂,所以一秒鐘切換個(gè)上百萬次系統(tǒng)都抗的住薄坏。協(xié)程一般是在一個(gè)線程內(nèi)通過切換調(diào)用函數(shù)實(shí)現(xiàn)。

3.1 greenlet實(shí)現(xiàn)協(xié)程
# 協(xié)程:greenlet
import time
from greenlet import greenlet

def test1():
    for x in range(5):
        print("---------test1---------")
        t2.switch()
        
def test2():
    for x in range(5):
        print("*********test2**********")
        t1.switch()
        
t1 = greenlet(test1)
t2 = greenlet(test2)

t1.switch()

# 執(zhí)行結(jié)果
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
---------test1---------
*********test2**********
3.2 gevent實(shí)現(xiàn)協(xié)程
# 協(xié)程:gevent
import gevent

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        gevent.sleep(0.5)


g1 = gevent.spawn(f, 3)
g2 = gevent.spawn(f, 3)
g3 = gevent.spawn(f, 3)

g1.join()
g2.join()
g3.join()

# 執(zhí)行結(jié)果
<Greenlet at 0x10f5129d8: f(3)> 0
<Greenlet at 0x10f512488: f(3)> 0
<Greenlet at 0x10f5126a8: f(3)> 0
<Greenlet at 0x10f5129d8: f(3)> 1
<Greenlet at 0x10f512488: f(3)> 1
<Greenlet at 0x10f5126a8: f(3)> 1
<Greenlet at 0x10f5129d8: f(3)> 2
<Greenlet at 0x10f512488: f(3)> 2
<Greenlet at 0x10f5126a8: f(3)> 2

4. 進(jìn)程線程協(xié)程總結(jié)

  • 進(jìn)程是資源分配的單位
  • 線程是操作系統(tǒng)調(diào)度的單位
  • 進(jìn)程切換需要的資源很最大寨闹,效率很低
  • 線程切換需要的資源一般胶坠,效率一般(當(dāng)然了在不考慮GIL的情況下)
  • 協(xié)程切換任務(wù)資源很小,效率高
  • 多進(jìn)程繁堡、多線程根據(jù)cpu核數(shù)不一樣可能是并行的沈善,但是協(xié)程是在一個(gè)線程中所以是并發(fā)

5. GIL(全局解釋器鎖)

GIL(全局解釋器鎖)是python的歷史遺留問題乡数,僅在cpython解釋器里面存在。所以在執(zhí)行多線程操作時(shí)闻牡,多個(gè)線程只能交替執(zhí)行即同一時(shí)刻在只有有個(gè)線程在執(zhí)行净赴,不能調(diào)用多個(gè)CPU內(nèi)核,只能利用一個(gè)內(nèi)核罩润。因此執(zhí)行CPU密集型任務(wù)時(shí)玖翅,多使用多進(jìn)程實(shí)現(xiàn),可利用多個(gè)CPU內(nèi)核割以;執(zhí)行IO密集型任務(wù)時(shí)金度,多使用多線程實(shí)現(xiàn),可明顯提高效率(多線程遇到IO阻塞會(huì)自動(dòng)釋放GIL鎖)严沥。下面我們編寫一個(gè)示例:線程數(shù)和CPU核心數(shù)(2核)相同猜极。驗(yàn)證多線程執(zhí)行時(shí),利用幾個(gè)CPU內(nèi)核消玄?

# GIL
from threading import Thread

def test():
    while True:
        pass
    
t1 = Thread(test)
t1.start()

while True:
    pass

在上面代碼中兩個(gè)個(gè)線程執(zhí)行的函數(shù)均為死循環(huán)跟伏,兩個(gè)進(jìn)程執(zhí)行時(shí),最終的CPU占用率在50%左右莱找。也就是說雖然同時(shí)運(yùn)行了2個(gè)進(jìn)程酬姆,但實(shí)際上只使用了1個(gè)CPU內(nèi)核。這正是GIL的原因奥溺。

要想解決多線程執(zhí)行中的GIL問題可以:

  • 更換解釋器,GIL僅存在于cpython解釋器中
  • 使用其他語言編寫線程任務(wù)代碼
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末骨宠,一起剝皮案震驚了整個(gè)濱河市浮定,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌层亿,老刑警劉巖桦卒,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異匿又,居然都是意外死亡方灾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門碌更,熙熙樓的掌柜王于貴愁眉苦臉地迎上來裕偿,“玉大人,你說我怎么就攤上這事痛单『偌” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵旭绒,是天一觀的道長(zhǎng)鸟妙。 經(jīng)常有香客問我焦人,道長(zhǎng),這世上最難降的妖魔是什么重父? 我笑而不...
    開封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任花椭,我火速辦了婚禮,結(jié)果婚禮上房午,老公的妹妹穿的比我還像新娘矿辽。我一直安慰自己,他們只是感情好歪沃,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開白布嗦锐。 她就那樣靜靜地躺著,像睡著了一般沪曙。 火紅的嫁衣襯著肌膚如雪奕污。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天液走,我揣著相機(jī)與錄音碳默,去河邊找鬼。 笑死缘眶,一個(gè)胖子當(dāng)著我的面吹牛嘱根,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播巷懈,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼该抒,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了顶燕?” 一聲冷哼從身側(cè)響起凑保,我...
    開封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涌攻,沒想到半個(gè)月后欧引,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡恳谎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年芝此,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片因痛。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡婚苹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出婚肆,到底是詐尸還是另有隱情租副,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布较性,位于F島的核電站用僧,受9級(jí)特大地震影響结胀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜责循,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一糟港、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧院仿,春花似錦秸抚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至排惨,卻和暖如春吭敢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背暮芭。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工鹿驼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辕宏。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓畜晰,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親瑞筐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子凄鼻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進(jìn)行...
    月亮是我踢彎得閱讀 5,959評(píng)論 3 28
  • 又來到了一個(gè)老生常談的問題,應(yīng)用層軟件開發(fā)的程序員要不要了解和深入學(xué)習(xí)操作系統(tǒng)呢聚假? 今天就這個(gè)問題開始野宜,來談?wù)劜?..
    tangsl閱讀 4,098評(píng)論 0 23
  • python之進(jìn)程、線程與協(xié)程 有這么個(gè)例子說他們的區(qū)別河胎,幫助理解很有用闯袒。 有一個(gè)老板想開一個(gè)工廠生產(chǎn)手機(jī)。 他需...
    道無虛閱讀 3,173評(píng)論 0 3
  • 文/tangsl(簡(jiǎn)書作者) 原文鏈接:http://www.reibang.com/p/2b993a4b913e...
    西葫蘆炒胖子閱讀 3,747評(píng)論 0 5
  • 老人痔瘡去醫(yī)院游岳,查出直腸有硬塊政敢,良性惡性不知道,因?yàn)獒t(yī)保門診費(fèi)用已完胚迫,醫(yī)生建議住院全面檢查喷户。老人年紀(jì)不小,可打小就...
    吳江飛翔印染閱讀 209評(píng)論 0 0