1昔头、什么是線程(thread)饼问?
線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,它被包含在進(jìn)程之中揭斧,是進(jìn)程中的實(shí)際運(yùn)作單位莱革,一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程讹开,每條線程中并發(fā)執(zhí)行不同的任務(wù)盅视。
官方解釋:
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
2、什么是進(jìn)程(process)旦万?
一個(gè)進(jìn)程就是一個(gè)應(yīng)用程序闹击,是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)成艘,在早期面向進(jìn)程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中赏半, 進(jìn)程是程序的基本執(zhí)行實(shí)體;在當(dāng)代面向線程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中淆两,進(jìn)程是線程的容器断箫,進(jìn)程是線程的容器,程序是指令秋冰、數(shù)據(jù)以及其組織形式的描述瑰枫,一個(gè)進(jìn)程包含多個(gè)線程。
官方解釋:
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
3、進(jìn)程和線程的區(qū)別
- 線程共享創(chuàng)建它的進(jìn)程的地址空間(每個(gè)進(jìn)程有自己的地址空間)光坝。
- 線程可以直接訪問其進(jìn)程的數(shù)據(jù)(每個(gè)進(jìn)程擁有自己父進(jìn)程的數(shù)據(jù)副本尸诽。)
- 線程可以直接與其進(jìn)程的其他線程通信(每個(gè)進(jìn)程必須使用第三方手段來與兄弟進(jìn)程進(jìn)行通信,比如消息隊(duì)列)
- 新線程很容易創(chuàng)建(創(chuàng)建新的進(jìn)程需要父進(jìn)程來創(chuàng)建盯另,子進(jìn)程是父進(jìn)程的拷貝)
- 線程可以對(duì)同一進(jìn)程的線程進(jìn)行相當(dāng)大的控制(進(jìn)程只能控制子進(jìn)程)
- 對(duì)主線程的更改(取消性含、優(yōu)先級(jí)更改等),可能會(huì)影響進(jìn)程的其他線程的行為(對(duì)父進(jìn)程的更改不會(huì)影響子進(jìn)程)
4鸳惯、GIL全局解釋器鎖
在python中商蕴,一次只能有一個(gè)線程在執(zhí)行,如果想要利用多核多處理器資源芝发,盡量使用多進(jìn)程去處理绪商。
官方解釋:
Cpython實(shí)現(xiàn)細(xì)節(jié):在Cpython中,由于Global Interpreter Lock辅鲸,只有一個(gè)線程可以同時(shí)執(zhí)行python代碼(某些面向性能的庫可能會(huì)避免這個(gè)限制)如果你希望應(yīng)用程序更好地利用多核機(jī)器的計(jì)算資源格郁,建議你使用多處理,但是独悴,如果要同時(shí)運(yùn)行多個(gè)I/O綁定任務(wù)例书,則線程仍然是一個(gè)合適的模型。
總結(jié):所以如果底層python解釋器的話刻炒,我們python的多線程只適合IO密集型任務(wù)决采,不適合CPU計(jì)算密集型任務(wù)。
5坟奥、多線程(threading模塊)
-
模塊以及相關(guān)方法
threading模塊:
threading.Thread(target=?,args=(arg1,arg2...)):
創(chuàng)建threading線程對(duì)象树瞭,target參數(shù)將要運(yùn)行的函數(shù)的變量名填寫到這里,args參數(shù)是要運(yùn)行函數(shù)需要的參數(shù)爱谁。
start():讓線程開始執(zhí)行任務(wù)
join():默認(rèn)線程開起來之后就和主線程沒有關(guān)系了移迫,主線程結(jié)束和join沒關(guān)系,調(diào)用join方法管行,會(huì)讓主線程再調(diào)用join方法的地方等待厨埋,知道子線程完成操作,主線程才繼續(xù)往下執(zhí)行代碼捐顷。
setDaemon(True):默認(rèn)為False荡陷,默認(rèn)主線程與其他線程沒有關(guān)系,但是有時(shí)候迅涮,我們想要主線程關(guān)閉的時(shí)候废赞,把其他線程也關(guān)閉了(不關(guān)心其他線程是不是執(zhí)行完任務(wù)了),我們將其他線程設(shè)置為主線程的守護(hù)進(jìn)程叮姑。注意:該方法需要在線程開始之前執(zhí)行threading.currentThread(): 返回當(dāng)前的線程變量唉地。
threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list据悔。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前耘沼,不包括啟動(dòng)前和終止后的線程极颓。
threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果群嗤。除了使用方法外菠隆,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
run(): 用以表示線程活動(dòng)的方法狂秘。
start():啟動(dòng)線程活動(dòng)骇径。
join([time]): 等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生者春。
isAlive(): 返回線程是否活動(dòng)的破衔。
getName(): 返回線程名。
setName(): 設(shè)置線程名钱烟。 -
如何使用threading模塊創(chuàng)建多線程
-
直接調(diào)用
# author:Dman # date:2019/3/26 import threading import time def foo(n): print('foo___start___%s' % n) time.sleep(1) print('end foo____') def bar(): print('bar___start') time.sleep(2) #time.sleep的時(shí)候并不會(huì)占用cpu print('end bar____') t1 = threading.Thread(target=foo,args=(2,)) t2 = threading.Thread(target=bar) t1.start() t2.start() print('_________main______')
-
使用繼承的方式調(diào)用
# author:Dman # date:2019/3/27 """ 使用自定義類的方式去創(chuàng)建thread 步驟: 1晰筛、繼承threading.Thread類 2、必須重寫父方法忠售,run()方法传惠。 3迄沫、可以重寫父類方法 """ import threading import time class Mythred(threading.Thread): def __init__(self,num): # super(Mythred,self).__init__() threading.Thread.__init__(self) self.num = num def run(self): print('threading %s is running at %s ' % (threading.current_thread(), time.ctime())) time.sleep(3) if __name__ == '__main__': t1 = Mythred(1) t2 = Mythred(2) t1.start() t2.start()
-
-
join方法的疑問
# author:Dman # date:2019/3/27 """ threading 中方法稻扬,join方法的理解與使用。 join方法 """ import threading from time import ctime,sleep import time def music(func): for i in range(2): print ("Begin listening to %s. %s" %(func,ctime())) sleep(4) print("end listening %s"%ctime()) def move(func): for i in range(2): print ("Begin watching at the %s! %s" %(func,ctime())) sleep(5) print('end watching %s'%ctime()) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=music, args=('七里香',)) threads.append(t1) t2 = threading.Thread(target=move, args=('阿甘正傳',)) threads.append(t2) for t in threads: # t.setDaemon(True) t.start() # t.join() #位置1 # t1.join() #位置2 # t2.join()######## #位置3 print ("all over %s" %ctime())
總結(jié):
1羊瘩、位置1:會(huì)阻塞所有的子線程泰佳,父進(jìn)程會(huì)在所有程序執(zhí)行完成之后就執(zhí)行
2、位置2:只會(huì)阻塞線程 t1尘吗,在 t1子線程執(zhí)行完畢之后逝她,主線程就會(huì)繼續(xù)執(zhí)行print函數(shù)。
3睬捶、位置3:只會(huì)阻塞線程 t2黔宛,在 t2子線程執(zhí)行完畢之后,主線程就會(huì)繼續(xù)執(zhí)行print函數(shù)擒贸。
-
多線程數(shù)據(jù)共享和同步
如果多個(gè)線程共同對(duì)某個(gè)數(shù)據(jù)進(jìn)行修改臀晃,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性介劫,需要對(duì)多個(gè)線程進(jìn)行同步徽惋,使用thread類的Lock和RLock可以是實(shí)現(xiàn)簡單的線程同步。
-
同步鎖:又稱互斥鎖 ------threading.Lock
作用:解決多個(gè)線程訪問共享變量的時(shí)候出現(xiàn)的數(shù)據(jù)的問題座韵。
# author:Dman # date:2019/3/27 """ 沒有同步鎖案例 """ """ 下面程序執(zhí)行順序: 1险绘、我們打開了100個(gè)線程去執(zhí)行addnum函數(shù),其中addnum是對(duì)一個(gè)全局變量num進(jìn)行-1的操作,我們的理想的狀態(tài)時(shí)num左后等于0 實(shí)際運(yùn)行結(jié)果是: 100個(gè)線程開始搶GIL宦棺,搶到的將被CPU執(zhí)行 1瓣距、執(zhí)行g(shù)lobal num 2、temp = num 賦值操作 3渺氧、執(zhí)行time.sleep(0.1) ---(解釋:這個(gè)語句相當(dāng)于發(fā)生IO阻塞旨涝,掛起,GIL釋放侣背,下一步num=temp-1還未被執(zhí)行白华,因此全局變量num的值仍為 100) 4、其他99個(gè)線程開始搶GIL鎖贩耐,重復(fù)上面的步驟 5弧腥、其他98個(gè)線程開始搶GIL鎖,重復(fù)上面的步驟 ... (備注:如果阻塞的事件夠長潮太,由于cpu的執(zhí)行速度很快管搪,也就是切換的快,在發(fā)生阻塞的0.1秒鐘铡买,如果100個(gè) 線程都切換一遍那么更鲁,每個(gè)線程就都拿到num=100這個(gè)變量,后面再執(zhí)行-1操作奇钞,那么當(dāng)所有線程結(jié)束澡为,得到的結(jié)果都是99.) """ import time import threading def addNum(): global num #在每個(gè)線程中都獲取這個(gè)全局變量 # num-=1 #這個(gè)操作速度很快 temp=num # print('--get num:',num) # 每個(gè)線程執(zhí)行到這一步就只能拿到變量num = 100悴能, time.sleep(0.1) num =temp-1 #對(duì)此公共變量進(jìn)行-1操作 if __name__ == '__main__': num = 100 #設(shè)定一個(gè)共享變量 thread_list = [] for i in range(5): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執(zhí)行完畢 t.join() print('final num:', num ) #------運(yùn)行結(jié)果------- final num: 99
-
```python
# author:Dman
# date:2019/3/28
"""
線程鎖使用實(shí)例
1鲜结、使用threading.Lock()方法得到鎖對(duì)象
2、在方法中調(diào)用acquire()和release()方法去包圍我們的代碼季二,
那么同一時(shí)刻就只有一個(gè)線程可以訪問acquire和release包圍的代碼塊谷徙。
"""
import time
import threading
def addNum():
global num # 在每個(gè)線程中都獲取這個(gè)全局變量
# num-=1 #這個(gè)操作速度很快
lock.acquire() # 獲取鎖
temp = num
# print('--get num:',num) # 每個(gè)線程執(zhí)行到這一步就只能拿到變量num = 100拒啰,
time.sleep(0.1)
num = temp - 1 # 對(duì)此公共變量進(jìn)行-1操作
lock.release() # 釋放鎖
if __name__ == '__main__':
lock = threading.Lock()
num = 100 # 設(shè)定一個(gè)共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執(zhí)行完畢
t.join()
print('final num:', num)
```
思考:同步鎖和GIL的區(qū)別
> 總結(jié):
>
> 1、線程鎖又稱互斥鎖完慧、同步鎖谋旦,為了防止多個(gè)代碼同時(shí)訪問共享變量的時(shí)候,出現(xiàn)問題屈尼。
>
> 2册着、在threading模塊中,通過Lock類來創(chuàng)建鎖對(duì)象鸿染,通過aquire方法和release方法去包圍需要保護(hù)的代碼
-
線程死鎖和遞歸鎖
-
死鎖
死鎖現(xiàn)象指蚜,見代碼如下:
# author:Dman # date:2019/3/30 """ 線程死鎖: 在線程間共享多個(gè)資源的時(shí)候,如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源涨椒,就會(huì)造成死鎖摊鸡,因?yàn)橄到y(tǒng)判斷這部分資源都正在使用绽媒,所有這兩個(gè)線程在無外力作用下將一直等待下去。 """ import threading import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() # 要求獲取LockB print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() # 要求獲取LockA print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結(jié)束免猾,后面再講是辕。
-
遞歸鎖-----threading.RLock
作用:為了解決鎖嵌套的問題,解決死鎖問題猎提。
# author:Dman # date:2019/3/30 """ 遞歸鎖(RLock)也叫可重入鎖:解決死鎖問題获三,看 線程鎖_test3.py 特點(diǎn):可以多次acquire。 內(nèi)部使用計(jì)數(shù)器來維護(hù)锨苏。acquire的時(shí)候計(jì)數(shù)器加1疙教,release的時(shí)候計(jì)數(shù)器減1 結(jié)果:鎖的是內(nèi)部代碼塊,同一時(shí)刻保證只有一個(gè)線程執(zhí)行該代碼塊伞租。 使用場景:當(dāng)我們修改多個(gè)變量是有關(guān)聯(lián)的贞谓,我們只能對(duì)自己的方法去鎖定,但是不能保證別人的方法是鎖定的葵诈,所以當(dāng)我們內(nèi)部鎖定了之后裸弦,其他函數(shù)也可能鎖定,這樣就出現(xiàn)了多把鎖的情況作喘。 """ import threading import threading,time class myThread(threading.Thread): def doA(self): # lockA.acquire() lock.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) # lockB.acquire() # 要求獲取LockB lock.acquire() print(self.name,"gotlockB",time.ctime()) # lockB.release() # lockA.release() lock.release() lock.release() def doB(self): lock.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lock.acquire() # 要求獲取LockA print(self.name,"gotlockA",time.ctime()) lock.release() lock.release() def run(self): self.doA() self.doB() if __name__=="__main__": # lockA=threading.Lock() # lockB=threading.Lock() lock = threading.RLock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結(jié)束理疙,后面再講。
-
案例使用案例----銀行取錢:
# author:Dman # date:2019/3/30 """ 遞歸鎖場景---案例 """ import threading class Account: def __init__(self,name,money): self.name = name self.balance = money self.lock = threading.RLock() def withdraw(self,amount): with self.lock: self.balance -= amount def deposit(self,amount): with self.lock: # with上下文管理泞坦,幫我們acquire 和release self.balance += amount def transfer(from_user, to_user,amount): # 鎖不可以加在這里窖贤,因?yàn)槠渌木€程執(zhí)行其他方法在不加鎖的情況下數(shù)據(jù)同樣是不安全的 from_user.withdraw(amount) to_user.deposit(amount) if __name__ == '__main__': alex = Account('alex',100) dman = Account('xiaohu',20000) t1 = threading.Thread(target=transfer, args=(alex, dman, 100)) t1.start() t2 = threading.Thread(target=transfer, args=(dman, dman, 200)) t2.start() t1.join() t2.join() print('>>>', alex.balance) print('>>>', dman.balance)
總結(jié):
1、創(chuàng)建遞歸鎖的方法:使用threading.RLock類去創(chuàng)建遞歸鎖對(duì)象暇矫。同互斥鎖一樣主之,使用aquire和release方法去包圍代碼塊
2择吊、遞歸鎖是為了解決鎖嵌套的時(shí)候的問題李根。
-
-
條件變量同步---threading.Condition
-
作用:為了實(shí)現(xiàn)多個(gè)線程之間的交互,它本身也提供了RLock或Lock的方法几睛,還提供了wait()房轿、notify()、notifyAll()方法
wait():條件不滿足時(shí)調(diào)用所森,線程會(huì)釋放鎖并進(jìn)入等待阻塞囱持;
notify():條件創(chuàng)造后調(diào)用,通知等待池激活一個(gè)線程焕济;
notifyAll():條件創(chuàng)造后調(diào)用纷妆,通知等待池激活所有線程。# author:Dman # date:2019/3/30 """ 條件變量------實(shí)現(xiàn)線程的限制 應(yīng)用場景:有一類線程需要滿足條件之后才能繼續(xù)執(zhí)行晴弃。,為了在滿足一定條件后掩幢,喚醒某個(gè)線程逊拍,防止該線程一直不被執(zhí)行 """ import threading,time from random import randint class Producer(threading.Thread): def run(self): global L while True: val=randint(0,100) print('生產(chǎn)者',self.name,":Append"+str(val),L) if lock_con.acquire(): L.append(val) lock_con.notify() # lock_con.release() time.sleep(3) class Consumer(threading.Thread): def run(self): global L while True: lock_con.acquire() # print('ok1') if len(L)==0: lock_con.wait() print('消費(fèi)者',self.name,":Delete"+str(L[0]),L) del L[0] lock_con.release() time.sleep(0.25) if __name__=="__main__": L=[] lock_con=threading.Condition()#獲取一個(gè)Condition對(duì)象 threads=[] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join()
總結(jié):
1、使用threading.Condition()獲取一個(gè)Condition對(duì)象际邻,里面默認(rèn)使用RLock芯丧,也可以自己手動(dòng)傳參數(shù)。
-
-
同步條件---threading.Event
-
作用:Event和Condition差不多世曾,只是少了鎖的功能缨恒,因此Event用于不訪問共享變量的條件環(huán)境
event.isSet():返回event的狀態(tài)值;
event.wait():如果 event.isSet()==False將阻塞線程轮听;
event.set(): 設(shè)置event的狀態(tài)值為True骗露,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度血巍;
event.clear():恢復(fù)event的狀態(tài)值為False椒袍。
# author:Dman # date:2019/3/30 """ event沒有鎖功能,但是實(shí)現(xiàn)了線程之間的交互藻茂。內(nèi)部有標(biāo)志位 實(shí)現(xiàn)了函數(shù): isSet():返回event 的狀態(tài)值 wait():如果event的狀態(tài)值位False將阻塞線程 set(): 設(shè)置event的狀態(tài)值位True clear():設(shè)置event的狀態(tài)值為False 交叉執(zhí)行驹暑。 """ import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") event.isSet() or event.set() time.sleep(5) print("BOSS:<22:00>可以下班了辨赐。") event.isSet() or event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦坝欧!") time.sleep(0.25) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() #獲取event對(duì)象 threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() #---------運(yùn)行結(jié)果--------------- BOSS:今晚大家都要加班到22:00掀序。 Worker:哎……命苦胺馈! Worker:哎……命苦安还А叶雹!Worker:哎……命苦啊换吧! Worker:哎……命苦罢刍蕖! Worker:哎……命苦罢赐摺满着! BOSS:<22:00>可以下班了。 Worker:OhYeah! Worker:OhYeah! Worker:OhYeah!Worker:OhYeah! Worker:OhYeah!
-
-
信號(hào)量
-
作用:用來控制線程并發(fā)數(shù)的贯莺,使用BoundedSemaphore或Semaphore類來管理一個(gè)內(nèi)置的計(jì)數(shù)器风喇,每當(dāng)調(diào)用acquire方法時(shí)-1,調(diào)用release方法時(shí)+1.
計(jì)數(shù)器不能小于0缕探,當(dāng)計(jì)數(shù)器為0時(shí)魂莫,acquire方法將阻塞線程至同步鎖定狀態(tài),知道其他線程調(diào)用release方法爹耗。(類似停車場的概念)
BoundedSemaphore與Semaphore的唯一區(qū)別在于前者將調(diào)用release時(shí)檢查計(jì)數(shù)器是否超過了計(jì)數(shù)器的初始值耙考,如果超過了將拋出一個(gè)異常秽誊。
# author:Dman # date:2019/3/30 """ 1、信號(hào)量 2琳骡、信號(hào)量和遞歸鎖的區(qū)別: 3锅论、應(yīng)用場景: 4、信號(hào)量的創(chuàng)建: """ import threading,time class MyThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__ =='__main__': semaphore = threading.BoundedSemaphore(5) thrs = [] for i in range(13): thrs.append(MyThread()) for i in thrs: i.start() # print('___main function close _____')
-
-
多線程數(shù)據(jù)共享利器--queue隊(duì)列模塊
-
作用:多個(gè)線程間進(jìn)行安全的信息交互的時(shí)候
queue隊(duì)列類的方法
創(chuàng)建一個(gè)“隊(duì)列”對(duì)象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個(gè)隊(duì)列的同步實(shí)現(xiàn)楣号。隊(duì)列長度可為無限或者有限最易。可通過Queue的構(gòu)造函數(shù)的可選參數(shù)maxsize來設(shè)定隊(duì)列長度炫狱。如果maxsize小于1就表示隊(duì)列長度無限藻懒。將一個(gè)值放入隊(duì)列中
q.put(10)
調(diào)用隊(duì)列對(duì)象的put()方法在隊(duì)尾插入一個(gè)項(xiàng)目。put()有兩個(gè)參數(shù)视译,第一個(gè)item為必需的嬉荆,為插入項(xiàng)目的值;第二個(gè)block為可選參數(shù)酷含,默認(rèn)為
1鄙早。如果隊(duì)列當(dāng)前為空且block為1,put()方法就使調(diào)用線程暫停,直到空出一個(gè)數(shù)據(jù)單元椅亚。如果block為0限番,put方法將引發(fā)Full異常。將一個(gè)值從隊(duì)列中取出
q.get()
調(diào)用隊(duì)列對(duì)象的get()方法從隊(duì)頭刪除并返回一個(gè)項(xiàng)目呀舔∶峙埃可選參數(shù)為block,默認(rèn)為True媚赖。如果隊(duì)列為空且block為True霜瘪,get()就使調(diào)用線程暫停,直至有項(xiàng)目可用惧磺。如果隊(duì)列為空且block為False颖对,隊(duì)列將引發(fā)Empty異常。Python Queue模塊有三種隊(duì)列及構(gòu)造函數(shù):
1豺妓、Python Queue模塊的FIFO隊(duì)列先進(jìn)先出惜互。 class queue.Queue(maxsize)
2布讹、LIFO類似于堆琳拭,即先進(jìn)后出。 class queue.LifoQueue(maxsize)
3描验、還有一種是優(yōu)先級(jí)隊(duì)列級(jí)別越低越先出來白嘁。 class queue.PriorityQueue(maxsize)此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊(duì)列的大小
q.empty() 如果隊(duì)列為空,返回True,反之False
q.full() 如果隊(duì)列滿了膘流,返回True,反之False
q.full 與 maxsize 大小對(duì)應(yīng)
q.get([block[, timeout]]) 獲取隊(duì)列絮缅,timeout等待時(shí)間
q.get_nowait() 相當(dāng)q.get(False)
非阻塞 q.put(item) 寫入隊(duì)列鲁沥,timeout等待時(shí)間
q.put_nowait(item) 相當(dāng)q.put(item, False)
q.task_done() 在完成一項(xiàng)工作之后,q.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
q.join() 實(shí)際上意味著等到隊(duì)列為空耕魄,再執(zhí)行別的操作案例一
# author:Dman # date:2019/3/30 import queue """ 隊(duì)列 queue:是線程安全的 相比較列表:為什么隊(duì)列是線程安全的 """ import threading,queue,time,random class Production(threading.Thread): def run(self): while True: r = random.randint(0,100) q.put(r) print('生產(chǎn)出來%s號(hào)包子' % r) time.sleep(1) class Proces(threading.Thread): def run(self): while True: re = q.get() print('吃掉%s號(hào)包子'% re) if __name__ == '__main__': q = queue.Queue(10) threads = [Production(),Production(),Proces()] for t in threads: t.start()
案例二:
# author:Dman # date:2019/4/3 #實(shí)現(xiàn)一個(gè)線程不斷生成一個(gè)隨機(jī)數(shù)到一個(gè)隊(duì)列中(考慮使用Queue這個(gè)模塊) # 實(shí)現(xiàn)一個(gè)線程從上面的隊(duì)列里面不斷的取出奇數(shù) # 實(shí)現(xiàn)另外一個(gè)線程從上面的隊(duì)列里面不斷取出偶數(shù) import random,threading,time from queue import Queue #Producer thread class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): for i in range(10): #隨機(jī)產(chǎn)生10個(gè)數(shù)字 画恰,可以修改為任意大小 randomnum=random.randint(1,99) print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)) self.data.put(randomnum) #將數(shù)據(jù)依次存入隊(duì)列 time.sleep(1) print ("%s: %s finished!" %(time.ctime(), self.getName())) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: try: val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時(shí)5秒 if val_even%2==0: print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)) time.sleep(2) else: self.data.put(val_even) time.sleep(2) except: #等待輸入,超過5秒 就報(bào)異常 print ("%s: %s finished!" %(time.ctime(),self.getName())) break class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: try: val_odd = self.data.get(1,5) if val_odd%2!=0: print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) except: print ("%s: %s finished!" % (time.ctime(), self.getName())) break #Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() print ('All threads terminate!') if __name__ == '__main__': main()
案例3:相比較吸奴,list不是線程安全的
import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] print(a) time.sleep(1) try: li.remove(a) except: print('----',a) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()) t2.start()
-
6允扇、多進(jìn)程
-
多進(jìn)程概念
由于GIL的存在,默認(rèn)的Cpython解釋器中的多線程其實(shí)并不是真正的多線程则奥,如果想要充分地使用多核CPU資源考润,在python中大部分需要使用多進(jìn)程,Python提供了multiprocessing模塊读处,這個(gè)模塊支持子進(jìn)程糊治、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步罚舱,提供了Process井辜、Queue、Pipe管闷、Lock等組件抑胎。
該模塊的使用和threading模塊類似,api大致相同渐北,但是需要注意幾點(diǎn):
1阿逃、在unix平臺(tái)上,某個(gè)進(jìn)程終結(jié)之后赃蛛,該進(jìn)程需要被父進(jìn)程調(diào)用wait恃锉,否則進(jìn)程成為僵尸進(jìn)程,所以有必要對(duì)每個(gè)process對(duì)象調(diào)用join方法(實(shí)際上等于wait)呕臂,對(duì)于多線程來說 破托,由于只有一個(gè)進(jìn)程,所以不存在此必要性歧蒋。
2土砂、multiprocessing模塊提供了Pipe和Queue,效率上更高谜洽,贏優(yōu)先考慮Pipe和Queue萝映,避免使用Lock、Event等同步方式(因?yàn)樗麄冋紦?jù)的不是進(jìn)程的資源阐虚。)
3序臂、多進(jìn)程應(yīng)該避免共享資源,在多線程中实束,我們可以比較容易的共享資源奥秆,比如使用全局變量或者傳遞參數(shù)逊彭,在多進(jìn)程的情況下,由于每個(gè)進(jìn)程有自己獨(dú)立的內(nèi)存空間构订,以上方法不合適侮叮。此時(shí)我們可以通過共享內(nèi)存和Manager的方法來共享資源,但這樣做提高了程序的復(fù)雜度悼瘾。
4签赃、另外、在windows系統(tǒng)下分尸,需要注意的是想要啟動(dòng)一個(gè)子進(jìn)程锦聊,必須加上
if __name__ == '__main__':
-
創(chuàng)建多進(jìn)程------multiprocessing.Process
-
直接調(diào)用
from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')
-
-
類的方式調(diào)用
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
-
多進(jìn)程之間的通信,有三種方式
-
multiprocessing.Queue
from multiprocessing import Process, Queue def f(q,n): q.put([42, n, 'hello']) if __name__ == '__main__': q = Queue() p_list=[] for i in range(3): p = Process(target=f, args=(q,i)) p_list.append(p) p.start() print(q.get()) print(q.get()) print(q.get()) for i in p_list: i.join()
-
-
multiprocessing.Pipe
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
1、Pipe()函數(shù)返回一個(gè)由管道連接的連接對(duì)象箩绍,默認(rèn)情況下是雙工(雙向)孔庭。
2、Pipe()返回的兩個(gè)連接對(duì)象代表管道的兩端材蛛。 每個(gè)連接對(duì)象都有send() 和recv()方法(以及其他方法)圆到。 請(qǐng)注意,如果兩個(gè)進(jìn)程(或線程)同時(shí)嘗試讀取或?qū)懭牍艿赖耐欢吮翱裕瑒t管道中的數(shù)據(jù)可能會(huì)損壞芽淡。 當(dāng)然,同時(shí)使用管道的不同端的進(jìn)程不存在損壞的風(fēng)險(xiǎn)豆赏。
-
multiprocessing.Manager
from multiprocessing import Process, Manager def f(d, l,n): d[n] = '1' d['2'] = 2 d[0.25] = None l.append(n) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
1挣菲、Manager()返回的管理器對(duì)象控制一個(gè)服務(wù)器進(jìn)程,該進(jìn)程保存Python對(duì)象并允許其他進(jìn)程使用代理操作它們掷邦。
2白胀、Manager()返回的管理器將支持類型列表,dict抚岗,Namespace或杠,Lock,RLock宣蔚,Semaphore向抢,BoundedSemaphore,Condition胚委,Event挟鸠,Barrier,Queue篷扩,Value和Array兄猩。
-
進(jìn)程間同步----multiprocessing.Lock
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進(jìn)程間同步,只使用父進(jìn)程的鎖鉴未,(另外盡量避免這種情況)
-
進(jìn)程池----multiprocessing.Pool
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print('-->exec done:',arg) pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join()
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列枢冤,當(dāng)使用時(shí),就去進(jìn)程池中獲取一個(gè)進(jìn)程铜秆,如果進(jìn)程池中沒有可供使用的進(jìn)程淹真,那么程序就會(huì)等待,直到進(jìn)程池中有可用的進(jìn)程為止连茧。
進(jìn)程池中的兩個(gè)方法:
1核蘸、apply
2、map
3啸驯、apply_async 是異步的客扎,也就是在啟動(dòng)進(jìn)程之后會(huì)繼續(xù)后續(xù)的代碼,不用等待進(jìn)程函數(shù)返回
4罚斗、map_async 是異步的徙鱼,
5、join語句要放在close語句后面
7针姿、協(xié)程
-
協(xié)程是什么袱吆?
-
協(xié)程,又稱微線程距淫,英文名為Coroutine绞绒,協(xié)程是用戶態(tài)的輕量級(jí)的線程。協(xié)程擁有自己的寄存器上下文和棧榕暇。協(xié)程調(diào)用切換時(shí)蓬衡,將寄存器上下文和棧保存到其他地方,在切換回來的時(shí)候彤枢,可以恢復(fù)先前保存的寄存器上下文和棧撤蟆,因此:
協(xié)程能保留上一次調(diào)用的狀態(tài),每次過程重入的時(shí)候堂污,就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài)家肯,換種說話,進(jìn)入上一次離開時(shí)所處的邏輯流的位置
總結(jié):
1盟猖、協(xié)程必須在只有一個(gè)單線程里實(shí)現(xiàn)并發(fā)
2讨衣、修改共享數(shù)據(jù)不需要加鎖
3、用戶程序里自己保存多個(gè)控制流的上下文和棧
4式镐、一個(gè)協(xié)程遇到IO操作<u>自動(dòng)</u>切換到其他線程
-
-
協(xié)程的好處反镇?
1、無需線程上下文切換的開銷
2娘汞、無需院子操作鎖定以及同步的開銷(原子操作是不需要同步歹茶,所謂原子操作是指不會(huì)被線程調(diào)度機(jī)制打斷的操作,也就是說該操作必須執(zhí)行完畢,才能進(jìn)行線程切換惊豺;原子操作可以是一個(gè)步驟燎孟,也可以是多個(gè)操作步驟)
3、方便切換控制流尸昧,簡化編程模型
4揩页、高并發(fā)+高擴(kuò)展+低成本:一個(gè)CPU支持上萬的協(xié)程都不是問題,所以很適合高并發(fā)的問題烹俗。
-
協(xié)程的缺點(diǎn)
1爆侣、無法利用多核資源:協(xié)程的本質(zhì)是一個(gè)單線程,它不能同時(shí)將單個(gè)CPU的多個(gè)核用上幢妄,協(xié)程需要和進(jìn)程配合才能利用多核CPU兔仰;我們?nèi)粘K帉懙拇蟛糠謶?yīng)用沒有這個(gè)必要,除非是CPU密集性應(yīng)用
2蕉鸳、進(jìn)行阻塞操作入IO會(huì)阻塞掉整個(gè)程序
-
yield實(shí)現(xiàn)協(xié)程案例
# author:Dman # date:2019/4/1 import time import queue def consumer(name): print('---開始生產(chǎn)包子') while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) def producer(): next(con1) next(con2) n = 0 while n<5: n += 1 con1.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__': con1 = consumer('c1') con2 = consumer('c2') p = producer() #------------------運(yùn)行結(jié)果--------------- ---開始生產(chǎn)包子 ---開始生產(chǎn)包子 [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 1 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 2 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 3 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 4 [c1] is eating baozi 5 [c2] is eating baozi 5 [producer] is making baozi 5
-
greenlet模塊支持的協(xié)程
相比較yield乎赴,可以在任意函數(shù)之間隨意切換,而不需要把這個(gè)函數(shù)先聲明成為generaor置吓。(但是它無法自動(dòng)遇到IO阻塞去切換无虚,必須手動(dòng)去切換)
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #調(diào)用switch去切換執(zhí)行函數(shù) #----------執(zhí)行結(jié)果---------------- 12 56 34 78
-
gevent模塊支持的協(xié)程
-
理解
使用gevent,可以獲得極高的并發(fā)性能衍锚,但gevent只能在Unix/Linux下運(yùn)行友题,在Windows下不保證正常安裝和運(yùn)行。(它可以在遇到IO阻塞的時(shí)候自動(dòng)切換)
由于gevent是基于IO切換的協(xié)程戴质,所以最神奇的是度宦,我們編寫的Web App代碼,不需要引入gevent的包告匠,也不需要改任何代碼戈抄,僅僅在部署的時(shí)候,用一個(gè)支持gevent的WSGI服務(wù)器后专,立刻就獲得了數(shù)倍的性能提升划鸽。具體部署方式可以參考后續(xù)“實(shí)戰(zhàn)”-“部署Web App”一節(jié)。
-
簡單案例
# author:Dman # date:2019/4/1 """ gevent 封裝了greenlet戚哎,這個(gè)不需要自己去切換裸诽,遇到io阻塞,模塊會(huì)自己去切換任務(wù)型凳。 我們只需要把gevent對(duì)象加到里面 """ import gevent def func1(): print('\033[31;1m李闖在跟海濤搞...\033[0m') gevent.sleep(2) #模擬IO阻塞丈冬,自動(dòng)開始切換 print('\033[31;1m李闖又回去跟繼續(xù)跟海濤搞...\033[0m') def func2(): print('\033[32;1m李闖切換到了跟海龍搞...\033[0m') gevent.sleep(1) print('\033[32;1m李闖搞完了海濤,回來繼續(xù)跟海龍搞...\033[0m') gevent.joinall([ gevent.spawn(func1), #將函數(shù)加到里面甘畅。 gevent.spawn(func2), # gevent.spawn(func3), ]) #-----------執(zhí)行結(jié)果------------- 李闖在跟海濤搞... 李闖切換到了跟海龍搞... 李闖搞完了海濤埂蕊,回來繼續(xù)跟海龍搞... 李闖又回去跟繼續(xù)跟海濤搞...
-
同步IO和異步IO的區(qū)別
# author:Dman # date:2019/4/1 import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) def asynchronous(): #異步io函數(shù) threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:'.center(20,'-')) synchronous() print('Asynchronous:'.center(20,'-')) asynchronous()
-
-
簡單的異步爬蟲往弓,遇到IO阻塞會(huì)自動(dòng)切換任務(wù)
from gevent import monkey import time monkey.patch_all() # 在最開頭的地方gevent.monkey.patch_all();把標(biāo)準(zhǔn)庫中的thread/socket等給替換掉, # 這樣我們在后面使用socket的時(shí)候可以跟平常一樣使用,無需修改任何代碼,但是它變成非阻塞的了. # import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) list = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/'] start = time.time() # for url in l: # f(url) gevent.joinall([ gevent.spawn(f, list[0]), gevent.spawn(f, list[1]), gevent.spawn(f, list[2]), ]) print(time.time()-start) #-----------輸出結(jié)果--------------- GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 48560 bytes received from https://www.python.org/. 82655 bytes received from https://github.com/. 536556 bytes received from https://www.yahoo.com/. 3.361192226409912
8蓄氧、事件驅(qū)動(dòng)和異步IO
以后繼續(xù)更新