Python-Socket網(wǎng)絡編程
1. thread模塊
- python是支持多線程的, 主要是通過thread和threading這兩個模塊來實現(xiàn)的筷笨。
- python的thread模塊是比較底層的模塊(或者說輕量級)疙筹,python的threading模塊是對thread做了一些包裝的吉殃,可以更加方便的被使用泉粉。
簡要的看一下thread模塊中含函數(shù)和常量
import thread
thread.LockType #鎖對象的一種, 用于線程的同步
thread.error #線程的異常
thread.start_new_thread(function, args[, kwargs]) #創(chuàng)建一個新的線程
function : 線程執(zhí)行函數(shù)
args : 線程執(zhí)行函數(shù)的參數(shù), 類似為tuple,
kwargs : 是一個字典
返回值: 返回線程的標識符
thread.exit() #線程退出函數(shù)
thread.allocate_lock() #生成一個未鎖狀態(tài)的鎖對象
返回值: 返回一個鎖對象
鎖對象
的方法
lock.acquire([waitflag]) #獲取鎖
無參數(shù)時, 無條件獲取鎖, 無法獲取時, 會被阻塞, 知道可以鎖被釋放
有參數(shù)時, waitflag = 0 時,表示只有在不需要等待的情況下才獲取鎖, 非零情況與上面相同
返回值 : 獲得鎖成功返回True, 獲得鎖失敗返回False
lock.release() #釋放鎖
lock.locked() #獲取當前鎖的狀態(tài)
返回值 : 如果鎖已經(jīng)被某個線程獲取,返回True, 否則為False
1.1. thread多線程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import thread
import time
def print_time(thread_name, delay) :
count = 0
while count < 5 :
time.sleep(delay)
count += 1
print "%s : %s" % (thread_name, time.ctime(time.time()))
try :
thread.start_new_thread(print_time, ("Thread-1", 2, ))
thread.start_new_thread(print_time, ("Thread-2", 4, ))
except :
print "Error: unable to start the thread"
while True :
pass
2. threading模塊
python的threading模塊是對thread做了一些包裝的护盈,可以更加方便的被使用灵莲。經(jīng)常和Queue結(jié)合使用,Queue模塊中提供了同步的路克、線程安全的隊列類潮秘,包括
FIFO(先入先出)隊列Queue
琼开,LIFO(后入先出)隊列LifoQueue
,和優(yōu)先級隊列PriorityQueue
枕荞。這些隊列都實現(xiàn)了鎖原語
柜候,能夠在多線程中直接使用□锞可以使用隊列來實現(xiàn)線程間的同步
2.1. 常用函數(shù)和對象
#函數(shù)
threading.active_count() #返回當前線程對象Thread的個數(shù)
threading.enumerate() #返回當前運行的線程對象Thread(包括后臺的)的list
threading.Condition() #返回條件變量對象的工廠函數(shù), 主要用戶線程的并發(fā)
threading.current_thread() #返回當前的線程對象Thread, 文檔后面解釋沒看懂
threading.Lock() #返回一個新的鎖對象, 是在thread模塊的基礎上實現(xiàn)的 與acquire()和release()結(jié)合使用
#類
threading.Thread #一個表示線程控制的類, 這個類常被繼承
thraeding.Timer #定時器,線程在一定時間后執(zhí)行
threading.ThreadError #引發(fā)中各種線程相關異常
2.1.1. Thread對象
一般來說渣刷,使用線程有兩種模式, 一種是創(chuàng)建線程要執(zhí)行的函數(shù), 把這個函數(shù)傳遞進Thread對象里,讓它來執(zhí)行. 另一種是直接從Thread繼承矗烛,創(chuàng)建一個新的class辅柴,把線程執(zhí)行的代碼放到這個新的class里。
常用兩種方式運行線程(線程中包含name屬性) :
- 在構(gòu)造函數(shù)中傳入用于線程運行的函數(shù)(這種方式更加靈活)
- 在子類中重寫threading.Thread基類中run()方法(
只重寫__init__()和run()方法
)
創(chuàng)建線程對象后, 通過調(diào)用start()函數(shù)運行線程, 然后會自動調(diào)用run()
方法.
通過設置`daemon`屬性, 可以將線程設置為守護線程
threading.Thread(group = None, target = None, name = None, args = () kwars = {})
group : 應該為None
target : 可以傳入一個函數(shù)用于run()方法調(diào)用,
name : 線程名 默認使用"Thread-N"
args : 元組, 表示傳入target函數(shù)的參數(shù)
kwargs : 字典, 傳入target函數(shù)中關鍵字參數(shù)
屬性:
name #線程表示, 沒有任何語義
doemon #布爾值, 如果是守護線程為True, 不是為False, 主線程不是守護線程, 默認threading.Thread.damon = False
類方法:
run() #用以表示線程活動的方法瞭吃。
start() #啟動線程活動碌嘀。
join([time]) #等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時發(fā)生歪架。
isAlive(): 返回線程是否活動的股冗。
getName(): 返回線程名。
setName(): 設置線程名牡拇。
范例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def test_thread(count) :
while count > 0 :
print "count = %d" % count
count = count - 1
time.sleep(1)
def main() :
my_thread = threading.Thread(target = test_thread, args = (10, ))
my_thread.start()
my_thread.join()
if __name__ == '__main__':
main()
2.2. 常用多線程寫法
- 固定線程運行的函數(shù)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, thread
import time
class MyThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, thread_id, name, counter) :
super(MyThread, self).__init__() #調(diào)用父類的構(gòu)造函數(shù)
self.thread_id = thread_id
self.name = name
self.counter = counter
def run(self) :
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(thread_name, delay, counter) :
while counter :
time.sleep(delay)
print "%s %s" % (thread_name, time.ctime(time.time()))
counter -= 1
def main():
#創(chuàng)建新的線程
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)
#開啟線程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print "Exiting Main Thread"
if __name__ == '__main__':
main()
- 外部傳入線程運行的函數(shù)
#/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
import time
class MyThread(threading.Thread):
"""
屬性:
target: 傳入外部函數(shù), 用戶線程調(diào)用
args: 函數(shù)參數(shù)
"""
def __init__(self, target, args):
super(MyThread, self).__init__() #調(diào)用父類的構(gòu)造函數(shù)
self.target = target
self.args = args
def run(self) :
self.target(self.args)
def print_time(counter) :
while counter :
print "counter = %d" % counter
counter -= 1
time.sleep(1)
def main() :
my_thread = MyThread(print_time, 10)
my_thread.start()
my_thread.join()
if __name__ == '__main__':
main()
2.3. 生產(chǎn)者消費者問題
試著用python寫了一個生產(chǎn)者消費者問題(偽生產(chǎn)者消費者), 只是使用簡單的鎖, 感覺有點不太對, 下面另一個程序會寫出正確的生產(chǎn)者消費者問題
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import thread, threading
import urllib2
import time, random
import Queue
share_queue = Queue.Queue() #共享隊列
my_lock = thread.allocate_lock()
class Producer(threading.Thread) :
def run(self) :
products = range(5)
global share_queue
while True :
num = random.choice(products)
my_lock.acquire()
share_queue.put(num)
print "Produce : ", num
my_lock.release()
time.sleep(random.random())
class Consumer(threading.Thread) :
def run(self) :
global share_queue
while True:
my_lock.acquire()
if share_queue.empty() : #這里沒有使用信號量機制進行阻塞等待,
print "Queue is Empty..."
my_lock.release()
time.sleep(random.random())
continue
num = share_queue.get()
print "Consumer : ", num
my_lock.release()
time.sleep(random.random())
def main() :
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
if __name__ == '__main__':
main()
殺死多線程程序方法: 使用control + z
掛起程序(程序依然在后臺, 可以使用ps aux
查看), 獲得程序的進程號, 然后使用kill -9 進程號
殺死進程
參考一篇帖子解決了上述問題,重寫了生產(chǎn)者消費者問題程序, 參考鏈接慣例放在最后.
使用了wait()和notify()解決
當然最簡答的方法是直接使用
Queue
,Queue封裝了Condition的行為, 如wait(), notify(), acquire(), 沒看文檔就這樣, 使用了Queue竟然不知道封裝了這些函數(shù), 繼續(xù)滾去看文檔了
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import random, time, Queue
MAX_SIZE = 5
SHARE_Q = [] #模擬共享隊列
CONDITION = threading.Condition()
class Producer(threading.Thread) :
def run(self) :
products = range(5)
global SHARE_Q
while True :
CONDITION.acquire()
if len(SHARE_Q) == 5 :
print "Queue is full.."
CONDITION.wait()
print "Consumer have comsumed something"
product = random.choice(products)
SHARE_Q.append(product)
print "Producer : ", product
CONDITION.notify()
CONDITION.release()
time.sleep(random.random())
class Consumer(threading.Thread) :
def run(self) :
global SHARE_Q
while True:
CONDITION.acquire()
if not SHARE_Q :
print "Queue is Empty..."
CONDITION.wait()
print "Producer have producted something"
product = SHARE_Q.pop(0)
print "Consumer :", product
CONDITION.notify()
CONDITION.release()
time.sleep(random.random())
def main() :
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
if __name__ == '__main__':
main()
2.4.簡單鎖
如果只是簡單的加鎖解鎖可以直接使用threading.Lock()生成鎖對象, 然后使用acquire()和release()方法
例如:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
class MyThread(threading.Thread) :
def __init__(self, thread_id, name, counter) :
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.counter = counter
def run(self) :
#重寫run方法, 添加線程執(zhí)行邏輯, start函數(shù)運行會自動執(zhí)行
print "Starting " + self.name
threadLock.acquire() #獲取所
print_time(self.name, self.counter, 3)
threadLock.release() #釋放鎖
def print_time(thread_name, delay, counter) :
while counter :
time.sleep(delay)
print "%s %s" % (thread_name, time.ctime(time.time()))
counter -= 1
threadLock = threading.Lock()
threads = [] #存放線程對象
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)
#開啟線程
thread1.start()
thread2.start()
for t in threads :
t.join() #等待線程直到終止
print "Exiting Main Thread"
2.5. Condition
如果是向生產(chǎn)者消費者類似的情形, 使用Condition類 或者直接使用
Queue
模塊
Condition
條件變量中有acquire()和release方法用來調(diào)用鎖的方法
, 有wait(), notify(), notifyAll()方法
, 后面是三個方法必須在獲取鎖的情況下調(diào)用, 否則產(chǎn)生RuntimeError
錯誤.
- 當一個線程獲得鎖后, 發(fā)現(xiàn)沒有期望的資源或者狀態(tài), 就會調(diào)用wait()阻塞, 并釋放已經(jīng)獲得鎖, 知道期望的資源或者狀態(tài)發(fā)生改變
- 當一個線程獲得鎖, 改變了資源或者狀態(tài), 就會調(diào)用notify()和notifyAll()去通知其他線程,
#官方文檔中提供的生產(chǎn)者消費者模型
# Consume one item
cv.acquire()
while not an_item_is_available():
cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
#threading.Condition類
thread.Condition([lock])
可選參數(shù)lock: 必須是Lock或者RLock對象, 并被作為underlying鎖(悲觀鎖?), 否則, 會創(chuàng)建一個新的RLock對象作為underlying鎖
類方法:
acquire() #獲得鎖
release() #釋放鎖
wait([timeout]) #持續(xù)等待直到被notify()或者notifyAll()通知或者超時(必須先獲得鎖),
#wait()所做操作, 先釋放獲得的鎖, 然后阻塞, 知道被notify或者notifyAll喚醒或者超時, 一旦被喚醒或者超時, 會重新獲取鎖(應該說搶鎖), 然后返回
notify() #喚醒一個wait()阻塞的線程.
notify_all()或者notifyAll() #喚醒所有阻塞的線程
參考程序可以查看上面的生產(chǎn)者消費者程序