技術(shù)交流QQ群:1027579432枢步,歡迎你的加入!
多線程介紹
- 在python3中渐尿,通過該threading模塊提供線程的功能。原來的thread模塊已經(jīng)廢棄矾瑰。但是砖茸,threading模塊中有個(gè)Thread類是模塊中最主要的線程類,一定要記着寡āA购弧;踉帷!
-
threading模塊提供了一些實(shí)用的方法或?qū)傩跃⒐唬纾?/p>
- theading模塊包含以下的類:
- Thread: 基本線程類
- Lock:互斥鎖
- RLock:可重入鎖震桶,使單一進(jìn)程再次獲得已持有的鎖(遞歸鎖)
- Condition:條件鎖,使得一個(gè)線程等待另一個(gè)線程滿足特定條件征绎,比如改變狀態(tài)或某個(gè)值蹲姐。
- Semaphore:信號鎖,為線程間共享的有限資源提供一個(gè)”計(jì)數(shù)器”人柿,如果沒有可用資源則會被阻塞柴墩。
- Event:事件鎖,任意數(shù)量的線程等待某個(gè)事件的發(fā)生凫岖,在該事件發(fā)生后所有線程被激活江咳。
- Timer:一種計(jì)時(shí)器
- Barrier:Python3.2新增的“阻礙”類,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù)執(zhí)行哥放。
1.多線程
- 有兩種方法來創(chuàng)建多線程:一種是繼承Thread類歼指,并重寫它的run()方法;另一種是實(shí)例化threading.Thread對象時(shí)甥雕,將線程要執(zhí)行的任務(wù)函數(shù)作為參數(shù)傳入線程踩身。
- 第一種方法:
import threading class MyThread(threading.Thread): def __init__(self, thread_name): super(MyThread, self).__init__(name = thread_name) # 重寫run()方法 def run(self): print("%s正在運(yùn)行中......" % self.name) for i in range(10): MyThread("thread-" + str(i)).start() # 啟動線程
- 第二種方法:
import threading import time def show(arg): time.sleep(1) print("thread " + str(arg) + " running......") for i in range(10): t = threading.Thread(target=show, args=(i,)) # 注意傳入的參數(shù)一定是一個(gè)元組! t.start()
- 對于Thread類,它的定義如下:
threading.Thread(self, group=None, target=None, name=None,agrs=(),kwargs=None, *, daemon=None)
- 參數(shù)group是預(yù)留的犀农,用于將來擴(kuò)展
- 參數(shù)target是一個(gè)可調(diào)用對象惰赋,在線程啟動后執(zhí)行;
- 參數(shù)name是線程的名字呵哨。默認(rèn)值為“Thread-N“赁濒,N是一個(gè)數(shù)字
- 參數(shù)args和kwargs分別表示調(diào)用target時(shí)的參數(shù)列表和關(guān)鍵字參數(shù)
-
Thread類定義的常用方法和屬性
-
在多線程執(zhí)行過程中,有一個(gè)特點(diǎn)要注意孟害,每個(gè)線程各自執(zhí)行自己的任務(wù)拒炎,不等待其他的線程,自顧自的完成自己的任務(wù)挨务,例如下面的例子:
import time import threading def doWaiting(): print("開始等待:", time.strftime('%H:%M:%S')) time.sleep(3) print("結(jié)束等待:", time.strftime("%H:%M:%S")) t = threading.Thread(target=doWaiting) t.start() time.sleep(1) # 確保線程已經(jīng)啟動 print("開始工作") print("結(jié)束工作")
- 分析上述過程:Python默認(rèn)會等待最后一個(gè)線程執(zhí)行完畢后才退出击你。上面例子中,主線程沒有等待子線程t執(zhí)行完畢谎柄,而是啥都不管丁侄,繼續(xù)往下執(zhí)行它自己的代碼,執(zhí)行完畢后也沒有結(jié)束整個(gè)程序朝巫,而是等待子線程t執(zhí)行完畢鸿摇,整個(gè)程序才結(jié)束。
-
有時(shí)候我們希望主線程等等子線程劈猿,不要“埋頭往前跑”拙吉。那要怎么辦潮孽?使用join()方法!筷黔,如下所示:
import threading import time def doWaiting(): print("開始等待: ", time.strftime("%H:%M:%S")) time.sleep(3) print("結(jié)束等待:", time.strftime("%H:%M:%S")) t = threading.Thread(target=doWaiting) t.start() # 確保線程t已經(jīng)啟動 time.sleep(1) print("開始阻塞主線程往史,等待子線程執(zhí)行") t.join() # 主線程不要著急走,等等子線程吧!!! 將一直堵塞佛舱,直到t運(yùn)行結(jié)束 print("子線程執(zhí)行完椎例,結(jié)束阻塞,主線程繼續(xù)執(zhí)行名眉!")
- 還可以使用setDaemon(True)吧所有的子線程都變成主線程的守護(hù)進(jìn)程粟矿。當(dāng)主線程結(jié)束后,守護(hù)子進(jìn)程也會隨之結(jié)束损拢,整個(gè)程序也跟著退出陌粹。
import threading import time def run(): print(threading.current_thread().getName(), "開始工作") time.sleep(2) # 子線程停兩秒 print("子線程工作執(zhí)行完成!") for i in range(3): t = threading.Thread(target=run) t.setDaemon(True) # 把子線程設(shè)置為守護(hù)進(jìn)程福压,必須在start()之前設(shè)置!!! t.start() time.sleep(1) # 主線程停1s print("主線程結(jié)束運(yùn)行...") print(threading.active_count()) # 輸出活躍的線程數(shù)量
2.自定義線程類
- 對于threading模塊的Thread類掏秩,本質(zhì)上是執(zhí)行了它的run()方法。因此可以字定義線程類荆姆,讓它繼承Thread類蒙幻,然后重新run()方法即可。
import threading class MyThreading(threading.Thread): def __init__(self, func, arg): super(MyThreading, self).__init__() self.func = func self.arg = arg # 重寫run()方法 def run(self): self.func(self.arg) def my_func(args): ''' 此處可以把你想讓線程做的事定義在這里 ''' print("我是業(yè)務(wù)函數(shù)...") pass obj = MyThreading(my_func, 123) obj.start()
3.線程鎖
- 由于線程之間的任務(wù)執(zhí)行是CPU進(jìn)行隨機(jī)調(diào)度的胆筒,并且每個(gè)線程可能只執(zhí)行了n條指令之后就被切換到別的線程了邮破。當(dāng)多個(gè)線程同時(shí)操作一個(gè)對象,如果沒有很好地保護(hù)該對象仆救,會造成程序結(jié)果的不可預(yù)期抒和,這被稱為“線程不安全”。為了保證數(shù)據(jù)安全彤蔽,我們設(shè)計(jì)了線程鎖摧莽,即同一時(shí)刻只允許一個(gè)線程操作該數(shù)據(jù)。線程鎖用于鎖定資源顿痪,可以同時(shí)使用多個(gè)鎖镊辕,當(dāng)你需要獨(dú)占某一資源時(shí),任何一個(gè)鎖都可以鎖這個(gè)資源蚁袭,就好比你用不同的鎖都可以把相同的一個(gè)箱子鎖住是一個(gè)道理征懈。
- 沒有鎖的情況下,臟數(shù)據(jù)是如何產(chǎn)生的揩悄!
import threading import time number = 0 def plus(): global number # global聲明此處的number是外面的全局變量number for _ in range(1000000): # 進(jìn)行一個(gè)大數(shù)級別的循環(huán)加一運(yùn)算 number += 1 print("子線程%s運(yùn)算結(jié)束后受裹,number = %s" % (threading.current_thread().getName(), number)) for i in range(2): # 用2個(gè)子線程,就可以觀察到臟數(shù)據(jù) t = threading.Thread(target=plus) t.start() time.sleep(3) # 等待3秒,確保2個(gè)子線程都已經(jīng)結(jié)束運(yùn)算 print("主線程執(zhí)行完成后棉饶,number = ", number)
- 分析過程:結(jié)果并不等于2,000,000,可以很明顯地看出臟數(shù)據(jù)的情況镇匀。這是因?yàn)閮蓚€(gè)線程在運(yùn)行過程中照藻,CPU隨機(jī)調(diào)度,你算一會我算一會汗侵,在沒有對number進(jìn)行保護(hù)的情況下幸缕,就發(fā)生了數(shù)據(jù)錯(cuò)誤。如果想獲得正確結(jié)果晰韵,可以使用join()方法发乔,讓多線程變成順序執(zhí)行,如下修改代碼片段:
import threading import time number = 0 def plus(): global number # global聲明此處的number是外面的全局變量number for _ in range(1000000): # 進(jìn)行一個(gè)大數(shù)級別的循環(huán)加一運(yùn)算 number += 1 print("子線程%s運(yùn)算結(jié)束后雪猪,number = %s" % (threading.current_thread().getName(), number)) for i in range(2): # 用2個(gè)子線程栏尚,就可以觀察到臟數(shù)據(jù) t = threading.Thread(target=plus) t.start() t.join() # 添加這一行就讓兩個(gè)子線程變成了順序執(zhí)行!!!!! time.sleep(3) # 等待3秒,確保2個(gè)子線程都已經(jīng)結(jié)束運(yùn)算 print("主線程執(zhí)行完成后只恨,number = ", number)
-
上面為了防止臟數(shù)據(jù)而使用join()的方法译仗,其實(shí)是讓多線程變成了單線程,屬于因噎廢食的做法官觅,正確的做法是使用線程鎖纵菌。Python在threading模塊中定義了幾種線程鎖類,分別是:
- Lock 互斥鎖
- RLock 可重入鎖
- Semaphore 信號
- Event 事件
- Condition 條件
- Barrier “阻礙”
3.1 互斥鎖
-
互斥鎖是一種獨(dú)占鎖休涤,同一時(shí)刻只有一個(gè)線程可以訪問共享的數(shù)據(jù)咱圆。使用很簡單,初始化鎖對象功氨,然后將鎖當(dāng)做參數(shù)傳遞給任務(wù)函數(shù)序苏,在任務(wù)中加鎖,使用后釋放鎖疑故。
import threading import time number = 0 lock = threading.Lock() # 鎖對象杠览! def plus(lk): global number # global聲明此處的number是外面的全局變量number lk.acquire() # 開始加鎖!!! for _ in range(1000000): # 進(jìn)行一個(gè)大數(shù)級別的循環(huán)加一運(yùn)算 number += 1 print("子線程%s運(yùn)算結(jié)束后,number = %s" % (threading.current_thread().getName(), number)) lk.release() # 釋放鎖纵势,讓別的線程也可以訪問number!!! for i in range(2): # 用2個(gè)子線程踱阿,就可以觀察到臟數(shù)據(jù) t = threading.Thread(target=plus, args=(lock,)) t.start() time.sleep(3) # 等待3秒,確保2個(gè)子線程都已經(jīng)結(jié)束運(yùn)算 print("主線程執(zhí)行完成后钦铁,number = ", number)
- RLock的使用方法和Lock一模一樣软舌,只不過它支持重入鎖。該鎖對象內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter對象牛曹。counter對象記錄了acquire的次數(shù)佛点,使得資源可以被多次require。最后,當(dāng)所有RLock被release后超营,其他線程才能獲取資源鸳玩。在同一個(gè)線程中,RLock.acquire()可以被多次調(diào)用演闭,利用該特性不跟,可以解決部分死鎖問題。
3.2 信號Semaphore
- 類名:BoundedSemaphore米碰。這種鎖允許一定數(shù)量的線程同時(shí)更改數(shù)據(jù)窝革,它不是互斥鎖。比如地鐵安檢吕座,排隊(duì)人很多虐译,工作人員只允許一定數(shù)量的人進(jìn)入安檢區(qū),其它的人繼續(xù)排隊(duì)吴趴。
import time import threading def run(n, se): se.acquire() print("run the thread: %s" % n) time.sleep(1) se.release() # 設(shè)置5個(gè)線程允許同時(shí)運(yùn)行 semaphore = threading.BoundedSemaphore(5) for i in range(20): t = threading.Thread(target=run, args=(i, semaphore)) t.start()
3.3 事件Event
- 類名Event, 事件線程鎖的運(yùn)行機(jī)制:全局定義了一個(gè)Flag漆诽,如果Flag的值為False,那么當(dāng)程序執(zhí)行wait()方法時(shí)就會阻塞史侣,如果Flag值為True拴泌,線程不再阻塞。這種鎖惊橱,類似交通紅綠燈(默認(rèn)是紅燈)蚪腐,它屬于在紅燈的時(shí)候一次性阻擋所有線程,在綠燈的時(shí)候税朴,一次性放行所有排隊(duì)中的線程回季。事件主要提供了四個(gè)方法set()、wait()正林、clear()和is_set()
- clear()方法會將事件的Flag設(shè)置為False
- set()方法會將Flag設(shè)置為True
- wait()方法將等待“紅綠燈”信號
- is_set():判斷當(dāng)前是否"綠燈放行"狀態(tài)
- 下面是一個(gè)模擬紅綠燈泡一,然后汽車通行的例子:
import threading import time event = threading.Event() def lighter(): green_time = 5 # 綠燈時(shí)間 red_time = 5 # 紅燈時(shí)間 event.set() # 初始設(shè)為綠燈 while True: print("綠燈亮...") time.sleep(green_time) event.clear() print("紅燈亮...") time.sleep(red_time) event.set() def run(name): while True: if event.is_set(): # 判斷當(dāng)前是否"放行"狀態(tài) print("一輛[%s] 呼嘯開過..." % name) time.sleep(1) else: print("一輛[%s]開來,看到紅燈觅廓,無奈的停下了..." % name) event.wait() print("[%s] 看到綠燈亮了鼻忠,瞬間飛起....." % name) lighter = threading.Thread(target=lighter,) lighter.start() for name in ['奔馳', '寶馬', '奧迪']: car = threading.Thread(target=run, args=(name,)) car.start()
3.4 條件Condition
- 類名:Condition。Condition稱作條件鎖杈绸,依然是通過acquire()/release()加鎖解鎖帖蔓。
- wait([timeout])方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖瞳脓。使用前線程必須已獲得鎖定塑娇,否則將拋出異常。
- notify()方法將從等待池挑選一個(gè)線程并通知劫侧,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池)埋酬,其他線程仍然在等待池中哨啃。調(diào)用這個(gè)方法不會釋放鎖定。使用前線程必須已獲得鎖定写妥,否則將拋出異常拳球。
- notifyAll()方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定耳标。調(diào)用這個(gè)方法不會釋放鎖定醇坝。使用前線程必須已獲得鎖定,否則將拋出異常次坡。
import threading import time num = 0 con = threading.Condition() class Foo(threading.Thread): def __init__(self, name, action): super(Foo, self).__init__() self.name = name self.action = action def run(self): global num con.acquire() print("%s開始執(zhí)行..." % self.name) while True: if self.action == "add": num += 1 elif self.action == 'reduce': num -= 1 else: exit(1) print("num當(dāng)前為:", num) time.sleep(1) if num == 5 or num == 0: print("暫停執(zhí)行%s!" % self.name) con.notify() con.wait() print("%s開始執(zhí)行..." % self.name) con.release() if __name__ == '__main__': a = Foo("線程A", 'add') b = Foo("線程B", 'reduce') a.start() b.start()
3.5 定時(shí)器Timer
- 定時(shí)器Timer類是threading模塊中的一個(gè)小工具画畅,用于指定n秒后執(zhí)行某操作砸琅。一個(gè)簡單但很實(shí)用的東西。
from threading import Timer def hello(): print("hello world") # 表示1s后執(zhí)行hello函數(shù) t = Timer(1, hello) t.start()
3.6 通過with語句使用線程鎖
- 所有的線程鎖都有一個(gè)加鎖和釋放鎖的動作轴踱,非常類似文件的打開和關(guān)閉症脂。在加鎖后,如果線程執(zhí)行過程中出現(xiàn)異骋В或者錯(cuò)誤诱篷,沒有正常的釋放鎖,那么其他的線程會造到致命性的影響雳灵。通過with上下文管理器棕所,可以確保鎖被正常釋放。其格式如下:
with some_lock: # 執(zhí)行任務(wù)....
- 這相當(dāng)于:
ome_lock.acquire() try: # 執(zhí)行任務(wù).. finally: some_lock.release()
4. 全局解釋器鎖(GIL)
- 在大多數(shù)環(huán)境中悯辙,單核CPU情況下琳省,本質(zhì)上某一時(shí)刻只能有一個(gè)線程被執(zhí)行。多核CPU時(shí)躲撰,則可以支持多個(gè)線程同時(shí)執(zhí)行针贬。但是在Python中,無論CPU有多少核拢蛋,同時(shí)只能執(zhí)行一個(gè)線程桦他,這是由于GIL的存在導(dǎo)致的。
- GIL的全稱是Global Interpreter Lock(全局解釋器鎖)谆棱,是Python設(shè)計(jì)之初為了數(shù)據(jù)安全所做的決定快压。Python中的某個(gè)線程想要執(zhí)行,必須先拿到GIL础锐∩そ冢可以把GIL看作是執(zhí)行任務(wù)的“通行證”,并且在一個(gè)Python進(jìn)程中皆警,GIL只有一個(gè)拦宣。拿不到通行證的線程,就不允許進(jìn)入CPU執(zhí)行。GIL只在CPython解釋器中才有鸵隧,因?yàn)镃Python調(diào)用的是c語言的原生線程绸罗,不能直接操作cpu,只能利用GIL保證同一時(shí)間只能有一個(gè)線程拿到數(shù)據(jù)豆瘫。在PyPy和JPython中沒有GIL珊蟀。
- Python多線程的工作流程:
- a.拿到公共數(shù)據(jù)
- b.申請GIL
- c.Python解釋器調(diào)用操作系統(tǒng)原生線程
- d.CPU執(zhí)行運(yùn)算
- e.當(dāng)該線程執(zhí)行一段時(shí)間消耗完,無論任務(wù)是否已經(jīng)執(zhí)行完畢外驱,都會釋放GIL
- f.下一個(gè)被CPU調(diào)度的線程重復(fù)上面的過程
- Python針對不同類型的任務(wù)育灸,多線程執(zhí)行效率是不同的:
- 對于CPU密集型任務(wù)(各種循環(huán)處理、計(jì)算等等): 由于計(jì)算工作多昵宇,ticks計(jì)數(shù)很快就會達(dá)到閾值磅崭,然后觸發(fā)GIL的釋放與再競爭(多個(gè)線程來回切換是需要消耗資源的),所以Python下的多線程對CPU密集型任務(wù)并不友好
- IO密集型任務(wù)(文件處理瓦哎、網(wǎng)絡(luò)通信等涉及數(shù)據(jù)讀寫的操作): 多線程能夠有效提升效率(單線程下有IO操作會進(jìn)行IO等待砸喻,造成不必要的時(shí)間浪費(fèi),而開啟多線程能在線程A等待時(shí)蒋譬,自動切換到線程B割岛,可以不浪費(fèi)CPU的資源,從而能提升程序執(zhí)行效率)犯助。所以Python的多線程對IO密集型任務(wù)比較友好癣漆。
- 實(shí)際中使用的建議:Python中想要充分利用多核CPU,就用多進(jìn)程也切。因?yàn)槊總€(gè)進(jìn)程有各自獨(dú)立的GIL扑媚,互不干擾,這樣就可以真正意義上的并行執(zhí)行雷恃。在Python中疆股,多進(jìn)程的執(zhí)行效率優(yōu)于多線程(僅僅針對多核CPU而言)。同時(shí)建議在IO密集型任務(wù)中使用多線程倒槐,在計(jì)算密集型任務(wù)中使用多進(jìn)程旬痹。另外,深入研究Python的協(xié)程機(jī)制讨越,你會有驚喜的两残。