Python有很多庫可以支持并行計算。
>>> import threading
>>> def thread_hello():
other = threading.Thread(target=thread_say_hello, args=())
other.start()
thread_say_hello()
>>> def thread_say_hello():
print('hello from', threading.current_thread().name)
>>> thread_hello()
hello from Thread-1
hello from MainThread
>>> import multiprocessing
>>> def process_hello():
other = multiprocessing.Process(target=process_say_hello, args=())
other.start()
process_say_hello()
>>> def process_say_hello():
print('hello from', multiprocessing.current_process().name)
>>> process_hello()
hello from MainProcess
hello from Process-1
threading
和multiprocessing
庫有著類似的API呼巴,但是前者只是建立單個線程锥忿,后者對多進(jìn)程封裝得更完善,對多核CPU的支持更好羔挡。更多可閱讀Python標(biāo)準(zhǔn)庫08 多線程與同步 (threading包), Python標(biāo)準(zhǔn)庫10 多進(jìn)程初步 (multiprocessing包), Python多進(jìn)程并發(fā)(multiprocessing)
threading
模塊使用線程,multiprocessing
使用進(jìn)程间唉。其區(qū)別不同在于绞灼,線程使用同一內(nèi)存空間,而進(jìn)程分配有不同的內(nèi)存空間呈野。因此進(jìn)程間難以共享對象低矮。但兩個線程則有可能同時改寫同一內(nèi)存空間。為防止出現(xiàn)沖突被冒,可以使用GIL保證不會同時執(zhí)行可能沖突的線程军掂。
更多對比
下面是一個線程沖突的實例
import threading
from time import sleep
counter = [0]
def increment():
count = counter[0]
sleep(0) # try to force a switch to the other thread
counter[0] = count + 1
other = threading.Thread(target=increment, args=())
other.start()
increment()
print('count is now: ', counter[0])
下面是執(zhí)行過程:
Thread 0 Thread 1
read counter[0]: 0
read counter[0]: 0
calculate 0 + 1: 1
write 1 -> counter[0]
calculate 0 + 1: 1
write 1 -> counter[0]
問題在于:盡管執(zhí)行了兩次加法,但結(jié)果仍然是:1姆打。
在Python中,最簡單的保證數(shù)據(jù)同步的方法是使用queue
模塊的Queue
類肠虽。
from queue import Queue
queue = Queue()
def synchronized_consume():
while True:
print('got an item:', queue.get()) # 得到對象
queue.task_done() # 隊列任務(wù)結(jié)束
def synchronized_produce():
consumer = threading.Thread(target=synchronized_consume, args=())
consumer.daemon = True
consumer.start()
for i in range(10):
queue.put(i) # 加入新對象
queue.join() # 確保所有隊列任務(wù)結(jié)束后幔戏,退出
synchronized_produce()
如果上面這個辦法因為某些原因做不到,那我們可以使用threading
模塊中的Lock
類税课。
seen = set()
seen_lock = threading.Lock()
def already_seen(item):
seen_lock.acquire() # 在Lock類的
result = True # acquire方法
if item not in seen: # 和release方法
seen.add(item) # 之間的代碼
result = False # 僅能同時被
seen_lock.release() # 一個線程訪問
return result
def already_seen(item):
with seen_lock:
if item not in seen:
seen.add(item)
return False
return True
還有一個辦法是threading
模塊中的Barrier
類闲延。
counters = [0, 0]
barrier = threading.Barrier(2)
def count(thread_num, steps):
for i in range(steps):
other = counters[1 - thread_num]
barrier.wait() # wait for reads to complete
counters[thread_num] = other + 1
barrier.wait() # wait for writes to complete
def threaded_count(steps):
other = threading.Thread(target=count, args=(1, steps))
other.start()
count(0, steps)
print('counters:', counters)
threaded_count(10)
更多參考Python的多線程編程模塊 threading 參考痊剖,17.1. threading — Thread-based parallelism。
防止共享數(shù)據(jù)錯誤讀寫的終極機(jī)制是完全避免并發(fā)地接觸同一數(shù)據(jù)垒玲。進(jìn)程的內(nèi)存空間的獨立性完全符合這一要求陆馁。為了解決進(jìn)程之間的交流問題,multiprocessing
模塊特別提供了Pipe
類合愈。Pipe
默認(rèn)為兩條通道叮贩,如果傳入?yún)?shù)False
則為一條通道。
def process_consume(in_pipe):
while True:
item = in_pipe.recv() # 只有接收成功后才會繼續(xù)執(zhí)行
if item is None:
return
print('got an item:', item)
def process_produce():
pipe = multiprocessing.Pipe(False)
consumer = multiprocessing.Process(target=process_consume, args=(pipe[0],))
consumer.start()
for i in range(10):
pipe[1].send(i) # 通過通道發(fā)送對象
pipe[1].send(None) # done signal
process_produce()
在執(zhí)行并發(fā)計算時佛析,程序員往往會犯下錯誤:
- 同步不足(Under-synchronization):一些線程沒有被同步
- 過度同步(Over-synchronization):某些本可以并發(fā)執(zhí)行的線程益老,被串行化
- 死鎖(Deadlock):被同步的進(jìn)程相互等候?qū)Ψ酵瓿赡承┎襟E才進(jìn)行下一步,導(dǎo)致程序鎖死寸莫。一個栗子:
def deadlock(in_pipe, out_pipe):
item = in_pipe.recv()
print('got an item:', item)
out_pipe.send(item + 1)
def create_deadlock():
pipe = multiprocessing.Pipe()
other = multiprocessing.Process(target=deadlock, args=(pipe[0], pipe[1]))
other.start()
deadlock(pipe[1], pipe[0])
create_deadlock()