端口扫描工具编写
目录- 端口扫描工具编写
- 0x01:实现端口扫描的方式
- 一、TCP扫描:
- 二、SYN扫描:
- 三、UDP扫描:
- 0x02:使用python实现端口扫描
- 一、使用socket库的connect()方法扫描
- 1、核心代码
- 2、批量端口扫描
- 3、使用协程提高效率
- 一、使用socket库的connect()方法扫描
- 0x03:总结
- 0x01:实现端口扫描的方式
0x01:实现端口扫描的方式
一、TCP扫描:
TCP connect扫描,也称为全连接扫描,这种方式直接连接到目标端口,完成了TCP三次握手的过程,这种方式扫描结果比较准确,但速度比较慢而且可轻易被目标系统检测到。
二、SYN扫描:
TCP SYN扫描是另一种TCP扫描。端口扫描工具不使用操作系统原生网络功能,而是自行生成、发送IP数据包,并监控其回应。这种扫描模式被称为“半开放扫描”,因为它从不建立完整的TCP连接。端口扫描工具生成一个SYN包,如果目标端口开放,则会返回SYN-ACK包。扫描端回应一个RST包,然后在握手完成前关闭连接。如果端口关闭了但未使用过滤,目标端口应该会持续返回RST包。
三、UDP扫描:
UDP扫描发送空的(没有数据)UDP报头到每个目标端口。 如果返回ICMP端口不可到达错误(类型3,代码3), 该端口是closed(关闭的)。 其它ICMP不可到达错误(类型3, 代码1,2,9,10,或者13)表明该端口是filtered(被过滤的)。 偶尔地,某服务会响应一个UDP报文,证明该端口是open(开放的)。
(资料图片仅供参考)
0x02:使用python实现端口扫描
一、使用socket库的connect()方法扫描
原理:指定IP和端口号,连接到特定主机对应的远程socket,连接失败会抛出timeout错误,通过判断是否连接成功来判断端口是否开放。
1、核心代码
首先使用socket.socket(socket.AF_INET, socket.SOCK_STREAM)
建立一个基于网络并且使用TCP协议的套接字;
之后使用connect()尝试建立连接,需要的参数为IP和端口号;
最后根据判断返回的结果是成功还是超时,来判断端口是否开放。
就这三步,转化成实际的代码就是下面这样:
import socketdef star_scan(ip, port): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个基于网络并且使用tcp协议的套接字,用于通信。 s.settimeout(0.02)# 设置超时时间 s.connect((ip, port)) except Exception as e: print(e) else: res = s.recv(3096).decode("utf-8").encode() print(res) print("[+]{}:{} \topen".format(ip, port)) finally: s.close() ip = "192.168.247.135"port = 22star_scan(ip, port)
主要代码都在star_scan()函数里,往里面传入指定的IP和端口,就可以看到该端口是否开放。
上图是程序运行的结果。
只做成这样肯定是不够的,下一步是实现批量端口的扫描,但是探测的原理还是这个。
2、批量端口扫描
要实现批量端口的扫描,主要的问题就是如何让计算机知道你想要扫的端口有哪些。
最简单的方法是一个一个指定,把要扫的端口全输入进去或者放到文件里让程序去读。一般情况下这都是一种很笨的方法,假如我们要指定1000个端口去扫描,那要输1000个端口,要是扫描全端口就更加麻烦。但是有的时候也是会使用这种方法的,有些端口就是比其他端口更常使用,这些算是常用端口;有些端口打开后假如被恶意利用,会造成非常严重的后果(比如3389端口),这些算是高危端口。
在知道这些端口的存在后,我们可以把它们存到列表里,每次扫描时都默认扫描这些端口,因为它们的利用价值更大,这样扫效率也更高。
把哪些端口定义为默认扫描端口也是门学问,太多太少都不太好,可以去GitHub上找一下大佬写的端口扫描工具或者综合扫描工具,看看他们默认都扫描哪些端口,或者看看nmap等工具是怎么定义的,这里就不列出来了。
还有一种常用的方式是指定端口范围,一般使用-
指定,比如1-100
这样。
通过设定默认扫描端口和输入端口范围,可以满足大部分使用需求了。由于设定默认端口在代码的实现上比较简单,所以这里主要实现指定端口范围,也就是让程序能够解析1-100
。具体实现代码如下:
import socketdef make_port_list(ports): new_port_list = [] if "," in ports: temp_list = ports.split(",") for port in temp_list: if "-" in port: for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(port) elif "-" in ports: for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(ports) return new_port_listdef star_scan(ip, port): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个基于网络并且使用tcp协议的套接字,用于通信。 s.settimeout(0.02)# 设置超时时间 s.connect((ip, port)) except Exception as e: pass #print(e) #print("[+]{}:{} \tclosed".format(ip, port)) else: res = s.recv(3096).decode("utf-8").encode() print(res) print("[+]{}:{} \topen".format(ip, port)) finally: s.close() ip = "192.168.247.135"port = "21,22-24"port_list = make_port_list(port)print("total port num:{}".format(len(port_list)))for port in port_list: star_scan(ip, int(port))
正常情况下,要扫描的端口应以命令参数的形式输入进去,这里为了方便直接以变量形式写进了代码。
这里扫描了指定端口21和端口范围22-24,以,
为分隔符区分,通过make_port_list()函数解析,返回一个端口列表,之后循环调用star_scan函数来实现批量扫描,运行结果如下:
这里把扫描失败时的输出提示给注释掉了,因为要是扫描的端口过多这里会看着非常乱。
程序到这里还没有完成,可以看到这里在扫描4个端口的情况下用了快1秒的时间,实际扫描的时候肯定不会只扫几个端口,那时候会扫描成千上万个端口,这时的耗时会非常大:
这是由于网络I/O操作非常的耗时,程序大部分时间都耗在了等待上面,下面就要解决这个问题。
3、使用协程提高效率
关于协程,这里也不会做太多描述(我也不太懂)。大概就是在一个协程进入IO等待时,会自动切换到其他协程继续执行,也就是最大化的利用了IO等待时间,这个过程一直都是单线程,只是程序在一个协程进入等待后切换到另一个协程给CPU处理,而对于CPU来说则一直是同一个程序在给它发任务,CPU是感受不到区别的。
套用到这个程序里,可以理解为:我们每扫一个端口都要建立一次连接,而每次建立连接都会有一次等待,那我们就把每一个要建立的连接都放到一个协程里,在一个连接也就是协程进入IO等待后,系统会自动切换去建立下一个连接,这样效率就会大大提升,这个切换时自动的,完全不需要我们去做什么。
python中可以使用gevent库来实现协程,操作也非常简单。
1、使用gevent.spawn(函数名)
创建协程
2、使用gevent.join()
或者gevent.joinall()
执行协程
于是这里就用我们程序里的star_scan()
函数创建协程,协程的数量等于我们要扫描的次数,最后一起执行就可以了。
还有一个问题就是每一个star_scan()
函数从哪里获取我们要扫描的IP和端口号信息,这里我们使用queue(队列)来解决这个问题。先把所有的IP和端口号放入队列,根据队列长度创建协程,之后每个协程在执行任务的时候就把队列里的信息提取出来一个,当把队列空了的时候,任务也就处理完了。
下面是完整代码:
import socketimport sysfrom queue import Queuefrom gevent import monkey; monkey.patch_socket()import geventdef make_port_list(ports): new_port_list = [] if "," in ports: temp_list = ports.split(",") for port in temp_list: if "-" in port: for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(port) elif "-" in ports: for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(ports) return new_port_listdef coroutines(): # 开启多协程 cos = [] num = ip_port.qsize() print(num) for i in range(num): # 调用工作函数 cor = gevent.spawn(star_scan) cos.append(cor) gevent.joinall(cos)def star_scan(): sockets = ip_port.get() ip = sockets[0] port = int(sockets[1]) try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个基于网络并且使用tcp协议的套接字,用于通信。 s.settimeout(0.02)# 设置超时时间 s.connect((ip, port)) except Exception as e: pass #print("[+]{}:{} \tclosed".format(ip, port)) else: res = s.recv(3096).decode("utf-8").encode() print(res) print("[+]{}:{} \topen".format(ip, port)) finally: s.close()ip_port = Queue()ip = "192.168.247.135"port = "21,22-2400"port_list = make_port_list(port)print("total port num:{}".format(len(port_list)))for port in port_list: ip_port.put([ip, port])coroutines()
主要是加了使用ip_port = Queue()
创建队列,再用put()
函数将参数放入队列。最后调用coroutines()
函数开启协程,star_scan()
里获取参数的方式也变成了用get()
获取队列中的数据。
可以看到速度确实是快了,但是准确性降低了,22端口明明是开着的这里却没有扫到......
这里不太清楚具体原因,但是也是可以解决的,把超时时间设置的长一点就好了,也就是修改star_scan()
里的settimeout()
里面的值。还有一种解决方案是修改每次创建协程的最大值,超时时间少点也能扫到。
我稍微改了下代码,把直接创建所有协程改成50个50个创建,最后代码如下:
import socketimport sysfrom queue import Queuefrom gevent import monkey; monkey.patch_socket()import geventdef make_port_list(ports): new_port_list = [] if "," in ports: temp_list = ports.split(",") for port in temp_list: if "-" in port: for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(port) elif "-" in ports: for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1): new_port_list.append(p) else: new_port_list.append(ports) return new_port_listdef coroutines(num): # 开启多协程 # print("开启协程{}个".format(num)) cos = [] for i in range(num): # 调用工作函数 cor = gevent.spawn(star_scan) cos.append(cor) gevent.joinall(cos)def star_scan(): sockets = ip_port.get() ip = sockets[0] port = int(sockets[1]) try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个基于网络并且使用tcp协议的套接字,用于通信。 s.settimeout(0.02)# 设置超时时间 s.connect((ip, port)) except Exception as e: pass #print("[+]{}:{} \tclosed".format(ip, port)) else: res = s.recv(3096).decode("utf-8").encode() print(res) print("[+]{}:{} \topen".format(ip, port)) finally: s.close()ip = "192.168.247.135"port = "21,22-2400"port_list = make_port_list(port)print("total port num:{}".format(len(port_list)))ip_port = Queue() # 创建队列for port in port_list: ip_port.put([ip, port])num = 50if len(port_list)%num == 0: turn = int(len(port_list)/num) for i in range(turn): coroutines(num)else: turn = int(len(port_list)/num)+1 for i in range(turn): if i == turn-1: coroutines(len(port_list)%num) else: coroutines(num)
最后结果就可以扫出来了,后续可以把每次创建协程数当作参数输入进去,这样方便调整。
0x03:总结
到目前为止程序就写的差不多了,只能说基本能用了。接下来就该是实现命令行参数的解析,不能每次换目标都要改代码,这里可以使用argparse模块来解析命令行参数,也可以使用gooey弄个简单的图形化界面,因为懒我就没接着往下写了。
这次是使用的socket库来实现的TCP扫描,其实方法不是唯一的,用scapy发包来扫描也是可以的,甚至直接用nmap模块调nmap来扫描也行。