5-線(xiàn)程(補(bǔ)充)

Python多線(xiàn)程原理與實(shí)戰(zhàn)

目的:

(1)了解python線(xiàn)程執(zhí)行原理

(2)掌握多線(xiàn)程編程與線(xiàn)程同步

(3)了解線(xiàn)程池的使用

1 線(xiàn)程基本概念

1.1 線(xiàn)程是什么?

線(xiàn)程是指進(jìn)程內(nèi)的一個(gè)執(zhí)行單元,也是進(jìn)程內(nèi)的可調(diào)度實(shí)體.

與進(jìn)程的區(qū)別:
(1) 地址空間:進(jìn)程內(nèi)的一個(gè)執(zhí)行單元;進(jìn)程至少有一個(gè)線(xiàn)程;它們共享進(jìn)程的地址空間;而進(jìn)程有自己獨(dú)立的地址空間;
(2) 資源擁有:進(jìn)程是資源分配和擁有的單位,同一個(gè)進(jìn)程內(nèi)的線(xiàn)程共享進(jìn)程的資源
(3) 線(xiàn)程是處理器調(diào)度的基本單位,但進(jìn)程不是.
(4) 二者均可并發(fā)執(zhí)行.

簡(jiǎn)而言之,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線(xiàn)程.

線(xiàn)程的劃分尺度小于進(jìn)程铐望,使得多線(xiàn)程程序的并發(fā)性高赂苗。
另外玩徊,進(jìn)程在執(zhí)行過(guò)程中擁有獨(dú)立的內(nèi)存單元谍失,而多個(gè)線(xiàn)程共享內(nèi)存,從而極大地提高了程序的運(yùn)行效率嫡丙。

1.2 線(xiàn)程和進(jìn)程關(guān)系拴袭?

? 進(jìn)程就是一個(gè)應(yīng)用程序在處理機(jī)上的一次執(zhí)行過(guò)程,它是一個(gè)動(dòng)態(tài)的概念曙博,而線(xiàn)程是進(jìn)程中的一部分拥刻,進(jìn)程包含多個(gè)線(xiàn)程在運(yùn)行。

? 多線(xiàn)程可以共享全局變量父泳,多進(jìn)程不能般哼。多線(xiàn)程中,所有子線(xiàn)程的進(jìn)程號(hào)相同惠窄;多進(jìn)程中蒸眠,不同的子進(jìn)程進(jìn)程號(hào)不同。

? 進(jìn)程是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
? 線(xiàn)程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線(xiàn)程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線(xiàn)程共享進(jìn)程所擁有的全部資源.
? 一個(gè)線(xiàn)程可以創(chuàng)建和撤銷(xiāo)另一個(gè)線(xiàn)程;同一個(gè)進(jìn)程中的多個(gè)線(xiàn)程之間可以并發(fā)執(zhí)行.

?

2 Python線(xiàn)程模塊

? python主要是通過(guò)thread和threading這兩個(gè)模塊來(lái)實(shí)現(xiàn)多線(xiàn)程支持杆融。python的thread模塊是比較底層的模塊楞卡,python的threading模塊是對(duì)thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由于GIL的存在無(wú)法使用threading充分利用CPU資源臀晃,如果想充分發(fā)揮多核CPU的計(jì)算能力需要使用multiprocessing模塊(Windows下使用會(huì)有諸多問(wèn)題)觉渴。

2.1 如何創(chuàng)建線(xiàn)程

? python3.x中已經(jīng)摒棄了Python2.x中采用函數(shù)式thread模塊中的start_new_thread()函數(shù)來(lái)產(chǎn)生新線(xiàn)程方式。

? python3.x中通過(guò)threading模塊創(chuàng)建新的線(xiàn)程有兩種方法:一種是通過(guò)threading.Thread(Target=executable Method)-即傳遞給Thread對(duì)象一個(gè)可執(zhí)行方法(或?qū)ο螅?第二種是繼承threading.Thread定義子類(lèi)并重寫(xiě)run()方法徽惋。第二種方法中案淋,唯一必須重寫(xiě)的方法是run()

(1)通過(guò)threading.Thread進(jìn)行創(chuàng)建多線(xiàn)程

import threading
import time
def target():
    print("the current threading %s is runing"
       %(threading.current_thread().name))
    time.sleep(1)
    print("the current threading %s is ended"%(threading.current_thread().name))
    
print("the current threading %s is runing"%(threading.current_thread().name))
## 屬于線(xiàn)程t的部分
t = threading.Thread(target=target)
t.start()
## 屬于線(xiàn)程t的部分
t.join() # join是阻塞當(dāng)前線(xiàn)程(此處的當(dāng)前線(xiàn)程時(shí)主線(xiàn)程) 主線(xiàn)程直到Thread-1結(jié)束之后才結(jié)束
print("the current threading %s is ended"%(threading.current_thread().name))

(2)通過(guò)繼承threading.Thread定義子類(lèi)創(chuàng)建多線(xiàn)程

? 使用Threading模塊創(chuàng)建線(xiàn)程,直接從threading.Thread繼承险绘,然后重寫(xiě)init方法和run方法:

import threading
import time

class myThread(threading.Thread):  # 繼承父類(lèi)threading.Thread
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter

   def run(self):  # 把要執(zhí)行的代碼寫(xiě)到run函數(shù)里面 線(xiàn)程在創(chuàng)建后會(huì)直接運(yùn)行run函數(shù)
      print("Starting " + self.name)
      print_time(self.name, 5, self.counter)
      print("Exiting " + self.name)


def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s process at: %s" % (threadName, time.ctime(time.time())))
      counter -= 1


# 創(chuàng)建新線(xiàn)程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 開(kāi)啟線(xiàn)程
thread1.start()
thread2.start()

# 等待線(xiàn)程結(jié)束
thread1.join()
thread2.join()

print("Exiting Main Thread")

通過(guò)以上案例可以知道踢京,thread1和thread2執(zhí)行順序是亂序的。要使之有序宦棺,需要進(jìn)行線(xiàn)程同步

3 線(xiàn)程間同步

? 如果多個(gè)線(xiàn)程共同對(duì)某個(gè)數(shù)據(jù)修改瓣距,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性代咸,需要對(duì)多個(gè)線(xiàn)程進(jìn)行同步蹈丸。

? 使用Thread對(duì)象的Lock和Rlock可以實(shí)現(xiàn)簡(jiǎn)單的線(xiàn)程同步,這兩個(gè)對(duì)象都有acquire方法和release方法呐芥,對(duì)于那些需要每次只允許一個(gè)線(xiàn)程操作的數(shù)據(jù)逻杖,可以將其操作放到acquire和release方法之間。

? 需要注意的是思瘟,Python有一個(gè)GIL(Global Interpreter Lock)機(jī)制荸百,任何線(xiàn)程在運(yùn)行之前必須獲取這個(gè)全局鎖才能執(zhí)行,每當(dāng)執(zhí)行完100條字節(jié)碼滨攻,全局鎖才會(huì)釋放够话,切換到其他線(xiàn)程執(zhí)行。

3.1 線(xiàn)程同步問(wèn)題

多線(xiàn)程實(shí)現(xiàn)同步有四種方式:

鎖機(jī)制光绕,信號(hào)量女嘲,條件判斷和同步隊(duì)列。

下面我主要關(guān)注兩種同步機(jī)制:鎖機(jī)制和同步隊(duì)列诞帐。

(1)鎖機(jī)制

threading的Lock類(lèi)澡为,用該類(lèi)的acquire函數(shù)進(jìn)行加鎖,用realease函數(shù)進(jìn)行解鎖

import threading
import time
class myThread(threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print("Starting " + self.name)
      # 獲得鎖景埃,成功獲得鎖定后返回True
      # 可選的timeout參數(shù)不填時(shí)將一直阻塞直到獲得鎖定
      # 否則超時(shí)后將返回False
      threadLock.acquire()
      print_time(self.name, self.counter, 5)
      # 釋放鎖
      threadLock.release()
def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1
    
threadLock = threading.Lock()
threads = []
# 創(chuàng)建新線(xiàn)程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開(kāi)啟新線(xiàn)程
thread1.start()
thread2.start()
# 添加線(xiàn)程到線(xiàn)程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有線(xiàn)程完成
for t in threads:
   t.join()

print("Exiting Main Thread")

?

(2) 線(xiàn)程同步隊(duì)列queue

python2.x中提供的Queue媒至, Python3.x中提供的是queue

見(jiàn)import queue.

Python的queue模塊中提供了同步的、線(xiàn)程安全的隊(duì)列類(lèi)谷徙,包括FIFO(先入先出)隊(duì)列Queue拒啰,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue完慧。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)谋旦,能夠在多線(xiàn)程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線(xiàn)程間的同步册着。

queue模塊中的常用方法:

  • queue.qsize() 返回隊(duì)列的大小
  • queue.empty() 如果隊(duì)列為空拴孤,返回True,反之False
  • queue.full() 如果隊(duì)列滿(mǎn)了,返回True,反之False
  • queue.full 與 maxsize 大小對(duì)應(yīng)
  • queue.get([block[, timeout]])獲取隊(duì)列甲捏,timeout等待時(shí)間
  • queue.get_nowait() 相當(dāng)Queue.get(False)
  • queue.put(item) 寫(xiě)入隊(duì)列演熟,timeout等待時(shí)間
  • queue.put_nowait(item) 相當(dāng)Queue.put(item, False)
  • queue.task_done() 在完成一項(xiàng)工作之后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
  • queue.join() 實(shí)際上意味著等到隊(duì)列為空司顿,再執(zhí)行別的操作

案例1:

import queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q

   def run(self):
      print("Starting " + self.name)
      process_data(self.name, self.q)
      print("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
      time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 創(chuàng)建新線(xiàn)程
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# 填充隊(duì)列
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# 等待隊(duì)列清空
while not workQueue.empty():
   pass

# 通知線(xiàn)程是時(shí)候退出
exitFlag = 1

# 等待所有線(xiàn)程完成
for t in threads:
   t.join()
print("Exiting Main Thread")
from queue import Queue
from threading import Thread, Lock
from redis import Redis

import time

isExitFlag = False  # 是否中斷子線(xiàn)程


class MyThread(Thread):
    def __init__(self, name, task_q):
        super(MyThread, self).__init__()
        self.name = name
        self.task_q = task_q
        self.start()

    def run(self):
        print(self.name, '---start---')
        while not isExitFlag:
            # task = self.task_q.get()  # 獲取一項(xiàng)任務(wù)
            task = self.task_q.blpop('taskQueue', timeout=1)
            print(self.name, '--正在執(zhí)行的任務(wù)-->', task)
            time.sleep(1)
        print(self.name, '---end---')


if __name__ == '__main__':
    print('--所有任務(wù)開(kāi)始--')
    # work_queue = Queue(maxsize=10)  # 最大數(shù)據(jù)量
    work_queue = Redis('127.0.0.1', db=4)
    threads = [MyThread('Thread-%d' % i, work_queue) for i in range(3)]

    # 發(fā)布任務(wù)
    tasks = ['www.baidu.com', 'www.hao123.com', 'www.jd.com', 'www.qf.ed', 'www.qq.com', 'www.google.com.cn']
    for task in tasks:
        # work_queue.put(task)
        work_queue.lpush('taskQueue', task)

    # 假如芒粹,隊(duì)列為空后,等待 指定時(shí)間(10s)后大溜,再?zèng)]有任務(wù)(消息)化漆,則退出
    while work_queue.llen('taskQueue'):
        pass

    isExitFlag = True  # 嘗試停止所有線(xiàn)程

    for thread_ in threads:
        thread_.join()

    print('--所有任務(wù)完成--')

案例2:

import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.start()    #執(zhí)行run()

    def run(self):
        #循環(huán),保證接著跑下一個(gè)任務(wù)
        while True:
            # 隊(duì)列為空則退出線(xiàn)程
            if self.queue.empty():
                break
            # 獲取一個(gè)隊(duì)列數(shù)據(jù)
            foo = self.queue.get()
            # 延時(shí)1S模擬你要做的事情
            time.sleep(1)
            # 打印
            print(self.getName() + " process " + str(foo))
            # 任務(wù)完成
            self.queue.task_done()


# 隊(duì)列
queue = queue.Queue()
# 加入100個(gè)任務(wù)隊(duì)列
for i in range(100):
    queue.put(i)
# 開(kāi)10個(gè)線(xiàn)程
for i in range(10):
    threadName = 'Thread' + str(i)
    Worker(threadName, queue)
# 所有線(xiàn)程執(zhí)行完畢后關(guān)閉
queue.join()

4 線(xiàn)程池

傳統(tǒng)多線(xiàn)程問(wèn)題钦奋?

? 傳統(tǒng)多線(xiàn)程方案會(huì)使用“即時(shí)創(chuàng)建座云, 即時(shí)銷(xiāo)毀”的策略。盡管與創(chuàng)建進(jìn)程相比付材,創(chuàng)建線(xiàn)程的時(shí)間已經(jīng)大大的縮短朦拖,但是如果提交給線(xiàn)程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁伞租,那么服務(wù)器將處于不停的創(chuàng)建線(xiàn)程,銷(xiāo)毀線(xiàn)程的狀態(tài)限佩。

? 一個(gè)線(xiàn)程的運(yùn)行時(shí)間可以分為3部分:線(xiàn)程的啟動(dòng)時(shí)間葵诈、線(xiàn)程體的運(yùn)行時(shí)間和線(xiàn)程的銷(xiāo)毀時(shí)間。在多線(xiàn)程處理的情景中祟同,如果線(xiàn)程不能被重用作喘,就意味著每次創(chuàng)建都需要經(jīng)過(guò)啟動(dòng)、銷(xiāo)毀和運(yùn)行3個(gè)過(guò)程晕城。這必然會(huì)增加系統(tǒng)相應(yīng)的時(shí)間泞坦,降低了效率。

有沒(méi)有一種高效的解決方案呢砖顷? —— 線(xiàn)程池

線(xiàn)程池基本原理:

? 我們把任務(wù)放進(jìn)隊(duì)列中去贰锁,然后開(kāi)N個(gè)線(xiàn)程,每個(gè)線(xiàn)程都去隊(duì)列中取一個(gè)任務(wù)滤蝠,執(zhí)行完了之后告訴系統(tǒng)說(shuō)我執(zhí)行完了豌熄,然后接著去隊(duì)列中取下一個(gè)任務(wù),直至隊(duì)列中所有任務(wù)取空物咳,退出線(xiàn)程锣险。

使用線(xiàn)程池:
? 由于線(xiàn)程預(yù)先被創(chuàng)建并放入線(xiàn)程池中,同時(shí)處理完當(dāng)前任務(wù)之后并不銷(xiāo)毀而是被安排處理下一個(gè)任務(wù),因此能夠避免多次創(chuàng)建線(xiàn)程芯肤,從而節(jié)省線(xiàn)程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)巷折,能帶來(lái)更好的性能和系統(tǒng)穩(wěn)定性。

multhreading-struct.png

內(nèi)置線(xiàn)程池

from multiprocessing.pool import ThreadPool
import time
w_start = time.time()

def worker(msg):
    time.sleep(1)
    print('hi', msg)

if __name__ == '__main__':
    pool = ThreadPool(4)
    for i in range(12):
        msg = 'threading %d' % i
        pool.apply_async(worker,(msg,))
    pool.close()
    pool.join()
    print(time.time() - w_start)
    print('sub-threading all done')

線(xiàn)程池要設(shè)置為多少崖咨?

服務(wù)器CPU核數(shù)有限锻拘,能夠同時(shí)并發(fā)的線(xiàn)程數(shù)有限,并不是開(kāi)得越多越好掩幢,以及線(xiàn)程切換是有開(kāi)銷(xiāo)的逊拍,如果線(xiàn)程切換過(guò)于頻繁,反而會(huì)使性能降低

線(xiàn)程執(zhí)行過(guò)程中际邻,計(jì)算時(shí)間分為兩部分:

  • CPU計(jì)算芯丧,占用CPU
  • 不需要CPU計(jì)算,不占用CPU世曾,等待IO返回缨恒,比如recv(), accept(), sleep()等操作,具體操作就是比如
    訪(fǎng)問(wèn)cache轮听、RPC調(diào)用下游service骗露、訪(fǎng)問(wèn)DB,等需要網(wǎng)絡(luò)調(diào)用的操作

那么如果計(jì)算時(shí)間占50%血巍, 等待時(shí)間50%萧锉,那么為了利用率達(dá)到最高,可以開(kāi)2個(gè)線(xiàn)程:
假如工作時(shí)間是2秒述寡, CPU計(jì)算完1秒后柿隙,線(xiàn)程等待IO的時(shí)候需要1秒,此時(shí)CPU空閑了鲫凶,這時(shí)就可以切換到另外一個(gè)線(xiàn)程禀崖,讓CPU工作1秒后,線(xiàn)程等待IO需要1秒螟炫,此時(shí)CPU又可以切回去波附,第一個(gè)線(xiàn)程這時(shí)剛好完成了1秒的IO等待,可以讓CPU繼續(xù)工作昼钻,就這樣循環(huán)的在兩個(gè)線(xiàn)程之前切換操作掸屡。

那么如果計(jì)算時(shí)間占20%, 等待時(shí)間80%然评,那么為了利用率達(dá)到最高折晦,可以開(kāi)5個(gè)線(xiàn)程:
可以想象成完成任務(wù)需要5秒,CPU占用1秒沾瓦,等待時(shí)間4秒满着,CPU在線(xiàn)程等待時(shí)谦炒,可以同時(shí)再激活4個(gè)線(xiàn)程,這樣就把CPU和IO等待時(shí)間风喇,最大化的重疊起來(lái)

抽象一下宁改,計(jì)算線(xiàn)程數(shù)設(shè)置的公式就是:
N核服務(wù)器,通過(guò)執(zhí)行業(yè)務(wù)的單線(xiàn)程分析出本地計(jì)算時(shí)間為x魂莫,等待時(shí)間為y还蹲,則工作線(xiàn)程數(shù)(線(xiàn)程池線(xiàn)程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化耙考。
由于有GIL的影響谜喊,python只能使用到1個(gè)核,所以這里設(shè)置N=1

import queue
import threading
import time


class WorkManager(object):
   def __init__(self, work_num=1000, thread_num=2):
      self.work_queue = queue.Queue()
      self.threads = []
      self.__init_work_queue(work_num)
      self.__init_thread_pool(thread_num)

   """
      初始化線(xiàn)程
   """

   def __init_thread_pool(self, thread_num):
      for i in range(thread_num):
         self.threads.append(Work(self.work_queue))


   """
      初始化工作隊(duì)列
   """

   def __init_work_queue(self, jobs_num):
      for i in range(jobs_num):
         self.add_job(do_job, i)

   """
      添加一項(xiàng)工作入隊(duì)
   """

   def add_job(self, func, *args):
      self.work_queue.put((func, list(args)))  # 任務(wù)入隊(duì)倦始,Queue內(nèi)部實(shí)現(xiàn)了同步機(jī)制

   """
      等待所有線(xiàn)程運(yùn)行完畢
   """

   def wait_allcomplete(self):
      for item in self.threads:
         if item.isAlive(): item.join()


class Work(threading.Thread):
   def __init__(self, work_queue):
      threading.Thread.__init__(self)
      self.work_queue = work_queue
      self.start()

   def run(self):
      # 死循環(huán)斗遏,從而讓創(chuàng)建的線(xiàn)程在一定條件下關(guān)閉退出
      while True:
         try:
            do, args = self.work_queue.get(block=False)  # 任務(wù)異步出隊(duì),Queue內(nèi)部實(shí)現(xiàn)了同步機(jī)制
            do(args)
            self.work_queue.task_done()  # 通知系統(tǒng)任務(wù)完成
         except:
            break

# 具體要做的任務(wù)
def do_job(args):
   time.sleep(0.1)  # 模擬處理時(shí)間
   print(threading.current_thread())
   print(list(args))


if __name__ == '__main__':
   start = time.time()
   work_manager = WorkManager(100, 10)  # 或者work_manager =  WorkManager(10000, 20)
   work_manager.wait_allcomplete()
   end = time.time()
   print("cost all time: %s" % (end - start))
# 自定義線(xiàn)程池

from threading import Thread, current_thread
from queue import Queue

import time
import random


class Task(Thread):
    """
    工作線(xiàn)程-從任務(wù)隊(duì)列中讀取任務(wù)(函數(shù)對(duì)象和函數(shù)參數(shù))
    """

    def __init__(self, task_queue):
        super(Task, self).__init__()
        self.task_queue: Queue = task_queue
        self.start()

    def run(self):
        while True:
            try:
                do_task, args = self.task_queue.get(False)
                do_task(*args)
                time.sleep(0.2)
            except:
                break


def doTask(*args):
    print(current_thread().name, '正在執(zhí)行任務(wù):', args)
    time.sleep(random.uniform(0, 1))


class PoolManager:
    """
    自定義線(xiàn)程池管理器
    """

    def __init__(self, maxsize=10):
        self.threads = []
        self.taskQueue = Queue()
        self.maxsize = maxsize  # 最大線(xiàn)程并發(fā)數(shù)

    def apply_async(self, *args):
        self.taskQueue.put((doTask, args))

    def close(self):  # 停止發(fā)布任務(wù)鞋邑,并開(kāi)啟線(xiàn)程
        self.init_threads()

    def init_threads(self):  # 創(chuàng)建線(xiàn)程并啟動(dòng)
        # 發(fā)布了2個(gè)任務(wù)诵次,用多少線(xiàn)程執(zhí)行合適
        init_size = 1
        task_len = self.taskQueue.qsize()  # 獲取任務(wù)隊(duì)列的大小
        init_size = task_len // 10 + (1 if task_len % 10 > 0 else 0)

        # 判斷需要的線(xiàn)程數(shù)是否超過(guò)最大線(xiàn)程數(shù)
        init_size = init_size if init_size < self.maxsize else self.maxsize
        for _ in range(init_size):
            self.threads.append(Task(self.taskQueue))

    def wait_all_complete(self):
        for t in self.threads:
            if t.is_alive():
                t.join()


if __name__ == '__main__':
    pool = PoolManager()
    # 發(fā)布任務(wù)
    for i in range(100):
        pool.apply_async('www.baidu.com', 'index%d.html' % i)

    # 定制發(fā)布任務(wù)并啟動(dòng)任務(wù)線(xiàn)程來(lái)完成任務(wù)
    pool.close()

    pool.wait_all_complete()
    print('--所有任務(wù)完成--')

5 協(xié)程

? 在python GIL之下,同一時(shí)刻只能有一個(gè)線(xiàn)程在運(yùn)行枚碗,那么對(duì)于CPU計(jì)算密集的程序來(lái)說(shuō)逾一,線(xiàn)程之間的切換開(kāi)銷(xiāo)就成了拖累,而以I/O為瓶頸的程序正是協(xié)程所擅長(zhǎng)的:

Python中的協(xié)程經(jīng)歷了很長(zhǎng)的一段發(fā)展歷程肮雨。其大概經(jīng)歷了如下三個(gè)階段:

  1. 最初的生成器變形yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await關(guān)鍵字

(1)從yield說(shuō)起

先看一段普通的計(jì)算斐波那契續(xù)列的代碼

def fibs(n):
   res = [0] * n
   index = 0
   a = 0
   b = 1
   while index < n:
      res[index] = b
      a, b = b, a + b
      index += 1
   return res


for fib_res in fibs(20):
   print(fib_res)

? 如果我們僅僅是需要拿到斐波那契序列的第n位遵堵,或者僅僅是希望依此產(chǎn)生斐波那契序列,那么上面這種傳統(tǒng)方式就會(huì)比較耗費(fèi)內(nèi)存怨规。

這時(shí)陌宿,yield就派上用場(chǎng)了。

def fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      yield b
      a, b = b, a + b
      index += 1

for fib_res in fib(20):
   print(fib_res)

? 當(dāng)一個(gè)函數(shù)中包含yield語(yǔ)句時(shí)椅亚,python會(huì)自動(dòng)將其識(shí)別為一個(gè)生成器限番。這時(shí)fib(20)并不會(huì)真正調(diào)用函數(shù)體舱污,而是以函數(shù)體生成了一個(gè)生成器對(duì)象實(shí)例呀舔。

? yield在這里可以保留fib函數(shù)的計(jì)算現(xiàn)場(chǎng),暫停fib的計(jì)算并將b返回扩灯。而將fib放入for…in循環(huán)中時(shí)媚赖,每次循環(huán)都會(huì)調(diào)用next(fib(20)),喚醒生成器珠插,執(zhí)行到下一個(gè)yield語(yǔ)句處惧磺,直到拋出StopIteration異常。此異常會(huì)被for循環(huán)捕獲捻撑,導(dǎo)致跳出循環(huán)磨隘。

(2) Send來(lái)了

? 從上面的程序中可以看到缤底,目前只有數(shù)據(jù)從fib(20)中通過(guò)yield流向外面的for循環(huán);如果可以向fib(20)發(fā)送數(shù)據(jù)番捂,那不是就可以在Python中實(shí)現(xiàn)協(xié)程了嘛个唧。

? 于是,Python中的生成器有了send函數(shù)设预,yield表達(dá)式也擁有了返回值徙歼。

? 我們用這個(gè)特性,模擬一個(gè)慢速斐波那契數(shù)列的計(jì)算:

import time
import random

def stupid_fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      sleep_cnt = yield b
      print('let me think {0} secs'.format(sleep_cnt))
      time.sleep(sleep_cnt)
      a, b = b, a + b
      index += 1


print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib) # 第一次必須要執(zhí)行next()函數(shù)鳖枕,讓程序控制到y(tǒng)ield b位置
while True:
   print(fib_res)
   try:
      fib_res = sfib.send(random.uniform(0, 0.5))
   except StopIteration:
      break

python 進(jìn)行并發(fā)編程

? 在Python 2的時(shí)代魄梯,高性能的網(wǎng)絡(luò)編程主要是使用Twisted、Tornado和Gevent這三個(gè)庫(kù)宾符,但是它們的異步代碼相互之間既不兼容也不能移植酿秸。

? asyncio是Python 3.4版本引入的標(biāo)準(zhǔn)庫(kù),直接內(nèi)置了對(duì)異步IO的支持吸奴。

? asyncio的編程模型就是一個(gè)消息循環(huán)允扇。我們從asyncio模塊中直接獲取一個(gè)EventLoop的引用,然后把需要執(zhí)行的協(xié)程扔到EventLoop中執(zhí)行则奥,就實(shí)現(xiàn)了異步IO考润。

? Python的在3.4中引入了協(xié)程的概念,可是這個(gè)還是以生成器對(duì)象為基礎(chǔ)读处。

? Python 3.5添加了async和await這兩個(gè)關(guān)鍵字糊治,分別用來(lái)替換asyncio.coroutineyield from

? python3.5則確定了協(xié)程的語(yǔ)法罚舱。下面將簡(jiǎn)單介紹asyncio的使用井辜。實(shí)現(xiàn)協(xié)程的不僅僅是asyncio,tornado和gevent都實(shí)現(xiàn)了類(lèi)似的功能管闷。

(1)協(xié)程定義

asyncio實(shí)現(xiàn)Hello world代碼如下:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 異步調(diào)用asyncio.sleep(1) --> 協(xié)程函數(shù):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop(事件循環(huán)器):
loop = asyncio.get_event_loop()
# 執(zhí)行coroutine
loop.run_until_complete(hello())
loop.close()

? @asyncio.coroutine把一個(gè)generator標(biāo)記為coroutine類(lèi)型粥脚,然后,我們就把這個(gè)coroutine扔到EventLoop中執(zhí)行包个。 hello()會(huì)首先打印出Hello world!刷允,然后,yield from語(yǔ)法可以讓我們方便地調(diào)用另一個(gè)generator碧囊。由于asyncio.sleep()也是一個(gè)coroutine树灶,所以線(xiàn)程不會(huì)等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個(gè)消息循環(huán)糯而。當(dāng)asyncio.sleep()返回時(shí)天通,線(xiàn)程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語(yǔ)句熄驼。

? 把asyncio.sleep(1)看成是一個(gè)耗時(shí)1秒的IO操作像寒,在此期間烘豹,主線(xiàn)程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了诺祸,因此可以實(shí)現(xiàn)并發(fā)執(zhí)行吴叶。

我們用Task封裝兩個(gè)coroutine試試:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

觀察執(zhí)行過(guò)程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

由打印的當(dāng)前線(xiàn)程名稱(chēng)可以看出,兩個(gè)coroutine是由同一個(gè)線(xiàn)程并發(fā)執(zhí)行的序臂。

如果把asyncio.sleep()換成真正的IO操作蚌卤,則多個(gè)coroutine就可以由一個(gè)線(xiàn)程并發(fā)執(zhí)行。

asyncio案例實(shí)戰(zhàn)

我們用asyncio的異步網(wǎng)絡(luò)連接來(lái)獲取sina奥秆、sohu和163的網(wǎng)站首頁(yè):

async_wget.py

import asyncio

@asyncio.coroutine
def wget(host):  
    print('wget %s...' % host)  # 等待打開(kāi)host:80的連接
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect # 開(kāi)始連接逊彭,如果連接成功,則返回讀Reader和寫(xiě)Writer的對(duì)象
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain() # 刷新底層傳輸?shù)膶?xiě)緩沖區(qū)
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

結(jié)果信息如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時(shí)間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

可見(jiàn)3個(gè)連接由一個(gè)線(xiàn)程通過(guò)coroutine并發(fā)完成构订。

新版本改寫(xiě)

@asyncio.coroutine -> async

yield from -> await

小結(jié)

asyncio提供了完善的異步IO支持侮叮;

異步操作需要在coroutine中通過(guò)yield from完成;

多個(gè)coroutine可以封裝成一組Task然后并發(fā)執(zhí)行悼瘾。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末囊榜,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子亥宿,更是在濱河造成了極大的恐慌卸勺,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烫扼,死亡現(xiàn)場(chǎng)離奇詭異曙求,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)映企,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)悟狱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人堰氓,你說(shuō)我怎么就攤上這事挤渐。” “怎么了双絮?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵浴麻,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我掷邦,道長(zhǎng)白胀,這世上最難降的妖魔是什么椭赋? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任抚岗,我火速辦了婚禮,結(jié)果婚禮上哪怔,老公的妹妹穿的比我還像新娘宣蔚。我一直安慰自己向抢,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布胚委。 她就那樣靜靜地躺著挟鸠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪亩冬。 梳的紋絲不亂的頭發(fā)上艘希,一...
    開(kāi)封第一講書(shū)人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音硅急,去河邊找鬼覆享。 笑死,一個(gè)胖子當(dāng)著我的面吹牛营袜,可吹牛的內(nèi)容都是我干的撒顿。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼荚板,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼凤壁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起跪另,我...
    開(kāi)封第一講書(shū)人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤拧抖,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后免绿,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體徙鱼,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年针姿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了袱吆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡距淫,死狀恐怖丙猬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情孔轴,我是刑警寧澤杀糯,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站彤枢,受9級(jí)特大地震影響狰晚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜缴啡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一壁晒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧业栅,春花似錦秒咐、人聲如沸谬晕。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)攒钳。三九已至,卻和暖如春雷滋,著一層夾襖步出監(jiān)牢的瞬間不撑,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工晤斩, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留燎孟,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓尸昧,卻偏偏與公主長(zhǎng)得像揩页,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子烹俗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容