前言
之前學(xué)習(xí)python2的時(shí)候?qū)懥硕丝趻呙璧囊恍┠_本有鹿,寫(xiě)的不怎么樣。抽空又重新捋了一遍,使用python3從簡(jiǎn)單的單線程單個(gè)IP到編寫(xiě)多線程枫振、隊(duì)列加線程。也算是以端口掃描為目標(biāo)又重新學(xué)了一遍python3萤彩。
端口掃描
V0.1
python3寫(xiě)的單線程掃描自定義端口粪滤。也是最簡(jiǎn)單的探測(cè)端口開(kāi)放狀態(tài)的腳本。
注解:
1.s.settimeout(0.1) 設(shè)置了超時(shí)時(shí)間乒疏;
2.用最簡(jiǎn)單的connect_ex方法進(jìn)行socket連接额衙,成功返回0,失敗返回error怕吴;
3.使用sys模塊來(lái)接收參數(shù)窍侧;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import sys
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
# 設(shè)置超時(shí)時(shí)間
if s.connect_ex((host, port)) == 0:
# 與connect一樣,成功返回0.失敗返回error
print(port,'open')
s.close()
if __name__ == '__main__':
if(len(sys.argv) == 2):
print('User: please port_scan.py IP')
host = sys.argv[1]
port = [22,23,25,135,137,445,3306,1541,3389,1080,9200]
for p in port:
scan(host,p)
else:
print('User: please port_scan.py IP')
V0.2
python3寫(xiě)的單線程掃描端口段转绷。適用于探測(cè)一些web資產(chǎn)伟件,比如8000-10000之間的端口。
注解:
相比較V0.1腳本出現(xiàn)了一下變動(dòng):
1.使用內(nèi)置的map函數(shù)议经,對(duì)每一個(gè)元素調(diào)用function函數(shù)斧账。即每一個(gè)port調(diào)用scan函數(shù)谴返;
2.返回的結(jié)果在python2位list類(lèi)型,python3手動(dòng)添加list咧织;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import sys
def scan(port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((sys.argv[1], port)) == 0:
print(port,'open')
s.close()
if __name__ == '__main__':
if(len(sys.argv) == 4):
print('User: please port_scan.py IP start_port end_port')
start_port = int(sys.argv[2])
end_port = int(sys.argv[3])
list(map(scan,range(start_port,end_port)))
# python2 可以直接map返回list嗓袱,python3 需要手動(dòng)list
else:
print('User: please port_scan.py IP start_port end_port')
V0.2.1
python3寫(xiě)的單線程掃描端口段。
注解:
1.相比較V0.2腳本习绢,引入了optparse 模塊用于處理命令行參數(shù)渠抹;
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import socket
import sys
import optparse
# optparse 模塊用于處理命令行參數(shù)
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
print(port,'open')
s.close()
if __name__ == '__main__':
parser = optparse.OptionParser("port_scan.py -H host -s start_port -e end_port")
parser.add_option("-H",action="store",type="string",dest="host",default="127.0.0.1",help="ip")
parser.add_option("-s",action="store",type="int",dest="start_port",default=0,help="start_port")
parser.add_option("-e",action="store",type="int",dest="end_port",default=512,help="end_port")
(options,args) = parser.parse_args()
#print(options)
host = options.host
start_port = options.start_port
end_port = options.end_port
for p in range(start_port,end_port):
scan(host,p)
V1.0
python3寫(xiě)的多線程掃描自定義端口。
適用于更快更高效的快速掃描一些端口闪萄。
注解:
1.首先使用了threading模塊多線程梧却,將port進(jìn)行循環(huán)添加線程中,有多少了port就有多少線程數(shù)败去。在port_list列表的端口掃描夠了放航;
2.沒(méi)有加Lock鎖;
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
print(port,'open')
s.close()
def thread():
thread_list = []
port_list = [21,22,23,25,80,135,137,139,445,1433,1521,3306,3389,8080,9015]
for port in port_list:
t = threading.Thread(target = scan,args = (host,port))
thread_list.append(t)
for t in thread_list:
t.start() #啟動(dòng)線程
for t in thread_list:
t.join() #等待到線程終止
if __name__ == '__main__':
if(len(sys.argv) == 2):
print('User: please port_scan_thread.py IP')
host = sys.argv[1]
thread()
else:
print('User: please port_scan_thread.py IP')
V1.0.1
python3 寫(xiě)的多線程自定義端口掃描圆裕。這次使用繼承threading類(lèi)的方式來(lái)編寫(xiě)广鳍。
注解:
1.在對(duì)run方法進(jìn)行重寫(xiě)的時(shí)候,python2 使用的apply()函數(shù)葫辐,python3使用星號(hào)標(biāo)記搜锰。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import sys
import socket
class Mythread(threading.Thread):
def __init__(self,fun,args):
# 初始化
threading.Thread.__init__(self)
self.fun = fun
self.args = args
def run(self):
# 重寫(xiě)run方法
self.fun(*self.args)
# python2 apply(self.fun,self.args)
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
ip_port = str(host)+':'+str(port)+'\t'+'open'
print(ip_port)
save_result('ip_scan_port_result.txt',ip_port)
s.close()
def save_result(filename,ip_port):
with open(filename,'a+') as f:
f.write(str(ip_port)+'\n')
def thread(host):
port_list = [21,22,23,25,80,135,137,139,445,1433,1521,3306,3389,8080,9015]
thread_list = []
for port in port_list:
t = Mythread(scan,(host,port))
thread_list.append(t)
for t in thread_list:
t.start()
for t in thread_list:
t.join()
if __name__=='__main__':
if(len(sys.argv) == 2):
print('User: please port_scan_thread.py IP')
host = sys.argv[1]
thread(host)
else:
print('User: please port_scan_thread.py IP')
V1.1
python3寫(xiě)的多線程掃描指定端口范圍。
適用于更快更高效的快速掃描端口耿战,但是端口范圍不易過(guò)大。
注解:
1.使用threading模塊焊傅,將port進(jìn)行循環(huán)添加線程中剂陡,有多少了port就有多少線程數(shù)。端口范圍不易過(guò)大狐胎,否則意味著線程過(guò)高鸭栖。雖然加了Lock鎖,當(dāng)線程過(guò)高的時(shí)候握巢,程序依然會(huì)崩潰;
2.注意加鎖的位置,在connect_ex之后芥映;
3.也加了時(shí)間方便比較單線程崇败;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
lock = threading.Lock()
# 創(chuàng)建鎖
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
if lock.acquire(): #鎖定
print(port,'open')
lock.release() #釋放鎖
s.close()
def thread(port_list):
thread_list = []
for port in port_list:
t = threading.Thread(target = scan,args = (host,port))
thread_list.append(t)
for t in thread_list:
t.start()
for t in thread_list:
t.join()
if __name__ == '__main__':
if(len(sys.argv) == 4):
print('User: please port_scan_thread.py IP start_port end_port')
host = sys.argv[1]
start_port = int(sys.argv[2])
end_port = int(sys.argv[3])
port_list = list(range(start_port,end_port))
t1 = time.time()
thread(port_list)
print('intime:',time.time()-t1)
else:
print('User: please port_scan_thread.py IP start_port end_port')
V1.2
python3 寫(xiě)的多線程掃描端口段。
注解
1.相比較V1.1歌焦,添加了隊(duì)列解決線程過(guò)高導(dǎo)致程序崩潰的問(wèn)題飞几;
2.利用隊(duì)列先入先出;
3.設(shè)置線程數(shù)為50独撇;
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import threading
import socket
import sys
import time
import queue
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
print(port,'open')
s.close()
def worker(host):
while not q.empty():
port = q.get() # 取出隊(duì)列
try:
scan(host,port)
finally:
q.task_done()
def thread(host):
thread_list = []
for t in range(50):
t = threading.Thread(target = worker,args = (host,))
thread_list.append(t)
for x in thread_list:
x.start()
for x in thread_list:
x.join()
if __name__ == '__main__':
if(len(sys.argv) == 4):
print('User: please port_scan_thread.py IP start_port end_port')
host = sys.argv[1]
start_port = int(sys.argv[2])
end_port = int(sys.argv[3])
t1 = time.time()
q = queue.Queue() # 創(chuàng)建隊(duì)列的對(duì)象
for port in range(start_port,end_port):
q.put(port) #put()方法將值port放入隊(duì)列
thread(host)
q.join()
print("end time:",time.time()-t1)
else:
print('User: please port_scan_thread.py IP start_port end_port')
V1.2.1
python3 寫(xiě)的多線程掃描端口段屑墨。
注解
1.只是把thread線程設(shè)置為自定義參數(shù)躁锁。當(dāng)然不建議自定義的線程過(guò)高;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
import queue
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
print(port,'open')
s.close()
def worker(host):
while not q.empty():
port = q.get() # 取出隊(duì)列
try:
scan(host,port)
finally:
q.task_done()
def thread(host,threads):
thread_list = []
for t in range(threads):
t = threading.Thread(target = worker,args = (host,))
thread_list.append(t)
for x in thread_list:
x.start()
for x in thread_list:
x.join()
if __name__ == '__main__':
if(len(sys.argv) == 5):
print('User: please port_scan_thread.py IP start_port end_port threads')
host = sys.argv[1]
start_port = int(sys.argv[2])
end_port = int(sys.argv[3])
threads = int(sys.argv[4])
t1 = time.time()
q = queue.Queue() # 創(chuàng)建隊(duì)列的對(duì)象
for port in range(start_port,end_port):
q.put(port) #put()方法將值port放入隊(duì)列
thread(host,threads)
q.join()
print("end time:",time.time()-t1)
else:
print('User: please port_scan_thread.py IP start_port end_port threads')
V1.3
python3 寫(xiě)的多線程掃描多個(gè)端口卵史。
注解
1.增加了輸出端口战转,保存在ip_scan_port_result.txt 文件中
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
import queue
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
ip_port = str(host)+':'+str(port)+'\t'+'open'
print(ip_port)
save_result('ip_scan_port_result.txt',ip_port)
s.close()
def save_result(filename,ip_port):
with open(filename,'a+') as f:
f.write(str(ip_port)+'\n')
def worker(host):
while not q.empty():
port = q.get() # 取出隊(duì)列
try:
scan(host,port)
finally:
q.task_done()
def thread(host,threads):
thread_list = []
for t in range(threads):
t = threading.Thread(target = worker,args = (host,))
thread_list.append(t)
for x in thread_list:
x.start()
for x in thread_list:
x.join()
if __name__ == '__main__':
if(len(sys.argv) == 5):
print('User: please port_scan_thread.py IP start_port end_port threads')
host = sys.argv[1]
start_port = int(sys.argv[2])
end_port = int(sys.argv[3])
threads = int(sys.argv[4])
t1 = time.time()
q = queue.Queue() # 創(chuàng)建隊(duì)列的對(duì)象
for port in range(start_port,end_port):
q.put(port) #put()方法將值port放入隊(duì)列
thread(host,threads)
q.join()
print("end time:",time.time()-t1)
else:
print('User: please port_scan_thread.py IP start_port end_port threads')
V1.4
python3寫(xiě)的多線程全端口掃描。
注解
1.默認(rèn)也是50個(gè)線程以躯;
2.把start_port匣吊、end_port寫(xiě)死就行;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import socket
import sys
import time
import queue
def scan(host,port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex((host, port)) == 0:
print(port,'open')
s.close()
def worker(host):
while not q.empty():
port = q.get() # 取出隊(duì)列
try:
scan(host,port)
finally:
q.task_done()
def thread(host):
thread_list = []
for t in range(50):
t = threading.Thread(target = worker,args = (host,))
thread_list.append(t)
for x in thread_list:
x.start()
for x in thread_list:
x.join()
if __name__ == '__main__':
if(len(sys.argv) == 2):
print('User: please port_scan_thread.py IP')
host = sys.argv[1]
t1 = time.time()
q = queue.Queue() # 創(chuàng)建隊(duì)列的對(duì)象
for port in range(1,65535):
q.put(port) #put()方法將值port放入隊(duì)列
thread(host)
q.join()
print("end time:",time.time()-t1)
else:
print('User: please port_scan_thread.py IP')
參考資料
https://github.com/windard/Port_Scan
https://blog.csdn.net/handsomekang/article/details/39826729