Python爬蟲(四)--多線程


Python-Socket網(wǎng)絡編程

1. thread模塊


  • python是支持多線程的, 主要是通過thread和threading這兩個模塊來實現(xiàn)的筷笨。
  • python的thread模塊是比較底層的模塊(或者說輕量級)疙筹,python的threading模塊是對thread做了一些包裝的吉殃,可以更加方便的被使用泉粉。

簡要的看一下thread模塊中含函數(shù)和常量

import thread

thread.LockType  #鎖對象的一種, 用于線程的同步
thread.error  #線程的異常

thread.start_new_thread(function, args[, kwargs])  #創(chuàng)建一個新的線程
function : 線程執(zhí)行函數(shù)
args : 線程執(zhí)行函數(shù)的參數(shù), 類似為tuple,
kwargs : 是一個字典
返回值: 返回線程的標識符

thread.exit()  #線程退出函數(shù)
thread.allocate_lock()  #生成一個未鎖狀態(tài)的鎖對象
返回值: 返回一個鎖對象

鎖對象的方法

lock.acquire([waitflag]) #獲取鎖
無參數(shù)時, 無條件獲取鎖, 無法獲取時, 會被阻塞, 知道可以鎖被釋放
有參數(shù)時, waitflag = 0 時,表示只有在不需要等待的情況下才獲取鎖, 非零情況與上面相同
返回值 : 獲得鎖成功返回True, 獲得鎖失敗返回False

lock.release() #釋放鎖

lock.locked() #獲取當前鎖的狀態(tài)
返回值 : 如果鎖已經(jīng)被某個線程獲取,返回True, 否則為False

1.1. thread多線程

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import thread
import time

def print_time(thread_name, delay) :
    count = 0
    while count < 5 :
        time.sleep(delay)
        count += 1
        print "%s : %s" % (thread_name, time.ctime(time.time()))

try :
    thread.start_new_thread(print_time, ("Thread-1", 2, ))
    thread.start_new_thread(print_time, ("Thread-2", 4, ))
except : 
    print "Error: unable to start the thread"

while True :
    pass

2. threading模塊


python的threading模塊是對thread做了一些包裝的护盈,可以更加方便的被使用灵莲。經(jīng)常和Queue結(jié)合使用,Queue模塊中提供了同步的路克、線程安全的隊列類潮秘,包括FIFO(先入先出)隊列Queue琼开,LIFO(后入先出)隊列LifoQueue,和優(yōu)先級隊列PriorityQueue枕荞。這些隊列都實現(xiàn)了鎖原語柜候,能夠在多線程中直接使用□锞可以使用隊列來實現(xiàn)線程間的同步

2.1. 常用函數(shù)和對象

#函數(shù)
threading.active_count()  #返回當前線程對象Thread的個數(shù)
threading.enumerate()  #返回當前運行的線程對象Thread(包括后臺的)的list
threading.Condition()  #返回條件變量對象的工廠函數(shù), 主要用戶線程的并發(fā)
threading.current_thread()  #返回當前的線程對象Thread, 文檔后面解釋沒看懂
threading.Lock()  #返回一個新的鎖對象, 是在thread模塊的基礎上實現(xiàn)的 與acquire()和release()結(jié)合使用

#類
threading.Thread  #一個表示線程控制的類, 這個類常被繼承
thraeding.Timer  #定時器,線程在一定時間后執(zhí)行
threading.ThreadError  #引發(fā)中各種線程相關異常

2.1.1. Thread對象

一般來說渣刷,使用線程有兩種模式, 一種是創(chuàng)建線程要執(zhí)行的函數(shù), 把這個函數(shù)傳遞進Thread對象里,讓它來執(zhí)行. 另一種是直接從Thread繼承矗烛,創(chuàng)建一個新的class辅柴,把線程執(zhí)行的代碼放到這個新的class里。

常用兩種方式運行線程(線程中包含name屬性) :

  • 在構(gòu)造函數(shù)中傳入用于線程運行的函數(shù)(這種方式更加靈活)
  • 在子類中重寫threading.Thread基類中run()方法(只重寫__init__()和run()方法)

創(chuàng)建線程對象后, 通過調(diào)用start()函數(shù)運行線程, 然后會自動調(diào)用run()方法.

通過設置`daemon`屬性, 可以將線程設置為守護線程

threading.Thread(group = None, target = None, name = None, args = () kwars = {})
group : 應該為None
target : 可以傳入一個函數(shù)用于run()方法調(diào)用,
name : 線程名 默認使用"Thread-N"
args : 元組, 表示傳入target函數(shù)的參數(shù)
kwargs : 字典, 傳入target函數(shù)中關鍵字參數(shù)

屬性:
name  #線程表示, 沒有任何語義
doemon  #布爾值, 如果是守護線程為True, 不是為False, 主線程不是守護線程, 默認threading.Thread.damon = False

類方法: 
run()  #用以表示線程活動的方法瞭吃。
start()  #啟動線程活動碌嘀。
join([time])  #等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時發(fā)生歪架。
isAlive(): 返回線程是否活動的股冗。
getName(): 返回線程名。
setName(): 設置線程名牡拇。

范例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def test_thread(count) :
    while count > 0 :
        print "count = %d" % count
        count = count - 1
        time.sleep(1)

def main() :
    my_thread = threading.Thread(target = test_thread, args = (10, ))
    my_thread.start()
    my_thread.join()

if __name__ == '__main__':
    main()

2.2. 常用多線程寫法

  • 固定線程運行的函數(shù)
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading, thread
import time


class MyThread(threading.Thread):
    """docstring for MyThread"""

    def __init__(self, thread_id, name, counter) :
        super(MyThread, self).__init__()  #調(diào)用父類的構(gòu)造函數(shù) 
        self.thread_id = thread_id
        self.name = name
        self.counter = counter

    def run(self) :
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name

def print_time(thread_name, delay, counter) :
    while counter :
        time.sleep(delay)
        print "%s %s" % (thread_name, time.ctime(time.time()))
        counter -= 1

def main():
    #創(chuàng)建新的線程
    thread1 = MyThread(1, "Thread-1", 1)
    thread2 = MyThread(2, "Thread-2", 2)

    #開啟線程
    thread1.start()
    thread2.start()


    thread1.join()
    thread2.join()
    print "Exiting Main Thread"

if __name__ == '__main__':
    main()
  • 外部傳入線程運行的函數(shù)
#/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
import time

class MyThread(threading.Thread):
    """
    屬性:
    target: 傳入外部函數(shù), 用戶線程調(diào)用
    args: 函數(shù)參數(shù)
    """
    def __init__(self, target, args):
        super(MyThread, self).__init__()  #調(diào)用父類的構(gòu)造函數(shù) 
        self.target = target
        self.args = args

    def run(self) :
        self.target(self.args)

def print_time(counter) :
    while counter :
        print "counter = %d" % counter
        counter -= 1
        time.sleep(1)

def main() :
    my_thread = MyThread(print_time, 10)
    my_thread.start()
    my_thread.join()

if __name__ == '__main__':
    main()

2.3. 生產(chǎn)者消費者問題

試著用python寫了一個生產(chǎn)者消費者問題(偽生產(chǎn)者消費者), 只是使用簡單的鎖, 感覺有點不太對, 下面另一個程序會寫出正確的生產(chǎn)者消費者問題

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import thread, threading
import urllib2
import time, random
import Queue

share_queue = Queue.Queue()  #共享隊列
my_lock = thread.allocate_lock()
class Producer(threading.Thread) :

    def run(self) :
        products = range(5)
        global share_queue
        while True :
            num = random.choice(products)
            my_lock.acquire()
            share_queue.put(num)
            print  "Produce : ", num
            my_lock.release()
            time.sleep(random.random())

class Consumer(threading.Thread) :

    def run(self) :
        global share_queue
        while True:
            my_lock.acquire()
            if share_queue.empty() : #這里沒有使用信號量機制進行阻塞等待, 
                print "Queue is Empty..."  
                my_lock.release()
                time.sleep(random.random())
                continue
            num = share_queue.get()
            print "Consumer : ", num
            my_lock.release()
            time.sleep(random.random())

def main() :
    producer = Producer()
    consumer = Consumer()
    producer.start()
    consumer.start()

if __name__ == '__main__':
    main()

殺死多線程程序方法: 使用control + z掛起程序(程序依然在后臺, 可以使用ps aux查看), 獲得程序的進程號, 然后使用kill -9 進程號殺死進程

參考一篇帖子解決了上述問題,重寫了生產(chǎn)者消費者問題程序, 參考鏈接慣例放在最后.

使用了wait()和notify()解決

當然最簡答的方法是直接使用Queue,Queue封裝了Condition的行為, 如wait(), notify(), acquire(), 沒看文檔就這樣, 使用了Queue竟然不知道封裝了這些函數(shù), 繼續(xù)滾去看文檔了

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import random, time, Queue

MAX_SIZE = 5
SHARE_Q = []  #模擬共享隊列
CONDITION = threading.Condition()

class Producer(threading.Thread) :

    def run(self) :
        products = range(5)
        global SHARE_Q
        while True :
            CONDITION.acquire()
            if len(SHARE_Q) == 5 :
                print "Queue is full.."
                CONDITION.wait()
                print "Consumer have comsumed something"
            product = random.choice(products)
            SHARE_Q.append(product)
            print "Producer : ", product
            CONDITION.notify()
            CONDITION.release()
            time.sleep(random.random())

class Consumer(threading.Thread) :

    def run(self) :
        global SHARE_Q
        while True:
            CONDITION.acquire()
            if not SHARE_Q :
                print "Queue is Empty..."
                CONDITION.wait()
                print "Producer have producted something"
            product = SHARE_Q.pop(0)
            print "Consumer :", product
            CONDITION.notify()
            CONDITION.release()
            time.sleep(random.random())

def main() :
    producer = Producer()
    consumer = Consumer()
    producer.start()
    consumer.start()

if __name__ == '__main__':
    main()

2.4.簡單鎖

如果只是簡單的加鎖解鎖可以直接使用threading.Lock()生成鎖對象, 然后使用acquire()和release()方法

例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*- 

import threading
import time

class MyThread(threading.Thread) :
    
    def __init__(self, thread_id, name, counter) :
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name
        self.counter = counter

    def run(self) :
        #重寫run方法, 添加線程執(zhí)行邏輯, start函數(shù)運行會自動執(zhí)行
        print  "Starting " + self.name
        threadLock.acquire() #獲取所
        print_time(self.name, self.counter, 3)
        threadLock.release() #釋放鎖

def print_time(thread_name, delay, counter) :
    while counter :
        time.sleep(delay)
        print "%s %s" % (thread_name, time.ctime(time.time()))
        counter -= 1

threadLock = threading.Lock()
threads = [] #存放線程對象

thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)

#開啟線程
thread1.start()
thread2.start()

for t in threads :
    t.join()  #等待線程直到終止
print "Exiting Main Thread"

2.5. Condition

如果是向生產(chǎn)者消費者類似的情形, 使用Condition類 或者直接使用Queue模塊

Condition

條件變量中有acquire()和release方法用來調(diào)用鎖的方法, 有wait(), notify(), notifyAll()方法, 后面是三個方法必須在獲取鎖的情況下調(diào)用, 否則產(chǎn)生RuntimeError錯誤.

  • 當一個線程獲得鎖后, 發(fā)現(xiàn)沒有期望的資源或者狀態(tài), 就會調(diào)用wait()阻塞, 并釋放已經(jīng)獲得鎖, 知道期望的資源或者狀態(tài)發(fā)生改變
  • 當一個線程獲得鎖, 改變了資源或者狀態(tài), 就會調(diào)用notify()和notifyAll()去通知其他線程,
#官方文檔中提供的生產(chǎn)者消費者模型
# Consume one item
cv.acquire()
while not an_item_is_available():
    cv.wait()
get_an_available_item()
cv.release()

# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
#threading.Condition類
thread.Condition([lock])
可選參數(shù)lock: 必須是Lock或者RLock對象, 并被作為underlying鎖(悲觀鎖?), 否則, 會創(chuàng)建一個新的RLock對象作為underlying鎖

類方法:
acquire()  #獲得鎖
release()  #釋放鎖
wait([timeout])  #持續(xù)等待直到被notify()或者notifyAll()通知或者超時(必須先獲得鎖),
#wait()所做操作, 先釋放獲得的鎖, 然后阻塞, 知道被notify或者notifyAll喚醒或者超時, 一旦被喚醒或者超時, 會重新獲取鎖(應該說搶鎖), 然后返回
notify()  #喚醒一個wait()阻塞的線程.
notify_all()或者notifyAll()  #喚醒所有阻塞的線程

參考程序可以查看上面的生產(chǎn)者消費者程序

3. 參考鏈接


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末魁瞪,一起剝皮案震驚了整個濱河市穆律,隨后出現(xiàn)的幾起案子惠呼,更是在濱河造成了極大的恐慌导俘,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件剔蹋,死亡現(xiàn)場離奇詭異旅薄,居然都是意外死亡,警方通過查閱死者的電腦和手機泣崩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門少梁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人矫付,你說我怎么就攤上這事凯沪。” “怎么了买优?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵妨马,是天一觀的道長。 經(jīng)常有香客問我杀赢,道長烘跺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任脂崔,我火速辦了婚禮滤淳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘砌左。我一直安慰自己脖咐,他們只是感情好,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布汇歹。 她就那樣靜靜地躺著文搂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪秤朗。 梳的紋絲不亂的頭發(fā)上煤蹭,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天,我揣著相機與錄音取视,去河邊找鬼硝皂。 笑死,一個胖子當著我的面吹牛作谭,可吹牛的內(nèi)容都是我干的稽物。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼折欠,長吁一口氣:“原來是場噩夢啊……” “哼贝或!你這毒婦竟也來了吼过?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤咪奖,失蹤者是張志新(化名)和其女友劉穎盗忱,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體羊赵,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡趟佃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了昧捷。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闲昭。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖靡挥,靈堂內(nèi)的尸體忽然破棺而出序矩,到底是詐尸還是另有隱情,我是刑警寧澤跋破,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布簸淀,位于F島的核電站,受9級特大地震影響幔烛,放射性物質(zhì)發(fā)生泄漏啃擦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一饿悬、第九天 我趴在偏房一處隱蔽的房頂上張望令蛉。 院中可真熱鬧,春花似錦狡恬、人聲如沸珠叔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽祷安。三九已至,卻和暖如春兔乞,著一層夾襖步出監(jiān)牢的瞬間汇鞭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工庸追, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留霍骄,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓淡溯,卻偏偏與公主長得像读整,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子咱娶,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 線程 引言&動機 考慮一下這個場景米间,我們有10000條數(shù)據(jù)需要處理强品,處理每條數(shù)據(jù)需要花費1秒,但讀取數(shù)據(jù)只需要0....
    不浪漫的浪漫_ea03閱讀 358評論 0 0
  • 引言&動機 考慮一下這個場景屈糊,我們有10000條數(shù)據(jù)需要處理的榛,處理每條數(shù)據(jù)需要花費1秒,但讀取數(shù)據(jù)只需要0.1秒另玖,...
    chen_000閱讀 501評論 0 0
  • 1.進程和線程 隊列:1困曙、進程之間的通信: q = multiprocessing.Queue()2表伦、...
    一只寫程序的猿閱讀 1,098評論 0 17
  • 線程 1.同步概念 1.多線程開發(fā)可能遇到的問題 同步不是一起的意思谦去,是協(xié)同步調(diào) 假設兩個線程t1和t2都要對nu...
    TENG書閱讀 602評論 0 1
  • 我的自我簡介: 一名充滿愛和溫暖的醫(yī)者,懷著一顆慈悲和感恩的心走在傳播健康的路上蹦哼,希望將健康和愛傳播出...
    杏子心語閱讀 273評論 0 0