前言
我是偏后臺開發(fā)的coder缰雇,學(xué)到python的這里時尤其的關(guān)注。操作系統(tǒng)的相關(guān)接口在python是不是比linux C中要簡潔的多。OS的概念不說了结胀,這次筆記集中關(guān)注python中多進程辰如、多線程普监、高并發(fā)、加鎖同步琉兜、進程間通信等實現(xiàn)凯正。
Definition
進程(process),在我的理解中豌蟋,就是一個任務(wù)廊散,是一段運行的程序。后臺的童鞋應(yīng)該知道其本質(zhì)就是一個task_struct結(jié)構(gòu)體梧疲,里面記載著程序運行需要的所有資源和他自身的信息允睹。當(dāng)他獲得運行所需的內(nèi)存、CPU資源等往声,也就是成為了一個running狀態(tài)的進程擂找。
可以把進程理解為一個任務(wù),那線程就是完成這個任務(wù)的執(zhí)行流浩销。線程是CPU調(diào)度的最小粒度贯涎。通常來說,現(xiàn)在的項目中慢洋,至少我接觸的轮纫,一個進程中都包括著不止一個的線程。畢竟現(xiàn)在的OS都是SMP的株汉,充分利用多核心提高程序效率應(yīng)該是每個coder敲鍵盤時需要優(yōu)先考慮的面哼。
多進程
linux的內(nèi)核向外提供了 fork()
這個系統(tǒng)調(diào)用來創(chuàng)建一個本進程的拷貝,當(dāng)然往往fork()后都跟著 exec()
族系統(tǒng)調(diào)用太防,我們創(chuàng)建一個進程一般都是為了執(zhí)行其他的代碼程序妻顶。
python的 os 模塊封裝了很多常用的系統(tǒng)調(diào)用,可以說是python中最常用的一個庫了蜒车。舉個栗子:
import os
print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid == 0:
print('Child process (%s).' % os.getpid())
else:
print('Parent process (%s).' % pid)
fork()
會返回兩個結(jié)果讳嘱,父進程返回一個大于0的無符號數(shù),子進程返回0酿愧。
我們都知道socket()是有好幾個步驟的沥潭,而對于web服務(wù)器,每天每時每分都有著成千上萬的訪問請求嬉挡。如果是一個進程向外提供服務(wù)钝鸽,那就是這個進程為第一個用戶從創(chuàng)建socket到關(guān)閉汇恤,再為下一個用戶提供服務(wù)。用戶時排著隊接受服務(wù)的拔恰,顯然不符合邏輯因谎。
拿Apache舉個栗子,它是多進程架構(gòu)服務(wù)器的代表颜懊。
- 運行主程序蓝角,只負責(zé)server端socket的
listen()
和accept()
,當(dāng)然主進程是一個守護進程 - 每當(dāng)一個用戶請求服務(wù)饭冬,就會調(diào)用
fork()
使鹅,在子程序中接受數(shù)據(jù),read()
或者write()
昌抠,然后提供服務(wù)直至關(guān)閉 - 主進程還是要負責(zé)回收結(jié)束的子進程資源的
偽代碼如下:
import os
server_fd = socket()
bind(server_fd,ip,port)
listen(server_fd,MAX_PROCESS)
While Online:
connfd = accpet(server_fd)
for each connfd:
os.fork()
// TODO
close(server_fd)
上面這段程序只適用linux平臺患朱,windows平臺創(chuàng)建進程的方式并不是 fork()
調(diào)用。python中提供了multiprocesssing
模塊來兼容windows炊苫,比起fork()
裁厅,代碼的語義更好理解一些
from multiprocessing import Process
import os
def run_proc(name):
print('Child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
#創(chuàng)建Process實例
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
這里的join
語義和linux平臺的多線程中的join語義很像,但效果其實是linux平臺的wait
有時候需要進程池侨艾,multiprocessing
也直接提供了pool
用于創(chuàng)建执虹。
pool.apply(func,params) 是單進程阻塞模式
pool.apply_async(func,params,callback) 是多進程異步模式
pool.map(func,iter) 用于可迭代結(jié)構(gòu),阻塞式調(diào)用
pool.map_async(func,iter,callback)
一般情況下唠梨,還是把進程數(shù)控制成和CPU核數(shù)相同袋励。pool結(jié)束調(diào)用pool.join()
回收進程資源時,需要先pool.close()
上面提到過当叭,創(chuàng)建一個新進程的原因往往是為了加載新的代碼茬故,去執(zhí)行新的任務(wù)。所以python封裝了fork()
和之后的exec族蚁鳖,提供subprocess
模塊磺芭,直接操作新的子進程。這個包醉箕,一般是用來執(zhí)行外部的命令或者程序如shell命令钾腺,和os.system()
類似。
import subprocess
r = subprocess.call(['ls','-l']) #阻塞
r = subprocess.call('ls -l',shell = True)
r = subprocess.check_call(['ls','-l']) #returncode不為0則raise CalledProcessError異常
r = subprocess.check_output('ls -l',shell=True)
r = subprocess.Popen(['ls','-l']) #非阻塞讥裤,需主動wait
r = subprocess.Popen(['ls','-l'],stdin=child1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE) #設(shè)置標(biāo)準(zhǔn)輸入輸出出錯的句柄
out,err = r.communicate() #繼續(xù)輸入放棒,或者用來獲得返回的元組(stdoutdata,stderrdata)
手動繼續(xù)輸入的例子:
import subprocess
print('$ python')
p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"print('Hello,world')")
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
進程間通信
用multiprocessing
的Queue
或者Pipe
來幫助實現(xiàn),類似linux中的Pipe
坞琴,打開一條管道哨查,一個進程往里面扔數(shù)據(jù)逗抑,一個從另一頭撿數(shù)據(jù)剧辐。python中的Pipe是全雙工管道寒亥,既可以讀也可以寫∮兀可以通過Pipe(duplex=False)
創(chuàng)建半雙工管道溉奕。
from multiprocessing import Pipe,Queue
#實例
q = Queue()
p = Pipe()
#寫入數(shù)據(jù)
q.put(value)
p[0].send(value)
#讀數(shù)據(jù)
q.get()
p[1].recv()
分別舉個例子,用Queue
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
time.sleep(random.random())
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
用Pipe:
from multiprocessing import Process, Pipe
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to pipe...' % value)
q.send(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.recv()
time.sleep(random.random())
print('Get %s from pipe.' % value)
if __name__=='__main__':
p = Pipe()
pw = Process(target=write, args=(p[0],))
pr = Process(target=read, args=(p[1],))
pw.start()
pr.start()
pw.join()
time.sleep(2)
pr.terminate()
多線程
有人會有疑問忍啤,問什么要在進程中開多個線程加勤,多創(chuàng)建幾個進程一起干活不就行了。其實這樣是可以的同波,只不過進程這個單位有點大鳄梅,比較占用資源,創(chuàng)建的時候開銷比較大(尤其在windows系統(tǒng)下)未檩,進程多了CPU調(diào)度起來戴尸,在進程間切換也是非常耗時的。還有多任務(wù)協(xié)同合作時冤狡,需要數(shù)據(jù)交換孙蒙,進程間通信也是開銷,而一個進程中的線程是共享進程的內(nèi)存空間的悲雳,可以直接交互挎峦。所以現(xiàn)在多線程的程序更加常見。
不過多線程也是有弊端的合瓢,協(xié)同合作的多線程坦胶,有一個掛了,會影響到所有的其他線程晴楔,也就代表這個任務(wù)是做不下去了迁央。進程因為有著獨立的地址空間,所以一個進程死了對其他進程的影響可以說很小滥崩。
python中提供了threading模塊為多線程服務(wù),threading.current_thread()
返回當(dāng)前線程岖圈,主線程名為MainThread
import threading
thread = threading.Thread(target=func,args=())
thread.start()
thread.join()
多線程編程,最重要的就是同步和互斥钙皮,也就是各種鎖的用法蜂科。為什么要用鎖,后臺的童鞋應(yīng)該都懂短条,現(xiàn)在的SMP操作系統(tǒng)都是搶占式內(nèi)核导匣,也就是即使你不同的核共同工作時,很幸運的沒有改亂一個共享變量茸时,當(dāng)然這就不可能了贡定。當(dāng)你的CPU時間片到時間了,或者需要內(nèi)存或者IO資源可都,你被踢出了CPU的工作隊列缓待,你必須得在走的時候給你的資源把鎖加上蚓耽,下次再來接著做。線程同步的重點的是對共享資源的判斷旋炒,和選擇合適的鎖步悠。也就是對什么資源加鎖和用什么鎖。
不過在python中很遺憾瘫镇,多線程存在著天生的缺陷鼎兽,因為有著GIL的存在,這是python解釋器的設(shè)計缺陷铣除。導(dǎo)致python程序在被解釋時谚咬,只能有一個線程。不過尚粘,對于IO密集型的程序序宦,多線程的設(shè)計還是很有幫助的。比如爬蟲
- 最常用的鎖背苦,類似 mutex
- 條件變量互捌,
threading.Condition()
會包含一個Lock
對象,因為這兩者一般都是配合使用的行剂。 - 信號量秕噪,
threading.Semaphore()
import threading
lock = threading.Lock()
lock.acquire()
lock.realease() #配合try...finally保證最后釋放掉鎖,防止死鎖
cond = threading.Condition()
cond.wait()
cond.notify() cond.notify_all()
sem = threading.Semaphore(NUM)
sem.acquire()
sem.realease()
event = threading.Event() #相當(dāng)于沒有l(wèi)ock的cond
event.set(True)
event.clear()
假設(shè)以下的情況
thread_func(params):
web_res = params
def func1(web_res):
http = web_res.http
TODO
def func2(web_res):
data = web_res.data
TODO
def func3(web_res):
user = web_res.user
TODO
在一個線程中厚宰,又存在多個子線程或者函數(shù)時腌巾,需要把一個參數(shù)都傳給它們時〔酰可以通過唯一的id來區(qū)分出從全局變量自己的局部變量時澈蝙。可以用ThreadLocal
實現(xiàn)
import threading
student = threading.local()
def func(name):
person = student.name #需要之前關(guān)聯(lián)過
p1 = threading.Thread(target=func,argc='A')
p1 = threading.Thread(target=func,argc='B')
通過ThredLocal免去了我們親自去字典中存取撵幽。通常用于web開發(fā)中的為每個線程綁定一個數(shù)據(jù)庫連接灯荧,HTTP請求,用戶身份信息等盐杂。
分布式進程
分布式是為了在橫向上提升整個系統(tǒng)的負載能力逗载。python中multiprocessing模塊中的manage子模塊支持把多進程分布到不同的機器上。當(dāng)然肯定存在一個master進程來負責(zé)任務(wù)的調(diào)度链烈。依賴manage子模塊厉斟,可以很輕松的寫出分布式程序。
比如爬蟲强衡,想要爬下豆瓣或者知乎這樣網(wǎng)站的全部數(shù)據(jù)擦秽,用單機估計得花費好幾年。可以把需要爬的網(wǎng)站的所有URL放在一個Queue中感挥,master進程負責(zé)Queue的管理缩搅,可以將很多設(shè)備與master進程所在的設(shè)備建立聯(lián)系,爬蟲開始獲取URL時链快,都從主機器獲取。這樣就能保證協(xié)同不沖突的合作眉尸。