問題
我們希望在windows或者linux上恤批,可以使用ssh連接遠(yuǎn)程服務(wù)器,并且能夠執(zhí)行一般的linux命令,同時還要能夠有一定交互能力。比如需要切換root用戶溺森,輸入管理員用戶密碼等。
解決方案
Python的paramiko庫窑眯,可以支持屏积。但實現(xiàn)也有挺多問題需要考慮。主要有以下幾點內(nèi)容:
- 命令執(zhí)行磅甩,能夠獲取命令結(jié)果
- 命令執(zhí)行炊林,能夠支持指定的預(yù)期結(jié)果
- 命令執(zhí)行,要有超時能力卷要,不能掛死渣聚。
用法1:
ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('root', 'xxxx')
result = ssh.exec('pwd')
print(result)
用法2:
ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('user-name', 'user-pwd')
ssh.exec('sudo su -', 'Password:')
ssh.exec('root-pwd')
ssh.exec('ls -l /var/root')
代碼實現(xiàn)如下所示:
import re
import socket
import time
import paramiko
class Ssh2Client:
def __init__(self, host: str, port: int):
self.__host = host
self.__port = port
self.__ssh = None
self.__channel = None
def __del__(self):
self.__close()
def connect(self, user: str, pwd: str) -> bool:
self.__close()
self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
return True
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
if not self.__channel:
self.__channel = self.__ssh.invoke_shell()
time.sleep(0.020)
self.__channel.recv(4096).decode()
if cmd.endswith('\n'):
self.__channel.send(cmd)
else:
self.__channel.send(cmd + '\n')
return self.__recv(self.__channel, end_str, timeout)
def __recv(self, channel, end_str, timeout) -> str:
result = ''
out_str = ''
max_wait_time = timeout * 1000
channel.settimeout(0.05)
while max_wait_time > 0:
try:
out = channel.recv(1024 * 1024).decode()
if not out or out == '':
continue
out_str = out_str + out
match, result = self.__match(out_str, end_str)
if match is True:
return result.strip()
else:
max_wait_time -= 50
except socket.timeout:
max_wait_time -= 50
raise Exception('recv data timeout')
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = out_str
for it in end_str:
if result.endswith(it):
return True, result
return False, result
def __close(self):
if not self.__ssh:
return
self.__ssh.close()
self.__ssh = None
討論
我們使用用法1,輸出類似如下格式(用戶名做了處理):
pwd
/Users/user1
[xxx:~] xxx%
這里有兩個問題要處理僧叉,命令和命令提示符都一并輸出了奕枝。我們需要做特殊處理。處理方法也很簡單瓶堕,第一行和最后一行直接去掉即可隘道,同時考慮命令無結(jié)果輸出的處理即可。修改exec方法如下:
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
# ...
# 以下是新增的代碼
result = self.__recv(self.__channel, end_str, timeout)
begin_pos = result.find('\r\n')
end_pos = result.rfind('\r\n')
if begin_pos == end_pos:
return ''
return result[begin_pos + 2:end_pos]
現(xiàn)狀輸出結(jié)果就正確了,這個就是我們想要的結(jié)果谭梗。
/Users/user1
偶然的機會忘晤,測試輸入的命令比較長,取得結(jié)果又不正確了激捏。比如執(zhí)行
ssh.exec('echo 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444')
輸出結(jié)果设塔,有的服務(wù)器,會返回下面這個奇怪的結(jié)果:
2222222222233333333333333333333333444444444444444
ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
這個問題的原因远舅,主要是因為ssh2輸出時使用了窗口的概念壹置,默認(rèn)是80*24,輸入命令如果超過長度表谊,會自動換行,導(dǎo)致處理命令結(jié)果時出錯盖喷,主要修改invoke_shell函數(shù)調(diào)用方式爆办,代碼如下:
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
if not self.__channel:
# width和height,可以指定輸出窗口的大小课梳。
self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096)
time.sleep(0.020)
self.__channel.recv(4096).decode()
# ....
命令窗口的寬度設(shè)置為4096距辆,輸出結(jié)果就對了。不過如果命令超過4096暮刃,輸出還會出問題跨算,根據(jù)實際情況,設(shè)置width的值椭懊,可以設(shè)置更大一點诸蚕。
ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
到目前為止,已經(jīng)基本夠用了氧猬。但是還有一個問題背犯,試用ls命令返回的結(jié)果,有一些奇怪的轉(zhuǎn)義字符盅抚,比如:
?[1;34mCacheVolume?[0m ?[1;34mbin?[0m ?[1;34mboot?[0m ?[1;34mdev?[0m ?[1;34metc?[0m ?[1;34mhome?[0m ?[1;34mlib?[0m ?[1;36mlinuxrc?[0m ?[1;34mlost+found?[0m ?[1;34mmnt?[0m ?[1;36mnfs?[0m ?[1;34mopt?[0m ?[1;
這個問題的處理比較麻煩漠魏,處理了很久也不行。開始使用字符串分析處理妄均,忽略這些轉(zhuǎn)義符柱锹,但總是有點麻煩,處理不夠徹底丰包。后來終于在網(wǎng)上搜索到禁熏,這個轉(zhuǎn)義是叫ansi轉(zhuǎn)義碼,可以在term上顯示彩色邑彪。網(wǎng)上給出了正則處理方法:
# 7-bit C1 ANSI sequences
self.__ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = self.__ansi_escape.sub('', out_str)
for it in end_str:
if result.endswith(it):
return True, result
return False, result
正則表達(dá)式比較復(fù)雜匹层,有興趣的同學(xué)自己分析這個re。
到目前為止,Ssh2Client已經(jīng)基本實現(xiàn)升筏,而且比較實用撑柔。可以處理絕大多數(shù)問題您访,實現(xiàn)也不復(fù)雜铅忿,比網(wǎng)上很多帖子都講得全一些,代碼可以直接拿來用灵汪。
但也不并是全部問題都能解決檀训。比如有的linux系統(tǒng),命令輸出會出現(xiàn)換行享言,中文處理峻凫,都容易會導(dǎo)致輸出結(jié)果獲取不正確。不過览露,這些基本就是字符串分析和解碼問題了荧琼。
完整的代碼如下:
import re
import socket
import time
import paramiko
class Ssh2Client:
"""
ssh2客戶端封裝
"""
def __init__(self, host: str, port: int):
"""
功能描述:構(gòu)造函數(shù)
:param host: 主機地址
:param port: 端口信息
"""
self.__host = host
self.__port = port
self.__ssh = None
self.__channel = None
# 7-bit C1 ANSI sequences
self.__ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
def __del__(self):
self.__close()
def connect(self, user: str, pwd: str) -> bool:
"""
功能描述:連接遠(yuǎn)程主機
:param user: 用戶名
:param pwd: 用戶密碼
:return: 連接成功還是失敗
"""
self.__close()
self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
return True
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% ', '#', '$', '?', '%'), timeout=5) -> str:
"""
功能描述:執(zhí)行命令
:param cmd: shell命令
:param end_str: 提示符
:param timeout: 超時間時間
:return: 命令執(zhí)行結(jié)果
"""
if not self.__channel:
self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096, height=48)
time.sleep(0.1)
self.__channel.recv(4096).decode()
if cmd.endswith('\n'):
self.__channel.send(cmd)
else:
self.__channel.send(cmd + '\n')
if end_str is None:
return self.__recv_without_end(cmd, timeout)
result = self.__recv(end_str, timeout)
begin_pos = result.find('\r\n')
end_pos = result.rfind('\r\n')
if begin_pos == end_pos:
return ''
return result[begin_pos + 2:end_pos]
def __recv_without_end(self, cmd, timeout):
"""
功能描述:接收命令執(zhí)行結(jié)果,不進(jìn)行任何比對差牛。
:param cmd: 命令
:param timeout:超時時間命锄,最長等待3秒
:return: 命令執(zhí)行結(jié)果
"""
out_str = ''
if timeout > 3:
timeout = 3
max_wait_time = timeout * 1000
self.__channel.settimeout(0.1)
while max_wait_time > 0.0:
try:
start = time.perf_counter()
out = self.__channel.recv(1024 * 1024).decode()
out_str = out_str + out
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
except socket.timeout:
max_wait_time -= 100
return out_str
def __recv(self, end_str, timeout) -> str:
"""
功能描述:根據(jù)提示符,接收命令執(zhí)行結(jié)果
:param end_str: 預(yù)期結(jié)果結(jié)尾
:param timeout: 超時間
:return: 命令執(zhí)行結(jié)果偏化,去除命令輸入提示符
"""
out_str = ''
max_wait_time = timeout * 1000
self.__channel.settimeout(0.05)
while max_wait_time > 0.0:
start = time.perf_counter()
try:
out = self.__channel.recv(1024 * 1024).decode()
if not out or out == '':
continue
out_str = out_str + out
match, result = self.__match(out_str, end_str)
if match is True:
return result.strip()
else:
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
except socket.timeout:
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
raise Exception('recv data timeout')
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = self.__ansi_escape.sub('', out_str)
for it in end_str:
if result.endswith(it):
return True, result
return False, result
def __close(self):
if not self.__ssh:
return
self.__ssh.close()
self.__ssh = None