【python】多線程:鎖 凶硅、全局鎖、Queue隊列以及線程池


關(guān)于如何使用鎖

【介紹】關(guān)于如何加鎖扫皱,獲取鑰匙足绅,釋放鎖。

  • lock = threading.Lock():生成鎖對象韩脑,全局唯一氢妈;
  • lock.acquire():獲取鎖。未獲取到會阻塞程序段多,直到獲取到鎖才會往下執(zhí)行允懂;
  • lock.release():釋放鎖,歸回后衩匣,其他人也可以調(diào)用;

【注意事項】:lock.acquire() 和 lock.release()必須成對出現(xiàn)粥航,否則就有可能造成死鎖琅捏。

為了規(guī)避這個問題,可以使用使用上下文管理器來加鎖递雀。如下所示:

import threading
lock = threading.Lock()
with lock:
    # 這里寫想要實現(xiàn)的代碼
    pass

【解釋】with 語句會在這個代碼塊執(zhí)行前自動獲取鎖柄延,在執(zhí)行結(jié)束后自動釋放鎖


為何要“上”鎖 ?

import threading
import time

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        mutex.acquire()  # 上鎖
        g_num += 1
        mutex.release()  # 解鎖

    print("---test1---g_num=%d"%g_num)

def test2(num):
    global g_num
    for i in range(num):
        mutex.acquire()  # 上鎖
        g_num += 1
        mutex.release()  # 解鎖

    print("---test2---g_num=%d"%g_num)

# 創(chuàng)建一個互斥鎖
# 默認是未上鎖的狀態(tài)
mutex = threading.Lock()

# 創(chuàng)建2個線程缀程,讓他們各自對g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()

p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()

# 等待計算完成
while len(threading.enumerate()) != 1:
    time.sleep(1)

print("2個線程對同一個全局變量操作之后的最終結(jié)果是:%s" % g_num)

輸出:

---test1---g_num=1909909
---test2---g_num=2000000
2個線程對同一個全局變量操作之后的最終結(jié)果是:2000000

【總結(jié)】入互斥鎖后搜吧,其結(jié)果與預(yù)期相符。


關(guān)于死鎖

【解釋】在線程間共享多個資源的時候杨凑,如果兩個線程分別占有一部分資源并且同時等待對方的資源滤奈,就會造成死鎖。

來看個實例:

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # 對mutexA上鎖
        mutexA.acquire()

        # mutexA上鎖后撩满,延時1秒蜒程,等待另外那個線程 把mutexB上鎖
        print(self.name+'----do1---up----')
        time.sleep(1)

        # 此時會堵塞,因為這個mutexB已經(jīng)被另外的線程搶先上鎖了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()

        # 對mutexA解鎖
        mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        # 對mutexB上鎖
        mutexB.acquire()

        # mutexB上鎖后伺帘,延時1秒昭躺,等待另外那個線程 把mutexA上鎖
        print(self.name+'----do2---up----')
        time.sleep(1)

        # 此時會堵塞,因為這個mutexA已經(jīng)被另外的線程搶先上鎖了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()

        # 對mutexB解鎖
        mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

【重點】標準的鎖對象(threading.Lock)并不關(guān)心當前是哪個線程占有了該鎖伪嫁;如果該鎖已經(jīng)被占有了领炫,那么任何其它嘗試獲取該鎖的線程都會被阻塞,包括已經(jīng)占有該鎖的線程也會被阻塞张咳。

【 獲取鎖和釋放鎖的語句也可以用Python的with來實現(xiàn)】

【知識提升】如有某個線程在兩個函數(shù)調(diào)用之間修改了共享資源帝洪,那么我們最終會得到不一致的數(shù)據(jù)似舵。【最直接的解決辦法】是在這個函數(shù)中也使用lock碟狞。然而啄枕,這是不可行的。里面的兩個訪問函數(shù)將會阻塞族沃,因為外層語句已經(jīng)占有了該鎖频祝。


飽受爭議的GIL(全局鎖)

什么是GIL呢?

【解釋】任何Python線程執(zhí)行前脆淹,必須先獲得GIL鎖常空,然后,每執(zhí)行100條字節(jié)碼盖溺,解釋器就自動釋放GIL鎖漓糙,讓別的線程有機會執(zhí)行。這個GIL全局鎖實際上把所有線程的執(zhí)行代碼都給上了鎖烘嘱,所以昆禽,多線程在Python中只能交替執(zhí)行,即使100個線程跑在100核CPU上蝇庭,也只能用到1個核醉鳖。

GIL執(zhí)行過程
  • 1). 設(shè)置一個GIL;
  • 2). 切換線程去準備執(zhí)行任務(wù)(Runnale就緒狀態(tài))哮内;
  • 3). 運行盗棵;
  • 4). 可能出現(xiàn)的狀態(tài):
    • 線程任務(wù)執(zhí)行結(jié)束;
    • time.sleep()
    • 需要獲取其他的信息才能繼續(xù)執(zhí)行(eg: 讀取文件, 需要從網(wǎng)絡(luò)下載html網(wǎng)頁)

5). 將線程設(shè)置為睡眠狀態(tài);
6). 解GIL的鎖;

【重點】python解釋器中任意時刻都只有一個線程在執(zhí)行;

I/O密集型(input, output):
計算密集型(cpu一直占用):

那么如何避免受到GIL的影響北发?
  • 使用多進程代替多線程纹因。
  • 更換Python解釋器,不使用CPython

Queue隊列

談及多線程琳拨,就不得不說Queue隊列瞭恰,這是從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式。創(chuàng)建一個被多個線程共享的 Queue 對象狱庇,這些線程通過使用put() 和 get() 操作來向隊列中添加或者刪除元素寄疏。

關(guān)于Queue隊列的重要的函數(shù)

from queue import Queue
# maxsize默認為0,不受限
# 一旦>0僵井,而消息數(shù)又達到限制陕截,q.put()也將阻塞
q = Queue(maxsize=0)

# 阻塞程序,等待隊列消息批什。
q.get()

# 獲取消息农曲,設(shè)置超時時間
q.get(timeout=5.0)

# 發(fā)送消息
q.put()

# 等待所有的消息都被消費完
q.join()

# 以下三個方法,知道就好,代碼中不要使用

# 查詢當前隊列的消息個數(shù)
q.qsize()

# 隊列消息是否都被消費完乳规,True/False
q.empty()

# 檢測隊列里消息是否已滿
q.full()

生產(chǎn)者-消費者模型(繼承實現(xiàn))

什么是生產(chǎn)者-消費者模型?

某個模塊專門負責生產(chǎn)+數(shù)據(jù)形葬, 可以認為是生產(chǎn)者;
另外一個模塊負責對生產(chǎn)的數(shù)據(jù)進行處理的, 可以認為是消費者.
在生產(chǎn)者和消費者之間加個緩沖區(qū)(隊列queue實現(xiàn)), 可以認為是商店暮的。

【生產(chǎn)者】 ===》【緩沖區(qū)】 ===》【 消費者】

生產(chǎn)者與消費者概念圖
生產(chǎn)者-消費者模型的優(yōu)點
  • 1). 生產(chǎn)者和消費者的依賴關(guān)系減少笙以,邏輯聯(lián)系少了,簡化代碼;
  • 2). 生產(chǎn)者和消費者是兩個獨立的個體冻辩, 可并發(fā)執(zhí)行;

關(guān)于線程池

在Python3中猖腕,創(chuàng)建線程池是通過concurrent.futures函數(shù)庫中的ThreadPoolExecutor類來實現(xiàn)的。

future對象:在未來的某一時刻完成操作的對象. submit方法可以返回一個future對象.

先看實例:簡單線程池實現(xiàn)

#線程執(zhí)行的函數(shù)
def add(n1,n2):
    v = n1 + n2
    print('add :', v , ', tid:',threading.currentThread().ident)
    time.sleep(n1)
    return v
#通過submit把需要執(zhí)行的函數(shù)扔進線程池中.
#submit 直接返回一個future對象
ex = ThreadPoolExecutor(max_workers=3)      #制定最多運行N個線程
f1 = ex.submit(add,2,3)
f2 = ex.submit(add,2,2)
print('main thread running')
print(f1.done())                            #done 看看任務(wù)結(jié)束了沒
print(f1.result())                          #獲取結(jié)果 ,阻塞方法

簡單線程池實現(xiàn)

import Queue
import threading
import time

'''
這個簡單的例子的想法是通過:
1恨闪、利用Queue特性倘感,在Queue里創(chuàng)建多個線程對象
2、那我執(zhí)行代碼的時候咙咽,去queue里去拿線程老玛!
如果線程池里有可用的,直接拿钧敞。
如果線程池里沒有可用蜡豹,那就等。
3溉苛、線程執(zhí)行完畢余素,歸還給線程池
'''

class ThreadPool(object): #創(chuàng)建線程池類
    def __init__(self,max_thread=20):#構(gòu)造方法,設(shè)置最大的線程數(shù)為20
        self.queue = Queue.Queue(max_thread) #創(chuàng)建一個隊列
        for i in xrange(max_thread):#循環(huán)把線程對象加入到隊列中
            self.queue.put(threading.Thread)
            #把線程的類名放進去炊昆,執(zhí)行完這個Queue

    def get_thread(self):#定義方法從隊列里獲取線程
        return self.queue.get()

    def add_thread(self):#定義方法在隊列里添加線程
        self.queue.put(threading.Thread)

pool = ThreadPool(10)

def func(arg,p):
    print arg
    time.sleep(2)
    p.add_thread() #當前線程執(zhí)行完了,我在隊列里加一個線程威根!

for i in xrange(300):
    thread = pool.get_thread() #線程池10個線程命辖,每一次循環(huán)拿走一個沟饥!默認queue.get(),如果隊列里沒有數(shù)據(jù)就會等待。
    t = thread(target=func,args=(i,pool))
    t.start()


'''
self.queue.put(threading.Thread) 添加的是類不是對象绷柒,在內(nèi)存中如果相同的類只占一份內(nèi)存空間
并且如果這里存儲的是對象的話每次都的新增都得在內(nèi)存中開辟一段內(nèi)存空間

還有如果是對象的話:下面的這個語句就不能這么調(diào)用了!
for i in xrange(300):
    thread = pool.get_thread()
    t = thread(target=func,args=(i,pool))
    t.start()
    通過查看源碼可以知道贸呢,在thread的構(gòu)造函數(shù)中:self.__args = args  self.__target = target  都是私有字段那么調(diào)用就應(yīng)該這么寫

for i in xrange(300):
    ret = pool.get_thread()
    ret._Thread__target = func
    ret._Thread__args = (i,pool)
    ret.start()

【map 方法】
返回值和提交的序列是一致的. 即是有序的奏司。

#下面是map 方法的簡單使用.  
#注意:map 返回是一個生成器 ,并且是有序的
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    print('thread id:',threading.currentThread().ident,' 訪問了:',url)
    #這里使用了requests 模塊
    return requests.get(url)            
ex = ThreadPoolExecutor(max_workers=3)
#內(nèi)部迭代中, 每個url 開啟一個線程
res_iter = ex.map(get_html,URLS)        
for res in res_iter:
    #此時將阻塞 , 直到線程完成或異常                    
    print('url:%s ,len: %d'%(res.url,len(res.text)))

【 as_completed】
用于解決submit什么時候完成,避免一次次調(diào)用future.done 或者是使用 future.result 谎砾。

concurrent.futures.as_completed(fs, timeout=None):返回一個生成器,在迭代過程中會阻塞逢倍。

【關(guān)聯(lián)】map方法返回是有序的, as_completed 是那個線程先完成/失敗 就返回。
【舉個栗子】

#as_completed 返回一個生成器景图,用于迭代较雕, 一旦一個線程完成(或失敗) 就返回
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    time.sleep(1)
    print('thread id:',threading.currentThread().ident,' 訪問了:',url)
    return requests.get(url)            #這里使用了requests 模塊
ex = ThreadPoolExecutor(max_workers=3)   #最多3個線程
future_tasks = [ex.submit(get_html,url) for url in URLS]    #創(chuàng)建3個future對象
for future in as_completed(future_tasks):       #迭代生成器
    try:
        resp = future.result()
    except Exception as e:
        print('%s'%e)
    else:
        print('%s has %d bytes!'%(resp.url, len(resp.text)))

輸出:

"""
thread id: 5160  訪問了: http://www.baidu.com
thread id: 7752  訪問了: http://www.sina.com.cn
thread id: 5928  訪問了: http://www.qq.com
http://www.qq.com/ has 240668 bytes!
http://www.baidu.com/ has 2381 bytes!
https://www.sina.com.cn/ has 577244 bytes!
"""

【強調(diào)】關(guān)于回調(diào)函數(shù)add_done_callback(fn)

回調(diào)函數(shù)是在調(diào)用線程完成后再調(diào)用的,在同一個線程中.

import os,sys,time,requests,threading
from concurrent import futures


URLS = [
        'http://baidu.com',
        'http://www.qq.com',
        'http://www.sina.com.cn'
        ]

def load_url(url):
    print('tid:',threading.currentThread().ident,',url:',url)
    with requests.get(url) as resp:
        return resp.content
def call_back(obj):
    print('->>>>>>>>>call_back , tid:',threading.currentThread().ident, ',obj:',obj)

with futures.ThreadPoolExecutor(max_workers=3) as ex:
    # mp = {ex.submit(load_url,url) : url for url in URLS}
    mp = dict()
    for url in URLS:
        f = ex.submit(load_url,url)
        mp[f] = url
        f.add_done_callback(call_back)
    for f in futures.as_completed(mp):
        url = mp[f]
        try:
            data = f.result()
        except Exception as exc:
            print(exc, ',url:',url)
        else:
            print('url:', url, ',len:',len(data),',data[:20]:',data[:20])
"""
tid: 7128 ,url: http://baidu.com
tid: 7892 ,url: http://www.qq.com
tid: 3712 ,url: http://www.sina.com.cn
->>>>>>>>>call_back , tid: 7892 ,obj: <Future at 0x2dd64b0 state=finished returned bytes>
url: http://www.qq.com ,len: 251215 ,data[:20]: b'<!DOCTYPE html>\n<htm'
->>>>>>>>>call_back , tid: 3712 ,obj: <Future at 0x2de07b0 state=finished returned bytes>
url: http://www.sina.com.cn ,len: 577333 ,data[:20]: b'<!DOCTYPE html>\n<!--'
->>>>>>>>>call_back , tid: 7128 ,obj: <Future at 0x2d533d0 state=finished returned bytes>
url: http://baidu.com ,len: 81 ,data[:20]: b'<html>\n<meta http-eq'
"""

最后的最后,來兩個栗子總結(jié)一下關(guān)于多線程的應(yīng)用。

  • 【多線程實現(xiàn)文件復(fù)制】
import concurrent.futures as fu
import os

ex_pools = fu.ThreadPoolExecutor(max_workers = 3)

def copy(org_file,dest_file):
    """
    復(fù)制文件
    """
    print("開始從%s復(fù)制文件到%s" % (org_file,dest_file))
    with open(org_file,'rb+') as f:
        content = f.read()

    with open(dest_file,'wb+') as f:
        f.write(content)
    print("從%s復(fù)制文件到%s亮蒋,完成扣典!" % (org_file, dest_file))


def copy_dir(base,dest):
    """
    復(fù)制目錄
    """
    if not os.path.exists(dest):
        print("創(chuàng)建文件夾:%s" %dest)
        os.mkdir(dest)

    org_dir_files = os.listdir(base)
    for file_name in org_dir_files:
        file = os.path.join(base,file_name)
        dest_file = os.path.join(dest,file_name)

        if os.path.isfile(file):
            ex_pools.submit(copy,file,dest_file)

        if os.path.isdir(file):
            ex_pools.submit(copy_dir, file, dest_file)

# 要復(fù)制的目標文件路徑
base = r"C:\Users\42072\Desktop\python"
# 復(fù)制到該文件路徑
dest = r"C:\Users\42072\Desktop\python123"
copy_dir(base,dest)
  • 【實現(xiàn)網(wǎng)絡(luò)上圖片的下載】
    (初衷想下音樂的,不過好像都收費了慎玖,還是尊重版權(quán)吧)

import requests
import os
import random
import concurrent.futures as futures

def download_img(url):
    resp = requests.get(url)
    filename = os.path.split(url)[1] # 獲取文件名
    with open(filename,'wb+') as f:
        f.write(resp.content)
    num = random.randint(2,5)
    print(filename + "generate:",num)
    time.sleep(num)
    return filename

urls = ["http://pic27.nipic.com/20130320/8952533_092547846000_2.jpg",
        "http://pic19.nipic.com/20120212/9337475_104548381000_2.jpg",]

ex = futures.ThreadPoolExecutor(max_workers = 3)
res_iter = ex.map(download_img,urls)
type(res_iter)
for res in res_iter:
    print(res)

def cf(rs):
    print(rs.result())

# [ex.submit(download_img,url).add_done_callback(cf) for url in urls]
# for future in futures.as_completed(fu_tasks):
for url in urls:
    f = ex.submit(download_img,url)
    f.add_done_callback(cf)

【輸出】:

9337475_104548381000_2.jpggenerate: 3
8952533_092547846000_2.jpggenerate: 3
8952533_092547846000_2.jpg
9337475_104548381000_2.jpg
9337475_104548381000_2.jpggenerate: 3
8952533_092547846000_2.jpggenerate: 4
9337475_104548381000_2.jpg
8952533_092547846000_2.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贮尖,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子趁怔,更是在濱河造成了極大的恐慌湿硝,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痕钢,死亡現(xiàn)場離奇詭異图柏,居然都是意外死亡,警方通過查閱死者的電腦和手機任连,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門蚤吹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人随抠,你說我怎么就攤上這事裁着。” “怎么了拱她?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵二驰,是天一觀的道長。 經(jīng)常有香客問我秉沼,道長桶雀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任唬复,我火速辦了婚禮矗积,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘敞咧。我一直安慰自己棘捣,他們只是感情好,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布休建。 她就那樣靜靜地躺著乍恐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪测砂。 梳的紋絲不亂的頭發(fā)上茵烈,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天,我揣著相機與錄音砌些,去河邊找鬼瞧毙。 笑死,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的宙彪。 我是一名探鬼主播矩动,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼释漆!你這毒婦竟也來了悲没?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤男图,失蹤者是張志新(化名)和其女友劉穎示姿,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逊笆,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡栈戳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了难裆。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片子檀。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖乃戈,靈堂內(nèi)的尸體忽然破棺而出褂痰,到底是詐尸還是另有隱情,我是刑警寧澤症虑,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布缩歪,位于F島的核電站,受9級特大地震影響谍憔,放射性物質(zhì)發(fā)生泄漏匪蝙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一习贫、第九天 我趴在偏房一處隱蔽的房頂上張望逛球。 院中可真熱鬧,春花似錦沈条、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至涕烧,卻和暖如春月而,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背议纯。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工父款, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓憨攒,卻偏偏與公主長得像世杀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子肝集,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344