PyPortScanner
python多線程端口掃描器。
輸出示例:
Github
此端口掃描器的源碼,文檔及詳細(xì)調(diào)用方法見(jiàn)Github PythonPortScanner by Yaokai。
背景
有時(shí)候杀捻,在進(jìn)行網(wǎng)絡(luò)相關(guān)的研究的時(shí)候闺骚,我們需要執(zhí)行一些有目的的參數(shù)測(cè)量。而端口掃描就是其中比較普遍也比較重要的一項(xiàng)称诗。所謂的端口掃描,就是指通過(guò)TCP握手或者別的方式來(lái)判別一個(gè)給定主機(jī)上的某些端口是否處理開(kāi)放头遭,或者說(shuō)監(jiān)聽(tīng)的狀態(tài)≡⒚猓現(xiàn)有的使用比較廣泛的端口掃描工具是nmap。毋庸置疑计维,nmap是一款非常強(qiáng)大且易于使用的軟件袜香。但nmap是一款運(yùn)行于terminal中的軟件,有時(shí)在別的代碼中調(diào)用并不是很方便鲫惶,甚至沒(méi)有相應(yīng)的庫(kù)蜈首。另外,nmap依賴的其他庫(kù)較多,在較老的系統(tǒng)中可能無(wú)法使用較新的nmap欢策,這樣會(huì)造成掃描的不便吆寨。另外,nmap在掃描時(shí)需要root權(quán)限踩寇∽那澹基于這個(gè)原因,我用python2.7自帶的庫(kù)開(kāi)發(fā)了一款高效的多線程端口掃描器來(lái)滿足使用需要姑荷。
具體實(shí)現(xiàn)
I. 利用TCP握手連接掃描一個(gè)給定的(ip,port)
地址對(duì)
為了實(shí)現(xiàn)端口掃描盒延,我們首先明白如何使用python socket
與給定的(ip, port)
進(jìn)行TCP握手。為了完成TCP握手鼠冕,我們需要先初始化一個(gè)TCP socket添寺。在python
中新建一個(gè)TCP socket的代碼如下:
TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #(1)
TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2)
TCP_sock.settimeout(delay) #(3)
其中(1)
是初始化socket的代碼,socket.AF_INTE
參數(shù)表示IPv4 socket
懈费,socket.SOCK_STREAM
參數(shù)表示TCP socket
计露。這樣我們就初始化了一個(gè)使用IPv4,TCP
協(xié)議的socket憎乙。
(2)
使用了socket.setsockopt()
來(lái)設(shè)置socket的另一些參數(shù)票罐。socket.SOL_SOCKET
指定當(dāng)前socket將使用setsockopt()
中后面的參數(shù)。socket.SO_REUSEPORT
表明當(dāng)前socket使用了可復(fù)用端口的設(shè)置泞边。socket.SO_REUSEPORT
具體含義可以參考我的另一篇文章该押。
(3)
將socket的連接超時(shí)時(shí)間設(shè)置為delay
變量所對(duì)應(yīng)的時(shí)間(以秒為單位)。這么做是為了防止我們?cè)谝粋€(gè)連接上等待太久阵谚。
了解了如何新建一個(gè)socket蚕礼,我們就可以開(kāi)始對(duì)給定的(ip,port)
對(duì)進(jìn)行TCP連接。代碼如下:
try:
result = TCP_sock.connect_ex((ip, int(port_number)))
# If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
if result == 0:
output[port_number] = 'OPEN'
else:
output[port_number] = 'CLOSE'
TCP_sock.close()
except socket.error as e:
output[port_number] = 'CLOSE'
pass
因?yàn)檫@是一個(gè)I/O操作梢什,為了處理可能出現(xiàn)的異常奠蹬,我們需要在try,except
塊處理這部分操作。其次嗡午,我們根據(jù)socket.connect_ex()
方法連接目標(biāo)地址囤躁,通過(guò)該方法返回的狀態(tài)代碼來(lái)判斷連接是否成功。該方法返回0
代表連接成功荔睹。所以當(dāng)返回值為0
的時(shí)候?qū)?dāng)前端口記錄為打開(kāi)狀態(tài)狸演。反之記錄為關(guān)閉。另外僻他,當(dāng)連接操作出現(xiàn)異常的時(shí)候严沥,我們也將端口記錄為關(guān)閉狀態(tài),因?yàn)槠洳⒉荒鼙怀晒B接(可能因?yàn)榉阑饓蛘邤?shù)據(jù)包被過(guò)濾等原因)中姜。
需要注意的是,在連接完成后我們一定要調(diào)用socket.close()
方法來(lái)關(guān)閉與遠(yuǎn)程端口之間的TCP連接。否則的話我們的掃描操作可能會(huì)引起所謂的TCP連接懸掛問(wèn)題(Hanging TCP connection)丢胚。
總結(jié)起來(lái)翩瓜,TCP握手掃描的整體代碼如下:
"""
Perform status checking for a given port on a given ip address using TCP handshake
Keyword arguments:
ip -- the ip address that is being scanned
port_number -- the port that is going to be checked
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
"""
def __TCP_connect(ip, port_number, delay, output):
# Initilize the TCP socket object
TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
TCP_sock.settimeout(delay)
try:
result = TCP_sock.connect_ex((ip, int(port_number)))
# If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
if result == 0:
output[port_number] = 'OPEN'
else:
output[port_number] = 'CLOSE'
TCP_sock.close()
except socket.error as e:
output[port_number] = 'CLOSE'
pass
II. 多線程掃描端口
單線程掃描雖然邏輯簡(jiǎn)單,但無(wú)疑是及其低效的携龟。因?yàn)樵趻呙柽^(guò)程中要進(jìn)行大量的數(shù)據(jù)包的發(fā)送和接受兔跌,所以這是一個(gè)I/O密集型的操作。如果只是用單線程進(jìn)行掃描的話峡蟋,程序會(huì)在等待回復(fù)的過(guò)程中浪費(fèi)大量的時(shí)間坟桅。因此多線程的操作是很有必要的。這里蕊蝗,一個(gè)很自然的思路就是為每一個(gè)端口單獨(dú)開(kāi)一個(gè)線程進(jìn)行掃描仅乓。
在這里我們將需要掃描的端口列表定為從Nmap中得到的前1000個(gè)使用頻率最高的端口:
__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563,...]
完整的端口表見(jiàn)top 1K commonly used ports
對(duì)于一個(gè)給定的ip地址,掃描的過(guò)程是這樣的:
- 取出一個(gè)端口
- 新建一條線程蓬戚,利用
__TCP_connect()
函數(shù)對(duì)該(ip,port)
進(jìn)行連接操作夸楣。 - 調(diào)用
thread.start()
和thread.join()
方法,使掃描的子線程開(kāi)始工作并且命令主線程等待子線程死亡后再結(jié)束子漩。 - 重復(fù)這個(gè)過(guò)程直到所有的端口都被掃描過(guò)豫喧。
根據(jù)以上思路,多線程掃描的代碼如下:
"""
Open multiple threads to perform port scanning
Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
"""
def __scan_ports_helper(ip, delay, output):
'''
Multithreading port scanning
'''
port_index = 0
while port_index < len(__port_list):
# Ensure that the number of cocurrently running threads does not exceed the thread limit
while threading.activeCount() < __thread_limit and port_index < len(__port_list):
# Start threads
thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))
thread.start()
# lock the thread until all threads complete
thread.join()
port_index = port_index + 1
其中__thread_limit參數(shù)是用來(lái)限制線程數(shù)目的幢泼。output是一個(gè)字典紧显,以(port: status)
的形式保存了掃描的結(jié)果。
thread.join()
保證了主線程只有在所有子線程都結(jié)束之后才會(huì)繼續(xù)執(zhí)行缕棵,從而確保了我們一定會(huì)掃描全部的端口孵班。
III. 多線程掃描多個(gè)網(wǎng)站
在多線程掃描端口的同時(shí),如果我們能夠多線程掃描多個(gè)網(wǎng)站挥吵,那么掃描的效率還將進(jìn)一步提高重父。為了達(dá)到這個(gè)目的,我們需要另一個(gè)線程去管理一個(gè)網(wǎng)站對(duì)應(yīng)的對(duì)其端口進(jìn)行掃描的所有子線程忽匈。
除此之外房午,在這種情況下,我們必須刪去__scan_ports_helper()
中的thread.join()
丹允。否則主線程就會(huì)被端口掃描子線程阻塞郭厌,我們也就無(wú)法多線程掃描多個(gè)網(wǎng)站了。
在不使用join()
的情況下雕蔽,我們?nèi)绾未_保一個(gè)網(wǎng)站的掃描線程只有在完成對(duì)其全部端口的掃描之后才會(huì)返回呢折柠?這里我使用的方法是檢測(cè)output
字典的長(zhǎng)度。因?yàn)樵谌繏呙柰瓿珊螅?code>output的長(zhǎng)度一定與__port_list
的長(zhǎng)度一致批狐。
改變后的代碼如下:
def __scan_ports_helper(ip, delay, output):
'''
Multithreading port scanning
'''
port_index = 0
while port_index < len(__port_list):
# Ensure that the number of cocurrently running threads does not exceed the thread limit
while threading.activeCount() < __thread_limit and port_index < len(__port_list):
# Start threads
thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))
thread.start()
port_index = port_index + 1
while (len(output) < len(self.target_ports)):
continue
根據(jù)以上掃描線程的代碼扇售,端口掃描的管理線程的代碼如下所示:
"""
Controller of the __scan_ports_helper() function
Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
message -- the message that is going to be included in the scanning packets, in order to prevent
ethical problem (default: '')
"""
def __scan_ports(websites, output_ip, delay):
scan_result = {}
for website in websites:
website = str(website)
scan_result[website] = {}
thread = threading.Thread(target = __scan_ports_helper, args = (ip, delay, scan_result[website]))
thread.start()
# lock the script until all threads complete
thread.join()
return scan_result
至此前塔,我們就完成了一個(gè)多線程端口掃描器的全部代碼。
IV. 總結(jié)承冰!利用這些代碼掃描給定網(wǎng)站并輸出結(jié)果
處于輸出方便的考慮华弓,我并沒(méi)有使用多線程掃描多個(gè)網(wǎng)站,同時(shí)對(duì)每個(gè)網(wǎng)站多線程掃描多個(gè)端口的方法困乒。在這個(gè)例子中只進(jìn)行了多線程掃描端口寂屏,但同時(shí)只掃描一個(gè)網(wǎng)站的操作。整合起來(lái)的代碼如下:
import sys
import subprocess
import socket
import threading
import time
class PortScanner:
# default ports to be scanned
# or put any ports you want to scan here!
__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563]
# default thread number limit
__thread_limit = 1000
# default connection timeout time inseconds
__delay = 10
"""
Constructor of a PortScanner object
Keyword arguments:
target_ports -- the list of ports that is going to be scanned (default self.__port_list)
"""
def __init__(self, target_ports = None):
# If target ports not given in the arguments, use default ports
# If target ports is given in the arguments, use given port lists
if target_ports is None:
self.target_ports = self.__port_list
else:
self.target_ports = target_ports
"""
Return the usage information for invalid input host name.
"""
def __usage(self):
print('python Port Scanner v0.1')
print('please make sure the input host name is in the form of "something.com" or "http://something.com!"\n')
"""
This is the function need to be called to perform port scanning
Keyword arguments:
host_name -- the hostname that is going to be scanned
message -- the message that is going to be included in the scanning packets, in order to prevent
ethical problem (default: '')
"""
def scan(self, host_name, message = ''):
if 'http://' in host_name or 'https://' in host_name:
host_name = host_name[host_name.find('://') + 3 : ]
print('*' * 60 + '\n')
print('start scanning website: ' + str(host_name))
try:
server_ip = socket.gethostbyname(str(host_name))
print('server ip is: ' + str(server_ip))
except socket.error as e:
# If the DNS resolution of a website cannot be finished, abort that website.
#print(e)
print('hostname %s unknown!!!' % host_name)
self.__usage()
return {}
# May need to return specificed values to the DB in the future
start_time = time.time()
output = self.__scan_ports(server_ip, self.__delay, message)
stop_time = time.time()
print('host %s scanned in %f seconds' %(host_name, stop_time - start_time))
print('finish scanning!\n')
return output
"""
Set the maximum number of thread for port scanning
Keyword argument:
num -- the maximum number of thread running concurrently (default 1000)
"""
def set_thread_limit(self, num):
num = int(num)
if num <= 0 or num > 50000:
print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!')
print('The scanning process will use default thread limit!')
return
self.__thread_limit = num
"""
Set the time out delay for port scanning in seconds
Keyword argument:
delay -- the time in seconds that a TCP socket waits until timeout (default 10)
"""
def set_delay(self, delay):
delay = int(delay)
if delay <= 0 or delay > 100:
print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)')
print('The scanning process will use the default delay time')
return
self.__delay = delay
"""
Print out the list of ports being scanned
"""
def show_target_ports(self):
print ('Current port list is:')
print (self.target_ports)
"""
Print out the delay in seconds that a TCP socket waits until timeout
"""
def show_delay(self):
print ('Current timeout delay is :%d' %(int(self.__delay)))
"""
Open multiple threads to perform port scanning
Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
message -- the message that is going to be included in the scanning packets, in order to prevent
ethical problem (default: '')
"""
def __scan_ports_helper(self, ip, delay, output, message):
'''
Multithreading port scanning
'''
port_index = 0
while port_index < len(self.target_ports):
# Ensure that the number of cocurrently running threads does not exceed the thread limit
while threading.activeCount() < self.__thread_limit and port_index < len(self.target_ports):
# Start threads
thread = threading.Thread(target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message))
thread.start()
port_index = port_index + 1
"""
Controller of the __scan_ports_helper() function
Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
message -- the message that is going to be included in the scanning packets, in order to prevent
ethical problem (default: '')
"""
def __scan_ports(self, ip, delay, message):
output = {}
thread = threading.Thread(target = self.__scan_ports_helper, args = (ip, delay, output, message))
thread.start()
# Wait until all port scanning threads finished
while (len(output) < len(self.target_ports)):
continue
# Print openning ports from small to large
for port in self.target_ports:
if output[port] == 'OPEN':
print(str(port) + ': ' + output[port] + '\n')
return output
"""
Perform status checking for a given port on a given ip address using TCP handshake
Keyword arguments:
ip -- the ip address that is being scanned
port_number -- the port that is going to be checked
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
message -- the message that is going to be included in the scanning packets, in order to prevent
ethical problem (default: '')
"""
def __TCP_connect(self, ip, port_number, delay, output, message):
# Initilize the TCP socket object
TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
TCP_sock.settimeout(delay)
# Initilize a UDP socket to send scanning alert message if there exists an non-empty message
if message != '':
UDP_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
UDP_sock.sendto(str(message), (ip, int(port_number)))
try:
result = TCP_sock.connect_ex((ip, int(port_number)))
if message != '':
TCP_sock.sendall(str(message))
# If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
if result == 0:
output[port_number] = 'OPEN'
else:
output[port_number] = 'CLOSE'
TCP_sock.close()
except socket.error as e:
output[port_number] = 'CLOSE'
pass