本文實(shí)現(xiàn)的效果:輸入服務(wù)名黄选、方法名和參數(shù),輸出格式化后的請(qǐng)求結(jié)果
對(duì)dubbo和telnet有所了解的可以直接移步github
Dubbo和telnet
簡(jiǎn)介
Dubbo是阿里巴巴開(kāi)源的一款RPC(Remote Procedure Call,遠(yuǎn)程過(guò)程調(diào)用)框架速侈,用于實(shí)現(xiàn)分布式服務(wù)的跨服務(wù)調(diào)用抱既,具有遠(yuǎn)程通訊、動(dòng)態(tài)配置捧挺、地址路由等功能虑绵。
Dubbo基于dubbo協(xié)議,dubbo協(xié)議是TCP協(xié)議之上的協(xié)議闽烙,采用單一長(zhǎng)連接和 NIO 異步通訊翅睛,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費(fèi)者機(jī)器數(shù)遠(yuǎn)大于服務(wù)提供者機(jī)器數(shù)的情況。
Telnet協(xié)議是TCP/IP協(xié)議族中的一員捕发,是Internet遠(yuǎn)程登錄服務(wù)的標(biāo)準(zhǔn)協(xié)議和主要方式疏旨。Telnet是常用的遠(yuǎn)程控制Web服務(wù)器的方法,具體介紹可以查看百度百科:Telnet_百度百科
Dubbo 2.0.5版本以上開(kāi)始支持telnet命令調(diào)用爬骤,本文正是基于telnet命令來(lái)實(shí)現(xiàn)對(duì)Dubbo接口的調(diào)用充石。
使用telnet命令調(diào)用dubbo服務(wù)
編寫(xiě)代碼之前,先簡(jiǎn)單了解一下如何使用telnet命令調(diào)用dubbo服務(wù)霞玄。
首先打開(kāi)cmd骤铃,使用telnet命令連接服務(wù)器(如果提示指令不存在的話,在設(shè)置-啟用或關(guān)閉Windows功能里勾選“Telnet客戶端“并保存)
telnet 172.16.51.4 20885
其中172.16.51.4
是dubbo服務(wù)所在的服務(wù)器地址坷剧,20885
是dubbo服務(wù)指定的端口惰爬。
連接后按回車(chē),出現(xiàn)dubbo>輸入提示符表示已經(jīng)連上了dubbo服務(wù)
telnet命令
- 輸入
ls
并回車(chē)惫企,列出所有可用的服務(wù) - 輸入
ls 服務(wù)名
撕瞧,列出服務(wù)下所有的方法 - 輸入
ls -l 服務(wù)名
,列出服務(wù)下的所有方法和參數(shù) - 輸入
invoke 服務(wù)名.方法名(調(diào)用參數(shù))
狞尔,調(diào)用對(duì)應(yīng)服務(wù)的dubbo接口 - 其它Telnet命令參考
telnet調(diào)用示例
調(diào)用UserInfoRpcService服務(wù)的getUserState方法丛版,參數(shù)是一個(gè)Long類(lèi)型數(shù)值
如下圖,接口成功響應(yīng)并返回了一個(gè)json字符串(默認(rèn)不是utf-8編碼偏序,所以中文會(huì)亂碼)
綜上所述页畦,使用telnet命令的方式調(diào)用dubbo接口最少需要三步:
1、查找服務(wù)的ip和端口(通常是在dubbo-admin或zookeeper查找)
2研儒、telnet連接服務(wù)器
3豫缨、invoke命令調(diào)用接口
如果一次兩次還好,如果用的多了端朵,就會(huì)感覺(jué)很不方便:
1好芭、每次都要查找對(duì)應(yīng)服務(wù)的ip端口并手動(dòng)連接
2、連接一段時(shí)間不操作冲呢,就會(huì)自動(dòng)斷開(kāi)連接
3舍败、invoke命令純手輸,輸錯(cuò)了不能用光標(biāo)移動(dòng)到錯(cuò)誤處修改敬拓,只能刪除重新輸入
4邻薯、響應(yīng)不支持中文,且沒(méi)有格式化恩尾,需要復(fù)制出來(lái)手動(dòng)格式化才具有易讀性
使用Python實(shí)現(xiàn)
既然手動(dòng)輸入telnet命令很不方便,那么使用代碼實(shí)現(xiàn)可以做到哪些改進(jìn)呢挽懦?
重復(fù)幾次命令操作之后就會(huì)發(fā)現(xiàn)翰意,除了調(diào)用時(shí)需要填寫(xiě)的服務(wù)名、方法名和參數(shù)之外,其余如查找ip端口冀偶、連接服務(wù)器醒第、格式化響應(yīng)都是重復(fù)性很高的操作,完全可以簡(jiǎn)化进鸠。
所以我們用Python實(shí)現(xiàn)的目標(biāo)就是:只輸入服務(wù)名
稠曼、方法名
、參數(shù)
并執(zhí)行客年,就得到一個(gè)易讀的返回結(jié)果霞幅。
通過(guò)服務(wù)名自動(dòng)查找ip端口
首先人工查找服務(wù)的ip端口的步驟是可以省略的。
系統(tǒng)通過(guò)dubbo提供服務(wù)量瓜,需要將服務(wù)注冊(cè)到指定的注冊(cè)中心(通常是使用Zookeeper作為Dubbo的注冊(cè)中心)司恳,并暴露其服務(wù)器ip和端口,既然如此绍傲,就可以通過(guò)注冊(cè)中心查找對(duì)應(yīng)服務(wù)的注冊(cè)信息扔傅,來(lái)自動(dòng)獲取其ip和端口了。
在Python中烫饼,可以通過(guò)kazoo庫(kù)來(lái)連接Zookeeper猎塞,代碼如下:
from urllib.parse import unquote
from kazoo.client import KazooClient
# zookeeper的ip和端口
zk = {
'host': '172.16.253.21',
'port': 2181
}
class Zookeeper:
client = None
service_dict = {}
class ServiceNotAvailableError(ValueError):
pass
def __init__(self, timeout=100):
# 連接zookeeper
self.client = KazooClient('%s:%s' % (zk['host'], zk['port']), timeout=timeout)
self.client.start()
# 查找所有注冊(cè)的dubbo服務(wù)
service_list = self.client.get_children('dubbo')
for service in service_list:
name = str(service).split('.')[-1] # 去掉包名,剩下的服務(wù)名作為key
self.service_dict[name] = service # 此處如果有重名的服務(wù)杠纵,會(huì)覆蓋
def get_service_address(self, service):
"""獲取指定服務(wù)的注冊(cè)地址信息"""
if '.' not in service:
# 如果傳入的服務(wù)名不帶包名荠耽,就從service_dict找到完整服務(wù)名
service = self.service_dict[service]
uri = 'dubbo/%s' % service
if not self.client.exists(uri):
raise ServiceNotAvailableError('服務(wù)"%s"不存在' % service)
elif not self.client.exists('%s/providers' % uri):
raise ServiceNotAvailableError('服務(wù)"%s"沒(méi)有提供者' % service)
else:
providers = self.client.get_children('%s/providers' % uri)
addrs = []
for provider in providers:
addr = str(unquote(provider)).split('/')[2]
addrs.append((str(addr).split(':')[0], str(addr).split(':')[1], str(addr)))
return addrs
def close(self):
self.client.stop()
self.client.close()
通過(guò)實(shí)例化Zookeeper對(duì)象,并調(diào)用get_service_address方法淡诗,就可以獲得指定服務(wù)的ip地址和端口了骇塘。
實(shí)現(xiàn)telnet命令調(diào)用并格式化返回值
完成服務(wù)的ip端口查找后,開(kāi)始進(jìn)行接口調(diào)用韩容,并將接口響應(yīng)格式化成易讀的形式款违。
在Python中,使用telnetlib庫(kù)來(lái)完成telnet命令操作群凶,代碼如下:
import json
import telnetlib
... # 此處省略zk的代碼
class DubboTester(telnetlib.Telnet):
class Args:
def __init__(self, service, method, params, host=None, port=0, index=0):
self.service = service
self.method = method
self.params = params
self.host = host
self.port = port
self.index = index
prompt = 'dubbo>'
coding = 'utf-8'
zk = Zookeeper()
args = None
def __init__(self, args: Args or dict):
"""
實(shí)例化DubboTester插爹,這一步會(huì)連接到指定服務(wù)的服務(wù)器
:param args: 可以傳Args對(duì)象實(shí)例,也可以傳字典數(shù)據(jù)请梢,字典最少要包含service赠尾、method、params,
params必須是list類(lèi)型毅弧,list中的元素就是方法所需的參數(shù)
"""
# dict解析成Args對(duì)象
if isinstance(args, dict):
args = self.__init_args_from_dict(args) if isinstance(args, dict) else args
address_list = self.zk.get_service_address(args.service)
if len(address_list) > 1:
# 對(duì)于多節(jié)點(diǎn)服務(wù)气嫁,默認(rèn)連接第一個(gè)節(jié)點(diǎn),可用index指定
print('——' * 43)
print('|%s服務(wù)有多個(gè)地址够坐,使用index參數(shù)指定請(qǐng)求地址寸宵,默認(rèn)index=0:|' % str(args.service).center(30, ' '))
print('-' * 86)
for i, address in enumerate(address_list):
print('| %d ==> %s:%s |' % (i, address[0], str(address[1]).ljust(80 - len(address[2]), ' ')))
print('——' * 43)
args.host = address_list[args.index][0]
args.port = address_list[args.index][1]
print('當(dāng)前連接地址: %s:%s' % (args.host, args.port))
self.args = args
super(DubboTester, self).__init__(host=args.host, port=args.port)
self.write(b'\n')
@staticmethod
def __init_args_from_dict(d):
service = d.get('service')
method = d.get('method')
params = d.get('params', [])
host = d.get('host')
port = d.get('port')
index = d.get('index', 0)
if port is not None and not isinstance(port, int):
raise TypeError('port必須是數(shù)值類(lèi)型')
elif params is not None and not isinstance(params, list):
raise TypeError('params必須是list類(lèi)型')
return DubboTester.Args(service, method, params, port, index)
@staticmethod
def __parse_args(args):
"""將參數(shù)解析成tenlet命令行調(diào)用的字符串格式"""
if isinstance(args, str) or isinstance(args, dict):
args = json.dumps(args)
elif isinstance(args, list):
tmp = ''
for arg in args:
tmp += json.dumps(arg) + ','
args = tmp[:-1]
return args
def command(self, flag, str_=""):
data = self.read_until(flag.encode())
self.write(str_.encode() + b'\n')
return data
def invoke(self):
arg = self.__parse_args(self.args.params)
command_str = "invoke {0}.{1}({2})".format(self.args.service, self.args.method, arg)
print(self.prompt, command_str)
self.command(self.prompt, command_str)
data = self.command(self.prompt, "")
# [:-6] 截取掉返回結(jié)果末尾的'dubbo>'
data = data.decode(self.coding, errors='ignore')[:-6].strip()
# 截取掉elapsed及之后的部分
if 'elapsed' in data:
data = data[:data.index('elapsed')].strip()
# 雙換行符替換為單換行符
data = data.replace('\r\n', '\n')
return data
def close(self):
if self.zk:
self.zk.close()
def run(case: dict):
try:
tester = DubboTester(case)
result = tester.invoke()
try:
# 解析結(jié)果崖面,結(jié)果縮進(jìn),支持中文
result = json.dumps(json.loads(result), ensure_ascii=False, sort_keys=True, indent=4)
except json.JSONDecodeError as e:
print(e)
print('請(qǐng)求結(jié)果:\n%s ' % result)
tester.close()
except TimeoutError:
print('連接超時(shí)梯影!')
在run函數(shù)中巫员,首先實(shí)例化一個(gè)DubboTester對(duì)象并調(diào)用invoke方法,執(zhí)行telnet命令甲棍,得到返回的結(jié)果后简识,再嘗試json解析并格式化,最后得到一個(gè)易讀的返回結(jié)果感猛。
執(zhí)行
通過(guò)上述代碼的實(shí)現(xiàn)七扰,現(xiàn)在只需要傳入一個(gè)包含service、method唱遭、params字段的字典類(lèi)型數(shù)據(jù)戳寸,就可以調(diào)用對(duì)應(yīng)的dubbo接口了:
if __name__ == '__main__':
# 輸入服務(wù)名、方法名和參數(shù)
case = {
'service': 'UserInfoRpcService',
'method': 'getUserState',
'params': [600001]
}
# 執(zhí)行
run(case)
執(zhí)行結(jié)果如下:
完整代碼:
#!/usr/bin/python3
import json
import telnetlib
from urllib.parse import unquote
from kazoo.client import KazooClient
test_env = {
'zk': '{ip}:{端口}'
}
env = test_env
class Zookeeper:
address = None
timeout = 100
client = None
service_dict = {}
def __init__(self, address=None, timeout=100):
self.address = address if address else env['zk']
self.timeout = timeout
self.client = KazooClient(hosts=self.address, timeout=timeout)
self.client.start()
service_list = self.client.get_children('dubbo')
for service in service_list:
name = str(service).split('.')[-1]
self.service_dict[name] = service
def get_service_address(self, service):
if '.' not in service:
service = self.service_dict[service]
uri = 'dubbo/%s' % service
if not self.client.exists(uri):
raise ValueError('服務(wù)%s不存在' % service)
elif not self.client.exists(uri + '/providers'):
raise ValueError('服務(wù)%s沒(méi)有提供者' % service)
else:
providers = self.client.get_children(uri + '/providers')
addrs = []
for provider in providers:
addr = str(unquote(provider)).split('/')[2]
addrs.append((str(addr).split(':')[0], str(addr).split(':')[1], str(addr)))
return addrs
def close(self):
self.client.stop()
self.client.close()
class DubboTester(telnetlib.Telnet):
class Args:
host: str
port: int
service: str
index: int
method: str
param: list
def __init__(self, host=None, port=0, service=None, index=None, method=None, param=None):
self.host = host
self.port = port
self.service = service
self.index = index
self.method = method
self.param = param
prompt = 'dubbo>'
coding = 'utf-8'
zk = None
args: Args
def __init__(self, args: Args or dict or str):
# json字符串解析成dict
args = json.loads(args) if isinstance(args, str) else args
# dict解析成Args對(duì)象
args = self.__init_args_from_dict(args) if isinstance(args, dict) else args
if args.host and args.port:
print('當(dāng)前模式:直連')
elif args.service is not None and args.service != '':
print('當(dāng)前模式:zookeeper')
self.zk = Zookeeper()
address_list = self.zk.get_service_address(args.service)
if len(address_list) > 1 and args.index is None:
index = 0
print('——' * 43)
print('|%s服務(wù)有多個(gè)地址拷泽,使用index參數(shù)指定請(qǐng)求地址疫鹊,默認(rèn)index=0:|' % str(args.service).center(30, ' '))
print('-' * 86)
for i, address in enumerate(address_list):
print('| %d ==> %s:%s |' % (i, address[0], str(address[1]).ljust(80 - len(address[2]), ' ')))
print('——' * 43)
else:
index = 0 if args.index is None else args.index
print('當(dāng)前使用地址:%s:%s' % (address_list[index][0], address_list[index][1]))
args.host = address_list[index][0]
args.port = address_list[index][1]
else:
raise KeyError('參數(shù)錯(cuò)誤')
self.args = args
super(DubboTester, self).__init__(host=args.host, port=args.port)
self.write(b'\r\n')
def command(self, str_=""):
data = self.read_until(self.prompt.encode())
self.write(str_.encode() + b"\n")
return data
@staticmethod
def __init_args_from_dict(d: dict):
host = d.get('host')
port = d.get('port')
service = d.get('service')
index = d.get('index')
method = d.get('method')
param = d.get('param')
if port is not None and not isinstance(port, int):
raise TypeError('port參數(shù)類(lèi)型錯(cuò)誤,需要int類(lèi)型司致,傳入了%s類(lèi)型' % type(port))
if param is not None and not isinstance(param, list):
raise TypeError('param參數(shù)類(lèi)型錯(cuò)誤拆吆,需要list類(lèi)型,傳入了%s類(lèi)型' % type(param))
port = port if port else 0
return DubboTester.Args(host, port, service, index, method, param)
@staticmethod
def __parse_args(args):
if isinstance(args, str) or isinstance(args, dict):
args = json.dumps(args)
elif isinstance(args, list):
tmp = ''
for param in args:
tmp += json.dumps(param) + ','
args = tmp[:-1]
return args
def invoke(self, parse=True):
"""
調(diào)用dubbo接口
:param parse: 是否解析參數(shù)脂矫,如果為false枣耀,則直接使用self.args.param來(lái)請(qǐng)求接口,如果為T(mén)rue庭再,則先將參數(shù)解析為json形式再請(qǐng)求
:return: 返回請(qǐng)求結(jié)果字符串
"""
if self.args.method is None:
raise KeyError('缺少method參數(shù)')
elif self.args.param is None:
raise KeyError('缺少param參數(shù)')
elif not isinstance(self.args.param, list):
raise TypeError('param參數(shù)必須是list類(lèi)型')
arg = self.__parse_args(self.args.param) if parse else self.args.param
command_str = "invoke {0}.{1}({2})".format(self.args.service, self.args.method, arg)
# print('\033[33m' + self.prompt, command_str)
print(self.prompt, command_str)
self.command(command_str)
data = self.command('')
# [:-6]去掉返回結(jié)果末尾的'dubbo>'
data = data.decode(self.coding, errors='ignore')[:-6].strip()
# 去掉elapsed(如果返回結(jié)果中包含elapsed捞奕,會(huì)返回不完整的結(jié)果)
if 'elapsed' in data:
data = data[:data.index('elapsed')].strip()
# 雙換行符替換為單換行符
data = data.replace('\r\n', '\n')
return data
def do(self, arg):
command_str = arg
self.command(command_str)
data = self.command('')
return data.decode(self.coding, errors='ignore').split('\r\n')[:-1]
def close(self):
if self.get_socket():
super(DubboTester, self).close()
if self.zk:
self.zk.close()
def get_service_address(self):
return self.host, self.port
def get_method_list(self, service_name, detail=False):
if detail:
method_list = self.do('ls -l %s' % service_name)
else:
method_list = self.do('ls %s' % service_name)
return [method for method in method_list]
def get_method_info(self, method='default'):
method = self.args.method if method == 'default' else method
if method is None or method not in self.get_method_list(self.args.service):
raise KeyError('%s不是%s的方法' % (method, self.args.service))
for item in self.get_method_list(self.args.service, True):
if method == str(item).split(' ')[1].split('(')[0]:
return item
def print_method_list(tester, package=None):
if package:
title = '* 服務(wù)%s的方法列表:' % package.split('.')[-1]
result = tester.get_method_list(package, True)
else:
title = '* %s:%s 的服務(wù)列表:' % tester.get_service_address()
result = [service for service in tester.do('ls')]
print('=' * 80)
print(title)
print('-' * 80)
for item in result:
print('*', item)
print('=' * 80)
def run(case: dict, parse=True, invoke=True, print_service=False, print_method=False, print_req_json=False):
try:
# 不調(diào)用接口時(shí),默認(rèn)打印服務(wù)或接口信息
if not invoke:
print_service = print_service if case['service'] else False
print_method = not print_service
tester = DubboTester(case)
print_method_list(tester) if print_service else None
print_method_list(tester, case['service']) if print_method else None
if print_req_json:
print(json.dumps('%s.%s::%s' % (case['service'], case['method'], json.dumps(case['param']))))
if invoke:
result = tester.invoke(parse)
try:
# 解析結(jié)果拄轻,結(jié)果縮進(jìn)颅围、支持中文
result = json.dumps(json.loads(result), ensure_ascii=False, sort_keys=True, indent=4)
# print('\033[32m')
except json.JSONDecodeError:
# print('\033[31m')
pass
if str(result).startswith('No such method') and case['method'] in tester.get_method_list(case['service']):
result = '方法參數(shù)錯(cuò)誤:%s\n你的傳參:%s' % (tester.get_method_info(), [type(param) for param in case['param']])
print('請(qǐng)求結(jié)果:\n%s' % result)
tester.close()
except TimeoutError:
print('連接超時(shí),請(qǐng)檢查ip和端口!')
except (KeyError, ValueError, TypeError) as e:
print(e)