本文主要講述官方提供的客戶端以及自己寫的增刪查改工具:
ros_tool.py?功能總匯,展示界面用了python的GUL几迄,模塊用的為? tkinter? 模塊
其他文件為單個功能文件冰评。
鏈接:https://pan.baidu.com/s/1_NIjG6gCQcnbp9Vwfi9Jyw 密碼:98ex
運(yùn)行方式:python3? ros_tool.py? ip? username? password? ? (LINUX下甲雅,裝過ros,改為自己的?ip,用戶名,密碼?直接運(yùn)行即可)
ros為python以及其他語言提供了接口脐瑰,官方給出了一個? ?TCP客戶端的例子:
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys, time, binascii, socket, select
import hashlib
class ApiRos:
? ? "Routeros api"
? ? def __init__(self, sk):
? ? ? ? self.sk = sk
? ? ? ? self.currenttag = 0
? ? def login(self, username, pwd):
? ? ? ? for repl, attrs in self.talk(["/login"]):
? ? ? ? ? ? chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
? ? ? ? md = hashlib.md5()
? ? ? ? md.update(b'\x00')
? ? ? ? md.update(pwd.encode('UTF-8'))
? ? ? ? md.update(chal)
? ? ? ? self.talk(["/login", "=name=" + username,
? ? ? ? ? ? ? ? ? "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
? ? def talk(self, words):
? ? ? ? if self.writeSentence(words) == 0: return
? ? ? ? r = []
? ? ? ? while 1:
? ? ? ? ? ? i = self.readSentence();
? ? ? ? ? ? if len(i) == 0: continue
? ? ? ? ? ? reply = i[0]
? ? ? ? ? ? attrs = {}
? ? ? ? ? ? for w in i[1:]:
? ? ? ? ? ? ? ? j = w.find('=', 1)
? ? ? ? ? ? ? ? if (j == -1):
? ? ? ? ? ? ? ? ? ? attrs[w] = ''
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? ? attrs[w[:j]] = w[j + 1:]
? ? ? ? ? ? r.append((reply, attrs))
? ? ? ? ? ? if reply == '!done': return r
? ? def writeSentence(self, words):
? ? ? ? ret = 0
? ? ? ? for w in words:
? ? ? ? ? ? self.writeWord(w)
? ? ? ? ? ? ret += 1
? ? ? ? self.writeWord('')
? ? ? ? return ret
? ? def readSentence(self):
? ? ? ? r = []
? ? ? ? while 1:
? ? ? ? ? ? w = self.readWord()
? ? ? ? ? ? if w == '': return r
? ? ? ? ? ? r.append(w)
? ? def writeWord(self, w):
? ? ? ? print(("<<< " + w))
? ? ? ? self.writeLen(len(w))
? ? ? ? self.writeStr(w)
? ? def readWord(self):
? ? ? ? ret = self.readStr(self.readLen())
? ? ? ? print((">>> " + ret))
? ? ? ? return ret
? ? def writeLen(self, l):
? ? ? ? if l < 0x80:
? ? ? ? ? ? self.writeStr(chr(l))
? ? ? ? elif l < 0x4000:
? ? ? ? ? ? l |= 0x8000
? ? ? ? ? ? self.writeStr(chr((l >> 8) & 0xFF))
? ? ? ? ? ? self.writeStr(chr(l & 0xFF))
? ? ? ? elif l < 0x200000:
? ? ? ? ? ? l |= 0xC00000
? ? ? ? ? ? self.writeStr(chr((l >> 16) & 0xFF))
? ? ? ? ? ? self.writeStr(chr((l >> 8) & 0xFF))
? ? ? ? ? ? self.writeStr(chr(l & 0xFF))
? ? ? ? elif l < 0x10000000:
? ? ? ? ? ? l |= 0xE0000000
? ? ? ? ? ? self.writeStr(chr((l >> 24) & 0xFF))
? ? ? ? ? ? self.writeStr(chr((l >> 16) & 0xFF))
? ? ? ? ? ? self.writeStr(chr((l >> 8) & 0xFF))
? ? ? ? ? ? self.writeStr(chr(l & 0xFF))
? ? ? ? else:
? ? ? ? ? ? self.writeStr(chr(0xF0))
? ? ? ? ? ? self.writeStr(chr((l >> 24) & 0xFF))
? ? ? ? ? ? self.writeStr(chr((l >> 16) & 0xFF))
? ? ? ? ? ? self.writeStr(chr((l >> 8) & 0xFF))
? ? ? ? ? ? self.writeStr(chr(l & 0xFF))
? ? def readLen(self):
? ? ? ? c = ord(self.readStr(1))
? ? ? ? if (c & 0x80) == 0x00:
? ? ? ? ? ? pass
? ? ? ? elif (c & 0xC0) == 0x80:
? ? ? ? ? ? c &= ~0xC0
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? elif (c & 0xE0) == 0xC0:
? ? ? ? ? ? c &= ~0xE0
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? elif (c & 0xF0) == 0xE0:
? ? ? ? ? ? c &= ~0xF0
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? elif (c & 0xF8) == 0xF0:
? ? ? ? ? ? c = ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? ? ? c <<= 8
? ? ? ? ? ? c += ord(self.readStr(1))
? ? ? ? return c
? ? def writeStr(self, str):
? ? ? ? n = 0;
? ? ? ? while n < len(str):
? ? ? ? ? ? r = self.sk.send(bytes(str[n:], 'UTF-8'))
? ? ? ? ? ? if r == 0: raise RuntimeError("connection closed by remote end")
? ? ? ? ? ? n += r
? ? def readStr(self, length):
? ? ? ? ret = ''
? ? ? ? while len(ret) < length:
? ? ? ? ? ? s = self.sk.recv(length - len(ret))
? ? ? ? ? ? if s == '': raise RuntimeError("connection closed by remote end")
? ? ? ? ? ? ret += s.decode('UTF-8', 'replace')
? ? ? ? return ret
def main():
? ? s = None
? ? for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
? ? ? ? af, socktype, proto, canonname, sa = res
? ? ? ? try:
? ? ? ? ? ? s = socket.socket(af, socktype, proto)
? ? ? ? except (socket.error, msg):
? ? ? ? ? ? s = None
? ? ? ? ? ? continue
? ? ? ? try:
? ? ? ? ? ? s.connect(sa)
? ? ? ? except (socket.error, msg):
? ? ? ? ? ? s.close()
? ? ? ? ? ? s = None
? ? ? ? ? ? continue
? ? ? ? break
? ? if s is None:
? ? ? ? print('could not open socket')
? ? ? ? sys.exit(1)
? ? apiros = ApiRos(s);
? ? apiros.login(sys.argv[2], sys.argv[3]);
? ? #模擬用戶自己輸入代碼進(jìn)行api控制
? ? tmpcommand=input('請輸入你想查詢的命令')
? ? inputsentence = []
? ? inputsentence.append(tmpcommand)
? ? apiros.writeSentence(inputsentence)
? ? while 1:
? ? ? ? x = apiros.readSentence()
? ? ? ? #print(x)
? ? ? ? if x == ['!done'] or x==['!re', '=status=finished']:
? ? ? ? ? ? break
? ? #導(dǎo)出ros配置到ROS本地ghg1.rsc文件
? ? # inputsentence = []
? ? #
? ? # inputsentence.append('/export')
? ? # inputsentence.append('=file=ghg1.rsc')
? ? # apiros.writeSentence(inputsentence)
? ? # while 1:
? ? #? ? x = apiros.readSentence()
? ? #? ? #print(x)
? ? #? ? if x == ['!done'] or x==['!re', '=status=finished']:
? ? #? ? ? ? break
? ? #從ros上傳文件到ftp的代碼
? ? # inputsentence = []
? ? # inputsentence.append('/tool/fetch')
? ? # inputsentence.append('=address=192.168.0.108')
? ? # inputsentence.append('=src-path=ghg1.rsc')
? ? # inputsentence.append('=user=xxxxx')
? ? # inputsentence.append('=mode=ftp')
? ? # inputsentence.append('=password=xxxxx')
? ? # inputsentence.append('=dst-path=123.rsc')
? ? # inputsentence.append('=upload=yes')
? ? # apiros.writeSentence(inputsentence)
? ? # inputsentence = []
? ? # while 1:
? ? #? ? x = apiros.readSentence()
? ? #? ? #print(x)
? ? #? ? if x == ['!done'] or x==['!re', '=status=finished']:
? ? #? ? ? ? break
? ? #刪除文件代碼
? ? # inputsentence = []
? ? # inputsentence.append('/file/remove')
? ? # inputsentence.append('=numbers=ghg1.rsc')
? ? # apiros.writeSentence(inputsentence)
? ? # while 1:
? ? #? ? x = apiros.readSentence()
? ? #? ? print(x)
? ? #? ? if x == ['!done'] or x==['!re', '=status=finished']:
? ? #? ? ? ? break
? ? ? ? ? ? #官方循環(huán)代碼,等待你輸入命令行掠剑,可以用來測試代碼命令行
? ? ? ? ? ? # while 1:
? ? ? ? ? ? #? ? r = select.select([s, sys.stdin], [], [], None)
? ? ? ? ? ? #? ? if s in r[0]:
? ? ? ? ? ? #? ? ? ? # something to read in socket, read sentence
? ? ? ? ? ? #? ? ? ? x = apiros.readSentence()
? ? ? ? ? ? #
? ? ? ? ? ? #? ? if sys.stdin in r[0]:
? ? ? ? ? ? #? ? ? ? # read line from input and strip off newline
? ? ? ? ? ? #? ? ? ? l = sys.stdin.readline()
? ? ? ? ? ? #? ? ? ? print(l)
? ? ? ? ? ? #? ? ? ? l = l[:-1]
? ? ? ? ? ? #? ? ? ? print(l)
? ? ? ? ? ? #
? ? ? ? ? ? #
? ? ? ? ? ? #? ? ? ? # if empty line, send sentence and start with new
? ? ? ? ? ? #? ? ? ? # otherwise append to input sentence
? ? ? ? ? ? #? ? ? ? if l == '':
? ? ? ? ? ? #? ? ? ? ? ? apiros.writeSentence(inputsentence)
? ? ? ? ? ? #? ? ? ? ? ? inputsentence = []
? ? ? ? ? ? #? ? ? ? else:
? ? ? ? ? ? #? ? ? ? ? ? inputsentence.append(l)
if __name__ == '__main__':
? ? main()
下面是本人寫的一個小工具朴译,可以進(jìn)行路由的增刪改查眠寿,代碼有很多不足之處盯拱,刪除功能時?容易出現(xiàn)問題例嘱,做了死循環(huán)拼卵,根據(jù)時間控制結(jié)束腋腮,性能非常低,有興趣的可以改下:
運(yùn)行方式:python3? ros_tool.py? ip? username? password? ? (LINUX下徊哑,裝過ros,改為自己的?ip,用戶名莺丑,密碼?直接運(yùn)行即可)
# !/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:yhq
import sys, time, binascii, socket, select
from datetimeimport datetime
# import posix,
import hashlib
from tkinterimport *
# import subprocess
root = Tk()
root.title("ros工具")
root.geometry('1100x800')
class ApiRos:
"Routeros api"
? ? def __init__(self, sk):
"""
初始化函數(shù)
? ? ? ? :paramsk:
"""
? ? ? ? self.sk = sk
self.currenttag =0
? ? def login(self, username, pwd):
"""
登錄函數(shù)
? ? ? ? :paramusername:
? ? ? ? :parampwd:
? ? ? ? :return:
"""
? ? ? ? for repl, attrsin self.talk(["/login"]):
chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
# 對 pwd 進(jìn)行加密
? ? ? ? md = hashlib.md5()
md.update(b'\x00')
md.update(pwd.encode('UTF-8'))
md.update(chal)
self.talk(["/login", "=name=" + username,
? ? ? ? ? ? ? ? ? "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
def talk(self, words):
"""
? ? ? ? :paramwords:
? ? ? ? :return:
"""
? ? ? ? if self.writeSentence(words) ==0:
return
? ? ? ? r = []
while 1:
# python2 中? while 1 比 while True 執(zhí)行速度快? 大概快1/3? python3中兩者基本沒區(qū)別
? ? ? ? ? ? i =self.readSentence()
if len(i) ==0:
continue
? ? ? ? ? ? reply = i[0]
attrs = {}
for win i[1:]:
j = w.find('=', 1)
if j == -1:
attrs[w] =''
? ? ? ? ? ? ? ? else:
attrs[w[:j]] = w[j +1:]
r.append((reply, attrs))
if reply =='!done':
return r
def writeSentence(self, words):
"""
? ? ? ? :paramwords:
? ? ? ? :return:
"""
? ? ? ? ret =0
? ? ? ? for win words:
self.writeWord(w)
ret +=1
? ? ? ? self.writeWord('')
return ret
def readSentence(self):
"""
? ? ? ? :return:
"""
? ? ? ? r = []
while 1:
w =self.readWord()
if w =='':
return r
r.append(w)
def writeWord(self, w):
"""
? ? ? ? :paramw:
? ? ? ? :return:
"""
? ? ? ? print(("<<< " + w))
self.writeLen(len(w))
self.writeStr(w)
def readWord(self):
"""
? ? ? ? :return:
"""
? ? ? ? ret =self.readStr(self.readLen())
print((">>> " + ret))
return ret
def writeLen(self, l):
"""
? ? ? ? :paraml:
? ? ? ? :return:
"""
? ? ? ? if l <0x80:
self.writeStr(chr(l))
elif l <0x4000:
l |=0x8000
? ? ? ? ? ? self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
elif l <0x200000:
l |=0xC00000
? ? ? ? ? ? self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
elif l <0x10000000:
l |=0xE0000000
? ? ? ? ? ? self.writeStr(chr((l >>24) &0xFF))
self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
else:
self.writeStr(chr(0xF0))
self.writeStr(chr((l >>24) &0xFF))
self.writeStr(chr((l >>16) &0xFF))
self.writeStr(chr((l >>8) &0xFF))
self.writeStr(chr(l &0xFF))
def readLen(self):
"""
? ? ? ? :return:
"""
? ? ? ? c =ord(self.readStr(1))
if (c &0x80) ==0x00:
pass
? ? ? ? elif (c &0xC0) ==0x80:
c &= ~0xC0
? ? ? ? ? ? c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
elif (c &0xE0) ==0xC0:
c &= ~0xE0
? ? ? ? ? ? c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
elif (c &0xF0) ==0xE0:
c &= ~0xF0
? ? ? ? ? ? c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
elif (c &0xF8) ==0xF0:
c =ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
c <<=8
? ? ? ? ? ? c +=ord(self.readStr(1))
return c
def writeStr(self, str):
"""
? ? ? ? :paramstr:
? ? ? ? :return:
"""
? ? ? ? n =0
? ? ? ? while n
r =self.sk.send(bytes(str[n:], 'UTF-8'))
if r ==0:
raise RuntimeError("connection closed by remote end")
n += r
def readStr(self, length):
"""
? ? ? ? :paramlength:
? ? ? ? :return:
"""
? ? ? ? ret =''
? ? ? ? while len(ret) < length:
s =self.sk.recv(length -len(ret))
if s =='':
raise RuntimeError("connection closed by remote end")
ret += s.decode('UTF-8', 'replace')
return ret
def check_interface():
"""
查看interface
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
global apiros
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
ls = ['/interface/print', '=.proplist=name']
apiros.writeSentence(ls)
ls1 = []
while 1:
x = apiros.readSentence()
if len(x) >1:
ls1.append(x[1][6:])
if x == ['!done']or x == ['!re', '=status=finished']:
break
? ? # print('可用interface名字:%s' % ls1)
? ? te.insert('1.0', " 可用interface名字: %s\n" % ls1)
te.insert('1.0', "*******************************\n")
l1 = Label(root, text="輸入interface名:")
l1.pack()# 這里的side可以賦值為LEFT? RIGHT TOP? BOTTOM
? ? global xls
xls = Entry()
xls.pack()
l2 = Label(root, text="輸入ip地址段前三位(如192.168.3):")
l2.pack()# 這里的side可以賦值為LEFT? RTGHT TOP? BOTTOM
? ? global sheet
sheet = Entry()
sheet.pack()
def add_ip():
n =0
? ? t1 = time.time()
interface = xls.get()
ip = sheet.get()
for iin range(0, 25):
address ='%s.%s/24' % (ip, i)
network ='%s.0' % ip
inputsentence = ['/ip/address/add', '=address=%s' % address, '=interface=%s' % interface,
? ? ? ? ? ? ? ? ? ? ? ? '=network=%s' % network, '=comment=%s %s' % (address, interface)]
apiros.writeSentence(inputsentence)
x = apiros.readSentence()
# print("添加address:%s成功" % address)
? ? ? ? te.insert('1.0', "添加ip:%s成功\n" % address)
n +=1
? ? t2 = time.time()
t = t2 - t1
# print('添加ip數(shù)量:%s' % n)
? ? te.insert('1.0', "添加ip數(shù)量:%s\n" % n)
print('添加ip用時:%s' % t)
te.insert('1.0', "添加ip用時:%s\n" % t)
te.insert('1.0', "*******************************\n")
def remove_ip():
"""
批量刪除ip
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量刪除ip
? ? global count
count = -1
? ? global n
n =0
? ? inputsentence = ['/ip/address/print', '=.proplist=.id,comment']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
? ? ? ? if len(x) >2:
# x[1]為ip 的 id
? ? ? ? ? ? while 1:
inputsentence = ['/ip/address/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
? ? ? ? ? ? ? ? if y == ['!done']or y == ['!re', '=status=finished']:
break
? ? ? ? ? ? # print("刪除%s成功" % x[2])
? ? ? ? ? ? te.insert('1.0', "刪除ip:%s成功\n" % x[2])
else:
pass
? ? ? ? if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_ip():
"""
執(zhí)行remove_ip
? ? :return:
"""
? ? t1 = time.time()
while 1:
remove_ip()
if n ==0:
break
? ? t2 = time.time()
t = t2 - t1
# print('剩余ip總數(shù):%s' % count)
? ? te.insert('1.0', "剩余ip總數(shù):%s成功\n" % count)
# print('刪除ip用時:%s' % t)
? ? te.insert('1.0', "刪除ip用時:%s\n" % t)
te.insert('1.0', "*******************************\n")
def backup():
"""
備份設(shè)備
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
inputsentence = []
inputsentence.append('/system/backup/save')
# 按日期生成文件名
? ? filename = datetime.now().strftime('%Y%m%d %H%M%S')
inputsentence.append(' =name=%s.backup' % filename)
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
print(x)
if x == ['!done']or x == ['!re', '=status=finished']:
# print("備份成功")
? ? ? ? ? ? te.insert('1.0', "備份:%s.backup成功\n" % filename)
break
? ? te.insert('1.0', "*******************************\n")
def check_ip():
"""
查看ip
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 查看ip
? ? inputsentence = ['/ip/address/print', ]
apiros.writeSentence(inputsentence)
count1 = -1
? ? while 1:
x = apiros.readSentence()
count1 +=1
? ? ? ? if x == ['!done']or x == ['!re', '=status=finished']:
break
? ? print('ip總數(shù):%s' % count1)
te.insert('1.0', "ip總數(shù):%s\n" % count1)
te.insert('1.0', "*******************************\n")
def check_route():
"""
查看route
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 查看route
? ? inputsentence = ['/ip/route/print', ]
apiros.writeSentence(inputsentence)
count2 = -1
? ? while 1:
x = apiros.readSentence()
count2 +=1
? ? ? ? if x == ['!done']or x == ['!re', '=status=finished']:
break
? ? # print('route總數(shù):%s' % count2)
? ? te.insert('1.0', "route總數(shù):%s\n" % count2)
te.insert('1.0', "*******************************\n")
def add_route():
"""
批量添加route
? ? :return:
"""
? ? gateway = xls.get()
ip = sheet.get()
for iin range(0, 10):
dst_address ='%s.%s.0/24' % (ip[:ip.rfind('.')], i)
# pref_src = '%s.%s.%s' % (ip, i, i)
? ? ? ? inputsentence = ['/ip/route/add', '=dst-address=%s' % dst_address, '=gateway=%s' % gateway,
? ? ? ? ? ? ? ? ? ? ? ? '=comment=%s ' % dst_address]
apiros.writeSentence(inputsentence)
x = apiros.readSentence()
# print("添加route:%s成功" % dst_address)
? ? ? ? te.insert('1.0', "添加route:%s成功\n" % dst_address)
te.insert('1.0', "*******************************\n")
def remove_route():
"""
批量刪除route
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量刪除route
? ? global count
count = -1
? ? global n
n =0
? ? inputsentence = ['/ip/route/print', '?>comment=', '=.proplist=.id,gateway']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
? ? ? ? if len(x) ==3:
inputsentence = ['/ip/route/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
? ? ? ? ? ? # print ("刪除%s成功" % x[2])
? ? ? ? ? ? te.insert('1.0', "刪除%s成功\n" % x[2])
if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_route():
"""
執(zhí)行 remove_route
? ? :return:
"""
? ? t1 = time.time()
t2 = t1 +15
? ? while 1:
remove_route()
t3 = time.time()
if t3 > t2:
te.insert('1.0', "*******************************\n")
break
def remove_re():
"""
route去重
? ? :return:
"""
? ? s =None
? ? for resin socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s =None
continue
? ? ? ? try:
s.connect(sa)
except socket.error:
s.close()
s =None
continue
break
? ? if sis None:
print('could not open socket')
sys.exit(1)
apiros = ApiRos(s)
apiros.login(sys.argv[2], sys.argv[3])
# 批量刪除重復(fù)route
? ? global count
count = -1
? ? global n
n =0
? ? inputsentence = ['/ip/route/print', '?=active=false', '=.proplist=.id,gateway']
apiros.writeSentence(inputsentence)
while 1:
x = apiros.readSentence()
count +=1
? ? ? ? if len(x) ==3:
inputsentence = ['/ip/route/remove', x[1]]
apiros.writeSentence(inputsentence)
y = apiros.readSentence()
n +=1
? ? ? ? ? ? # print("刪除%s成功" % x[2])
? ? ? ? ? ? te.insert('1.0', "去重%s成功\n" % x[2])
if x == ['!done']or x == ['!re', '=status=finished']:
break
def start_remove_re():
"""
執(zhí)行remove_re
? ? :return:
"""
? ? t1 = time.time()
t2 = t1 +15
? ? while 1:
remove_re()
t3 = time.time()
if t3 > t2:
te.insert('1.0', "*******************************\n")
break
te = Text()
te.pack()
Button(root, text="備份設(shè)備", bg='green', command=backup).pack(padx=10, pady=10, ipadx=3, side=LEFT)
Button(root, text="查看可用interface", bg='green', command=check_interface).pack(padx=5,? pady=20, side=LEFT)
Button(root, text="查看ip", bg='green', command=check_ip).pack(padx=5, pady=10,? ipadx=3, side=LEFT)
Button(root, text="查看route", bg='green', command=check_route).pack(padx=5,? pady=15, ipadx=3, side=LEFT)
Button(root, text="刪除ip", bg='#ff3300', command=start_remove_ip).pack(padx=5, pady=10,? ipadx=3, side=RIGHT)
Button(root, text="刪除route", bg='#ff3300', command=start_remove_route).pack(padx=5,? pady=10, ipadx=3, side=RIGHT)
Button(root, text="route去重", bg='#ff3300', command=start_remove_re).pack(padx=5,? pady=10, ipadx=3, side=RIGHT)
Button(root, text="增加ip", bg='#9900ff', command=add_ip).pack(padx=5, pady=10,? ipadx=3,? side=RIGHT)
Button(root, text="增加route", bg='#9900ff', command=add_route).pack(padx=5, pady=10, ipadx=3, side=RIGHT)
root.mainloop()