更多內容請到個人博客:https://www.cxy96.top/
1. CPU密集型级乍、I/O密集型?
CPU密集型
- CPU密集型也叫計算密集型玫荣,是指I/O在很短的時間就可以完成捅厂,CPU需要大量的計算和處理焙贷,特點是CPU占用率相當高
- 例如:壓縮解壓縮辙芍、加密解密、正則表達式搜索
I/O密集型
- I/O密集型指的是系統(tǒng)運作大部分的狀況是CPU在I/O(硬盤/內存)的讀/寫操作注簿,CPU占用率仍然較低跳仿。
- 文件處理程序菲语、網(wǎng)絡爬蟲程序、讀寫數(shù)據(jù)庫程序
2. 多種并發(fā)方式對比
多進程
- 優(yōu)點:可以利用多核CPU并行運算
- 缺點:占用資源最多眼俊,可啟動數(shù)目比線程少
- 適用于:CPU密集型計算
多線程
優(yōu)點:相比進程,更輕量級挚币,占用資源少
-
缺點:
相比進程:多線程只能并發(fā)執(zhí)行,不能利用多CPU(GIL)
相比協(xié)程:啟動數(shù)目有限制池凄,占用內存資源致盟,有線程切換開銷
適用于:I/O密集型計算
多協(xié)程
- 優(yōu)點:內存開銷最少馏锡、啟動協(xié)程數(shù)量最多
- 缺點:支持的庫有限制(aiohttp vs requests)杯道、代碼實現(xiàn)復雜
- 適用于:I/O密集型計算 但有現(xiàn)成庫支持的場景
3. python運行慢的原因
-
全局解釋器鎖(Global Interpreter Lock)
- 每一個線程在開始執(zhí)行時蕉饼,都會鎖住 GIL昧港,以阻止別的線程執(zhí)行创肥;同樣的巩搏,每一個線程執(zhí)行完一段后,會釋放 GIL趾代,以允許別的線程開始利用資源贯底。
- 導致了python的多線程是偽多線程,不能同時調用多個線程
-
Python是解釋型語言而不是編譯型語言
- 程序不需要編譯撒强,程序在運行時才翻譯成機器語言禽捆,每執(zhí) 行一次都要翻譯一次。因此效率比較低
-
Python是一門動態(tài)類型的語言
- 運行時可以改變其結構的語言:例如新的函數(shù)飘哨、對象胚想、甚至代碼可以被引進,已有的函數(shù)可以被刪除或是其他結構上的變化芽隆。
4. 代碼實現(xiàn)
多進程
-
創(chuàng)建進程方式
import multiprocessing # 創(chuàng)建一個進程 process1 = multiprocessing.Process(target=函數(shù)名,args=(參數(shù)1,)) #(參數(shù)1)是字符串 (參數(shù)1,)是元組 # 執(zhí)行進程 process1.start() # 等待子進程結束浊服,才繼續(xù)執(zhí)行主進程下面的代碼 process1.join()
-
多進程示例
import multiprocessing # 進程列表放置進程 processes = [] # 創(chuàng)建十個進程 for i in range(10): processes.append( multiprocessing.Process(target=函數(shù)名,args=(參數(shù)1,)) #(參數(shù)1)是字符串 (參數(shù)1,)是元組 ) # 執(zhí)行線程 for process in processes: process.start() for process in processes: process.join()
-
進程池
from concurrent.futures import ProcessPoolExecutor # 第一種map方法按順序返回 max_workers 指定最大進程數(shù) # 只能傳遞一個參數(shù) with ProcessPoolExecutor(max_workers=5) as pool: results=pool.map(函數(shù)名,[線程1參數(shù),線程2參數(shù)]) #每個參數(shù)對應一個線程 map一下子執(zhí)行 返回順序按參數(shù)順序 for result in results: print(result) # 第二種as_completed 方法 先執(zhí)行完先返回 from concurrent.futures import ProcessPoolExecutor,as_completed with ProcessPoolExecutor() as pool: futures=[pool.submit(函數(shù)名,參數(shù)) for 參數(shù) in 參數(shù)列表] # 按順序返回 for future in futures: print(future.result()) # 先執(zhí)行完先返回 for future in as_completed(futures): print(result)
多線程
-
創(chuàng)建線程方式
import threading # 創(chuàng)建一個線程 thread1 = threading.Thread(target=函數(shù)名,args=(參數(shù)1,)) #(參數(shù)1)是字符串 (參數(shù)1,)是元組 # 執(zhí)行線程 thread1.start() # 等待子線程結束,才繼續(xù)執(zhí)行下面的代碼 thread1.join()
-
多線程示例
import threading # 線程列表放置線程 threads = [] # 創(chuàng)建一千個線程 for i in range(1000): threads.append( threading.Thread(target=函數(shù)名,args=(參數(shù)1,)) #(參數(shù)1)是字符串 (參數(shù)1,)是元組 ) # 執(zhí)行線程 for thread in threads: thread.start() for thread in threads: thread.join()
-
線程池管理
優(yōu)點:
- 提升性能:因為減去了大量新建、終止線程的開銷,重用了線程資源乓搬;
- 適用場景:適合處理突發(fā)性大量請求或需要大量線程完成任務棉磨、但實際任務處理時間較短
from concurrent.futures import ThreadPoolExecutor # 第一種map方法按順序返回 max_workers 指定最大線程數(shù) # 只能傳遞一個參數(shù) with ThreadPoolExecutor(max_workers=5) as pool: results=pool.map(函數(shù)名,[線程1參數(shù),線程2參數(shù)]) #每個參數(shù)對應一個線程 map一下子執(zhí)行 返回順序按參數(shù)順序 for result in results: print(result) # 第二種as_completed 方法 先執(zhí)行完先返回 from concurrent.futures import ThreadPoolExecutor,as_completed with ThreadPoolExecutor() as pool: futures=[pool.submit(函數(shù)名,參數(shù)) for 參數(shù) in 參數(shù)列表] # 按順序返回 for future in futures: print(future.result()) # 先執(zhí)行完先返回 for future in as_completed(futures): print(result)
多協(xié)程
實現(xiàn):
import asyncio
# 獲取事件循環(huán)
loop = asyncio.get_event_loop()
# 定義協(xié)程
async def func(參數(shù)):
await func2(參數(shù))
# 創(chuàng)建task列表
tasks = [loop.create_task(func(參數(shù))) for 參數(shù) in 參數(shù)列表]
# 執(zhí)行事件列表
loop.run_until_complete(asyncio.wait(tasks))
5. 高階應用
5.1 多組件Pipeline架構
使復雜的程序抬吟,分為多個中間步驟完成
實現(xiàn):
import queue
# 1.創(chuàng)建Queue對象 入?yún)?maxsize 是一個整數(shù)钙畔,如果 maxsize 設置為小于或等于零,則隊列的長度沒有限制。
q = queue.Queue(maxsize=0)
# 2.添加元素(空間不足時會阻塞)
q.put(item)
# 3.獲取元素(沒有數(shù)據(jù)時會阻塞)
item = q.get()
# 4.狀態(tài)查詢
# 查看元素數(shù)量
q.qsize()
# 判斷是否為空
q.empty()
# 判斷是否已滿
q.full()
示例:
import queue
q = queue.Queue() # 創(chuàng)建 Queue 隊列
for i in range(3):
q.put(i) # 在隊列中依次插入0步责、1、2元素
for i in range(3):
print(q.get()) # 依次從隊列中取出插入的元素慧邮,數(shù)據(jù)元素輸出順序為0、1、2
5.2 線程安全
線程安全是指某個函數(shù)哨免、函數(shù)庫在多線程環(huán)境中被調用時慧耍,能正確處理多個線程之間的共享變量煌珊,使程序功能正常完成
不安全示例:
import time
import threading
class Account:
def __init__(self,balance):
self.balance=balance
def draw(account,amount):
if account.balance>=amount:
time.sleep(0.1)
account.balance-=amount
print("取錢成功蔬浙,當前賬戶余額:",account.balance)
else:
print("取錢失敗,當前賬戶余額不足")
if __name__=="__main__":
account=Account(1000)
t1=threading.Thread(target=draw,args=(account,600))
t2=threading.Thread(target=draw,args=(account,600))
t1.start()
t2.start()
輸出結果:
取錢成功俱病,當前賬戶余額: 400
取錢成功溢吻,當前賬戶余額: -200
解決方式:線程鎖
import threading
# 方式一:with模式
lock=threading.Lock()
with lock:
#do something
# 方式二:try-finally
lock=threading.Lock()
lock.acquire()
try:
#do something
finally:
#執(zhí)行完釋放鎖
lock.release()
加鎖后程序:
import threading
import time
lock=threading.Lock()
class Account:
def __init__(self,balance):
self.balance=balance
def draw(account,amount):
#即使切換線程但是鎖沒釋放依然不能運行
with lock:
if account.balance>=amount:
time.sleep(0.1)
account.balance-=amount
print("取錢成功,當前賬戶余額:",account.balance)
else:
print("取錢失敗,當前賬戶余額不足")
if __name__=="__main__":
account=Account(1000)
t1=threading.Thread(target=draw,args=(account,600))
t2=threading.Thread(target=draw,args=(account,600))
t1.start()
t2.start()
輸出結果:
取錢成功诈胜,當前賬戶余額: 400
取錢失敗,當前賬戶余額不足