Num01-->多線(xiàn)程threading
Python中建議使用threading模塊臭猜,而不要使用thread模塊呐伞。原因如下:
1淘捡,Python中threading模塊對(duì)thread進(jìn)行了一些包裝蚤吹,可以更加方便的使用娃兽。
2,Python中threading模塊能確保重要的子線(xiàn)程在進(jìn)程退出前結(jié)束禁筏。
3持钉,Python中thread模塊,當(dāng)主線(xiàn)程技術(shù)篱昔,同一主線(xiàn)程下的其他所有子線(xiàn)程都被強(qiáng)制退出每强。
4,Python中thread模塊州刽,不支持守護(hù)(daemon)線(xiàn)程空执。
Test01-->多線(xiàn)程執(zhí)行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
import time
def say():
print("我是子線(xiàn)程")
time.sleep(1)
if __name__ == "__main__":
print("我是主線(xiàn)程")
# 創(chuàng)建三個(gè)子線(xiàn)程
for i in range(3):
t = threading.Thread(target=say)
# 啟動(dòng)線(xiàn)程
t.start()
# 結(jié)果如下:
# 我是主線(xiàn)程
# 我是子線(xiàn)程
# 我是子線(xiàn)程
# 我是子線(xiàn)程
Test02-->主線(xiàn)程會(huì)等待所有子線(xiàn)程結(jié)束后,再結(jié)束
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
from time import sleep, ctime
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子線(xiàn)程結(jié)束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子線(xiàn)程結(jié)束")
if __name__ == '__main__':
print('主線(xiàn)程---開(kāi)始時(shí)間---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
# 屏蔽以下代碼穗椅,試試看辨绊,程序是否會(huì)立馬結(jié)束?
# t1.join()
# t2.join()
print('主線(xiàn)程---結(jié)束時(shí)間---:%s' % ctime())
# 沒(méi)有屏蔽join()函數(shù)的結(jié)果如下:
# 主線(xiàn)程---開(kāi)始時(shí)間---:Fri Apr 28 14:19:29 2017
# 正在唱歌...0
# 正在跳舞...0
# 正在唱歌...1
# 正在跳舞...1
# 正在跳舞...2
# 正在唱歌...2
# 跳舞子線(xiàn)程結(jié)束
# 唱歌子線(xiàn)程結(jié)束
# 主線(xiàn)程---結(jié)束時(shí)間---:Fri Apr 28 14:19:32 2017
# 屏蔽join()函數(shù),的結(jié)果如下:
# 主線(xiàn)程---開(kāi)始時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在唱歌...0
# 正在跳舞...0
# 主線(xiàn)程---結(jié)束時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在跳舞...1
# 正在唱歌...1
# 正在跳舞...2
# 正在唱歌...2
# 唱歌子線(xiàn)程結(jié)束
# 跳舞子線(xiàn)程結(jié)束
以上代碼說(shuō)明匹表,子線(xiàn)程在啟動(dòng)后门坷,調(diào)用join()函數(shù),是等待所有的子線(xiàn)程結(jié)束后桑孩,主線(xiàn)程才結(jié)束拜鹤。否則主線(xiàn)程,很快就結(jié)束了流椒。
Test03-->查看線(xiàn)程數(shù)量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
from time import sleep, ctime
import gc
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子線(xiàn)程結(jié)束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子線(xiàn)程結(jié)束")
if __name__ == '__main__':
print('---開(kāi)始時(shí)間---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
# enumerate()函數(shù)的意思:返回一個(gè)列表敏簿,存放當(dāng)前活著的線(xiàn)程
length = len(threading.enumerate())
print('當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:%d' % length)
if length <= 1:
break
sleep(1)
# t1.join()
# t2.join()
print('---結(jié)束時(shí)間---:%s' % ctime())
# 手動(dòng)垃圾回收
#gc.collect()
print('當(dāng)前還活著的線(xiàn)程是:%s' % threading.current_thread().name)
# 結(jié)果如下:
# ---開(kāi)始時(shí)間---:Fri Apr 28 14:44:50 2017
# 正在唱歌...0
# 正在跳舞...0
# 當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:3
# 正在唱歌...1
# 正在跳舞...1
# 當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:3
# 正在唱歌...2
# 當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:3
# 正在跳舞...2
# 唱歌子線(xiàn)程結(jié)束
# 當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:2
# 跳舞子線(xiàn)程結(jié)束
# 當(dāng)前運(yùn)行的線(xiàn)程數(shù)為:1
# ---結(jié)束時(shí)間---:Fri Apr 28 14:44:54 2017
# 當(dāng)前還活著的線(xiàn)程是:MainThread
以上代碼加以說(shuō)明:在程序的最后,還剩下主線(xiàn)程還在存活,是因?yàn)橐粋€(gè)程序至少要有一個(gè)主線(xiàn)程惯裕,一直存活著温数。當(dāng)然Python垃圾回收器也會(huì)不定時(shí)的回收垃圾(引用計(jì)數(shù)和分代清除兩種機(jī)制),也可以手動(dòng)回收垃圾蜻势。
Num02-->第二種方式創(chuàng)建多線(xiàn)程
定義一個(gè)新的子類(lèi)撑刺,繼承threading.Thread就可以,然后重寫(xiě)run()方法握玛。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @ ' + str(i) # name屬性中保存的是當(dāng)前線(xiàn)程的名字
print(msg)
def main():
for i in range(3):
t = MyThread()
t.start()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# I'm Thread-2 @ 0
# I'm Thread-1 @ 0
# I'm Thread-3 @ 0
# I'm Thread-1 @ 1
# I'm Thread-3 @ 1
# I'm Thread-2 @ 1
# I'm Thread-2 @ 2
# I'm Thread-1 @ 2
# I'm Thread-3 @ 2
以上代碼加以說(shuō)明:
1痹栖,Python中threading.Thread類(lèi)中有一個(gè)run()方法祈远,用于定義線(xiàn)程的功能函數(shù)示姿,可以在自己定義的線(xiàn)程類(lèi)中重寫(xiě)該方法蟹但。而創(chuàng)建自己的線(xiàn)程實(shí)例后,通過(guò)Thread類(lèi)的start()方法拂苹,可以啟動(dòng)該線(xiàn)程安聘,交個(gè)Python虛擬機(jī)進(jìn)行調(diào)度,當(dāng)該線(xiàn)程獲得執(zhí)行的機(jī)會(huì)時(shí)瓢棒,就會(huì)調(diào)用run()方法執(zhí)行線(xiàn)程浴韭。
2,多線(xiàn)程程序的執(zhí)行順序是不確定的脯宿。當(dāng)執(zhí)行到sleep()語(yǔ)句時(shí)念颈,線(xiàn)程將被阻塞(Blocked),到sleep()結(jié)束后,線(xiàn)程進(jìn)入就緒狀態(tài)(Runnable)连霉,等待調(diào)度舍肠。而線(xiàn)程的調(diào)度將會(huì)隨機(jī)選擇一個(gè)線(xiàn)程執(zhí)行。上面的代碼只能保證每個(gè)線(xiàn)程都運(yùn)行完窘面,整個(gè)run()函數(shù)。但是線(xiàn)程的啟動(dòng)順序叽躯,run()函數(shù)中每次循環(huán)的執(zhí)行順序不能確定财边。
Num03-->線(xiàn)程的幾種狀態(tài)
Num04-->多線(xiàn)程共享全局變量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----in work1, g_num is %d---" % g_num)
def work2():
global g_num
print("----in work2, g_num is %d---" % g_num)
print("---線(xiàn)程創(chuàng)建之前g_num is %d---" % g_num)
t1 = Thread(target=work1)
t1.start()
t1.join()
t2 = Thread(target=work2)
t2.start()
t2.join()
以上代碼加以說(shuō)明:
1,優(yōu)點(diǎn):不管是可變類(lèi)型或者不可變類(lèi)型的數(shù)據(jù)点骑,在一個(gè)進(jìn)程內(nèi)的所有線(xiàn)程共享全局變量酣难,能夠在不使用其他方式的前提下完成多線(xiàn)程之間的數(shù)據(jù)共享。
2黑滴,缺點(diǎn):多線(xiàn)程對(duì)全局變量的隨意改變憨募,會(huì)造成對(duì)全局變量值的混亂。也就是多線(xiàn)程非安全的原因袁辈。
Num05-->多線(xiàn)程中的互斥鎖
當(dāng)多個(gè)線(xiàn)程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候菜谣,需要進(jìn)行同步控制
線(xiàn)程同步能夠保證多個(gè)線(xiàn)程安全訪(fǎng)問(wèn)競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖。
互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定尾膊。
某個(gè)線(xiàn)程要更改共享數(shù)據(jù)時(shí)媳危,先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”冈敛,其他線(xiàn)程不能更改待笑;直到該線(xiàn)程釋放資源,將資源的狀態(tài)變成“非鎖定”抓谴,其他的線(xiàn)程才能再次鎖定該資源暮蹂。互斥鎖保證了每次只有一個(gè)線(xiàn)程進(jìn)行寫(xiě)入操作癌压,從而保證了多線(xiàn)程情況下數(shù)據(jù)的正確性仰泻。
Test01-->互斥鎖的創(chuàng)建
#創(chuàng)建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([blocking])
#釋放
mutex.release()
加以說(shuō)明:
其中,鎖定方法acquire可以有一個(gè)blocking參數(shù)措拇。
如果設(shè)定blocking為T(mén)rue我纪,則當(dāng)前線(xiàn)程會(huì)堵塞,直到獲取到這個(gè)鎖為止丐吓。(默認(rèn)為T(mén)rue)
如果設(shè)定blocking為False浅悉,則當(dāng)前線(xiàn)程不會(huì)堵塞
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
# True表示堵塞 即如果這個(gè)鎖在上鎖之前已經(jīng)被上鎖了,那么這個(gè)線(xiàn)程會(huì)在這里一直等待到解鎖為止
# False表示非堵塞券犁,即不管本次調(diào)用能夠成功上鎖术健,都不會(huì)卡在這,而是繼續(xù)執(zhí)行下面的代碼
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("---test1---g_num=%d" % g_num)
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) # True表示堵塞
if mutexFlag:
g_num += 1
mutex.release()
print("---test2---g_num=%d" % g_num)
def main():
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---" % g_num)
p1.join()
p2.join()
if __name__ == '__main__':
# 創(chuàng)建一個(gè)互斥鎖
# 這個(gè)所默認(rèn)是未上鎖的狀態(tài)
mutex = Lock()
main()
# 運(yùn)行結(jié)果如下:
# ---g_num=44416---
# ---test1---g_num=1990567
# ---test2---g_num=2000000
Test02-->上鎖解鎖過(guò)程
當(dāng)一個(gè)線(xiàn)程調(diào)用鎖的acquire()方法獲得鎖時(shí),鎖就進(jìn)入“l(fā)ocked”鎖住狀態(tài)粘衬。
每次只有一個(gè)線(xiàn)程可以獲得鎖荞估。
如果此時(shí)另一個(gè)線(xiàn)程試圖獲得鎖,該線(xiàn)程就會(huì)變成“blocked”狀態(tài)稚新,稱(chēng)為阻塞勘伺。直到擁有鎖的線(xiàn)程調(diào)用release()函數(shù)釋放鎖之后,這時(shí)這個(gè)鎖就進(jìn)入“unlocked”解鎖的狀態(tài)褂删。
線(xiàn)程調(diào)度程序飞醉,從處于同步阻塞狀態(tài)的線(xiàn)程中選擇一個(gè)來(lái)獲得鎖,并使得該線(xiàn)程進(jìn)入(running)運(yùn)行狀態(tài)屯阀。
Test03-->互斥鎖的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):確保了某段關(guān)鍵代碼只能有一個(gè)線(xiàn)程從頭到尾的執(zhí)行缅帘。
缺點(diǎn):
1,阻止了多線(xiàn)程并發(fā)執(zhí)行难衰,包含鎖的代碼實(shí)際上是以單線(xiàn)程的模式執(zhí)行钦无,效率大大降低了。
2盖袭,由于可以存在多個(gè)鎖失暂,不同的線(xiàn)程持有不同的鎖彼宠,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖趣席。
Num06-->線(xiàn)程死鎖和遞歸鎖
Test01--> 死鎖的情況
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對(duì)mutexA上鎖
if mutexA.acquire():
print(self.name + '----mutexA上鎖----')
time.sleep(1)
if mutexB.acquire():
print(self.name + '----mutexA中mutexB上鎖----')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì)mutexB上鎖
if mutexB.acquire():
print(self.name + '----mutexB上鎖----')
time.sleep(1)
if mutexA.acquire():
print(self.name + '----mutexB中mutexA上鎖----')
mutexA.release()
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# 死鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-2----mutexB上鎖----
# 我手寫(xiě)的:一直停留在這里不動(dòng)
Test02--> 遞歸鎖兵志,用于解決死鎖的問(wèn)題,對(duì)以上死鎖代碼加以修改
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對(duì)mutexA上鎖
if mutex.acquire():
print(self.name + '----mutexA上鎖----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexA中mutexB上鎖----')
mutex.release()
mutex.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì)mutexB上鎖
if mutex.acquire():
print(self.name + '----mutexB上鎖----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexB中mutexA上鎖----')
mutex.release()
mutex.release()
mutex = threading.RLock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# RLock遞歸鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-1----mutexA中mutexB上鎖----
# Thread-2----mutexB上鎖----
# Thread-2----mutexB中mutexA上鎖----
Num07-->利用互斥鎖實(shí)現(xiàn)同步
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
# 利用互斥鎖實(shí)現(xiàn)同步
from threading import Thread, Lock
import time
# 先創(chuàng)建A宣肚、B想罕、C三個(gè)鎖
lockA = Lock()
lockB = Lock()
lockC = Lock()
def task_a():
# 申請(qǐng)lockA,獲得鎖才開(kāi)始任務(wù)
while True:
lockA.acquire()
print("-----A--------")
time.sleep(1)
lockB.release()
def task_b():
# 申請(qǐng)lockB霉涨,獲得鎖才開(kāi)始任務(wù)
while True:
lockB.acquire()
print("-----B--------")
time.sleep(1)
lockC.release()
def task_c():
# 申請(qǐng)lockC按价,獲得鎖才開(kāi)始任務(wù)
while True:
lockC.acquire()
print("-----C--------")
time.sleep(1)
lockA.release()
def main():
# lockB和lockC在主線(xiàn)程獲得鎖
lockB.acquire()
lockC.acquire()
thread_a = Thread(target=task_a)
thread_b = Thread(target=task_b)
thread_c = Thread(target=task_c)
thread_a.start()
thread_b.start()
thread_c.start()
thread_a.join()
thread_b.join()
thread_c.join()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# 此處省略......一直在同步的打印A、B笙瑟、C
Num08-->生產(chǎn)者和消費(fèi)者模式--Queue
Python的Queue模塊中提供了同步的楼镐、線(xiàn)程安全的隊(duì)列類(lèi),包括FIFO(先入先出)隊(duì)列Queue往枷,LIFO(后入先出)隊(duì)列LifoQueue框产,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)(可以理解為原子操作错洁,即要么不做秉宿,要么就做完),能夠在多線(xiàn)程中直接使用屯碴∶枘溃可以使用隊(duì)列來(lái)實(shí)現(xiàn)線(xiàn)程間的同步。
為什么要使用生產(chǎn)者和消費(fèi)者模式导而???
在線(xiàn)程世界里忱叭,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線(xiàn)程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線(xiàn)程今艺。在多線(xiàn)程開(kāi)發(fā)當(dāng)中韵丑,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢虚缎,那么生產(chǎn)者就必須等待消費(fèi)者處理完埂息,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理遥巴,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者享幽。為了解決這個(gè)問(wèn)題于是引入了生產(chǎn)者和消費(fèi)者模式铲掐。
什么是生產(chǎn)者消費(fèi)者模式???
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊值桩,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊摆霉,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)携栋,而是直接從阻塞隊(duì)列里取搭盾,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力婉支。
這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的鸯隅。
Test01-->FIFO先進(jìn)先出隊(duì)列Queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue # 隊(duì)列,解決多線(xiàn)程問(wèn)題
q = queue.Queue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# 我的第一個(gè)進(jìn)來(lái)的
# 我是第二個(gè)進(jìn)來(lái)的
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}
Test02-->LIFO后入先出隊(duì)列LifoQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 后進(jìn)先出
q = queue.LifoQueue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}
# 我是第二個(gè)進(jìn)來(lái)的
# 我的第一個(gè)進(jìn)來(lái)的
Test03-->優(yōu)先級(jí)隊(duì)列PriorityQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 按優(yōu)先級(jí),按照2向挖,3蝌以,4,5這樣從小到大的優(yōu)先級(jí)
q = queue.PriorityQueue()
q.put([5, "我的第一個(gè)進(jìn)來(lái)的"])
q.put([2, "我是第二個(gè)進(jìn)來(lái)的"])
q.put([4, {"name": "我是第三個(gè)進(jìn)來(lái)的"}])
q.put([3, (2, 3, 4, 5)])
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# [2, '我是第二個(gè)進(jìn)來(lái)的']
# [3, (2, 3, 4, 5)]
# [4, {'name': '我是第三個(gè)進(jìn)來(lái)的'}]
# [5, '我的第一個(gè)進(jìn)來(lái)的']
Test04-->Queue隊(duì)列的方法:
創(chuàng)建一個(gè)“隊(duì)列”對(duì)象
from queue import Queue
q = queue.Queue(maxsize = 100)
Queue類(lèi)即是一個(gè)隊(duì)列的同步實(shí)現(xiàn)何之。隊(duì)列長(zhǎng)度可為無(wú)限或者有限跟畅。可通過(guò)Queue的構(gòu)造函數(shù)的可選參數(shù)maxsize來(lái)設(shè)定隊(duì)列長(zhǎng)度溶推。如果maxsize小于1就表示隊(duì)列長(zhǎng)度無(wú)限徊件。
將一個(gè)值放入隊(duì)列中
q.put(100)
調(diào)用隊(duì)列對(duì)象的put()方法在隊(duì)尾插入一個(gè)項(xiàng)目。put()有兩個(gè)參數(shù)蒜危,第一個(gè)item為必需的虱痕,為插入項(xiàng)目的值;第二個(gè)block為可選參數(shù)舰褪,默認(rèn)為
1皆疹。如果隊(duì)列當(dāng)前為空且block為1,put()方法就使調(diào)用線(xiàn)程暫停,直到空出一個(gè)數(shù)據(jù)單元占拍。如果block為0略就,put方法將引發(fā)Full異常。
將一個(gè)值從隊(duì)列中取出
q.get()
調(diào)用隊(duì)列對(duì)象的get()方法從隊(duì)頭刪除并返回一個(gè)項(xiàng)目晃酒”砝危可選參數(shù)為block,默認(rèn)為T(mén)rue贝次。如果隊(duì)列為空且block為T(mén)rue崔兴,
get()就使調(diào)用線(xiàn)程暫停,直至有項(xiàng)目可用蛔翅。如果隊(duì)列為空且block為False敲茄,隊(duì)列將引發(fā)Empty異常。
此包中的常用方法(q = queue.Queue()):
q.qsize() 返回隊(duì)列的大小
q.empty() 如果隊(duì)列為空山析,返回True,反之False
q.full() 如果隊(duì)列滿(mǎn)了堰燎,返回True,反之False
q.full 與 maxsize 大小對(duì)應(yīng)
q.get([block[, timeout]]) 獲取隊(duì)列,timeout等待時(shí)間
q.get_nowait() 相當(dāng)q.get(False)
非阻塞 q.put(item) 寫(xiě)入隊(duì)列笋轨,timeout等待時(shí)間
q.put_nowait(item) 相當(dāng)q.put(item, False)
q.task_done() 在完成一項(xiàng)工作之后秆剪,q.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
q.join() 實(shí)際上意味著等到隊(duì)列為空赊淑,再執(zhí)行別的操作
Test05-->生產(chǎn)者消費(fèi)者案例如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
from queue import Queue
# 3個(gè)生產(chǎn)者,當(dāng)產(chǎn)品數(shù)量小于100就生產(chǎn)8個(gè)
# 5個(gè)消費(fèi)者仅讽,當(dāng)產(chǎn)品數(shù)量大于10就消費(fèi)3
storageQueue = Queue() # 創(chuàng)建用于保存產(chǎn)品的隊(duì)列
# 保存產(chǎn)品編號(hào)
serial_num = 0
# 生產(chǎn)者
class Producer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
global serial_num
while True:
if storageQueue.qsize() < 100:
# 向隊(duì)列中添加消息模擬生產(chǎn)
for i in range(8):
msg = self.name + ":" + str(serial_num)
storageQueue.put(msg)
print("%s 生產(chǎn)了:%s" % (self.name, msg))
serial_num += 1
time.sleep(1)
# 消費(fèi)者
class Consumer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
while True:
if storageQueue.qsize() > 10:
# 從消息隊(duì)列中取消息陶缺,代表消費(fèi)產(chǎn)品
for i in range(3):
msg = storageQueue.get()
print("%s消費(fèi)了:%s" % (self.name, msg))
time.sleep(1)
def main():
# 3個(gè)生產(chǎn)者
for i in range(3):
t = Producer("Producter--" + str(i))
t.start()
# 5個(gè)消費(fèi)者
for i in range(5):
t = Consumer("Consumer--" + str(i))
t.start()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# Producter--0 生產(chǎn)了:Producter--0:0
# Producter--0 生產(chǎn)了:Producter--0:1
# Producter--0 生產(chǎn)了:Producter--0:2
# Producter--0 生產(chǎn)了:Producter--0:3
# Producter--0 生產(chǎn)了:Producter--0:4
# Producter--0 生產(chǎn)了:Producter--0:5
# Producter--0 生產(chǎn)了:Producter--0:6
# Producter--0 生產(chǎn)了:Producter--0:7
# Producter--1 生產(chǎn)了:Producter--1:8
# Producter--1 生產(chǎn)了:Producter--1:9
# Producter--1 生產(chǎn)了:Producter--1:10
# Producter--1 生產(chǎn)了:Producter--1:11
# Producter--1 生產(chǎn)了:Producter--1:12
# Producter--1 生產(chǎn)了:Producter--1:13
# Producter--2 生產(chǎn)了:Producter--2:13
# Producter--2 生產(chǎn)了:Producter--2:14
# Producter--2 生產(chǎn)了:Producter--2:15
# Producter--2 生產(chǎn)了:Producter--2:16
# Producter--2 生產(chǎn)了:Producter--2:17
# Producter--2 生產(chǎn)了:Producter--2:18
# Producter--2 生產(chǎn)了:Producter--2:19
# Producter--2 生產(chǎn)了:Producter--2:20
# Producter--1 生產(chǎn)了:Producter--1:22
# Producter--1 生產(chǎn)了:Producter--1:23
# Consumer--0消費(fèi)了:Producter--0:0
# Consumer--0消費(fèi)了:Producter--0:1
# Consumer--0消費(fèi)了:Producter--0:2
# Consumer--1消費(fèi)了:Producter--0:3
# Consumer--1消費(fèi)了:Producter--0:4
# Consumer--1消費(fèi)了:Producter--0:5
# Consumer--2消費(fèi)了:Producter--0:6
# Consumer--2消費(fèi)了:Producter--0:7
# Consumer--2消費(fèi)了:Producter--1:8
# Consumer--3消費(fèi)了:Producter--1:9
# Consumer--3消費(fèi)了:Producter--1:10
# Consumer--3消費(fèi)了:Producter--1:11
# Consumer--4消費(fèi)了:Producter--1:12
# Consumer--4消費(fèi)了:Producter--1:13
# Consumer--4消費(fèi)了:Producter--2:13
# Producter--0 生產(chǎn)了:Producter--0:24
# Producter--0 生產(chǎn)了:Producter--0:25
# Producter--0 生產(chǎn)了:Producter--0:26
# Producter--0 生產(chǎn)了:Producter--0:27
# Producter--0 生產(chǎn)了:Producter--0:28
# Producter--0 生產(chǎn)了:Producter--0:29
# 我手動(dòng)寫(xiě)的:此處省略很多......
Num09-->守護(hù)線(xiàn)程
只要非守護(hù)線(xiàn)程結(jié)束了,不管守護(hù)線(xiàn)程結(jié)束沒(méi)結(jié)束洁灵,程序都結(jié)束.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
def run(n):
print("我是守護(hù)線(xiàn)程--", n)
time.sleep(1)
start_time = time.time()
thread_list = []
for i in range(3):
t = threading.Thread(target=run, args=('thread_list-%s' % i,))
# 設(shè)置線(xiàn)程為守護(hù)狀態(tài)饱岸,非守護(hù)狀態(tài)線(xiàn)程都退出,程序就退出,不等待守護(hù)狀態(tài)線(xiàn)程
t.daemon = True
t.start() # t.daemon=True 必須在 調(diào)用start()函數(shù) 前面
thread_list.append(t)
print("當(dāng)前還活著的線(xiàn)程數(shù)量是:", threading.active_count())
print("當(dāng)前還活著的線(xiàn)程有:", thread_list)
print("當(dāng)前還剩線(xiàn)程是:", threading.current_thread().name)
print("耗時(shí):", time.time() - start_time)
# 結(jié)果是:
# 我是守護(hù)線(xiàn)程-- thread_list-0
# 我是守護(hù)線(xiàn)程-- thread_list-1
# 我是守護(hù)線(xiàn)程-- thread_list-2
# 當(dāng)前還活著的線(xiàn)程數(shù)量是: 4
# 當(dāng)前還活著的線(xiàn)程有: [<Thread(Thread-1, started daemon 12852)>, <Thread(Thread-2, started daemon 6696)>, <Thread(Thread-3, started daemon 11940)>]
# 當(dāng)前還剩線(xiàn)程是: MainThread
# 耗時(shí): 0.0
Num10-->多線(xiàn)程--非共享數(shù)據(jù)
在多線(xiàn)程開(kāi)發(fā)中处渣,全局變量是多個(gè)線(xiàn)程都共享的數(shù)據(jù)伶贰,而局部變量則是各個(gè)線(xiàn)程的,是非共享的罐栈。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
# 重寫(xiě)run()方法
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('線(xiàn)程(%s),num=%d' % (self.name, self.num))
if __name__ == '__main__':
t1 = MyThread(99, 5)
t1.start()
t2 = MyThread(199, 1)
t2.start()
t1.join()
t2.join()
# 結(jié)果如下:
# 線(xiàn)程(Thread-2),num=200
# 線(xiàn)程(Thread-1),num=100
Num11-->線(xiàn)程中ThreadLocal
Test01-->使用函數(shù)傳參的方法
def process_student(name):
std = Student(name)
# std是局部變量黍衙,但是每個(gè)函數(shù)都要用它,因此必須傳進(jìn)去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每個(gè)函數(shù)一層一層調(diào)用都這么傳參數(shù)那還得了荠诬?用全局變量琅翻?也不行,因?yàn)槊總€(gè)線(xiàn)程處理不同的Student對(duì)象柑贞,不能共享方椎。
Test02-->使用全局字典的方法
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局變量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不傳入std,而是根據(jù)當(dāng)前線(xiàn)程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函數(shù)都可以查找出當(dāng)前線(xiàn)程的std變量:
std = global_dict[threading.current_thread()]
...
這種方式理論上是可行的钧嘶,它最大的優(yōu)點(diǎn)是消除了std對(duì)象在每層函數(shù)中的傳遞問(wèn)題棠众,但是,每個(gè)函數(shù)獲取std的代碼有點(diǎn)low有决。
Test03-->使用ThreadLocal的方法
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import threading
# 創(chuàng)建ThreadLocal對(duì)象
local_school = threading.local()
class Student(object):
def __init__(self, name):
self.name = name
# 線(xiàn)程中調(diào)用的函數(shù)
def do_task():
# 相當(dāng)于以當(dāng)前線(xiàn)程對(duì)象為key取出stu對(duì)象
stu = local_school.student
print("%s中學(xué)生名字為:%s" % (threading.current_thread().name, stu.name))
# 線(xiàn)程的功能函數(shù)
def task_thread(name):
# 創(chuàng)建類(lèi)的實(shí)例對(duì)象
stu = Student(name)
# 相當(dāng)于以當(dāng)前的線(xiàn)程對(duì)象為key,放入全局的字典local_school
local_school.student = stu
do_task()
def main():
t1 = Thread(target=task_thread, args=("xiaoke",))
t2 = Thread(target=task_thread, args=("lili",))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# Thread-1中學(xué)生名字為:xiaoke
# Thread-2中學(xué)生名字為:lili
全局變量local_school就是一個(gè)ThreadLocal對(duì)象闸拿,每個(gè)Thread對(duì)它都可以讀寫(xiě)student屬性,但互不影響书幕。你可以把local_school看成全局變量新荤,但每個(gè)屬性如local_school.student都是線(xiàn)程的局部變量,可以任意讀寫(xiě)而互不干擾台汇,也不用管理鎖的問(wèn)題苛骨,ThreadLocal內(nèi)部會(huì)處理。
可以理解為全局變量local_school是一個(gè)dict苟呐。
ThreadLocal最常用的地方就是為每個(gè)線(xiàn)程綁定一個(gè)數(shù)據(jù)庫(kù)連接痒芝,HTTP請(qǐng)求,用戶(hù)身份信息等牵素。這樣一個(gè)線(xiàn)程所有調(diào)用到的處理函數(shù)严衬,都可以非常方便地訪(fǎng)問(wèn)這些資源。
Test04-->小總結(jié)
一個(gè)ThreadLocal變量雖然是“全局變量”两波,但每個(gè)線(xiàn)程都只能讀寫(xiě)自己線(xiàn)程的獨(dú)立副本瞳步,互不干擾。
ThreadLocal解決了參數(shù)在一個(gè)線(xiàn)程中各個(gè)函數(shù)之間互相傳遞的問(wèn)題腰奋。
Num12-->GIL全局解釋器鎖
作用:保證同一時(shí)刻单起,無(wú)論你有多少個(gè)線(xiàn)程,只有一個(gè)線(xiàn)程被CPU執(zhí)行劣坊。
因?yàn)镻ython的線(xiàn)程雖然是真正的線(xiàn)程嘀倒,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock局冰,任何Python線(xiàn)程執(zhí)行前测蘑,必須先獲得GIL鎖,然后康二,每執(zhí)行100條字節(jié)碼碳胳,解釋器就自動(dòng)釋放GIL鎖,讓別的線(xiàn)程有機(jī)會(huì)執(zhí)行沫勿。這個(gè)GIL全局鎖實(shí)際上把所有線(xiàn)程的執(zhí)行代碼都給上了鎖挨约,所以,多線(xiàn)程在Python中只能交替執(zhí)行产雹,即使100個(gè)線(xiàn)程跑在100核CPU上诫惭,也只能用到1個(gè)核。
所以蔓挖,在Python中夕土,可以使用多線(xiàn)程,但不要指望能有效利用多核瘟判。如果一定要通過(guò)多線(xiàn)程利用多核怨绣,那只能通過(guò)C擴(kuò)展來(lái)實(shí)現(xiàn),不過(guò)這樣就失去了Python簡(jiǎn)單易用的特點(diǎn)荒适。
不過(guò)梨熙,也不用過(guò)于擔(dān)心,Python雖然不能利用多線(xiàn)程實(shí)現(xiàn)多核任務(wù)刀诬,但可以通過(guò)多進(jìn)程實(shí)現(xiàn)多核任務(wù)咽扇。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖,互不影響陕壹。
Num13-->信號(hào)量Semaphore
信號(hào)量:是指同時(shí)開(kāi)幾個(gè)線(xiàn)程并發(fā)质欲。比如廁所有5個(gè)坑,那最多只允許5個(gè)人上廁所糠馆,后面的人只能等里面的5個(gè)人都出來(lái)了嘶伟,才能再進(jìn)去。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class myThread(threading.Thread):
def run(self):
# 加把鎖又碌,可以放進(jìn)去多個(gè)(相當(dāng)于5把鎖九昧,同時(shí)有5個(gè)線(xiàn)程)
if semaphore.acquire():
print(self.name)
time.sleep(3)
semaphore.release()
if __name__ == "__main__":
# 同時(shí)能有幾個(gè)線(xiàn)程進(jìn)去(設(shè)置為5就是一次5個(gè)線(xiàn)程進(jìn)去)
semaphore = threading.Semaphore(5)
thread_list = [] # 空列表
for i in range(200): # 200個(gè)線(xiàn)程
thread_list.append(myThread()) # 加線(xiàn)程對(duì)象
for t in thread_list:
t.start() # 分別啟動(dòng)
# 結(jié)果如下:每隔3秒就會(huì)同時(shí)顯示5個(gè)進(jìn)程
# Thread-1
# Thread-2
# Thread-3
# Thread-4
# Thread-5
# Thread-6
# Thread-7
# Thread-8
# Thread-9
# Thread-10
# .......
Num14-->Event線(xiàn)程間通信
Python提供了Event對(duì)象用于線(xiàn)程間通信绊袋。它是由線(xiàn)程設(shè)置的信號(hào)標(biāo)志,如果信號(hào)標(biāo)志為真铸鹰,則其他線(xiàn)程等待癌别,直到信號(hào)拿到。
Event對(duì)象實(shí)現(xiàn)了簡(jiǎn)單的線(xiàn)程通信機(jī)制蹋笼。它提供了設(shè)置信號(hào)展姐,清除信號(hào),等待信號(hào)等剖毯,用于實(shí)現(xiàn)線(xiàn)程間的通信圾笨。
Events的使用
event = threading.Event()
event.wait()
Event對(duì)象wait()方法只有在內(nèi)部信號(hào)為真的時(shí)候,才會(huì)很快的執(zhí)行并完成返回逊谋。當(dāng)Event對(duì)象的內(nèi)部信號(hào)標(biāo)志為假時(shí)擂达,則wait()方法一直等待到其為真時(shí)才返回。
event.set()
使用Event的set()方法可以設(shè)置Event對(duì)象內(nèi)部的信號(hào)標(biāo)志為真涣狗。Event對(duì)象提供了isSet()方法來(lái)判斷其內(nèi)部信號(hào)標(biāo)志的狀態(tài)谍婉。當(dāng)使用Event對(duì)象的set()方法后,isSet()方法返回真镀钓。
event.clear()
使用Event對(duì)象的clear()方法可以清除Event對(duì)象內(nèi)部的信號(hào)標(biāo)志穗熬。即將其設(shè)為假,當(dāng)使用Event的clear方法后丁溅,isSet()方法返回假唤蔗。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class Boss(threading.Thread):
def run(self):
print("大老板說(shuō):今晚大家都要加班到24:00。")
print(event.isSet())
event.set()
print(event.isSet())
print("辛苦大家了......加油干吧?呱汀<斯瘛!")
time.sleep(10)
print("大老板說(shuō):24:00到了涯穷,可以下班了棍掐。")
print(event.isSet())
event.set()
print(event.isSet())
class Worker(threading.Thread):
def run(self):
# 等待信號(hào)為真,取數(shù)據(jù)
event.wait()
print("加油工作拷况,多賺錢(qián)……作煌!")
time.sleep(5)
event.clear()
# 等待信號(hào)為真,取數(shù)據(jù)
event.wait()
print("下班回家陪老婆赚瘦,孩子粟誓,哈哈哈!")
if __name__ == "__main__":
event = threading.Event()
threads = []
# 三個(gè)員工,一個(gè)老板
for i in range(3):
threads.append(Worker())
threads.append(Boss())
# 開(kāi)啟老板和員工
for t in threads:
t.start()
# 等待老板和員工工作都結(jié)束
for t in threads:
t.join()
# 結(jié)果如下:
# 大老板說(shuō):今晚大家都要加班到24:00起意。
# False
# True
# 辛苦大家了......加油干吧Sシ!!
# 加油工作悲酷,多賺錢(qián)……套菜!
# 加油工作,多賺錢(qián)……设易!
# 加油工作笼踩,多賺錢(qián)……!
# 大老板說(shuō):24:00到了亡嫌,可以下班了。
# False
# True
# 下班回家陪老婆掘而,孩子挟冠,哈哈哈!
# 下班回家陪老婆,孩子袍睡,哈哈哈!
# 下班回家陪老婆知染,孩子,哈哈哈!