在python中Process比Thread更穩(wěn)定掂恕,且Process能分布到多臺機器火欧,而Thread只能分布到同一臺機器的多個CPU衷畦。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上摇锋。
task_master.py
# coding:utf-8
import random,time,queue
from multiprocessing.managers import BaseManager
#發(fā)送任務(wù)的隊列
task_queue = queue.Queue()
#接收任務(wù)的隊列
result_queue = queue.Queue()
#把兩個任務(wù)隊列在網(wǎng)絡(luò)上注冊
BaseManager.register('get_task_queue',callable=lambda: task_queue)
BaseManager.register('get_result_queue',callable=lambda: result_queue)
#綁定端口5000丹拯,設(shè)置驗證碼:8e8b55261098a425273f31a
manager = BaseManager(address=('',5000),authkey=b'8e8b55261098a425273f31a')
#啟動隊列
manager.start()
# 獲取通過網(wǎng)絡(luò)訪問的queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
begintime = time.time()
for i in range(50):
r = random.randint(10001,99999)
print("Put task %d ..." % r)
task.put(r)
for i in range(50):
r = result.get(timeout=10)
print("Result is %s" % r)
manager.shutdown()
print("master exit.")
endtime = time.time()
print('用時:%0.5f' %(endtime-begintime))
task_worker.py
#task_worker.py
#coding:utf-8
import time,sys,queue
from multiprocessing.managers import BaseManager
#獲取網(wǎng)絡(luò)中的Queue,并注冊
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
#連接到manager.py所在機器 server_addr 為遠程master服務(wù)器的ip地址
server_addr = '127.0.0.1'
print("Connecting to server %s" % server_addr)
m = BaseManager(address=(server_addr,5000),authkey=b'8e8b55261098a425273f31a')
m.connect()
#獲取Queue對象
task = m.get_task_queue()
result = m.get_result_queue()
#從task中獲取任務(wù),并把結(jié)果寫入result隊列
for i in range(50):
try:
n = task.get(timeout=2)
print('run task %d * %d' %(n,n))
r = '%d * %d = %d ' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty')
#處理結(jié)束
print('Worker exit .')