聲明:并非標題黨浮定,確實是一件實際的案例,這里只是詳細捋一下自己的分析過程
背景
某日层亿,進行git checkout xxx_branch
時桦卒,總是報出Unlink of file 'logs/Crawler_2019-11-02.log' failed. Should I try again? (y/n)
的錯誤,這是一份多線程爬蟲的日志報告匿又,查看stackoverflow方灾,發(fā)現原因是:有程序在使用這個文件導致不能正確地進行遷移,常規(guī)解決辦法:查看任務管理器碌更,找到占用這個文件的進程裕偿,把它kill,但這又治標不治本痛单,想到了每次出現這個的原因是:爬蟲任務每次都不能正常結束嘿棘,強制結束導致的資源沒有被正確釋放,且該任務為多線程任務桦他,導致出現很多僵尸線程蔫巩。-
問題:
對占用.log文件的程序正確退出并釋放資源,該程序分為兩個階段:- 多線程收集各個具體頁面的URL
- 多線程收集各個具體頁面的contents
表現出來的行為是每次程序都在第二個階段卡住快压,強制結束
-
解決思路:
首先懷疑是
queue.Queue()
的問題圆仔,不放心它的task_done()
與join()
的配合,打印出的信息發(fā)現確實是task_done()
使qsize()
的數目-1蔫劣,第一階段執(zhí)行結束坪郭,qsize()
到0,queue這邊沒問題-
其次懷疑是多線程這邊實現的問題脉幢,使用的
concurrrent.futures
歪沃,相比于threading
這種自己動手豐衣足食的做法,有點不放心嫌松,查看stackoverflow1 stackoverflow2沪曙,這個東西確實也是使用了threading
;甚至懷疑過使用這種多線程的做法是不是把默認的主線程阻塞了萎羔,但試了一下這種想法也太傻了
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
看到以上代碼段的時候來源液走,特別是#stop workers這一段,確信:join()
的作用只是起到barrier的作用,用于同步缘眶,并不能結束子線程的執(zhí)行嘱根,在考慮這種多線程問題的時候,還要設置子線程退出機制巷懈,對于子線程的退出機制:
- 對于
threading
该抒,可以設置daemon=True
,將子線程設置為守護線程顶燕,主線程退出時凑保,子線程自動退出; - 對于
concurrent.futures
, 在stackoverflow1中涌攻,講到關于守護進程的設置愉适,并不能真正的daemon
對于子線程退出的問題,我采用的做法癣漆,是通過判斷queue為空,子線程退出剂买,這適用于“快生產惠爽、慢消費”場景(不同生產消費場景思考)
if self.news_detail_url_queue.empty():
break
-
關鍵
這些并不能解決阻塞問題,后來檢查一部分多線程并發(fā)實現的代碼時發(fā)現:def parallel_do(func, args_list, max_workers=None, mode='thread'): max_workers = thread_max_workers if not max_workers else max_workers exe = cf.ThreadPoolExecutor(max_workers=max_workers) if mode == 'thread' else cf.ProcessPoolExecutor(max_workers=max_workers) with exe as executor: if args_list is None: for t in range(max_workers): executor.submit(func) else: executor.map(func, args_list)
使用with statement
雖然保證了子線程的退出和資源的正確釋放瞬哼,這也是導致整個執(zhí)行過程被阻塞的根源婚肆。 也就是不能使parallel_do
之間實現并發(fā),要想并發(fā)坐慰,可以考慮去除with
statement较性。
-
總結
- 使用多線程時要注意子線程的退出條件,否則出現 zombie thread结胀;
-
concurrent.futures
內部使用的仍然是threading
實現的多線程赞咙; - 使用
queue.join()
要注意對task_done()
的正確調用,否則會阻塞糟港;
更多關于git的內容可參考本人博客:老香椿(https://laoxiangchun.cn/2020/05/30/Git/how-to-use-git/)