1.進(jìn)程和線程
隊(duì)列:
1房官、進(jìn)程之間的通信: q = multiprocessing.Queue()
2是己、進(jìn)程池之間的通信: q = multiprocessing.Manager().Queue()
3、線程之間的通信: q = queue.Queue()
1.功能
- 進(jìn)程州既,能夠完成多任務(wù),比如 在一臺(tái)電腦上能夠同時(shí)運(yùn)行多個(gè)QQ
- 線程,能夠完成多任務(wù)树碱,比如 一個(gè)QQ中的多個(gè)聊天窗口
2.定義的不同
- 進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
- 線程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線程共享進(jìn)程所擁有的全部資源.
3.區(qū)別
- 一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.
- 線程的劃分尺度小于進(jìn)程(資源比進(jìn)程少),使得多線程程序的并發(fā)性高成榜。
- 進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元,而多個(gè)線程共享內(nèi)存蹦玫,從而極大地提高了程序的運(yùn)行效率
- 線程不能夠獨(dú)立執(zhí)行赎婚,必須依存在進(jìn)程中
4.優(yōu)缺點(diǎn)
線程和進(jìn)程在使用上各有優(yōu)缺點(diǎn):線程執(zhí)行開銷小,但不利于資源的管理和保護(hù)樱溉;而進(jìn)程正相反挣输。
2.同步的概念
1.多線程開發(fā)可能遇到的問題
假設(shè)兩個(gè)線程t1和t2都要對(duì)num=0進(jìn)行增1運(yùn)算,t1和t2都各對(duì)num修改10次福贞,num的最終的結(jié)果應(yīng)該為20撩嚼。
但是由于是多線程訪問,有可能出現(xiàn)下面情況:
在num=0時(shí),t1取得num=0完丽。此時(shí)系統(tǒng)把t1調(diào)度為”sleeping”狀態(tài)恋技,把t2轉(zhuǎn)換為”running”狀態(tài),t2也獲得num=0逻族。然后t2對(duì)得到的值進(jìn)行加1并賦給num蜻底,使得num=1。然后系統(tǒng)又把t2調(diào)度為”sleeping”瓷耙,把t1轉(zhuǎn)為”running”朱躺。線程t1又把它之前得到的0加1后賦值給num。這樣搁痛,明明t1和t2都完成了1次加1工作长搀,但結(jié)果仍然是num=1。
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
# time.sleep(3) #取消屏蔽之后 再次運(yùn)行程序鸡典,結(jié)果的不同
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
運(yùn)行結(jié)果卻不是2000000:
---g_num=129699---
---test2---g_num=1126024
---test1---g_num=1135562
取消屏蔽之后源请,再次運(yùn)行結(jié)果如下:
---test1---g_num=1000000
---g_num=1025553---
---test2---g_num=2000000
問題產(chǎn)生的原因就是沒有控制多個(gè)線程對(duì)同一資源的訪問,對(duì)數(shù)據(jù)造成破壞彻况,使得線程運(yùn)行的結(jié)果不可預(yù)期谁尸。這種現(xiàn)象稱為“線程不安全”。
2.同步
- 同步就是協(xié)同步調(diào)纽甘,按預(yù)定的先后次序進(jìn)行運(yùn)行良蛮。
- 如進(jìn)程、線程同步悍赢,可理解為進(jìn)程或線程A和B一塊配合决瞳,A執(zhí)行到一定程度時(shí)要依靠B的某個(gè)結(jié)果,于是停下來左权,示意B運(yùn)行;B依言執(zhí)行皮胡,再將結(jié)果給A;A再繼續(xù)操作。
3.解決線程不安全的方法
可以通過線程同步來解決
- 系統(tǒng)調(diào)用t1赏迟,然后獲取到num的值為0屡贺,此時(shí)上一把鎖,即不允許其他現(xiàn)在操作num
- 對(duì)num的值進(jìn)行+1
- 解鎖锌杀,此時(shí)num的值為1甩栈,其他的線程就可以使用num了,而且是num的值不是0而是1
- 同理其他線程在對(duì)num進(jìn)行修改時(shí)糕再,都要先上鎖量没,處理完后再解鎖,在上鎖的整個(gè)過程中不允許其他線程訪問亿鲜,就保證了數(shù)據(jù)的正確性
3.互斥鎖
- 當(dāng)多個(gè)線程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候允蜈,需要進(jìn)行同步控制
線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源冤吨,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖。 - 互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定饶套。
- 某個(gè)線程要更改共享數(shù)據(jù)時(shí)漩蟆,先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”妓蛮,其他線程不能更改怠李;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”蛤克,其他的線程才能再次鎖定該資源捺癞。互斥鎖保證了每次只有一個(gè)線程進(jìn)行寫入操作构挤,從而保證了多線程情況下數(shù)據(jù)的正確性髓介。
threading模塊中定義了Lock類,可以方便的處理鎖定:
#創(chuàng)建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([blocking])
#釋放
mutex.release()
其中筋现,鎖定方法acquire可以有一個(gè)blocking參數(shù)唐础。
- 如果設(shè)定blocking為True,則當(dāng)前線程會(huì)堵塞矾飞,直到獲取到這個(gè)鎖為止(如果沒有指定一膨,那么默認(rèn)為True)
- 如果設(shè)定blocking為False,則當(dāng)前線程不會(huì)堵塞
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
#True表示堵塞 即如果這個(gè)鎖在上鎖之前已經(jīng)被上鎖了洒沦,那么這個(gè)線程會(huì)在這里一直等待到解鎖為止
#False表示非堵塞豹绪,即不管本次調(diào)用能夠成功上鎖,都不會(huì)卡在這,而是繼續(xù)執(zhí)行下面的代碼
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) #True表示堵塞
if mutexFlag:
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
#創(chuàng)建一個(gè)互斥鎖
#這個(gè)鎖默認(rèn)是未上鎖的狀態(tài)
mutex = Lock()
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
運(yùn)行結(jié)果:
---g_num=19446---
---test1---g_num=1699950
---test2---g_num=2000000
加入互斥鎖后申眼,運(yùn)行結(jié)果與預(yù)期相符瞒津。
我們可以模擬一下賣票的程序:
# Python主要通過標(biāo)準(zhǔn)庫中的threading包來實(shí)現(xiàn)多線程
import threading
import time
import os
def doChore(): # 作為間隔 每次調(diào)用間隔0.5s
time.sleep(0.5)
def booth(tid):
global i
global lock
while True:
lock.acquire() # 得到一個(gè)鎖,鎖定
if i != 0:
i = i - 1 # 售票 售出一張減少一張
print(tid, ':now left:', i) # 剩下的票數(shù)
doChore()
else:
print("Thread_id", tid, " No more tickets")
os._exit(0) # 票售完 退出程序
lock.release() # 釋放鎖
doChore()
#全局變量
i = 15 # 初始化票數(shù)
lock = threading.Lock() # 創(chuàng)建鎖
def main():
# 總共設(shè)置了3個(gè)線程
for k in range(3):
# 創(chuàng)建線程; Python使用threading.Thread對(duì)象來代表線程
new_thread = threading.Thread(target=booth, args=(k,))
# 調(diào)用start()方法啟動(dòng)線程
new_thread.start()
if __name__ == '__main__':
main()
運(yùn)行結(jié)果:
0 :now left: 14
1 :now left: 13
0 :now left: 12
2 :now left: 11
1 :now left: 10
0 :now left: 9
1 :now left: 8
2 :now left: 7
0 :now left: 6
1 :now left: 5
2 :now left: 4
0 :now left: 3
2 :now left: 2
0 :now left: 1
1 :now left: 0
Thread_id 2 No more tickets
- 上鎖解鎖過程
當(dāng)一個(gè)線程調(diào)用鎖的acquire()方法獲得鎖時(shí)豺型,鎖就進(jìn)入“l(fā)ocked”狀態(tài)仲智。
每次只有一個(gè)線程可以獲得鎖买乃。如果此時(shí)另一個(gè)線程試圖獲得這個(gè)鎖姻氨,該線程就會(huì)變?yōu)椤癰locked”狀態(tài),稱為“阻塞”剪验,直到擁有鎖的線程調(diào)用鎖的release()方法釋放鎖之后肴焊,鎖進(jìn)入“unlocked”狀態(tài)。
線程調(diào)度程序從處于同步阻塞狀態(tài)的線程中選擇一個(gè)來獲得鎖功戚,并使得該線程進(jìn)入運(yùn)行(running)狀態(tài)娶眷。
鎖的好處: - 確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行
鎖的壞處: - 阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行啸臀,效率就大大地下降了
- 由于可以存在多個(gè)鎖届宠,不同的線程持有不同的鎖烁落,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖
4.多線程-非共享數(shù)據(jù)
對(duì)于多線程中全局變量和局部變量是否共享
- 多線程局部變量
#coding=utf-8
import threading
import time
class MyThread(threading.Thread):
# 重寫 構(gòu)造方法
def __init__(self,num,sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('線程(%s),num=%d'%(self.name, self.num))
if __name__ == '__main__':
mutex = threading.Lock()
t1 = MyThread(100,5)
t1.start()
t2 = MyThread(200,1)
t2.start()
運(yùn)行結(jié)果:
線程(Thread-2),num=201
線程(Thread-1),num=101
- 多線程全局變量
import threading
from time import sleep
def test(sleepTime):
num = 1
sleep(sleepTime)
num+=1
print('---(%s)--num=%d'%(threading.current_thread(), num))
if __name__ == '__main__':
t1 = threading.Thread(target = test,args=(5,))
t2 = threading.Thread(target = test,args=(1,))
t1.start()
t2.start()
運(yùn)行結(jié)果:
---(<Thread(Thread-2, started 10876)>)--num=2
---(<Thread(Thread-1, started 7484)>)--num=2
- 在多線程開發(fā)中豌注,全局變量是多個(gè)線程都共享的數(shù)據(jù)伤塌,而局部變量等是各自線程的,是非共享的
5.同步應(yīng)用
- 多個(gè)線程有序執(zhí)行
from threading import Thread,Lock
from time import sleep
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("------Task 1 -----")
sleep(0.5)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("------Task 2 -----")
sleep(0.5)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("------Task 3 -----")
sleep(0.5)
lock1.release()
#使用Lock創(chuàng)建出的鎖默認(rèn)沒有“鎖上”
lock1 = Lock()
#創(chuàng)建另外一把鎖轧铁,并且“鎖上”
lock2 = Lock()
lock2.acquire()
#創(chuàng)建另外一把鎖每聪,并且“鎖上”
lock3 = Lock()
lock3.acquire()
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
運(yùn)行結(jié)果:
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
...........`
- 可以使用互斥鎖完成多個(gè)任務(wù),有序的進(jìn)程工作齿风,這就是線程的同步
6.生產(chǎn)者與消費(fèi)者模式
- Python的Queue模塊中提供了同步的药薯、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue救斑,LIFO(后入先出)隊(duì)列LifoQueue童本,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(可以理解為原子操作脸候,即要么不做巾陕,要么就做完),能夠在多線程中直接使用纪他”擅海可以使用隊(duì)列來實(shí)現(xiàn)線程間的同步。
- 用FIFO隊(duì)列實(shí)現(xiàn)上述生產(chǎn)者與消費(fèi)者問題的代碼如下:
import threading,time
from queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count +1
msg = '生成產(chǎn)品'+str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消費(fèi)了 '+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始產(chǎn)品'+str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
運(yùn)行結(jié)果:
生成產(chǎn)品1
生成產(chǎn)品2
生成產(chǎn)品1
生成產(chǎn)品3
生成產(chǎn)品2
生成產(chǎn)品4
Thread-3消費(fèi)了 初始產(chǎn)品0
生成產(chǎn)品3
生成產(chǎn)品5
Thread-3消費(fèi)了 初始產(chǎn)品1
生成產(chǎn)品4
生成產(chǎn)品6
Thread-4消費(fèi)了 初始產(chǎn)品2
Thread-3消費(fèi)了 初始產(chǎn)品3
生成產(chǎn)品5
生成產(chǎn)品7
Thread-4消費(fèi)了 初始產(chǎn)品4
生成產(chǎn)品6
生成產(chǎn)品8
Thread-5消費(fèi)了 初始產(chǎn)品5
Thread-4消費(fèi)了 初始產(chǎn)品6
............
此時(shí)就出現(xiàn)生產(chǎn)者與消費(fèi)者的問題
1.Queue的說明
1.對(duì)于Queue茶袒,在多線程通信之間扮演重要的角色
2.添加數(shù)據(jù)到隊(duì)列中梯刚,使用put()方法
3.從隊(duì)列中取數(shù)據(jù),使用get()方法
4.判斷隊(duì)列中是否還有數(shù)據(jù)薪寓,使用qsize()方法
2.生產(chǎn)者消費(fèi)者模式的說明
-
使用生產(chǎn)者和消費(fèi)者模式的原因
在線程世界里亡资,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程向叉。在多線程開發(fā)當(dāng)中锥腻,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢母谎,那么生產(chǎn)者就必須等待消費(fèi)者處理完瘦黑,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理奇唤,如果消費(fèi)者的處理能力大于生產(chǎn)者幸斥,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式咬扇。 -
生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題甲葬。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊懈贺,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理经窖,直接扔給阻塞隊(duì)列坡垫,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取画侣,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)葛虐,平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的棉钧。
3.ThreadLocal
在多線程環(huán)境下屿脐,每個(gè)線程都有自己的數(shù)據(jù)。一個(gè)線程使用自己的局部變量比使用全局變量好宪卿,因?yàn)榫植孔兞恐挥芯€程自己能看見的诵,不會(huì)影響其他線程,而全局變量的修改必須加鎖佑钾。
1.使用函數(shù)傳參的方法
def process_student(name):
std = Student(name)
# std是局部變量西疤,但是每個(gè)函數(shù)都要用它,因此必須傳進(jìn)去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
說明:用局部變量也有問題休溶,因?yàn)槊總€(gè)線程處理不同的Student對(duì)象代赁,不能共享。
2.使用全局字典的方法
import threading
# 創(chuàng)建字典對(duì)象:
myDict={}
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的student:
std = myDict[threading.current_thread()]
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
myDict[threading.current_thread()] = name
process_student()
t1 = threading.Thread(target=process_thread, args=('yongGe',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
運(yùn)行結(jié)果;
Hello, yongGe (in Thread-A)
Hello, 老王 (in Thread-B)
這種方式理論上是可行的兽掰,它最大的優(yōu)點(diǎn)是消除了std對(duì)象在每層函數(shù)中的傳遞問題芭碍,但是,每個(gè)函數(shù)獲取std的代碼有點(diǎn)low孽尽。
3.使用ThreadLocal的方法
import threading
# 創(chuàng)建全局ThreadLocal對(duì)象:
local_school = threading.local()
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread, args=('erererbai',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
運(yùn)行結(jié)果:
Hello, erererbai (in Thread-A)
Hello, 老王 (in Thread-B)
說明:
全局變量local_school就是一個(gè)ThreadLocal對(duì)象窖壕,每個(gè)Thread對(duì)它都可以讀寫student屬性,但互不影響杉女。你可以把local_school看成全局變量瞻讽,但每個(gè)屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾熏挎,也不用管理鎖的問題速勇,ThreadLocal內(nèi)部會(huì)處理。
可以理解為全局變量local_school是一個(gè)dict坎拐,不但可以用local_school.student烦磁,還可以綁定其他變量,如local_school.teacher等等廉白。
ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫連接个初,HTTP請(qǐng)求乖寒,用戶身份信息等猴蹂,這樣一個(gè)線程的所有調(diào)用到的處理函數(shù)都可以非常方便地訪問這些資源。
- 一個(gè)ThreadLocal變量雖然是全局變量楣嘁,但每個(gè)線程都只能讀寫自己線程的獨(dú)立副本磅轻,互不干擾珍逸。ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問題