什么是 Paramiko?
Paramiko 是一個(gè) Python 實(shí)現(xiàn)的 SSH 協(xié)議庫苏章,提供了 SSH 客戶端和 SSH 服務(wù)器的 API。它允許你通過 SSH 協(xié)議遠(yuǎn)程控制服務(wù)器驴党,進(jìn)行數(shù)據(jù)傳輸或在 Shell 中執(zhí)行命令等操作跪楞。
如何安裝 Paramiko?
Paramiko 可以使用 pip 安裝,命令如下:
pip install paramiko
如何使用 Paramiko 連接 SSH 服務(wù)器?
使用 Paramiko 連接 SSH 服務(wù)器可以通過如下代碼實(shí)現(xiàn):
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh.close()
如何使用 Paramiko 上傳和下載文件?
可以使用 Paramiko 的 SFTP API 上傳和下載文件:
import paramiko
# SFTP credentials
sftp_host = 'your_sftp_host'
sftp_user = 'your_sftp_username'
sftp_password = 'your_sftp_password'
# Establish SFTP connection
transport = paramiko.Transport((sftp_host, 22))
transport.connect(username=sftp_user, password=sftp_password)
sftp = transport.open_sftp()
# Download a remote file
remote_file_path = '/path/to/remote/file'
local_file_path = '/path/to/local/file'
sftp.get(remote_file_path, local_file_path)
# Upload a local file
local_file_path = '/path/to/local/file'
remote_file_path = '/path/to/remote/file'
sftp.put(local_file_path, remote_file_path)
# Close SFTP connection
sftp.close()
transport.close()
如何使用 Paramiko 執(zhí)行 sudo 命令?
可以使用 Paramiko 的 invoke_shell()
方法來模擬一個(gè)終端會(huì)話堡赔,然后執(zhí)行 sudo 命令:
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Start a shell session
shell = ssh.invoke_shell()
shell.send('sudo your_command\n')
# Wait for the password prompt
while not shell.recv_ready():
pass
shell.send('your_password\n')
# Execute command
output = shell.recv(1024)
# Print output
print(output.decode())
# Close SSH connection
ssh.close()
** 增加異常處理 **
import paramiko
import json
# SSH連接信息
hostname = "1.1.1.1"
username = "root"
password = "123456"
# 命令
command = "kubectl get cm -n test xxx-configmap -o json"
# 創(chuàng)建SSH客戶端
client = paramiko.SSHClient()
# 自動(dòng)添加主機(jī)密鑰
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接SSH服務(wù)器
try:
client.connect(hostname, username=username, password=password)
except paramiko.AuthenticationException:
print("Authentication failed, please verify your password.")
except paramiko.SSHException as sshException:
print("Unable to establish SSH connection: %s" % sshException)
except paramiko.SSHException as e:
print(e)
# 執(zhí)行命令
try:
stdin, stdout, stderr = client.exec_command(command, timeout=10)
output = stdout.read().decode()
print(output)
result = json.loads(output)
print(result["kind"])
except paramiko.SSHException as sshException:
print("Unable to execute command: %s" % sshException)
# 關(guān)閉連接
client.close()
Paramiko 的 SSH 隧道功能
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Tunnel credentials
tunnel_host = 'your_tunnel_host'
tunnel_port = your_tunnel_port
tunnel_user = 'your_tunnel_username'
tunnel_password = 'your_tunnel_password'
# Establish SSH tunnel
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(tunnel_host, username=tunnel_user, password=tunnel_password)
transport = ssh.get_transport()
local_port = 12345
remote_port = 54321
transport.request_port_forward('', local_port, remote_port)
# Establish SSH connection through the tunnel
ssh_tunnel = paramiko.SSHClient()
ssh_tunnel.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_tunnel.connect(ssh_host, username=ssh_user, password=ssh_password, sock=transport.open_channel('direct-tcpip', ('127.0.0.1', remote_port), ('127.0.0.1', local_port)))
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh_tunnel.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh_tunnel.close()
transport.close()
ssh.close()
以上就是關(guān)于 Python Paramiko 模塊的文章總結(jié)识脆。