探测网络设备ACL规则

背景:在互联网企业的生产网络中,往往在网络入口处的网络设备上会有成千上万条ACL策略,这么多的ACL导致了网络管理员很难彻底梳理清楚其中的逻辑关系,从而不知道到底对外开放了哪些IP和哪些端口。

解决手段:编写ACL规则探测程序,从公网扫描该网络设备的ACL规则

工作原理:不管是交换机还是路由器或防火墙,在处理数据包时ACL规则总是优先于ICMP规则。即:当网络设备收到一个TTL为0的报文时会先匹配ACL规则之后再向发送者发送 ICMP time exceeded消息,基于此原理就可以在公网发送以IDC内地址为目的IP且TTL到被探测设备时刚好减为0的数据包,如果被探测设备返回了ICMP time exceeded消息则说明它的ACL策略针对此IP及port开放,如果没有返回包则说明数据包被它的ACL阻拦

图示:

程序实现语言:python3

源码:

 # coding:utf-8

 from itertools import groupby
from scapy.all import *
import re
import sys
import IPy class RangeException(Exception):
pass class InputType(Exception):
pass class TargetNotSupport(Exception):
pass class OptionError(Exception):
pass class PortScan(object):
def __init__(self, speed=3):
self.open_port = []
self.speed = speed def __str__(self):
speed_statement = '使用PortScan(*)创建对象时可以在*处指定扫描速率,默认为3,数值越小扫描速度越快\n' \
'注意:随着扫描速度的增加准确率会相应降低!'
return speed_statement # 从本地文件读取IP资源
def __target(self):
try:
open_file = input('请输入要导入资源的文件名字:')
address_file = open(open_file, 'r')
address_list = []
for i in address_file.readlines():
i = i.replace('\n', '')
address_list.append(i)
except FileNotFoundError:
print('\n')
print('请先在本地创建对应名字的IP列表文本文件!!!')
print('\n')
self.scan() except KeyboardInterrupt:
print('')
sys.exit() except Exception as error:
print('打开本地文件有误!!!')
print(error)
self.scan()
else:
return address_list # 获取IP资源
# 输入1从一个文件读取IP,输入2从屏幕输入获取IP
# 获取的IP信息可以是单个IP地址(例:220.12.12.12),也可以是一个地址段(例:192.168.1.0/24)
# 最终返回一个IP地址列表,此列表包含了输入的所有单个IP地址以及地址段中的可用IP
def get_ip(self, option, string): address_store = [] # IP资源存储 # 如果选1则从文件读取IP资源
if option == 1:
# 得到打开IP表文件名字及其IP表
address_list = self.__target() # 如果选2则手动输入IP资源
if option == 2:
# 接收IP数据
address_list = input(string) # 1.1.1.1,2.2.2.0/24
address_list = address_list.split(',') # 1.1.1.1/24 的正则
ip_range_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))/' \
r'(3[012]|[12][0-9]|[1-9]) *'
# 1.1.1.1,2.2.2.2,3.3.3.3 的正则
ip_address_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \
r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))' # 对输入的值进行便利,提取其中的IP地址
for i in address_list:
range_re_result = re.match(ip_range_re, i) # 1.1.1.1/24的正则匹配结果
ip_re_result = re.match(ip_address_re, i) # 1.1.1.1,2.2.2.2,3.3.3.3 的正则匹配结果
if range_re_result:
subnet_mask = range_re_result.group(6)
network_number = re.sub(r'^0+', '', range_re_result.group(1))
address_string = network_number + '/' + subnet_mask # 如果输入1.1.1.1/24类型则address_store为字符串
try:
# 提取网段内所有可用IP地址并加表
address_subset = IPy.IP(address_string)
for i in address_subset:
if i == address_subset[len(address_subset)-1] or i == address_subset[0]:
continue
else:
address_store.append(str(i))
except ValueError:
print('输入有误,请按"网络号/掩码"或"IP地址"格式输入')
self.scan() elif ip_re_result:
ii = re.sub(r'^0+', '', ip_re_result.group())
address_store.append(ii) #对单个IP地址形式的输入直接加表
else:
print('输入有误,请输入正确的IP地址(e.g:1.1.1.1,192.168.1.0/24)!!!')
self.get_ip(option)
return address_store # 通过从屏幕输入获取端口资源
# 输入形式可以为单个端口号(例:3389),也可以是一个端口范围(例:22-25)
# 返回数据为一个列表,其中每个元素都以元组形式存在. 每个元组包含两个整数元素,第一个为端口范围的最小值,第二个为端口范围的最大值
# 注意: 单个端口号形式的输入最后也将以范围形式输出,其最大值与最小值都为他本身
# 返回数据举例: [(22-25),(3389,3389)]
def get_port(self):
port_range = input('请输入要扫描端口范围(e.g: 3389,20-25):')
target_port = [] # 端口资源存储
try:
port_range = port_range.split(',') # 例:['1', '2', '3-10', '11-20']
for i in port_range:
if re.match(' *(\d+)-(\d+).*', i):
# for ii in range(len(open_port_list)):
low_port = int(re.match(' *(\d+)-(\d+).*', i).group(1))
high_port = int(re.match(' *(\d+)-(\d+).*', i).group(2))
if low_port >= high_port or low_port <= 0 or low_port > 65535 or high_port <= 0 or high_port > 65535:
raise RangeException
else:
target_port.append((low_port, high_port)) # 如果是范围则把最小值和最大值以元组形式加表
elif re.match(' *\d+ *', i):
singular = int(re.match(' *(\d+) *', i).group(1))
if 0 < singular <= 65535:
target_port.append((singular, singular)) # 如果是单整数则把它当作范围一样处理,最大值和最小值均为它自己
else:
raise RangeException
else:
raise InputType
except RangeException:
print('端口应为1-65535之间的整数,且输入范围格式应当为从小到大')
self.get_port()
except InputType:
print('端口类型应为整数')
self.get_port()
except KeyboardInterrupt:
print('')
sys.exit()
except Exception as unusual:
print('输入有误!')
print(unusual)
self.get_port()
return target_port # 返回经过处理的目标端口列表 # 对纯数字的列表进行排序且范围切块
# 例:导入[11,22,33,1,2,3,4,5]----->导出[1-5,11,22,33]
@staticmethod
def int_single_to_range(original):
original.sort() # 先排序
open_port_range = []
fun = lambda x: x[1] - x[0]
for k, g in groupby(enumerate(original), fun):
l1 = [j for i, j in g] # 连续数字的列表
if len(l1) > 1:
scop = str(min(l1)) + '-' + str(max(l1)) # 将连续数字范围用"-"连接
else:
scop = l1[0]
open_port_range.append("{}".format(scop))
return open_port_range # TTL自动检测
# 导入一个被探测设备IP列表,返回一个被探测设备IP与相应TTL的字典,例:{'220.2.2.2':15}
def ttl_check(self, address_list):
print('准备中...')
probe_device_ttl = {}
# switch = 0 # 检测返回数据包的源IP是否为被探测设备
try:
for i in address_list:
for ii in range(1, 129):
print(i, ii)
scan_packet = IP(dst=i, ttl=ii) / TCP(dport=8080, flags='S')
ttl_source = sr1(scan_packet, timeout=3, verbose=False)
#while 1:
# time.sleep(0.001)
if ttl_source:
try:
if ttl_source['IP'].fields['src'] == i:
probe_device_ttl[i] = ii
# switch = 1
break
else:
continue
except Exception as receive_error:
print(receive_error)
raise
# if switch == 1:
# break
else:
print('TTL超时!!!') except KeyboardInterrupt:
print('')
sys.exit() except Exception as error:
print('程序出现错误!!!')
print(error)
self.scan()
else:
print('准备完毕')
return probe_device_ttl @staticmethod
def option():
print('请选择导入被扫描信息方式:\n'
'1 从文件导入\n'
'2 在程序中手动输入\n') def scan(self):
# 功能选择
self.option()
try:
option = int(input('我选择: '))
print(option)
if option != 2 and option != 1:
raise OptionError
except OptionError:
print('请输入功能标号!')
self.scan() # 获取要扫描IP列表
address_store = self.get_ip(option, '请输入被探测IP资源:') # 获取要扫描的端口列表
port_range = self.get_port() probe_device = self.get_ip(2, '请输入被探测的安全设备IP地址:') # 自动检测到探测设备的TTL值,该值为一个字典,key为被探测安全设备IP,value为到该设备的TTL值
ttl = self.ttl_check(probe_device) count = 0 # 用作进度百分比的分子. 以每个IP的每个端口为单位进行计数,总数为IP个数*端口个数 if ttl:
# 挨个儿朝被探测设备发送端口探测包
for probe_device_ip, ttl in ttl.items(): print(probe_device_ip + '端口开放情况:') # 创建一个新文件,准备导入结果
write_file = open(probe_device_ip + '-result.txt', 'w') try:
# 为每个被探测设备计算IP资源池中所有的IP资源
for i in address_store: # 为每个IP计算各个输入IP端口范围开放情况
for port in port_range:
(low_port, high_port) = port
scan_packets = IP(dst=i, ttl=ttl) / TCP(dport=(low_port, high_port), flags='S') # 构造检测包
replay_packets_total = sr(scan_packets, timeout=self.speed, verbose=False) # 发送检测包及接收返回包
open_port_list = replay_packets_total[0].res # 开放端口原始对象列表(一个IP不同端口范围回包的集合) # 一个IP有几个端口开放就有几个回包(如果端口被ACL干掉则不会回包),以下遍历回包来读取开放的端口
for ii in range(len(open_port_list)):
try:
if open_port_list[ii][1]['ICMP'].fields['type'] == 11: # ICMP类型为11时为TTL超时包
self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) # TTL超时则为开放端口,将开放端口进行加表
continue
else:
if open_port_list[ii][1]['ICMP'].fields['type'] == 3: # 不知为啥有时候会返回类型为3的ICMP包(即:端口不可达包)
continue
else:
# 除11和3外其他类型的ICMP回包,需进行人工排查
print('ICMP返回类型不对')
print(open_port_list[ii][1]['ICMP'].fields)
print(open_port_list[ii][0]['TCP'].fields)
except IndexError: # 如果探测设备IP刚好为要扫描的IP时,开放端口会返回SYN,ACK包
if open_port_list[ii][1]['TCP'].fields['flags'] == 'SA':
self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport'])
continue # 不知为啥有时候交换机会返回RST ACK的包
if open_port_list[ii][1]['TCP'].fields['flags'] == 'RA':
continue
else:
print('返回未知TCP包,需人工分析')
print(open_port_list[ii][1]['TCP'].fields,
open_port_list[ii][1]['TCP'].fields['flags'])
print(open_port_list[ii]) count += 1 # 执行进度+1(每计算完一个IP进度+1)
print(count) # 进度统计
speed_to_progress = count / len(address_list) * len(port_range) * len(ttl) * 100
print('\r已完成:%.2f%% ' % speed_to_progress, end='') self.open_port = self.int_single_to_range(self.open_port) # 对开放端口列表进行排序和范围化
print('针对' + i + '开放端口: ', self.open_port)
write_file.write(str(i) + ':' + str(self.open_port) + '\n') # 每扫描完一个IP就把该IP结果写入文件
self.open_port = [] # 扫尾工作,为下个IP扫描准备一个干净的开放端口列表 write_file.close() except KeyboardInterrupt:
print('')
write_file.close()
sys.exit()
except Exception as error:
write_file.close()
print('程序异常退出!')
print(error)
else:
write_file.close()
print('')
if option == 1:
print('被探测设备%s已完成,结果已导入当前路径''\'%s\'''文件中' % (probe_device_ip, probe_device_ip + '-result.txt'))
if option == 2:
print('扫描已完成!') if __name__ == '__main__': def banner():
print('\n')
print('============================================')
print('\n')
print('\n')
print(' ACL有效性探测系统v1.0 ')
print('\n')
print('\n')
print('============================================')
print('\n') def main():
banner()
a = PortScan()
a.scan() main()

最新文章

  1. BZOJ 1047 二维单调队列
  2. 黄聪:远程序桌面登录的.NET(C#)开发
  3. IIS7配置PHP运行环境
  4. Android布局居中的几种做法
  5. httpClenit的post出现乱码问题
  6. 从零开始学JAVA(05)-连接数据库MSSQL(JDBC代码篇)
  7. smarty 时间格式化date_format
  8. 【转】android中重复连接ble设备导致的连接后直接返回STATE_DISCONNECTED的解决办法---不错不错,重新连接需要花费很长的时间
  9. Mysql 存储过程和函数区别
  10. Excel01-不同的单元格输入同一数据
  11. C#记录日志、获取枚举值 等通用函数列表
  12. 剑指Offer_编程题_25
  13. 正则表达式(_ % regexp_like)
  14. MYSQL基础知识小盲区
  15. JS实现网页背景旋转缩放轮播效果
  16. linux命令(52):usermod 修改账户信息,groupmod
  17. Python: re.compile()
  18. springboot activiti关闭验证自动部署
  19. jpgraph中文使用手册之文本和字体控制教程
  20. ASP.NET WebAPI 01-Demo

热门文章

  1. BZOJ2751 [HAOI2012]容易题
  2. H5移动端项目案例、web手机微商城实战开发
  3. 处女作《Web全栈开发进阶之路》出版了!
  4. Java代码规范与质量检测插件SonarLint
  5. asp.net core系列 56 IS4使用OpenID Connect添加用户认证
  6. [转载]学习Javascript闭包(Closure)
  7. 简单Java类 全网最详细讲解 !!!
  8. 供应链管理为什么要上企业自主可控的免费开源ERP Odoo
  9. Cannot set the value of read-only property &#39;outputFile&#39; for ApkVariantOutputImpl_Decorated{...
  10. Android 程序结构