1.多進(jìn)程
Unix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊愁憔。普通的函數(shù)調(diào)用腕扶,調(diào)用一次,返回一次吨掌,但是fork()調(diào)用一次半抱,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程)膜宋,然后窿侈,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。
子進(jìn)程永遠(yuǎn)返回0
秋茫,而父進(jìn)程返回子進(jìn)程的ID史简。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程肛著,所以圆兵,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid()就可以拿到父進(jìn)程的ID枢贿。
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
運(yùn)行結(jié)果
Process (876) start...I (876) just created a child process (877).
I am child process (877) and my parent is 876.
跨平臺(tái)支持multiprocessing
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Parent process 928.
Process will start.
Run child process test (929)...
Process end.
進(jìn)程池Pool
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢殉农,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了局荚。
子進(jìn)程輸入和輸出
subprocess
模塊可以讓我們非常方便地啟動(dòng)一個(gè)子進(jìn)程超凳,然后控制其輸入和輸出愈污。
下面的例子演示了如何在Python代碼中運(yùn)行命令nslookup www.python.org,這和命令行直接運(yùn)行的效果是一樣的:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
運(yùn)行結(jié)果
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223
Exit code: 0
如果子進(jìn)程還需要輸入轮傍,則可以通過communicate()
方法輸入:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代碼相當(dāng)于在命令行執(zhí)行命令nslookup暂雹,然后手動(dòng)輸入:
set q=mx
python.org
exit
運(yùn)行結(jié)果如下:
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.
Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6
Exit code: 0
進(jìn)程間通信
Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue
金麸、Pipes等多種方式來交換數(shù)據(jù)擎析。
我們以Queue為例簿盅,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程挥下,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù):
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進(jìn)程創(chuàng)建Queue桨醋,并傳給各個(gè)子進(jìn)程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動(dòng)子進(jìn)程pw棚瘟,寫入:
pw.start()
# 啟動(dòng)子進(jìn)程pr,讀取:
pr.start()
# 等待pw結(jié)束:
pw.join()
# pr進(jìn)程里是死循環(huán)喜最,無法等待其結(jié)束偎蘸,只能強(qiáng)行終止:
pr.terminate()
Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux下,可以使用fork()調(diào)用實(shí)現(xiàn)多進(jìn)程瞬内。
要實(shí)現(xiàn)跨平臺(tái)的多進(jìn)程迷雪,可以使用multiprocessing模塊。
進(jìn)程間通信是通過Queue虫蝶、Pipes等實(shí)現(xiàn)的章咧。
2.多線程
Python的標(biāo)準(zhǔn)庫(kù)提供了兩個(gè)模塊:_thread和threading,_thread是低級(jí)模塊能真,threading是高級(jí)模塊赁严,對(duì)_thread進(jìn)行了封裝。絕大多數(shù)情況下粉铐,我們只需要使用threading這個(gè)高級(jí)模塊疼约。
import time, threading
# 新線程執(zhí)行的代碼:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
執(zhí)行結(jié)果如下:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
由于任何進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程蝙泼,主線程又可以啟動(dòng)新的線程程剥,Python的threading模塊有個(gè)current_thread()函數(shù),它永遠(yuǎn)返回當(dāng)前線程的實(shí)例汤踏。主線程實(shí)例的名字叫MainThread织鲸,子線程的名字在創(chuàng)建時(shí)指定,我們用LoopThread命名子線程茎活。名字僅僅在打印時(shí)用來顯示昙沦,完全沒有其他意義,如果不起名字Python就自動(dòng)給線程命名為Thread-1载荔,Thread-2
……
Lock鎖
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要釋放鎖:
lock.release()
python中無法利用多線程使用多核盾饮,
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock丘损,任何Python線程執(zhí)行前普办,必須先獲得GIL鎖,然后徘钥,每執(zhí)行100條字節(jié)碼衔蹲,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行呈础。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖舆驶,所以,多線程在Python中只能交替執(zhí)行而钞,即使100個(gè)線程跑在100核CPU上沙廉,也只能用到1個(gè)核。
ThreadLocal
在多線程環(huán)境下臼节,每個(gè)線程都有自己的數(shù)據(jù)撬陵。一個(gè)線程使用自己的局部變量比使用全局變量好,因?yàn)榫植孔兞恐挥芯€程自己能看見网缝,不會(huì)影響其他線程巨税,而全局變量的修改必須加鎖。
def process_student(name):
std = Student(name)
# std是局部變量粉臊,但是每個(gè)函數(shù)都要用它草添,因此必須傳進(jìn)去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每個(gè)函數(shù)一層一層調(diào)用都這么傳參數(shù)那還得了?用全局變量维费?也不行果元,因?yàn)槊總€(gè)線程處理不同的Student對(duì)象,不能共享犀盟。
解決方法2
如果用一個(gè)全局dict存放所有的Student對(duì)象而晒,然后以thread自身作為key獲得線程對(duì)應(yīng)的Student對(duì)象如何?
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局變量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不傳入std阅畴,而是根據(jù)當(dāng)前線程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函數(shù)都可以查找出當(dāng)前線程的std變量:
std = global_dict[threading.current_thread()]
這種方式理論上是可行的倡怎,它最大的優(yōu)點(diǎn)是消除了std對(duì)象在每層函數(shù)中的傳遞問題,但是贱枣,每個(gè)函數(shù)獲取std的代碼有點(diǎn)丑监署。
ThreadLocal更簡(jiǎn)單
import threading
# 創(chuàng)建全局ThreadLocal對(duì)象:
local_school = threading.local()
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局變量local_school就是一個(gè)ThreadLocal對(duì)象,每個(gè)Thread對(duì)它都可以讀寫student屬性纽哥,但互不影響钠乏。你可以把local_school看成全局變量,但每個(gè)屬性如local_school.student都是線程的局部變量春塌,可以任意讀寫而互不干擾晓避,也不用管理鎖的問題簇捍,ThreadLocal內(nèi)部會(huì)處理。
ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫(kù)連接俏拱,HTTP請(qǐng)求暑塑,用戶身份信息等,這樣一個(gè)線程的所有調(diào)用到的處理函數(shù)都可以非常方便地訪問這些資源锅必。
3.分布式進(jìn)程
在Thread和Process中事格,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定搞隐,而且驹愚,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上尔许。
Python的multiprocessing模塊不但支持多進(jìn)程么鹤,其中managers
子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上终娃。
舉個(gè)例子:如果我們已經(jīng)有一個(gè)通過Queue通信的多進(jìn)程程序在同一臺(tái)機(jī)器上運(yùn)行味廊,現(xiàn)在洛搀,由于處理任務(wù)的進(jìn)程任務(wù)繁重棍辕,希望把發(fā)送任務(wù)的進(jìn)程和處理任務(wù)的進(jìn)程分布到兩臺(tái)機(jī)器上。怎么用分布式進(jìn)程實(shí)現(xiàn)求摇?
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 發(fā)送任務(wù)的隊(duì)列:
task_queue = queue.Queue()
# 接收結(jié)果的隊(duì)列:
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個(gè)Queue都注冊(cè)到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對(duì)象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設(shè)置驗(yàn)證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動(dòng)Queue:
manager.start()
# 獲得通過網(wǎng)絡(luò)訪問的Queue對(duì)象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個(gè)任務(wù)進(jìn)去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊(duì)列讀取結(jié)果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關(guān)閉:
manager.shutdown()
print('master exit.')
在另一臺(tái)機(jī)器上啟動(dòng)任務(wù)進(jìn)程(本機(jī)上啟動(dòng)也可以):
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue窍荧,所以注冊(cè)時(shí)只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務(wù)器辉巡,也就是運(yùn)行task_master.py的機(jī)器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗(yàn)證碼注意保持與task_master.py設(shè)置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網(wǎng)絡(luò)連接:
m.connect()
# 獲取Queue的對(duì)象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊(duì)列取任務(wù),并把結(jié)果寫入result隊(duì)列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 處理結(jié)束:
print('worker exit.')
現(xiàn)在,可以試試分布式進(jìn)程的工作效果了蕊退。先啟動(dòng)task_master.py
服務(wù)進(jìn)程:
$ python3 task_master.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
task_master.py進(jìn)程發(fā)送完任務(wù)后郊楣,開始等待result隊(duì)列的結(jié)果。現(xiàn)在啟動(dòng)task_worker.py進(jìn)程:
$ python3 task_worker.py
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.
task_worker.py進(jìn)程結(jié)束瓤荔,在task_master.py進(jìn)程中會(huì)繼續(xù)打印出結(jié)果:
Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956