Python學(xué)習(xí):基于paramiko的交互式shell

問題

我們希望在windows或者linux上恤批,可以使用ssh連接遠(yuǎn)程服務(wù)器,并且能夠執(zhí)行一般的linux命令,同時還要能夠有一定交互能力。比如需要切換root用戶溺森,輸入管理員用戶密碼等。

解決方案

Python的paramiko庫窑眯,可以支持屏积。但實現(xiàn)也有挺多問題需要考慮。主要有以下幾點內(nèi)容:

  • 命令執(zhí)行磅甩,能夠獲取命令結(jié)果
  • 命令執(zhí)行炊林,能夠支持指定的預(yù)期結(jié)果
  • 命令執(zhí)行,要有超時能力卷要,不能掛死渣聚。

用法1:

ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('root', 'xxxx')

result = ssh.exec('pwd')
print(result)

用法2:

ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('user-name', 'user-pwd')
ssh.exec('sudo su -', 'Password:')
ssh.exec('root-pwd')
ssh.exec('ls -l /var/root')

代碼實現(xiàn)如下所示:

import re
import socket
import time

import paramiko


class Ssh2Client:
    def __init__(self, host: str, port: int):
        self.__host = host
        self.__port = port
        self.__ssh = None
        self.__channel = None

    def __del__(self):
        self.__close()

    def connect(self, user: str, pwd: str) -> bool:
        self.__close()

        self.__ssh = paramiko.SSHClient()
        self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
        return True

    def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
        if not self.__channel:
            self.__channel = self.__ssh.invoke_shell()
            time.sleep(0.020)
            self.__channel.recv(4096).decode()

        if cmd.endswith('\n'):
            self.__channel.send(cmd)
        else:
            self.__channel.send(cmd + '\n')

        return self.__recv(self.__channel, end_str, timeout)

    def __recv(self, channel, end_str, timeout) -> str:
        result = ''
        out_str = ''
        max_wait_time = timeout * 1000
        channel.settimeout(0.05)
        while max_wait_time > 0:
            try:
                out = channel.recv(1024 * 1024).decode()

                if not out or out == '':
                    continue
                out_str = out_str + out

                match, result = self.__match(out_str, end_str)
                if match is True:
                    return result.strip()
                else:
                    max_wait_time -= 50
            except socket.timeout:
                max_wait_time -= 50

        raise Exception('recv data timeout')

    def __match(self, out_str: str, end_str: list) -> (bool, str):
        result = out_str
        for it in end_str:
            if result.endswith(it):
                return True, result
        return False, result

    def __close(self):
        if not self.__ssh:
            return
        self.__ssh.close()
        self.__ssh = None

討論

我們使用用法1,輸出類似如下格式(用戶名做了處理):

pwd
/Users/user1
[xxx:~] xxx%

這里有兩個問題要處理僧叉,命令和命令提示符都一并輸出了奕枝。我們需要做特殊處理。處理方法也很簡單瓶堕,第一行和最后一行直接去掉即可隘道,同時考慮命令無結(jié)果輸出的處理即可。修改exec方法如下:

    def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
        # ...
        
       # 以下是新增的代碼
        result = self.__recv(self.__channel, end_str, timeout)
        begin_pos = result.find('\r\n')
        end_pos = result.rfind('\r\n')
        if begin_pos == end_pos:
            return ''
        return result[begin_pos + 2:end_pos]

現(xiàn)狀輸出結(jié)果就正確了,這個就是我們想要的結(jié)果谭梗。

/Users/user1

偶然的機會忘晤,測試輸入的命令比較長,取得結(jié)果又不正確了激捏。比如執(zhí)行

ssh.exec('echo 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444')

輸出結(jié)果设塔,有的服務(wù)器,會返回下面這個奇怪的結(jié)果:

2222222222233333333333333333333333444444444444444
ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory

這個問題的原因远舅,主要是因為ssh2輸出時使用了窗口的概念壹置,默認(rèn)是80*24,輸入命令如果超過長度表谊,會自動換行,導(dǎo)致處理命令結(jié)果時出錯盖喷,主要修改invoke_shell函數(shù)調(diào)用方式爆办,代碼如下:

def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
    if not self.__channel:
        # width和height,可以指定輸出窗口的大小课梳。
        self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096)
        time.sleep(0.020)
        self.__channel.recv(4096).decode()
   
   # ....

命令窗口的寬度設(shè)置為4096距辆,輸出結(jié)果就對了。不過如果命令超過4096暮刃,輸出還會出問題跨算,根據(jù)實際情況,設(shè)置width的值椭懊,可以設(shè)置更大一點诸蚕。

ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory

到目前為止,已經(jīng)基本夠用了氧猬。但是還有一個問題背犯,試用ls命令返回的結(jié)果,有一些奇怪的轉(zhuǎn)義字符盅抚,比如:

?[1;34mCacheVolume?[0m  ?[1;34mbin?[0m          ?[1;34mboot?[0m         ?[1;34mdev?[0m          ?[1;34metc?[0m          ?[1;34mhome?[0m         ?[1;34mlib?[0m          ?[1;36mlinuxrc?[0m      ?[1;34mlost+found?[0m   ?[1;34mmnt?[0m          ?[1;36mnfs?[0m          ?[1;34mopt?[0m          ?[1;

這個問題的處理比較麻煩漠魏,處理了很久也不行。開始使用字符串分析處理妄均,忽略這些轉(zhuǎn)義符柱锹,但總是有點麻煩,處理不夠徹底丰包。后來終于在網(wǎng)上搜索到禁熏,這個轉(zhuǎn)義是叫ansi轉(zhuǎn)義碼,可以在term上顯示彩色邑彪。網(wǎng)上給出了正則處理方法:

    # 7-bit C1 ANSI sequences
    self.__ansi_escape = re.compile(r'''
            \x1B  # ESC
            (?:   # 7-bit C1 Fe (except CSI)
            [@-Z\\-_]
            |     # or [ for CSI, followed by a control sequence
            \[
            [0-?]*  # Parameter bytes
            [ -/]*  # Intermediate bytes
            [@-~]   # Final byte
        )
    ''', re.VERBOSE)

 def __match(self, out_str: str, end_str: list) -> (bool, str):
        result = self.__ansi_escape.sub('', out_str)

        for it in end_str:
            if result.endswith(it):
                return True, result
        return False, result

正則表達(dá)式比較復(fù)雜匹层,有興趣的同學(xué)自己分析這個re。

到目前為止,Ssh2Client已經(jīng)基本實現(xiàn)升筏,而且比較實用撑柔。可以處理絕大多數(shù)問題您访,實現(xiàn)也不復(fù)雜铅忿,比網(wǎng)上很多帖子都講得全一些,代碼可以直接拿來用灵汪。

但也不并是全部問題都能解決檀训。比如有的linux系統(tǒng),命令輸出會出現(xiàn)換行享言,中文處理峻凫,都容易會導(dǎo)致輸出結(jié)果獲取不正確。不過览露,這些基本就是字符串分析和解碼問題了荧琼。

完整的代碼如下:

import re
import socket
import time

import paramiko


class Ssh2Client:
    """
    ssh2客戶端封裝
    """

    def __init__(self, host: str, port: int):
        """
        功能描述:構(gòu)造函數(shù)

        :param host: 主機地址
        :param port: 端口信息
        """
        self.__host = host
        self.__port = port
        self.__ssh = None
        self.__channel = None

        # 7-bit C1 ANSI sequences
        self.__ansi_escape = re.compile(r'''
                \x1B  # ESC
                (?:   # 7-bit C1 Fe (except CSI)
                [@-Z\\-_]
                |     # or [ for CSI, followed by a control sequence
                \[
                [0-?]*  # Parameter bytes
                [ -/]*  # Intermediate bytes
                [@-~]   # Final byte
            )
        ''', re.VERBOSE)

    def __del__(self):
        self.__close()

    def connect(self, user: str, pwd: str) -> bool:
        """
        功能描述:連接遠(yuǎn)程主機
        :param user: 用戶名
        :param pwd:  用戶密碼
        :return: 連接成功還是失敗
        """
        self.__close()

        self.__ssh = paramiko.SSHClient()
        self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
        return True

    def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% ', '#', '$', '?', '%'), timeout=5) -> str:
        """
        功能描述:執(zhí)行命令
        :param cmd: shell命令
        :param end_str: 提示符
        :param timeout: 超時間時間
        :return: 命令執(zhí)行結(jié)果
        """
        if not self.__channel:
            self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096, height=48)
            time.sleep(0.1)
            self.__channel.recv(4096).decode()

        if cmd.endswith('\n'):
            self.__channel.send(cmd)
        else:
            self.__channel.send(cmd + '\n')

        if end_str is None:
            return self.__recv_without_end(cmd, timeout)

        result = self.__recv(end_str, timeout)
        begin_pos = result.find('\r\n')
        end_pos = result.rfind('\r\n')
        if begin_pos == end_pos:
            return ''
        return result[begin_pos + 2:end_pos]

    def __recv_without_end(self, cmd, timeout):
        """
        功能描述:接收命令執(zhí)行結(jié)果,不進(jìn)行任何比對差牛。
        :param cmd: 命令
        :param timeout:超時時間命锄,最長等待3秒
        :return: 命令執(zhí)行結(jié)果
        """
        out_str = ''
        if timeout > 3:
            timeout = 3
        max_wait_time = timeout * 1000
        self.__channel.settimeout(0.1)
        while max_wait_time > 0.0:
            try:
                start = time.perf_counter()
                out = self.__channel.recv(1024 * 1024).decode()
                out_str = out_str + out
                max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
            except socket.timeout:
                max_wait_time -= 100
        return out_str

    def __recv(self, end_str, timeout) -> str:
        """
        功能描述:根據(jù)提示符,接收命令執(zhí)行結(jié)果
        :param end_str: 預(yù)期結(jié)果結(jié)尾
        :param timeout: 超時間
        :return: 命令執(zhí)行結(jié)果偏化,去除命令輸入提示符
        """
        out_str = ''
        max_wait_time = timeout * 1000
        self.__channel.settimeout(0.05)
        while max_wait_time > 0.0:
            start = time.perf_counter()
            try:
                out = self.__channel.recv(1024 * 1024).decode()

                if not out or out == '':
                    continue
                out_str = out_str + out

                match, result = self.__match(out_str, end_str)
                if match is True:
                    return result.strip()
                else:
                    max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
            except socket.timeout:
                max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000

        raise Exception('recv data timeout')

    def __match(self, out_str: str, end_str: list) -> (bool, str):
        result = self.__ansi_escape.sub('', out_str)

        for it in end_str:
            if result.endswith(it):
                return True, result
        return False, result

    def __close(self):
        if not self.__ssh:
            return
        self.__ssh.close()
        self.__ssh = None
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脐恩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子侦讨,更是在濱河造成了極大的恐慌驶冒,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件韵卤,死亡現(xiàn)場離奇詭異只怎,居然都是意外死亡,警方通過查閱死者的電腦和手機怜俐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門身堡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拍鲤,你說我怎么就攤上這事贴谎。” “怎么了季稳?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵擅这,是天一觀的道長。 經(jīng)常有香客問我景鼠,道長仲翎,這世上最難降的妖魔是什么痹扇? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮溯香,結(jié)果婚禮上鲫构,老公的妹妹穿的比我還像新娘。我一直安慰自己玫坛,他們只是感情好结笨,可當(dāng)我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著湿镀,像睡著了一般炕吸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上勉痴,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天赫模,我揣著相機與錄音,去河邊找鬼蒸矛。 笑死瀑罗,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的莉钙。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼筛谚,長吁一口氣:“原來是場噩夢啊……” “哼磁玉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起驾讲,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤蚊伞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吮铭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體时迫,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年谓晌,在試婚紗的時候發(fā)現(xiàn)自己被綠了掠拳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡纸肉,死狀恐怖溺欧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情柏肪,我是刑警寧澤姐刁,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站烦味,受9級特大地震影響聂使,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一柏靶、第九天 我趴在偏房一處隱蔽的房頂上張望弃理。 院中可真熱鬧,春花似錦宿礁、人聲如沸案铺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽控汉。三九已至,卻和暖如春返吻,著一層夾襖步出監(jiān)牢的瞬間姑子,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工测僵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留街佑,地道東北人。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓捍靠,卻偏偏與公主長得像沐旨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子榨婆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容