這是一篇學(xué)習(xí)Python 線程相關(guān)的內(nèi)容竭翠,記錄一下以備復(fù)習(xí)和開發(fā)使用,技術(shù)有限薇搁,如有問題歡迎指出斋扰,多謝。
一.GIL 全局解釋器鎖(cpython)
1.為什么會(huì)有這個(gè)鎖:為了線程安全啃洋,減少python使用者的上手難度
GIL 使得同一個(gè)時(shí)刻只有一個(gè)線程在一個(gè)cpu上執(zhí)行字節(jié)碼传货,無(wú)法隱射到多個(gè)cpu,多核上執(zhí)行宏娄。
2.特殊情況下會(huì)釋放GIL:達(dá)到特定字節(jié)碼行數(shù)问裕、到底特定數(shù)目時(shí)間片、IO操作(主動(dòng))
二:并發(fā)和并行的區(qū)別
- 并發(fā):描述程序的組織結(jié)構(gòu)绝编,指程序要被設(shè)計(jì)成多個(gè)可獨(dú)立執(zhí)行的子任務(wù)
- 并行:描述程序的執(zhí)行狀態(tài)僻澎,指多任務(wù)需要同時(shí)執(zhí)行
三:守護(hù)線程&線程阻塞
- 守護(hù)線程:
thread.setDaemon(true)
,當(dāng)主程序退出的時(shí)候讓子程序也一并退出 - 子線程阻塞:
thread.join()
十饥,當(dāng)子程序都結(jié)束后主程序再退出
四:多線程的寫法
- 實(shí)例化Threading窟勃,調(diào)用Threading的方法去進(jìn)行多線程編程
- 寫子類繼承Theading,重寫相應(yīng)的方法
說(shuō)明:當(dāng)程序簡(jiǎn)單時(shí)可使用實(shí)例化方法逗堵,當(dāng)程序較復(fù)雜的時(shí)候秉氧,實(shí)現(xiàn)邏輯較多,第二種方法蜒秤。
五:線程間通信
- 1.共享變量:
方法簡(jiǎn)單汁咏,也可以寫入到單獨(dú)的py文件中。問題:線程不安全作媚,易出問題攘滩。 - 2.queue 隊(duì)列:
使用queue 的 Queue,這個(gè)是線程安全的纸泡,多線程取數(shù)據(jù)不會(huì)出錯(cuò)漂问。
內(nèi)部使用的是deque Python 的雙端隊(duì)列,在字節(jié)碼的層面上就已經(jīng)到達(dá)了線程安全女揭。
q = Queue()
# 方法:
q.put() # 放入數(shù)據(jù)
q.get() # 取出數(shù)據(jù)
q.put_nowait() # 放入數(shù)據(jù)蚤假,不用等待它完成再返回,異步的方法
q.get_nowait() # 取出數(shù)據(jù)吧兔,不用等待它完成再返回磷仰,異步的方法
get() put()
,可以設(shè)置是否阻塞的境蔼,默認(rèn)是阻塞的
q.join()
方法:
只有q.task_done()調(diào)用了join()才會(huì)讓主線程退出灶平,成對(duì)使用伺通。
六:線程同步
- Lock 鎖
lock= Theading.Lock()
# 獲取鎖:
lock.acquire()
lock.release()
# 另一種方法:
with lock:
# do something
加鎖的代碼段同時(shí)只有這一個(gè)代碼段在執(zhí)行,方式數(shù)據(jù)出問題民逼。
缺點(diǎn):1.用鎖會(huì)影響性能 2. 可能引起死鎖
死鎖情況:1.有acquire 沒有release 2. 相互等待
- RLock 可重入鎖
當(dāng)在一個(gè)線程中多個(gè)地方需要加鎖的時(shí)候用Lock 是不行的泵殴,需要用到RLock ,但是要注意的是獲取和釋放鎖的數(shù)量要一致拼苍,成對(duì)出現(xiàn)笑诅。
- Condition 條件變量
用于復(fù)雜的線程間同步,是一個(gè)同步鎖疮鲫。例如:先后順序的多線程通信吆你。
重點(diǎn)函數(shù):wait() notify()
con = theading.Condition()
with con:
# do something
cond.notify() #通知其他線程
cond.wait() # 等待其他線程通知
# do something
注意:
1.先con.acquire()
或者with con
,獲取condition鎖俊犯,不然wait() notify()
不能用
2.Condition 有兩把鎖:一把底層鎖會(huì)在線程調(diào)用了wait() 的時(shí)候釋放妇多,上面的鎖會(huì)在每次調(diào)用wait的時(shí)候分配一把并放入condition的等待隊(duì)列中,等到notify()的喚醒
- Semaphore 信號(hào)量
用于控制某段代碼進(jìn)入線程的數(shù)量燕侠,比如控制爬蟲的并發(fā)量者祖。
import threading
import time
class HtmlSppier(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.sem = sem
self.url = url
def run(self):
time.sleep(2)
print('download html success')
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self,sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSppier(f'http://www.qq.com/pn={i}',self.sem)
html_thread.start()
if __name__ == '__main__':
sem = threading.Semaphore(3)
url_produce = UrlProducer(sem)
url_produce.start()
七:線程池
為什么要使用線程池?
主線程中可以獲取某一個(gè)線程的狀態(tài)或者某一個(gè)的任務(wù)的狀態(tài)以及返回值
當(dāng)一個(gè)線程完成的時(shí)候主線程能立即知道
import requests
def download_html(i):
url = f'https://www.baidu.com/s?ie=UTF-8&wd={i}'
response = requests.get(url).text
print(response)
ids = list(range(100))
# 線程池方式一:
import threadpool
def thread_main(item):
pool = threadpool.ThreadPool(30)
tasks = threadpool.makeRequests(download_html, ids)
[pool.putRequest(req) for req in tasks]
pool.wait()
# 線程池方式二:
from multiprocessing.dummy import Pool as thpool
def thread_pool(item):
pool = thpool(20)
pool.map(download_html, ids)
pool.close()
pool.join()
# 線程池方式三(推薦):
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=8) as exe:
exe.map(download_html,ids)
推薦使用 concurrent.futures 模塊七问,線程池和進(jìn)程池的接口很相似械巡,方便使用讥耗。
ThreadPoolExecutor 其他方法使用:
# 其他接口使用:
from concurrent.futures import ThreadPoolExecutor, as_completed,wait
executor = ThreadPoolExecutor(max_workers=8)
# 通過 submit 提交執(zhí)行的函數(shù)到線程中
task1 = executor.submit(download_html, (1))
task2 = executor.submit(download_html, (3))
# done() 判斷 task 是否完成
print(task1.done())
time.sleep(4)
print(task1.done())
# result() 獲取 task 的執(zhí)行結(jié)果 阻塞
print(task1.result())
# cancel() 取消任務(wù)喊崖,如果任務(wù)在執(zhí)行中或者執(zhí)行完了是不能取消的
# 現(xiàn)在線程池是8 兩個(gè)任務(wù)都會(huì)被提交任務(wù)去執(zhí)行贷祈,如果 max_workers = 1,執(zhí)行task2.cancel()就會(huì)成功取消
print(task2.cancel())
# as_completed() 獲取已經(jīng)成功的task的返回?cái)?shù)據(jù)粟耻,阻塞
# as_completed實(shí)際上是一個(gè)生成器挤忙,里面有 yield 會(huì)把已經(jīng)完成的 future (task) 返回結(jié)果
ids = list(range(10))
all_task = [executor.submit(download_html,(i)) for i in ids]
time.sleep(8)
# 這是異步的戈泼,誰(shuí)完成就處理誰(shuí)
for future in as_completed(all_task):
data = future.result()
print(f'html response {data}')
# 通過 executor 獲取已經(jīng)完成的task
for data in executor.map(download_html,ids):
print(f'html response {data}')
# wait() 等待task完成
ids = list(range(10))
all_task = [executor.submit(download_html,(i)) for i in ids]
# wait 的 return_when 可選項(xiàng)
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
_AS_COMPLETED = '_AS_COMPLETED'
wait(all_task, return_when=ALL_COMPLETED)
八:總結(jié)
Python 多線程首選concurrent.futures 中的 ThreadPoolExecutor淀零,使用簡(jiǎn)單方便驾中,而且切換多進(jìn)程也是很快速的唠亚,后面繼續(xù)記錄多進(jìn)程方面的知識(shí)點(diǎn)。
代碼位置:github.com/rieuse/learnPython