Python:線程

線程概念的引入背景

有了進(jìn)程為什么還要線程

進(jìn)程有很多的有點(diǎn)猴蹂,它提供了多到編程埠褪,讓我們感覺我們每個人都擁有自己的CPU和其他資源岖妄,可以提高計(jì)算機(jī)的利用率。很多人就不理解了抬探,既然進(jìn)程這么優(yōu)秀子巾,為什么還要現(xiàn)線程?其實(shí)小压,仔細(xì)觀察就會發(fā)現(xiàn)進(jìn)程還是有很多缺陷的,主要體現(xiàn)在兩點(diǎn)上:
①椰于、進(jìn)程只能在一個時間內(nèi)干一件事怠益,如果想同時干兩件事或者多件事,進(jìn)程就無能為力了瘾婿。
②蜻牢、進(jìn)程在執(zhí)行的過程中如果阻塞烤咧,例如等待輸入,整個進(jìn)程就睡掛起抢呆,即使進(jìn)程中有些工作不依賴于輸入的數(shù)據(jù)煮嫌,也將無法執(zhí)行。

線程的出現(xiàn)

60年代抱虐,在os中能擁有資源獨(dú)立運(yùn)行的基本單位是進(jìn)程昌阿,然而隨著計(jì)算機(jī)技術(shù)的發(fā)展,進(jìn)程出現(xiàn)了很多的弊端恳邀,一是由于進(jìn)程是資源的擁有這懦冰,創(chuàng)建、撤銷谣沸、切換存在很大的時空開銷刷钢,因此余姚引入輕型進(jìn)程;二是由于對稱多處理機(jī)(SMP)的出現(xiàn)乳附,可以滿足多個運(yùn)行單位内地,而整個進(jìn)程并行開銷過大。
因此在80年代赋除,出現(xiàn)了能獨(dú)立運(yùn)行的基本單位——線程(Threads)瓤鼻。
(進(jìn)程是最小的資源分配單位;線程是被CPU執(zhí)行的最小單位)

進(jìn)程和線程的關(guān)系

1.png

線程與進(jìn)程的區(qū)別:
1贤重、地址空間和其他資源:進(jìn)程間相互獨(dú)立茬祷,同一進(jìn)程的各線程間共享。某線程內(nèi)的想愛你城咋其他進(jìn)程不可見并蝗。
2祭犯、通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段來進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助滚停,以保證數(shù)據(jù)的一致性沃粗。
3、調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多键畴。
4最盅、在多線程操作系統(tǒng)中,進(jìn)程不是一個可執(zhí)行的實(shí)體起惕。

線程的特點(diǎn)

在多線程的操作系統(tǒng)中涡贱,通常是在一個進(jìn)程中包括多個線程,每個線程都是作為利用CPU的基本單位惹想,是花費(fèi)最小開銷的實(shí)體问词。線程具有以下屬性。

1)嘀粱、輕型實(shí)體
線程中的實(shí)體基本上不擁有系統(tǒng)資源激挪,只是有一點(diǎn)必不可少的辰狡、能保證獨(dú)立運(yùn)行的資源。
線程的實(shí)體包括程序垄分、數(shù)據(jù)和TCB宛篇。線程是動態(tài)概念,它的動態(tài)特性由線程控制塊TCB(Thread Control Block)描述
TCB包括以下信息:

(1)線程狀態(tài)
(2)當(dāng)線程不運(yùn)行時薄湿,被保存的現(xiàn)場資源叫倍。
(3)一組執(zhí)行堆棧。
(4)存放每個線程的局部變量主存區(qū)嘿般。
(5)訪問同一個進(jìn)程中的主存和其他資源段标。
用于指示被執(zhí)行指令序列的程序計(jì)數(shù)器、保留局部變量炉奴、少數(shù)狀態(tài)參數(shù)和返回地址等的一組寄存器和堆棧逼庞。

2)獨(dú)立調(diào)度和分派的基本單位。
在多線程OS中瞻赶,線程是能獨(dú)立運(yùn)行的基本單位赛糟,因而也是獨(dú)立調(diào)度和分派的基本單位。由于線程很輕砸逊,故線程的切換非常迅速且開銷小璧南。

3)共享進(jìn)程資源。
線程在同一進(jìn)程中的各個線程师逸,都可以共享該進(jìn)程所擁有過的每一個內(nèi)存資源司倚;此外,還可以訪問進(jìn)程所擁有的一打開文件篓像、定時器动知、信號量機(jī)構(gòu)等。由于同一個進(jìn)程內(nèi)的線程共享內(nèi)存文件员辩,所以線程之間互相通信不必調(diào)用內(nèi)核盒粮。

4)可并發(fā)執(zhí)行
在一個進(jìn)程中的多個線程之間,可以兵法執(zhí)行奠滑,甚至允許在一個進(jìn)程中所有的線程都能兵法執(zhí)行丹皱;同樣,不同進(jìn)程中的線程也能并發(fā)執(zhí)行宋税,充分利用和發(fā)揮了處理機(jī)和外部設(shè)備并行工作的能力摊崭。

使用線程的實(shí)際場景

2.png

開啟一個字處理軟件進(jìn)程,該進(jìn)程肯定需要辦不止一件事情弃甥,比如監(jiān)聽鍵盤輸入爽室,處理文字,定時自動將文字保存到硬盤淆攻,這三個任務(wù)操作的都是同一塊數(shù)據(jù)阔墩,因而不能用多進(jìn)程。只能在一個進(jìn)程里并發(fā)地開啟三個線程,如果是單線程瓶珊,那就只能是啸箫,鍵盤輸入時,不能處理文字和自動保存伞芹,自動保存時又不能輸入和處理文字忘苛。

內(nèi)存中的線程

3.png

多個線程共享同一個進(jìn)程的地址空間中的資源,是對一臺計(jì)算機(jī)上多個進(jìn)程的模擬唱较,有時也稱線程為輕量級的進(jìn)程扎唾。

而對一臺計(jì)算機(jī)上多個進(jìn)程,則共享物理內(nèi)存南缓、磁盤胸遇、打印機(jī)等其他物理資源。多線程的運(yùn)行也多進(jìn)程的運(yùn)行類似汉形,是cpu在多個線程之間的快速切換纸镊。

不同的進(jìn)程之間是充滿敵意的,彼此是搶占概疆、競爭cpu的關(guān)系逗威,如果迅雷會和QQ搶資源。而同一個進(jìn)程是由一個程序員的程序創(chuàng)建岔冀,所以同一進(jìn)程內(nèi)的線程是合作關(guān)系凯旭,一個線程可以訪問另外一個線程的內(nèi)存地址,大家都是共享的使套,一個線程干死了另外一個線程的內(nèi)存罐呼,那純屬程序員腦子有問題。

類似于進(jìn)程童漩,每個線程也有自己的堆棧弄贿,不同于進(jìn)程,線程庫無法利用時鐘中斷強(qiáng)制線程讓出CPU矫膨,可以調(diào)用thread_yield運(yùn)行線程自動放棄cpu差凹,讓另外一個線程運(yùn)行。

線程通常是有益的侧馅,但是帶來了不小程序設(shè)計(jì)難度危尿,線程的問題是:

1. 父進(jìn)程有多個線程,那么開啟的子線程是否需要同樣多的線程

2. 在同一個進(jìn)程中馁痴,如果一個線程關(guān)閉了文件谊娇,而另外一個線程正準(zhǔn)備往該文件內(nèi)寫內(nèi)容呢?

因此罗晕,在多線程的代碼中济欢,需要更多的心思來設(shè)計(jì)程序的邏輯赠堵、保護(hù)程序的數(shù)據(jù)。

用戶級線程和內(nèi)核級線程

線程的實(shí)現(xiàn)可以分為兩類:用戶級線程(User-Level Thread)和內(nèi)核線線程(Kernel-Level Thread)法褥,后者又稱為內(nèi)核支持的線程或輕量級進(jìn)程茫叭。在多線程操作系統(tǒng)中,各個系統(tǒng)的實(shí)現(xiàn)方式并不相同半等,在有的系統(tǒng)中實(shí)現(xiàn)了用戶級線程揍愁,有的系統(tǒng)中實(shí)現(xiàn)了內(nèi)核級線程。

用戶級線程

內(nèi)核的切換由用戶態(tài)程序自己控制內(nèi)核切換,不需要內(nèi)核干涉杀饵,少了進(jìn)出內(nèi)核態(tài)的消耗莽囤,但不能很好的利用多核Cpu。


4.png

在用戶空間模擬操作系統(tǒng)對進(jìn)程的調(diào)度切距,來調(diào)用一個進(jìn)程中的線程朽缎,每個進(jìn)程中都會有一個運(yùn)行時系統(tǒng),用來調(diào)度線程蔚舀。此時當(dāng)該進(jìn)程獲取cpu時饵沧,進(jìn)程內(nèi)再調(diào)度出一個線程去執(zhí)行,同一時刻只有一個線程執(zhí)行赌躺。

內(nèi)核級線程

內(nèi)核級線程:切換由內(nèi)核控制狼牺,當(dāng)線程進(jìn)行切換的時候,由用戶態(tài)轉(zhuǎn)化為內(nèi)核態(tài)礼患。切換完畢要從內(nèi)核態(tài)返回用戶態(tài)是钥;可以很好的利用smp,即利用多核cpu缅叠。windows線程就是這樣的悄泥。


5.png

Python中的線程

全局解釋器鎖GIL

Python代碼的執(zhí)行由Python虛擬機(jī)(解釋器主循環(huán))
來控制住。Python在設(shè)計(jì)之初就考慮到要在主循環(huán)中肤粱,同時只有一個線程咋執(zhí)行弹囚。雖然Python解釋器中可以“運(yùn)行”多個線程,但在任意時刻只有一個線程在解釋器中運(yùn)行领曼。
對Python虛擬機(jī)的訪問有全局解釋器鎖GIL來控制鸥鹉,正式這個鎖能保證同一時刻只有一個線程在解釋器中運(yùn)行。
在多線程環(huán)境中庶骄,Python虛擬機(jī)按一下方式執(zhí)行:
a毁渗、設(shè)置 GIL;

b单刁、切換到一個線程去運(yùn)行灸异;

c、運(yùn)行指定數(shù)量的字節(jié)碼指令或者線程主動讓出控制(可以調(diào)用 time.sleep(0));

d肺樟、把線程設(shè)置為睡眠狀態(tài)檐春;

e、解鎖 GIL儡嘶;

d喇聊、再次重復(fù)以上所有步驟恍风。
  在調(diào)用外部代碼(如 C/C++擴(kuò)展函數(shù))的時候蹦狂,GIL將會被鎖定,直到這個函數(shù)結(jié)束為止(由于在這期間沒有Python的字節(jié)碼被運(yùn)行朋贬,所以不會做線程切換)編寫擴(kuò)展的程序員可以主動解鎖GIL凯楔。

python線程模塊的選擇

Python提供了幾個用于多線程編程的模塊,包括thread锦募、threading和Queue等摆屯。thread和threading模塊允許程序員創(chuàng)建和管理線程。thread模塊提供了基本的線程和鎖的支持糠亩,threading提供了更高級別虐骑、功能更強(qiáng)的線程管理的功能。Queue模塊允許用戶創(chuàng)建一個可以用于多個線程之間共享數(shù)據(jù)的隊(duì)列數(shù)據(jù)結(jié)構(gòu)赎线。
  避免使用thread模塊廷没,因?yàn)楦呒墑e的threading模塊更為先進(jìn),對線程的支持更為完善垂寥,而且使用thread模塊里的屬性有可能會與threading出現(xiàn)沖突颠黎;其次低級別的thread模塊的同步原語很少(實(shí)際上只有一個),而threading模塊則有很多滞项;再者狭归,thread模塊中當(dāng)主線程結(jié)束時,所有的線程都會被強(qiáng)制結(jié)束掉文判,沒有警告也不會有正常的清除工作过椎,至少threading模塊能確保重要的子線程退出后進(jìn)程才退出。

thread模塊不支持守護(hù)線程戏仓,當(dāng)主線程退出時疚宇,所有的子線程不論它們是否還在工作,都會被強(qiáng)行退出柜去。而threading模塊支持守護(hù)線程灰嫉,守護(hù)線程一般是一個等待客戶請求的服務(wù)器,如果沒有客戶提出請求它就在那等著嗓奢,如果設(shè)定一個線程為守護(hù)線程讼撒,就表示這個線程是不重要的,在進(jìn)程退出的時候,不用等待這個線程退出根盒。

threading模塊

multiprocess模塊的完全模仿了threading模塊的接口钳幅,二者在使用層面敢艰,有很大的相似性册赛,因而不再詳細(xì)介紹

線程的創(chuàng)建Threading.Thread類

線程的創(chuàng)建

創(chuàng)建方式1:

from threading import Thread
import time
def sayhi(name):
     time.sleep(2)
     print('%s say hello' % name)

if __name__ == '__main__':
    t = Thread(target=sayhi,args=('Jay',))
    t.start()
    print('主線程')

創(chuàng)建方式2:

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        aquer().__init__()
        self.name = name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('Jay')
    t.start()
    print('主線程')
多線程與多進(jìn)程

pid的比較:

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進(jìn)程下開啟多個線程,每個線程都跟主進(jìn)程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進(jìn)程pid',os.getpid())

    #part2:開多個進(jìn)程,每個進(jìn)程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進(jìn)程pid',os.getpid())

開啟效率的比較:

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進(jìn)程下開啟線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進(jìn)程')
    '''
    打印結(jié)果:
    hello
    主線程/主進(jìn)程
    '''

    #在主進(jìn)程下開啟子進(jìn)程
    t=Process(target=work)
    t.start()
    print('主線程/主進(jìn)程')
    '''
    打印結(jié)果:
    主線程/主進(jìn)程
    hello
    '''

內(nèi)存數(shù)據(jù)的共享問題:

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進(jìn)程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進(jìn)程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) #查看結(jié)果為0,因?yàn)橥贿M(jìn)程內(nèi)的線程之間共享進(jìn)程內(nèi)的數(shù)據(jù)
#同一進(jìn)程內(nèi)的線程共享該進(jìn)程的數(shù)據(jù)森瘪?
多線程socket

Server端:

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

Client端:

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)
Thread類的其他方法
Thread實(shí)例對象的方法
  # isAlive(): 返回線程是否活動的扼睬。
  # getName(): 返回線程名。
  # setName(): 設(shè)置線程名窗宇。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當(dāng)前的線程變量。
  # threading.enumerate(): 返回一個包含正在運(yùn)行的線程的list军俊。正在運(yùn)行指線程啟動后侥加、結(jié)束前,不包括啟動前和終止后的線程蝇完。
  # threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量官硝,與len(threading.enumerate())有相同的結(jié)果。

代碼示例:

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進(jìn)程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內(nèi)有兩個運(yùn)行的線程
    print(threading.active_count())
    print('主線程/主進(jìn)程')

    '''
    打印結(jié)果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進(jìn)程
    Thread-1
    '''

join方法:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
守護(hù)線程

無論是進(jìn)程還是線程短蜕,都遵循:守護(hù)xx會等待主xx運(yùn)行完畢后被銷毀氢架。需要強(qiáng)調(diào)的是:運(yùn)行完畢并非終止運(yùn)行

1.對主進(jìn)程來說,運(yùn)行完畢指的是主進(jìn)程代碼運(yùn)行完畢
2.對主線程來說朋魔,運(yùn)行完畢指的是主線程所在的進(jìn)程內(nèi)所有非守護(hù)線程統(tǒng)統(tǒng)運(yùn)行完畢岖研,主線程才算運(yùn)行完畢

守護(hù)線程例子1:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設(shè)置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''

守護(hù)線程例子2:

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

同步鎖

多個線程搶占資源的情況:

from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結(jié)果可能為99
import threading
R=threading.Lock()
R.acquire()
'''
對公共數(shù)據(jù)的操作
'''
R.release()

同步鎖的引用:

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結(jié)果肯定為0,由原來的并發(fā)執(zhí)行變成串行警检,犧牲了執(zhí)行效率保證了數(shù)據(jù)安全

互斥鎖與join的區(qū)別:

#不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼并發(fā)運(yùn)行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運(yùn)行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學(xué)可能有疑問:既然加鎖會讓運(yùn)行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之后立刻使用jion,肯定會將100個任務(wù)的執(zhí)行變成串行,毫無疑問,最終n的結(jié)果也肯定是0,是安全的,但問題是
#start后立即join:任務(wù)內(nèi)的所有代碼都是串行執(zhí)行的,而加鎖,只是加鎖的部分即修改共享數(shù)據(jù)的部分是串行的
#單從保證數(shù)據(jù)安全方面,二者都可以實(shí)現(xiàn),但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''
死鎖與遞歸鎖

死鎖:所謂死鎖就是指兩個或兩個以上的進(jìn)程或者線程在執(zhí)行過程中孙援,因搶奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用扇雕,他們都將無法推進(jìn)下去拓售。此事稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程被稱為死鎖進(jìn)程镶奉。以下就是死鎖的例子:

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

解決方法础淤,遞歸鎖崭放,在Python中為了支持在同一線程中多次請求統(tǒng)一資源,python提供了可重入鎖RLock鸽凶。
這個RLock內(nèi)部維護(hù)著一個Lock和一個counter變量币砂,counter記錄了acquire的次數(shù),從而使得資源可以被多次require玻侥。直到一個線程所有的acquire都被release,其他的線程才能獲得資源凑兰。
例子:

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

信號量

Semaphore管理一個內(nèi)置的計(jì)數(shù)器拘鞋,
每當(dāng)調(diào)用acquire()時內(nèi)置計(jì)數(shù)器-1;
調(diào)用release() 時內(nèi)置計(jì)數(shù)器+1祟剔;
計(jì)數(shù)器不能小于0物延;當(dāng)計(jì)數(shù)器為0時,acquire()將阻塞線程直到其他線程調(diào)用release()笙纤。

實(shí)例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數(shù)為5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

池與信號量:

與進(jìn)程池是完全不同的概念抖拴,進(jìn)程池Pool(4)阿宅,最大只能產(chǎn)生4個進(jìn)程洒放,而且從頭到尾都只是這四個進(jìn)程榨为,不會產(chǎn)生新的随闺,而信號量是產(chǎn)生一堆線程/進(jìn)程

事件

線程的一個關(guān)鍵特性是每個線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測矩乐。如果程序中的其 他線程需要通過判斷某個線程的狀態(tài)來確定自己下一步的操作,這時線程同步問題就會變得非常棘手散罕。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設(shè)置的信號標(biāo)志,它允許線程等待某些事件的發(fā)生葬燎。在 初始情況下,Event對象中的信號標(biāo)志被設(shè)置為假窑邦。如果有線程等待一個Event對象, 而這個Event對象的標(biāo)志為假,那么這個線程將會被一直阻塞直至該標(biāo)志為真冈钦。一個線程如果將一個Event對象的信號標(biāo)志設(shè)置為真,它將喚醒所有等待這個Event對象的線程瞧筛。如果一個線程等待一個已經(jīng)被設(shè)置為真的Event對象,那么它將忽略這個事件, 繼續(xù)執(zhí)行

event.isSet():返回event的狀態(tài)值较幌;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設(shè)置event的狀態(tài)值為True恩急,所有阻塞池的線程激活進(jìn)入就緒狀態(tài)衷恭, 等待操作系統(tǒng)調(diào)度随珠;
event.clear():恢復(fù)event的狀態(tài)值為False。

6.png

例如,有多個工作線程嘗試鏈接MySQL显沈,我們想要在鏈接前確保MySQL服務(wù)正常才讓那些工作線程去連接MySQL服務(wù)器拉讯,如果連接不成功魔慷,都會去嘗試重新連接院尔。那么我們就可以采用threading.Event機(jī)制來協(xié)調(diào)各個工作線程的連接操作召边。
實(shí)例:

import threading
import time,random
from threading import Thread,Event

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

條件

使得線程等待,只有滿足某條件時贞盯,才釋放n個線程
Python提供的Condition對象提供了對復(fù)雜線程同步問題的支持躏敢。Condition被稱為條件變量件余,除了提供與Lock類似的acquire和release方法外啼器,還提供了wait和notify方法端壳。線程首先acquire一個條件變量损谦,然后判斷一些條件照捡。如果條件不滿足則wait闯参;如果條件滿足术羔,進(jìn)行一些處理改變條件后释移,通過notify方法通知其他線程玩讳,其他處于wait狀態(tài)的線程接到通知后會重新判斷條件熏纯。不斷的重復(fù)這一過程樟澜,從而解決復(fù)雜的同步問題秩贰。

實(shí)例:

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print('****')

定時器

定時器,指定n秒后執(zhí)行某個操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

線程隊(duì)列

queue隊(duì)列:使用import queue愈魏,用法與進(jìn)程的Queue一樣

class queue.Queue(maxsize=0)#先進(jìn)先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(先進(jìn)先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #后進(jìn)先出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(后進(jìn)先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #存儲數(shù)據(jù)時可設(shè)置優(yōu)先級的隊(duì)列

import queue

q=queue.PriorityQueue()
#put進(jìn)入一個元組,元組的第一個元素是優(yōu)先級(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(數(shù)字越小優(yōu)先級越高,優(yōu)先級高的優(yōu)先出隊(duì)):
(10, 'b')
(20, 'a')
(30, 'c')
'''

Python標(biāo)準(zhǔn)模塊——concurrent.futures

1 介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池溪厘,提供異步調(diào)用
ProcessPoolExecutor: 進(jìn)程池桩匪,提供異步調(diào)用
Both implement the same interface, which is defined by the abstract Executor class.

2 基本方法
submit(fn, *args, **kwargs)
異步提交任務(wù)

map(func, *iterables, timeout=None, chunksize=1)
取代for循環(huán)submit的操作

shutdown(wait=True)
相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作
wait=True闺骚,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False僻爽,立即返回胸梆,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值碰镜,整個程序都會等到所有任務(wù)執(zhí)行完畢
submit和map必須在shutdown之前

result(timeout=None)
取得結(jié)果

add_done_callback(fn)

ProcessPoolExecutor

#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

ThreadPoolExecutor

#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

回調(diào)函數(shù)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進(jìn)程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj柠横,需要用obj.result()拿到結(jié)果
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市搬俊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扩淀,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胜臊,死亡現(xiàn)場離奇詭異象对,居然都是意外死亡勒魔,警方通過查閱死者的電腦和手機(jī)冠绢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門楷力,熙熙樓的掌柜王于貴愁眉苦臉地迎上來萧朝,“玉大人检柬,你說我怎么就攤上這事⊥分欤” “怎么了项钮?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長亚隙。 經(jīng)常有香客問我,道長违崇,這世上最難降的妖魔是什么阿弃? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮羞延,結(jié)果婚禮上渣淳,老公的妹妹穿的比我還像新娘。我一直安慰自己伴箩,他們只是感情好入愧,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般棺蛛。 火紅的嫁衣襯著肌膚如雪彤恶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機(jī)與錄音斜友,去河邊找鬼洛史。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的响谓。 我是一名探鬼主播虽画,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼渤弛,長吁一口氣:“原來是場噩夢啊……” “哼扣草!你這毒婦竟也來了密浑?” 一聲冷哼從身側(cè)響起呆瞻,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤前域,失蹤者是張志新(化名)和其女友劉穎漏峰,沒想到半個月后靖苇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年谋梭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吼鱼。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖织阳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情历极,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布规惰,位于F島的核電站寒锚,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜过咬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦枪孩、人聲如沸阴挣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至揖膜,卻和暖如春誓沸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背壹粟。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工拜隧, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人趁仙。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓洪添,卻偏偏與公主長得像,于是被迫代替她去往敵國和親幸撕。 傳聞我的和親對象是個殘疾皇子薇组,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念律胀,程序并不能單獨(dú)運(yùn)行宋光,只有...
    go以恒閱讀 1,635評論 0 6
  • 線程狀態(tài)新建,就緒炭菌,運(yùn)行罪佳,阻塞,死亡黑低。 線程同步多線程可以同時運(yùn)行多個任務(wù)赘艳,線程需要共享數(shù)據(jù)的時候,可能出現(xiàn)數(shù)據(jù)不...
    KevinCool閱讀 795評論 0 0
  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進(jìn)行...
    月亮是我踢彎得閱讀 5,959評論 3 28
  • 線程的定義 線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位克握。它被包含在進(jìn)程中蕾管。是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程...
    So_ProbuING閱讀 331評論 0 0
  • 但之后菩暗,情況開始有所轉(zhuǎn)變掰曾。她們升職之后,部門的架構(gòu)也做了調(diào)整停团,倆人被指定分別去帶兩個團(tuán)隊(duì)旷坦,而這兩個團(tuán)隊(duì)間的目標(biāo)是相...
    萍子111閱讀 536評論 1 1