一豁跑、程序結(jié)構(gòu)
既然要使用多線程,那么關(guān)于多線程的使用的模型我們也要了解一下泻云。
許多新手在寫多線程的代碼時(shí)總是喜歡把代碼一股腦全部塞在一個(gè)類中艇拍。
這樣的寫法其實(shí)是對多線程的錯(cuò)誤使用
首先就程序設(shè)計(jì)來說狐蜕,這樣不符合模塊化的設(shè)計(jì)
其次就是這樣的代碼往往會(huì)有很嚴(yán)重的競爭問題,需要很多的資源鎖來保證線程安全卸夕,這樣就拉低了程序執(zhí)行的速度层释。
實(shí)際上,多線程往往是和生產(chǎn)—消費(fèi)模型掛鉤的快集,以我們的簡書文章信息爬蟲為例贡羔,它的多線程結(jié)構(gòu)示意圖如下:
對于這個(gè)結(jié)構(gòu),第一個(gè)線程池里是生產(chǎn) uid的線程个初,這些線程把生產(chǎn)出來的線程放入 uids queue隊(duì)列中乖寒。
第二個(gè)線程池里的線程就通過 uids queue來獲取 uid,他們就是 uid producer的消費(fèi)者院溺。
同時(shí)楣嘁,他們也是 data的生產(chǎn)者。
第二個(gè)線程池與第三個(gè)線程池的交互和前面兩個(gè)線程池的交互類似珍逸,如下圖:
中間的 uids queue是一個(gè)先入先出而且線程安全的隊(duì)列逐虚。
使用生產(chǎn)—消費(fèi)者模型后,我們還需要的就是一個(gè)線程安全的 FIFO隊(duì)列谆膳,和恰當(dāng)?shù)纳a(chǎn)者與消費(fèi)者比例(以生產(chǎn)者的產(chǎn)出剛好被消費(fèi)者消費(fèi)完為最佳)
二痊班、代碼實(shí)現(xiàn)
首先我們先把原來的模塊封裝到一個(gè)單獨(dú)的文件里去,這樣更加方便調(diào)用
文件 GitHub:jianshu_models.py
文件里面我們需要用到的有:
- simplifiedCsv 類摹量,將數(shù)據(jù)寫入文件
- userUidsGenerator userUid生成器
- getArticleInfo 獲取文章信息
然后就是我們的生產(chǎn)者和消費(fèi)者類:
這里我們使用 python自帶的 queue模塊里的 Queue隊(duì)列
Queue隊(duì)列有 put和 get兩個(gè)方法,這兩個(gè)方法都接受一個(gè)整數(shù)作為最大隊(duì)列長度馒胆。
當(dāng)隊(duì)列長度達(dá)到最大隊(duì)列長度時(shí)缨称,put方法就會(huì)阻塞,直到隊(duì)列長度再次小于最大隊(duì)列長度祝迂。
按從左到右的順序睦尽,第一個(gè)類是 uid生產(chǎn)線程類:
class UidGenerateThread(threading.Thread):
def __init__(self, uid_queue):
threading.Thread.__init__(self)
self.uid_queue = uid_queue
def run(self):
start_users = [{'uid': 'a3ea268aeb60', 'follow_num': 525, 'fans_num': 2521, 'article_num': 118}]
user_generator = jianshu_models.userUidsGenerator(start_users)
while True:
user = user_generator.__next__()
self.uid_queue.put(user)
UidGenerateThread類接受一個(gè) uid_queue隊(duì)列作為初始化參數(shù)。
在 run方法中型雳,我們先獲取一個(gè) uid生成器当凡,然后無限調(diào)用生成器的 __next__()
方法,并將獲得的結(jié)果通過 uid_queue的 put方法放到 uid_queue隊(duì)列里去纠俭。
第二個(gè)類是沿量,uid消費(fèi)者類同時(shí)也是 data生產(chǎn)者類:
class DataCollectorThread(threading.Thread):
def __init__(self, uid_queue, data_queue):
threading.Thread.__init__(self)
self.uid_queue = uid_queue
self.data_queue = data_queue
def run(self):
while True:
user = self.uid_queue.get()
datas = jianshu_models.getArticleInfo(user)
self.data_queue.put(datas)
DataCollectorThread類接受兩個(gè)隊(duì)列 uid_queue和 data_queue作為初始化參數(shù)。
run方法里冤荆,我們先通過 uid_queue的 get方法獲取 userUid朴则,然后把返回結(jié)果作為參數(shù)傳遞給 getArticleInfo來獲取對應(yīng)用戶的文章信息。
獲取到文章信息后钓简,我們再把文章信息放到 data_queue里
第三個(gè)類是 data消費(fèi)者類乌妒,作用是將 data寫入 csv文件:
class DataWriterThread(threading.Thread):
def __init__(self, data_queue):
threading.Thread.__init__(self)
self.data_queue = data_queue
self.writer = jianshu_models.simplifiedCsv(f'test/{self.name}.txt')
def run(self):
while True:
data_list = self.data_queue.get()
self.writer.writerows(data_list)
與上面兩個(gè)類相似汹想,DataWriterThread類也接受一個(gè) data_queue作為初始化參數(shù)。
在 run方法中不斷從 adta_queue里取出數(shù)據(jù)寫入到文件里撤蚊。
這里為了避免使用資源鎖古掏,我們讓每個(gè)線程都有一個(gè) simplifiedCsv類,將數(shù)據(jù)寫入不同的文件中侦啸。
當(dāng)爬取完成后再將數(shù)據(jù)整合到一個(gè)文件中去槽唾。
根據(jù)不同線程的生產(chǎn)和消費(fèi)能力,在程序中我們使用 1個(gè) uid生產(chǎn)線程(而且只能使用一個(gè))匹中,10個(gè)DataCollectorThread和 10個(gè)DataWriterThread夏漱。
并且設(shè)置 uid_queue的長度為 100,data_queue的長度為 50.
threads = []
uid_queue = queue.Queue(100)
data_queue = queue.Queue(50)
t1 = UidGenerateThread(uid_queue)
threads.append(t1)
for i in range(10):
t1 = DataWriterThread(data_queue)
t2 = DataCollectorThread(uid_queue, data_queue)
threads.append(t1)
threads.append(t2)
for t in threads:
t.start()
for t in threads:
t.join()
運(yùn)行十分鐘看看:
十分鐘爬取了 72920條數(shù)據(jù)顶捷,再看看單線程的:
單線程十分鐘爬取了 7823條挂绰。
多線程是單線程的 9倍多一點(diǎn),不過如果我們增加 DataCollectorThread和
DataWriterThread的數(shù)量服赎,速度還可以更快葵蒂。
當(dāng) cpu的利用率達(dá)到 80%時(shí)可以達(dá)到 30倍的速度。
這次代碼版本為:v2.0
代碼在 GitHub上的鏈接:project_mulitiple_threads_version
大家可能覺得現(xiàn)在已經(jīng)很快了重虑,但這還不是最快的方式践付,比多線程更快更節(jié)省資源的是------>協(xié)程,也被稱作異步
下一篇就讓我們來講一講異步
覺得我寫的不錯(cuò)的話缺厉,記得關(guān)注永高、點(diǎn)贊、評論(提针。???)ノ