rpc中文即遠(yuǎn)程過(guò)程調(diào)用的意思
微服務(wù)架構(gòu)大行其道的形勢(shì)下冗栗, 作為開(kāi)發(fā)人員或多或少都會(huì)接觸到相關(guān)技術(shù)
大廠google的gRPC秋茫、Facebook的thrift迎瞧、阿里的dubble算是rpc框架中比較優(yōu)秀的代表啦
其中不少框架都是支持跨語(yǔ)言調(diào)用肉迫,有python調(diào)用接口妓忍,但是因?yàn)橛闷渌Z(yǔ)言編寫,使用起來(lái)略顯復(fù)雜尝苇,且不太pythonic 手動(dòng)滑稽
這里不妨借助rabbitMQ 實(shí)現(xiàn)一個(gè)自己的簡(jiǎn)單rpc框架
小伙伴們首先得熟悉一下rabbitMQ的api, 這里貼出一個(gè)非常友好的教程文檔铛只,文檔不大僅有幾篇文章,快一點(diǎn)大概半天即可了解消息隊(duì)列的機(jī)制茎匠,rabbitmq作為生產(chǎn)中經(jīng)常用到的工具格仲,需要好生了解哦~
python版教程最后一篇押袍,就是一個(gè)簡(jiǎn)單的rpc demo
該實(shí)現(xiàn)僅能作單個(gè)函數(shù)的遠(yuǎn)程調(diào)用诵冒,我在此之上重新封裝實(shí)現(xiàn)了一個(gè)支持多種函數(shù)調(diào)用的rpc實(shí)現(xiàn)
服務(wù)架設(shè)配置:
系統(tǒng) | 版本 |
---|---|
ubuntu | 16.04 |
RabbitMQ | 3.5.7 |
python | 3.6.2 |
pika | 1.0.0 |
客戶端/調(diào)用端部分:
在初始化對(duì)象方法中加入了兩個(gè)數(shù)據(jù)結(jié)構(gòu):
# 請(qǐng)求id結(jié)果映射表
self._req_id_result_map = {}
# 請(qǐng)求id列表
self.req_id_list = []
每一次請(qǐng)求調(diào)用都需要使用一個(gè)唯一標(biāo)識(shí)id
這兩個(gè)數(shù)據(jù)結(jié)構(gòu),一個(gè)用來(lái)保存調(diào)用結(jié)果谊惭,一個(gè)用來(lái)保存調(diào)用者id列表
rst = self._req_id_result_map.get(req_id, None)
del self._req_id_result_map[req_id]
從保存的結(jié)果中拿出數(shù)據(jù)汽馋,記得要清空數(shù)據(jù),不然請(qǐng)求數(shù)據(jù)量大的話會(huì)出現(xiàn)數(shù)據(jù)積壓到內(nèi)存從而導(dǎo)致內(nèi)存溢出的情況
def call(self, req_id, fn, param):
"""
:param req_id:str 遠(yuǎn)程調(diào)用請(qǐng)求id 要求唯一
:param fn:str 遠(yuǎn)程調(diào)用函數(shù)名 要求遠(yuǎn)端必須有同名函數(shù)
:param param:list 遠(yuǎn)程調(diào)用參數(shù) 限制列表長(zhǎng)度為2 第一個(gè)元素代表位置參數(shù)的列表
無(wú)參數(shù)傳空列表 第二個(gè)元素為具名參數(shù)的字典
沒(méi)有則傳空字典 遠(yuǎn)端調(diào)用類似 fn(*param[0], **param[1])
:return:
"""
self.req_id_list.append(req_id)
self.channel.basic_publish(exchange='', routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=req_id),
body=json.dumps([fn, param]))
while not self._req_id_result_map.get(req_id, None):
self.connection.process_data_events()
return self.get_request_result(req_id)
這里是調(diào)用端的核心邏輯:
call方法參數(shù)為 請(qǐng)求id圈盔, 調(diào)用函數(shù)名豹芯, 調(diào)用函數(shù)參數(shù)
其中參數(shù)有特別要求,必須是長(zhǎng)度為二的可遍歷容器(列表驱敲、元祖铁蹈、set...)
容器的第一個(gè)元素代表位置參數(shù),第二個(gè)元素是命名參數(shù)的字典
call方法首先向默認(rèn)交換機(jī)發(fā)送數(shù)據(jù)众眨,注意這里通過(guò)路由鍵把消息指定到名為rpc_queue
的隊(duì)列中, 然后阻塞一直等到遠(yuǎn)端響應(yīng)結(jié)果
服務(wù)端/被調(diào)端部分:
服務(wù)端要準(zhǔn)備好遠(yuǎn)程調(diào)用的函數(shù)握牧,建議模塊化到單獨(dú)的地方然后引入到服務(wù)端主邏輯模塊
聲明connection、rpc_queue
隊(duì)列娩梨、綁定回調(diào)函數(shù)并開(kāi)啟消費(fèi)(consume)
回調(diào)函數(shù)的主要邏輯:
def on_request(ch, method, props, body):
fn, param = json.loads(body.decode())
print(" [.] fib(%s)" % (param,))
if not (isinstance(param, list) and len(param) == 2 and
isinstance(param[0], list) and isinstance(param[1], dict)):
response = "error params please use like this fn(*param[0], **param[1])"
else:
try:
fn = eval(fn)
response = fn(*param[0], **param[1])
except:
response = "remote has not function like {}".format(fn)
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
這里要判斷調(diào)用端的參數(shù)格式正確與否沿腰,以及調(diào)用的函數(shù)是否存在
取出調(diào)用端的reply_to
作為路由鍵(也是調(diào)用端聲明的隊(duì)列,注意有兩個(gè)隊(duì)列狈定,一個(gè)存放遠(yuǎn)程調(diào)用信息颂龙,一個(gè)存放遠(yuǎn)程調(diào)用結(jié)果, 分別在服務(wù)端/客戶端聲明)
同時(shí)取出correlation_id
這個(gè)即是調(diào)用端的請(qǐng)求id 把這個(gè)id作為屬性發(fā)送到調(diào)用端纽什,調(diào)用端通過(guò)這個(gè)id(可以看成一個(gè)token)取出調(diào)用結(jié)果
數(shù)據(jù)傳輸默認(rèn)使用二進(jìn)制措嵌, 參數(shù)部分先json序列化,然后轉(zhuǎn)化為二進(jìn)制
最后把調(diào)用端封裝了一下方法:
call_single芦缰、call_many
使用這個(gè)兩個(gè)函數(shù)即可單次企巢、多次進(jìn)行rpc
調(diào)用端(客戶端)和遠(yuǎn)程端(服務(wù)端/被調(diào)端)分別部署在不同的機(jī)器上,注意配置統(tǒng)一的rabbitMQ主機(jī)地址饺藤,至于消息隊(duì)列可以放在服務(wù)端也可以放在客戶端甚至可以部署在一臺(tái)獨(dú)立的服務(wù)器上
最后總結(jié):
源代碼地址在此哦 >~<
這里用最簡(jiǎn)潔直接的方式實(shí)現(xiàn)了一個(gè)rpc框架包斑,目前僅支持函數(shù)的遠(yuǎn)程調(diào)用流礁,后續(xù)可以加入方法的遠(yuǎn)程調(diào)用,原理也和函數(shù)調(diào)用類似罗丰,感興趣的小伙伴可以自己動(dòng)手實(shí)現(xiàn)一下