多任務之進程
什么是進程
進程是CPU資源分配的基本單位,進程包含線程驱入,線程依賴于進程拧粪。
進程的狀態(tài)
- 就緒態(tài):運行的條件都已經(jīng)慢去,正在等在cpu執(zhí)行
- 執(zhí)行態(tài):cpu正在執(zhí)行其功能
- 等待態(tài):等待某些條件滿足沧侥,例如一個程序sleep了,此時就處于等待態(tài)
進程可以理解為一個團隊魄鸦,而線程就是團隊的成員宴杀。當團隊要同時完成多個項目,缺少人手時需要招人拾因,既開辟子線程旺罢,當然也可以使用另外的方式完成多任務,比如再找一個團隊绢记,既多線程完成多任務扁达。
多進程多任務
Python 中可以使用multiprocessing 開啟多進程
import time
def look():
"""看風景"""
for i in range(10):
print("這里風景真好...")
time.sleep(0.1)
def walk():
"""徒步旅行"""
for i in range(10):
print("我在徒步旅行...")
time.sleep(0.2)
def main():
"""入口函數(shù)"""
look()
walk()
if __name__ == '__main__':
main()
# 運行結果:
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
將上面的單進程單任務改為多進程多任務
import multiprocessing
import time
def look():
# 獲取當前進程
curr_process = multiprocessing.current_process()
print("look_process", curr_process)
# 打印當前進程的id
print("look_pid", curr_process.pid)
"""看風景"""
for i in range(10):
print("這里風景真好...")
time.sleep(0.1)
def walk():
# 獲取當前進程
curr_process = multiprocessing.current_process()
print("walk_process", curr_process)
# 打印當前進程的id
print("walk_pid", curr_process.pid)
"""徒步旅行"""
for i in range(10):
print("我在徒步旅行...")
time.sleep(0.2)
def main():
"""入口函數(shù)"""
# look()
# walk()
# 獲取當前進程
curr_process = multiprocessing.current_process()
print("current_process", curr_process)
# 打印當前進程的id
print("process_pid", curr_process.pid)
# 獲取當前進程中活動的子進程
print("multiprocessing.Process前 ", multiprocessing.active_children())
look_process = multiprocessing.Process(target=look)
walk_process = multiprocessing.Process(target=walk)
# 獲取當前進程中活動的子進程
print("multiprocessing.Process后 ", multiprocessing.active_children())
look_process.start()
walk_process.start()
# 獲取當前進程中活動的子進程
print("start后 ", multiprocessing.active_children())
if __name__ == '__main__':
main()
# 運行結果:
# current_process < _MainProcess(MainProcess, started) >
# process_pid 14734
# multiprocessing.Process前[]
# multiprocessing.Process后[]
# start后[ < Process(Process - 2, started) >, < Process(Process - 1, started) >]
# look_process < Process(Process - 1, started) >
# look_pid 14735
# 這里風景真好...
# walk_process < Process(Process - 2, started) >
# walk_pid 14736
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
分析結果:
- 默認有一條進程MainProcess ,叫做主進程
- multiprocessing.Process() 只是準備了一條進程蠢熄,并沒有創(chuàng)建子進程(multiprocessing.Process前[])
- start 將準備好的子線程運行起來(multiprocessing.Process前[])
- 進程之間執(zhí)行是無序的(輸出結果無序)
- 主進程默認會等待子進程結束
設置守護進程
import multiprocessing
import time
def look():
"""看風景"""
for i in range(10):
print("這里風景真好...")
time.sleep(0.1)
def walk():
"""徒步旅行"""
for i in range(10):
print("我在徒步旅行...")
time.sleep(0.2)
def main():
"""入口函數(shù)"""
# look()
# walk()
look_process = multiprocessing.Process(target=look)
walk_process = multiprocessing.Process(target=walk)
# 設置守護進程
look_process.daemon = True
walk_process.daemon = True
look_process.start()
walk_process.start()
time.sleep(1)
print("over")
if __name__ == '__main__':
main()
# 運行結果:
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# over
從結果上看跪解,主進程結束,子進程被結束或者說被銷毀签孔。
小結:
- 獲取當前進程
- multiprocessing.current_process()
- 打印當前進程的id
- print("process_pid", curr_process.pid)
- 獲取活動子進程
- multiprocessing.active_children()
- 進程之間執(zhí)行是無序的
- 默認情況下叉讥,主進程會等待子進程執(zhí)行結束窘行,程序再退出,既主進程結束图仓。
- 設置守護進程罐盔,主進程結束,子進程被銷毀救崔。
- 注意點:主進程里的子進程如果有一個沒有設置守護惶看,那么主進程會等待這個子進程執(zhí)行結束,因為主進程沒有結束六孵,所以守護進程不會被銷毀纬黎。
- 注意點:設置守護進程( daemon ),要在start() 之前設置狸臣,否則會報錯(RuntimeError: cannot set daemon status of active thread)莹桅。
討論:為什么進程執(zhí)行無序?首先烛亦,線程是CPU調(diào)度的基本單位诈泼,線程執(zhí)行是無序的;其次煤禽,線程依賴于進程铐达,一個進程默認有一條線程,所以進程之間的執(zhí)行順序也是無序的檬果。
記錄:線程和進程對CPU的使用情況瓮孙,線程對CPU的使用率達不到100%,僅有50%左右选脊;進程對CPU的使用率可以達到100%杭抠。
多進程與全局變量[1]
import multiprocessing
import time
tmp_list = []
def look():
"""看風景"""
for i in range(10):
tmp_list.append("這里風景真好...")
print("這里風景真好...")
time.sleep(0.1)
print(id(tmp_list))
print(tmp_list)
def walk():
"""徒步旅行"""
for i in range(10):
tmp_list.append("我在徒步旅行...")
print("我在徒步旅行...")
time.sleep(0.2)
print(id(tmp_list))
print(tmp_list)
def main():
"""入口函數(shù)"""
# look()
# walk()
look_process = multiprocessing.Process(target=look)
walk_process = multiprocessing.Process(target=walk)
# 設置守護進程
look_process.daemon = True
walk_process.daemon = True
look_process.start()
walk_process.start()
time.sleep(3)
print("over", tmp_list)
print(id(tmp_list))
if __name__ == '__main__':
main()
# 運行結果:
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 139881809190792
# ['這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...', '這里風景真好...']
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 139881809190792
# ['我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...']
# over[]
# 139881809190792
從結果可以看出,進程之間不共享全局變量
使用Queue恳啥,完成進程之間數(shù)據(jù)共享
import multiprocessing
import time
tmp_list = []
def look(qun):
"""看風景"""
for i in range(10):
tmp = qun.get()
tmp.append("這里風景真好...")
qun.put(tmp)
print("這里風景真好...")
time.sleep(0.1)
def walk(qun):
"""徒步旅行"""
for i in range(10):
tmp = qun.get()
tmp.append("我在徒步旅行...")
print("我在徒步旅行...")
qun.put(tmp)
time.sleep(0.2)
def main():
"""入口函數(shù)"""
# look()
# walk()
qun = multiprocessing.Queue(3)
qun.put(tmp_list)
look_process = multiprocessing.Process(target=look, args=(qun,))
walk_process = multiprocessing.Process(target=walk, args=(qun,))
# 設置守護進程
look_process.daemon = True
walk_process.daemon = True
look_process.start()
walk_process.start()
time.sleep(3)
print("over", qun.get())
if __name__ == '__main__':
main()
# 運行結果:
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 這里風景真好...
# 我在徒步旅行...
# 這里風景真好...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# 我在徒步旅行...
# over[
# '這里風景真好...', '我在徒步旅行...', '這里風景真好...', '這里風景真好...', '我在徒步旅行...', '這里風景真好...', '這里風景真好...', '我在徒步旅行...', '這里風景真好...', '這里風景真好...', '我在徒步旅行...', '這里風景真好...', '這里風景真好...', '我在徒步旅行...', '這里風景真好...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...', '我在徒步旅行...']
通過Queue 實現(xiàn)了數(shù)據(jù)共享偏灿。
進程之間的資源競爭
import multiprocessing
import time
a = 0
def look(qun):
"""看風景"""
for i in range(1000000):
qun.value += 1
def walk(qun):
"""徒步旅行"""
for i in range(1000000):
qun.value += 1
def main():
"""入口函數(shù)"""
# look()
# walk()
qun = multiprocessing.Value("d", a)
look_process = multiprocessing.Process(target=look, args=(qun,))
walk_process = multiprocessing.Process(target=walk, args=(qun,))
# 設置守護進程
look_process.daemon = True
walk_process.daemon = True
look_process.start()
walk_process.start()
time.sleep(10)
print("over", qun.value)
if __name__ == '__main__':
main()
# 運行結果:
# over 1052801.0
同步解決資源競爭問題
import multiprocessing
import time
a = 0
def look(qun):
"""看風景"""
for i in range(1000000):
qun.value += 1
def walk(qun):
"""徒步旅行"""
for i in range(1000000):
qun.value += 1
def main():
"""入口函數(shù)"""
# look()
# walk()
qun = multiprocessing.Value("d", a)
look_process = multiprocessing.Process(target=look, args=(qun,))
walk_process = multiprocessing.Process(target=walk, args=(qun,))
# 設置守護進程
look_process.daemon = True
walk_process.daemon = True
look_process.start()
# 進程同步
look_process.join()
walk_process.start()
time.sleep(10)
print("over", qun.value)
if __name__ == '__main__':
main()
# 運行結果:
# over 2000000.0
類比線程資源競爭問題,進程同樣可以使用同步與互斥鎖解決資源競爭問題钝的。
小結
- 多進程共享全局變量翁垂,但共享全局變量會產(chǎn)生資源競爭問題
- 進程同步(join())和互斥鎖都能解決資源競爭問題,但是都是將多任務變成了單任務硝桩。
- 使用鎖要避免死鎖
- 注意點:如果兩個子進程共享全局變量沿猜,只對一個子進程加鎖,可以理解為和沒加鎖一樣碗脊。
- 注意點:效率問題啼肩,同樣是累加200,0000次,直接遍歷累加200,0000次所用時間小于進程同步所用時間小于互斥鎖所用時間
進程池
import multiprocessing
import time
# 向指定隊列寫入數(shù)據(jù)
def write_data(queue):
for i in range(5):
queue.put(i)
print(i)
time.sleep(0.2)
print("數(shù)據(jù)寫入完成")
# 向指定隊列讀取數(shù)據(jù)
def read_data(queue):
while True:
# 判斷隊列是否為空
if queue.empty():
print("隊列為空")
break
else:
value = queue.get()
print(value)
if __name__ == '__main__':
# 創(chuàng)建進程池中的queue
queue = multiprocessing.Manager().Queue()
# 創(chuàng)建進程池
pool = multiprocessing.Pool(2)
# 通過進程池執(zhí)行寫入數(shù)據(jù)的任務
# pool.apply(write_data, (queue,))
# # 通過進程池執(zhí)行讀取數(shù)據(jù)的任務
# pool.apply(read_data, (queue,))
# 使用異步執(zhí)行對應的任務
# 執(zhí)行異步異步任務會返回一個應用結果對象,這個對象里面有一個wait()方法可以等待異步任務執(zhí)行完成以后代碼再繼續(xù)往下執(zhí)行
# <multiprocessing.pool.ApplyResult object at 0x7f22d1795e48>
result = pool.apply_async(write_data, (queue,))
# 等待寫入這個異步任務執(zhí)行完成以后代碼再繼續(xù)執(zhí)行
result.wait()
# time.sleep(1.1)
pool.apply_async(read_data, (queue,))
# 關閉進程池
pool.close()
# 主進程等待進程池執(zhí)行完任務以后程序再退出
pool.join()
- 進程池: 池子里面放的都是進程疟游,進程池可以根據(jù)任務自動創(chuàng)建進程呼畸,會合理利用進程池中的進程完成多任務,能夠節(jié)省資源的開銷
- 默認情況下颁虐,主進程不會等待進程池結束
- 使用組合拳蛮原,pool.close() 和 pool.join() 讓主進程等待進程池任務執(zhí)行結束
- 進程池分為同步執(zhí)行pool.apply 和異步執(zhí)行pool.apply_async
到此結 ? DragonFangQy ?2018.4.29