先來個最簡單的例子:
把1-10000每個數(shù)求平方
服務(wù)器server:
用兩個隊(duì)列存儲任務(wù)铺然、結(jié)果
定義兩個函數(shù)
要實(shí)現(xiàn)分布式得繼承multiprocessing.managers.BaseManager
在主函數(shù)里multiprocessing.freeze_support()開啟分布式支持
注冊兩個函數(shù)給客戶端調(diào)用
創(chuàng)建管理器灰瞻,設(shè)置ip地址和開啟端口做粤、鏈接密碼砖顷。
用兩個隊(duì)列加任務(wù)喷众、收結(jié)果浓恶。用剛剛注冊的函數(shù)
把1-10000壓入隊(duì)列,
把結(jié)果壓入隊(duì)列
最后完成關(guān)閉服務(wù)器
客戶端client:
也需要繼承multiprocessing.managers.BaseManager
定義一個協(xié)程處理一個數(shù)據(jù)岭参,同時把結(jié)果壓入結(jié)果隊(duì)列
定義一個線程處理10個數(shù)據(jù)反惕,開啟10個協(xié)程
定義一個進(jìn)程,進(jìn)程驅(qū)動10個線程
主函數(shù):同客戶端注冊兩個函數(shù)
同客戶端創(chuàng)建管理器演侯,設(shè)置ip地址和開啟端口姿染、鏈接密碼。
鏈接服務(wù)器
同客戶端調(diào)用注冊的函數(shù)秒际,兩個隊(duì)列
套四層循環(huán):10個進(jìn)程悬赏、100個線程、1000個協(xié)程
循環(huán)進(jìn)程函數(shù)
上代碼:
服務(wù)器server:
#coding:utf-8
import multiprocessing #分布式進(jìn)程
import multiprocessing.managers #分布式進(jìn)程管理器
import random,time #隨機(jī)數(shù)娄徊,時間
import Queue #隊(duì)列
task_queue=Queue.Queue() #任務(wù)
result_queue=Queue.Queue() #結(jié)果
def return_task(): #返回任務(wù)隊(duì)列
return task_queue
def return_result(): #返回結(jié)果隊(duì)列
return result_queue
class QueueManger(multiprocessing.managers.BaseManager):#繼承闽颇,進(jìn)程管理共享數(shù)據(jù)
pass
if __name__=="__main__":
multiprocessing.freeze_support()#開啟分布式支持
QueueManger.register("get_task",callable=return_task)#注冊函數(shù)給客戶端調(diào)用
QueueManger.register("get_result", callable=return_result)
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #創(chuàng)建一個管理器,設(shè)置地址與密碼
manger.start() #開啟
task,result=manger.get_task(),manger.get_result() #任務(wù)寄锐,結(jié)果
for i in range(10000):
print "task add data",i
task.put(i)
print "waitting for------"
for i in range(10000):
res=result.get(timeout=100)
print "get data",res
manger.shutdown()#關(guān)閉服務(wù)器
客戶端client:
#coding:utf-8
import multiprocessing #分布式進(jìn)程
import multiprocessing.managers # 分布式進(jìn)程管理器
import random,time #隨機(jī)數(shù)兵多,時間
import Queue #隊(duì)列
import threading
import gevent
import gevent.monkey
class QueueManger(multiprocessing.managers.BaseManager):# 繼承,進(jìn)程管理共享數(shù)據(jù)
pass
def gevetygo(num ,result): #協(xié)程處理一個數(shù)據(jù)
print num*num
result.put(num*num)
def threadgo(datalist,result): # 線程處理10個數(shù)據(jù)橄仆,開啟10個協(xié)程
tasklist=[]
for data in datalist:
tasklist.append(gevent.spawn(gevetygo, data,result))
gevent.joinall(tasklist)
def processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 進(jìn)程驅(qū)動了10個線程
threadlist=[]
for datalist in ddatalist:
mythread=threading.Thread(target=threadgo,args=(datalist,result))
mythread.start()
threadlist.append(mythread)
for mythread in threadlist:
mythread.join()
if __name__=="__main__":
QueueManger.register("get_task") # 注冊函數(shù)調(diào)用服務(wù)器
QueueManger.register("get_result")
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
manger.connect() # 鏈接服務(wù)器
task= manger.get_task()
result =manger.get_result() # 任務(wù)剩膘,結(jié)果
# 1000
# 10個進(jìn)程
# 100個線程
# 1000個協(xié)程
for i in range(10):
cubelist = [] # [[[1],[2]]]
for j in range(10):
arealist = []
for k in range(10):
linelist = []
for l in range(10):
data = task.get()
linelist.append(data)
arealist.append(linelist)
cubelist.append(arealist)
processlist = []
for myarealist in cubelist:
process = multiprocessing.Process(target=processgo, args=(myarealist, result))
process.start()
processlist.append(process)
for process in processlist:
process.join()
遇到的坑:一個月之前弄分布式的時候?qū)慽p地址怎么都開啟不了,后來換了臺電腦就支持了= =盆顾。
如果只是在自己電腦上弄的話怠褐,寫127.0.0.1也可以運(yùn)行,如果你也遇到ip地址怎么都開啟不了的情況
整理不易您宪,如果覺得有所幫助奈懒,希望可以留下您的精彩言論再走。趕快為你們最喜歡的框架打Call吧宪巨。
大家如果需要Python的學(xué)習(xí)資料可以加我的Qun:834179111磷杏,小編整理了,從Python入門零基礎(chǔ)到項(xiàng)目實(shí)戰(zhàn)的資料揖铜。歡迎還沒有找到方向的小伙伴來學(xué)習(xí)茴丰。
本文轉(zhuǎn)自網(wǎng)絡(luò) 如有侵權(quán) 請聯(lián)系小編刪除