多任務(wù)原理
現(xiàn)代操作系統(tǒng)(Windows、Mac OS X、Linux丽声、UNIX等)都支持“多任務(wù)”
什么叫多任務(wù)?觉义?雁社?
操作系統(tǒng)同時可以運行多個任務(wù)
單核CPU實現(xiàn)多任務(wù)原理:操作系統(tǒng)輪流讓各個任務(wù)交替執(zhí)行,QQ執(zhí)行2us晒骇,切換到微信霉撵,在執(zhí)行2us,再切換到陌陌洪囤,執(zhí)行2us……喊巍。表面是看,每個任務(wù)反復執(zhí)行下去箍鼓,但是CPU調(diào)度執(zhí)行速度太快了崭参,導致我們感覺就行所有任務(wù)都在同時執(zhí)行一樣
多核CPU實現(xiàn)多任務(wù)原理:真正的秉性執(zhí)行多任務(wù)只能在多核CPU上實現(xiàn),但是由于任務(wù)數(shù)量遠遠多于CPU的核心數(shù)量款咖,所以何暮,操作系統(tǒng)也會自動把很多任務(wù)輪流調(diào)度到每個核心上執(zhí)行
并發(fā):看上去一起執(zhí)行,任務(wù)數(shù)多于CPU核心數(shù)
并行:真正一起執(zhí)行铐殃,任務(wù)數(shù)小于等于CPU核心數(shù)
實現(xiàn)多任務(wù)的方式:
1海洼、多進程模式
2、多線程模式
3富腊、協(xié)程模式
4坏逢、多進程+多線程模式
進程概念
對于操作系統(tǒng)而言,一個任務(wù)就是一個進程
進程是系統(tǒng)中程序執(zhí)行和資源分配的基本單位赘被。每個進程都有自己的數(shù)據(jù)段是整、代碼段、和堆棧段
多進程: 多進程不能共享資源,多個進程相互獨立,多個進程受控于操作系統(tǒng).
多線程: 多線程可以共享資源,線程受控于進程.
多個線程在同一個應(yīng)用程序中.
什么是進程民假?
進程就是一個程序在一個數(shù)據(jù)集上的一次動態(tài)執(zhí)行過程浮入。進程一般由程序、數(shù)據(jù)集羊异、進程控制塊三部分組成事秀。我們編寫的程序用來描述進程要完成哪些功能以及如何完成彤断;數(shù)據(jù)集則是程序在執(zhí)行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特征易迹,描述進程的執(zhí)行變化過程宰衙,系統(tǒng)可以利用它來控制和管理進程,它是系統(tǒng)感知進程存在的唯一標志睹欲。
進程的生命周期
單任務(wù)現(xiàn)象
from time import sleep
def run():
while True:
print("sunck is a nice man")
sleep(1.2)
if __name__ == "__main__":
while True:
print("he is a good man")
sleep(1)
# 不會執(zhí)行到run方法菩浙,只有上面的while循環(huán)結(jié)束才可以執(zhí)行
run()
父進程和子進程
Linux 操作系統(tǒng)提供了一個 fork() 函數(shù)用來創(chuàng)建子進程,這個函數(shù)很特殊句伶,調(diào)用一次,返回兩次陆淀,因為操作系統(tǒng)是將當前的進程(父進程)復制了一份(子進程)考余,然后分別在父進程和子進程內(nèi)返回。子進程永遠返回0轧苫,而父進程返回子進程的 PID楚堤。我們可以通過判斷返回值是不是 0 來判斷當前是在父進程還是子進程中執(zhí)行。
? 在 Python 中同樣提供了 fork() 函數(shù)含懊,此函數(shù)位于 os 模塊下身冬。
import os
import time
print("在創(chuàng)建子進程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
pid = os.fork()
if pid == 0:
print("子進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
time.sleep(5)
else:
print("父進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
# pid表示回收的子進程的pid
#pid, result = os.wait() # 回收子進程資源 阻塞
time.sleep(5)
#print("父進程:回收的子進程pid=%d" % pid)
#print("父進程:子進程退出時 result=%d" % result)
# 下面的內(nèi)容會被打印兩次,一次是在父進程中岔乔,一次是在子進程中酥筝。
# 父進程中拿到的返回值是創(chuàng)建的子進程的pid,大于0
print("fork創(chuàng)建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
父子進程如何區(qū)分?
子進程是父進程通過fork()產(chǎn)生出來的雏门,pid = os.fork()
通過返回值pid是否為0嘿歌,判斷是否為子進程,如果是0茁影,則表示是子進程
由于 fork() 是 Linux 上的概念宙帝,所以如果要跨平臺,最好還是使用 subprocess 模塊來創(chuàng)建子進程募闲。
子進程如何回收步脓?
python中采用os.wait()方法用來回收子進程占用的資源
pid, result = os.wait() # 回收子進程資源 阻塞,等待子進程執(zhí)行完成回收
在UNIX 系統(tǒng)中浩螺,一個進程結(jié)束了靴患,但是他的父進程沒有等待(調(diào)用wait / waitpid)他, 那么他將變成一個僵尸進程要出。 但是如果該進程的父進程已經(jīng)先結(jié)束了蚁廓,那么該進程就不會變成僵尸進程, 因為每個進程結(jié)束的時候厨幻,系統(tǒng)都會掃描當前系統(tǒng)中所運行的所有進程相嵌, 看有沒有哪個進程是剛剛結(jié)束的這個進程的子進程腿时,如果是的話,就由Init 來接管他饭宾,成為他的父進程……
fork()
缺點:
? 1.兼容性差批糟,只能在類linux系統(tǒng)下使用,windows系統(tǒng)不可使用看铆;
? 2.擴展性差徽鼎,當需要多條進程的時候,進程管理變得很復雜弹惦;
? 3.會產(chǎn)生“孤兒”進程和“僵尸”進程否淤,需要手動回收資源。
優(yōu)點:
? 是系統(tǒng)自帶的接近低層的創(chuàng)建方式棠隐,運行效率高石抡。
進程實現(xiàn)多任務(wù)
multiprocessing模塊提供Process類實現(xiàn)新建進程
特點:
? 1.注意:Process對象可以創(chuàng)建進程,但Process對象不是進程助泽,其刪除與否與系統(tǒng)資源是否被回收沒有直接的關(guān)系啰扛。
2.主進程執(zhí)行完畢后會默認等待子進程結(jié)束后回收資源,不需要手動回收資源嗡贺;join()函數(shù)用來控制子進程
? 結(jié)束的順序,其內(nèi)部也有一個清除僵尸進程的函數(shù)隐解,可以回收資源;
3.Process進程創(chuàng)建時诫睬,子進程會將主進程的Process對象完全復制一份煞茫,這樣在主進程和子進程各有一個 Process對象,但是p.start()啟動的是子進程摄凡,主進程中的Process對象作為一個靜態(tài)對象存在溜嗜,不執(zhí)行。
4.當子進程執(zhí)行完畢后架谎,會產(chǎn)生一個僵尸進程炸宵,其會被join函數(shù)回收,或者再有一條進程開啟谷扣,start函數(shù)也會回收僵尸進程土全,所以不一定需要寫join函數(shù)。
5.windows系統(tǒng)在子進程結(jié)束后會立即自動清除子進程的Process對象会涎,而linux系統(tǒng)子進程的Process對象如果沒有join函數(shù)和start函數(shù)的話會在主進程結(jié)束后統(tǒng)一清除裹匙。
另外還可以通過繼承Process對象來重寫run方法創(chuàng)建進程
'''
multiprocessing 庫
跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象
'''
from multiprocessing import Process
from time import sleep
import os
#子進程需要執(zhí)行的買嗎
def run(str):
while True:
# os.getpid()獲取當前進程id號
# os.getppid()獲取當前進程的父進程id號
print("he is a %s man--%s--%s"%(str, os.getpid(),os.getppid()))
sleep(1.2)
if __name__ == "__main__":
print("主(父)進程啟動-%s"%(os.getpid()))
#創(chuàng)建子進程
#target說明進程執(zhí)行的任務(wù)
p = Process(target=run, args=("nice",))
#啟動進程
p.start()
while True:
print("he is a good man")
sleep(1)
父進程的先后順序
from multiprocessing import Process
from time import sleep
import os
def run(str):
print("子進程啟動")
sleep(3)
print("子進程結(jié)束")
if __name__ == "__main__":
print("父進程啟動")
p = Process(target=run, args=("nice",))
p.start()
# 在不加join的時候末秃,父進程結(jié)束后子進程稱為孤兒進程概页,繼續(xù)運行,父進程的結(jié)束不能影響子進程
#利用join练慕,進程阻塞惰匙,讓父進程等待子進程結(jié)束再執(zhí)行父進程
p.join()
print("父進程結(jié)束")
全局變量在多個進程中不能共享
父進程和子進程使用不同的堆棧段
from multiprocessing import Process
from time import sleep
num = 100
def run():
print("子進程開始")
global num # num = 100
num += 1
print(num) # 101
print("子進程結(jié)束")
if __name__ == "__main__":
print("父進程開始")
p = Process(target=run)
p.start()
p.join()
# 在子進程中修改全局變量對父進程中的全局變量沒有影響
# 在創(chuàng)建子進程時對全局變量做了一個備份技掏,父進程中的與子進程中的num是完全不同的兩個變量
print("父進程結(jié)束--%d"%num) # 100
進程池
from multiprocessing import Pool
import os, time, random
def run(name):
print("子進程%d啟動--%s" % (name, os.getpid()))
start = time.time()
time.sleep(random.choice([1,2,3]))
end = time.time()
print("子進程%d結(jié)束--%s--耗時%.2f" % (name, os.getpid(), end-start))
if __name__ == "__main__":
print("父進程啟動")
#創(chuàng)建多個進程
#進程池 Pool
#表示可以同時執(zhí)行的進程數(shù)量
#Pool默認大小是CPU核心數(shù)
pp = Pool(4)
for i in range(6):
# 先同時啟動4個子進程,因為是4核项鬼,有進程結(jié)束后才能執(zhí)行第5,6個
#創(chuàng)建進程哑梳,放入進程池統(tǒng)一管理
pp.apply_async(run,args=(i,))
#在調(diào)用join之前必須先調(diào)用close,調(diào)用close之后就不能再繼續(xù)添加新的進程了
pp.close()
#進程池對象調(diào)用join,會等待進程池中所有的子進程結(jié)束完畢再去執(zhí)行父進程
pp.join()
print("父進程結(jié)束")
述代碼中的pool.apply_async()
是apply()
函數(shù)的變體绘盟,apply_async()
是apply()
的并行版本鸠真,apply()
是apply_async()
的阻塞版本,使用apply()
主進程會被阻塞直到函數(shù)執(zhí)行結(jié)束龄毡,所以說是阻塞版本吠卷。apply()
既是Pool
的方法岖瑰,也是Python內(nèi)置的函數(shù)吠勘,兩者等價腮介】樟恚可以看到輸出結(jié)果并不是按照代碼for循環(huán)中的順序輸出的。
多個子進程并返回值
apply_async()
本身就可以返回被進程調(diào)用的函數(shù)的返回值赤套。上一個創(chuàng)建多個子進程的代碼中,如果在函數(shù)func
中返回一個值,那么pool.apply_async(func, (msg, ))
的結(jié)果就是返回pool中所有進程的值的對象(注意是對象寻拂,不是值本身)。
import multiprocessing
import time
def func(msg):
return multiprocessing.current_process().name + '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # 創(chuàng)建4個進程
results = []
for i in range(20):
msg = "process %d" %(i)
results.append(pool.apply_async(func, (msg, )))
pool.close() # 關(guān)閉進程池丈牢,表示不能再往進程池中添加進程祭钉,需要在join之前調(diào)用
pool.join() # 等待進程池中的所有進程執(zhí)行完畢
print ("Sub-process(es) done.")
for res in results:
print (res.get())
與之前的輸出不同,這次的輸出是有序的己沛。
? 如果電腦是八核慌核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1申尼,可以看到每個CPU的使用率是比較平均的
拷貝文件
import os, time
from multiprocessing import Pool
#實現(xiàn)文件的拷貝
def copyFile(rPath, wPath):
fr = open(rPath, "rb")
fw = open(wPath, "wb")
context = fr.read()
fw.write(context)
fr.close()
fw.close()
path = r"C:\Users\xlg\Desktop\Python-1704\day20\2垮卓、進程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2、進程\toFile"
#讀取path下的都有的文件
filesList = os.listdir(path)
#啟動for循環(huán)處理每一個文件
start = time.time()
for fileName in filesList:
copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
end = time.time()
print("總耗時:%0.2f" % (end-start))
多進程實現(xiàn)文件的拷貝
import os, time
from multiprocessing import Pool
#實現(xiàn)文件的拷貝
def copyFile(rPath, wPath):
fr = open(rPath, "rb")
fw = open(wPath, "wb")
context = fr.read()
fw.write(context)
fr.close()
fw.close()
path = r"C:\Users\xlg\Desktop\Python-1704\day20\2师幕、進程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2粟按、進程\toFile"
if __name__ == "__main__":
# 讀取path下的都有的文件
filesList = os.listdir(path)
start = time.time()
pp = Pool(4)
for fileName in filesList:
pp.apply_async(copyFile, args=(os.path.join(path,fileName), os.path.join(toPath,fileName)))
pp.close()
pp.join()
end = time.time()
print("總耗時:%0.2f" % (end-start))
封裝進程對象
# myProcess.py
from multiprocessing import Process
import os, time
class MyProcess(Process):
def __init__(self,name):
Process.__init__(self)
self.name = name
def run(self):
print("子進程(%s-%s)啟動" % (self.name, os.getpid()))
#子進程的功能
time.sleep(3)
print("子進程(%s-%s)結(jié)束" % (self.name, os.getpid()))
from myProcess import MyProcess
if __name__ == "__main__":
print("父進程啟動")
#創(chuàng)建子進程
p = myProcess("test")
# 自動調(diào)用p進程對象的run方法
p.start()
p.join()
print("父進程結(jié)束")
進程間通信
- 管道pipe(全雙工,半雙工):管道是一種半雙工的通信方式霹粥,數(shù)據(jù)只能單向流動灭将,而且只能在具有親緣關(guān)系的進程間使用。進程的親緣關(guān)系通常是指父子進程關(guān)系后控。
- 命名管道FIFO:有名管道也是半雙工的通信方式庙曙,但是它允許無親緣關(guān)系進程間的通信。
- 消息隊列MessageQueue:消息隊列是由消息的鏈表浩淘,存放在內(nèi)核中并由消息隊列標識符標識捌朴。消息隊列克服了信號傳遞信息少吴攒、管道只能承載無格式字節(jié)流以及緩沖區(qū)大小受限等缺點。
- 共享存儲SharedMemory:共享內(nèi)存就是映射一段能被其他進程所訪問的內(nèi)存男旗,這段共享內(nèi)存由一個進程創(chuàng)建舶斧,但多個進程都可以訪問。共享內(nèi)存是最快的 IPC 方式察皇,它是針對其他進程間通信方式運行效率低而專門設(shè)計的茴厉。它往往與其他通信機制,如信號兩什荣,配合使用矾缓,來實現(xiàn)進程間的同步和通信。
以上幾種進程間通信方式中稻爬,消息隊列是使用的比較頻繁的方式嗜闻。
- 單工數(shù)據(jù)傳輸只支持數(shù)據(jù)在一個方向上傳輸;
- 半雙工數(shù)據(jù)傳輸允許數(shù)據(jù)在兩個方向上傳輸桅锄,但是琉雳,在某一時刻,只允許數(shù)據(jù)在一個方向上傳輸友瘤,它實際上是一種切換方向的單工通信翠肘;
- 全雙工數(shù)據(jù)通信允許數(shù)據(jù)同時在兩個方向上傳輸,因此辫秧,全雙工通信是兩個單工通信方式的結(jié)合束倍,它要求發(fā)送設(shè)備和接收設(shè)備都有獨立的接收和發(fā)送能力。
(1)管道pipe
import multiprocessing
def foo(sk):
sk.send('hello father')
print(sk.recv())
if __name__ == '__main__':
conn1,conn2=multiprocessing.Pipe(duplex=True) #開辟兩個口盟戏,都是能進能出绪妹,括號中如果False即單向通信, 那么由主進程發(fā)送消息,子進程接受消息
p=multiprocessing.Process(target=foo,args=(conn1,)) #子進程使用sock口柿究,調(diào)用foo函數(shù)
p.start()
print(conn2.recv()) #主進程使用conn口接收邮旷,從管道pipe中讀取消息
conn2.send('hi son') #主進程使用conn口發(fā)送
Pipe對象返回的元組分別代表管道的兩端,管道默認是全雙工蝇摸,兩端都支持send
和recv
方法廊移,兩個進程分別操作管道兩端時不會有沖突,兩個進程對管道一端同時讀寫時可能會有沖突探入。
如果聲明了coon1,coon2 = Pipe(duplex=False)
的單向管道狡孔,則coon1
只負責接受消息,conn2
只負責發(fā)送消息蜂嗽。
(2)消息隊列Queue
Queue是多進程的安全隊列苗膝,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞。
Queue的一些常用方法:
- Queue.qsize():返回當前隊列包含的消息數(shù)量植旧;
- Queue.empty():如果隊列為空辱揭,返回True离唐,反之False ;
- Queue.full():如果隊列滿了问窃,返回True,反之False亥鬓;
- Queue.get():獲取隊列中的一條消息,然后將其從列隊中移除域庇,可傳參超時時長嵌戈。
- Queue.get_nowait():相當Queue.get(False),取不到值時觸發(fā)異常:Empty;
- Queue.put():將一個值添加進數(shù)列听皿,可傳參超時時長熟呛。
- Queue.put_nowait():相當于Queue.get(False),當隊列滿了時報錯:Full。
from multiprocessing import Process, Queue
import os, time
def write(q):
print("啟動寫子進程%s" % (os.getpid()))
for chr in ["A", "B", "C", "D"]:
q.put(chr)
time.sleep(1)
print("結(jié)束寫子進程%s" % (os.getpid()))
def read(q):
print("啟動讀子進程%s" % (os.getpid()))
while True:
value = q.get(True)
print("value = " + value)
print("結(jié)束讀子進程%s" % (os.getpid()))
if __name__ == "__main__":
#父進程創(chuàng)建隊列尉姨,并傳遞給子進程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
#pr進程里是個死循環(huán)庵朝,無法等待其結(jié)束,只能強行結(jié)束
pr.terminate()
print("父進程結(jié)束")
(3)隊列通信實現(xiàn)多進程爬蟲
import time
from multiprocessing import Process, Queue
class Downloader(Process):
def __init__(self, task_queue, result_queue):
super().__init__()
self.name = '下載器'
self.task_queue: Queue = task_queue
self.result_queue: Queue = result_queue
def download(self, url):
time.sleep(2)
print('下載完成', url)
return {'time': time.time()}
def run(self): # 啟動當前進程之后執(zhí)行的核心程序
while True:
try:
# 設(shè)置10秒的超時時間又厉,如果是10秒沒有任務(wù)九府,則會拋出異常
task = self.task_queue.get(timeout=10)
if isinstance(task, str):
resp = self.download(task)
# 數(shù)據(jù)寫入到任務(wù)結(jié)果中
self.result_queue.put(resp)
else:
self.result_queue.put('bye')
break
except:
break
class ItemPipeline(Process):
def __init__(self, result_queue):
super().__init__()
self.name = '數(shù)據(jù)處理Item'
self.result_queue = result_queue
def run(self):
while True:
data = self.result_queue.get()
if isinstance(data, dict):
print('下載完成的數(shù)據(jù):', data)
else:
break
print('ItemPipeline 進程退出')
def start(downloads, itempipes):
for d in downloads:
d.start()
for i in itempipes:
i.start()
def wait(downloads, itempipes):
for d in downloads:
d.join()
for i in itempipes:
i.join()
if __name__ == '__main__':
taskQueue = Queue()
resultQueue = Queue()
downloads = [Downloader(taskQueue, resultQueue) for _ in range(4)]
itempipes = [ItemPipeline(resultQueue) for _ in range(2)]
start(downloads, itempipes)
# 發(fā)布任務(wù)
for i in range(100):
taskQueue.put('http://www.baidu.com/s?key=%d' % i)
time.sleep(0.1)
# 結(jié)束
taskQueue.put(0)
wait(downloads, itempipes)