IOTA 设备提供 RESTful API,允许直接访问存储在设备上的数据。这对于集成到各种场景中非常有用。在本例中,可以过滤当前捕获内容中的黑名单 IP。用例可能是分析某些内部 IP 是否未传播到特定访问级别之外,或者某些设备是否从不应访问的区域进行访问。
IOTA简介:IOTA 是一款功能强大的网络捕获和分析解决方案,适用于边缘和核心网络。IOTA 系列包括便携式 EDGE 型号、高速 CORE 型号和 IOTA CM 集中设备管理系统。IOTA 解决方案可为分支机构、中小企业和核心网络(如数据中心)提供快速高效的网络分析和故障排除功能。
IOTA 设备提供 RESTful API,允许直接访问存储在设备上的数据。这对于集成到各种场景中非常有用。在本例中,可以过滤当前捕获内容中的黑名单 IP。用例可能是分析某些内部 IP 是否未传播到特定访问级别之外,或者某些设备是否从不应访问的区域进行访问。
一、前提条件
- Python 编程基础知识。
- 从 Profitap 访问 IOTA 设备。
二、设置 IOTA 设备
- 连接 IOTA 设备
- 将 IOTA 设备连接到要监控的网段。
- 确定正确的网段至关重要。通过目视检查 IOTA 设备上的流量,确保在捕获期间使用正确的网段。
- 访问 IOTA 网络界面
- 打开网络浏览器并输入 IOTA 设备的 IP 地址。例如:https://<iota_device_ip_address>(用 IOTA 设备的实际 IP 地址替换)。
- 使用正确的凭证登录。该脚本使用的用户名/密码组合应允许它查看必要的信息。
- 配置捕获设置
- 设置用于捕获数据包的网络接口。从设备的 Web 界面选择要监控的接口。
- 根据需要配置过滤器,以关注特定流量类型。您可以根据需要设置过滤器,只捕获特定类型的流量,如 HTTP 或 FTP。
三、使用 IOTA 数据库查询引擎
查询引擎允许任何用户以机器可读的方式与 IOTA 设备交互,以查询元数据数据库。用户只需输入用户名/密码,即可通过 RESTful API 访问数据库。
图 1IOTA用户管理界面
为确保敏感信息不被传播,以及设备上的任何信息都不会被用户篡改,建议创建一个新的特殊用户。在本例中,创建了一个用户名为 “apitest ”的用户。请注意,该用户已作为 “查看器 ”自动添加到组织中。不需要配置任何其他功能,而且该角色只允许只读访问,可以防止任何误传。
图 2增加一个“apitest”新用户
四、安全考虑
这里使用的解决方案是为使用脚本创建一个专用的用户/密码组合。由于建议保护 IOTA 的直接访问不受任何外部干扰,并使用 HTTPS 连接,因此这样做没有问题。但请注意,良好的做法是确保该用户/密码组合不在任何其他情况下使用,并且该用户仅具有查看器角色(新用户默认具有该角色)。
五、检测黑名单 IP 的 Python 脚本
import requests import re import time import urllib.parse import json import argparse from urllib3.exceptions import InsecureRequestWarning def main(): # CLI options parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.') required_args = parser.add_argument_group('required') required_args.add_argument('-d', '--device_ip', help='Device IP address') required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)') parser.add_argument('-u', '--device_username', default='admin', help='Device IP username (default: admin)') parser.add_argument('-p', '--device_password', default='admin', help='Device IP password (default: admin)') parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)') args = parser.parse_args() if args.device_ip: iota_ip_address = args.device_ip if args.device_username: iota_username = args.device_username if args.device_password: iota_password = args.device_password if args.query_time_window_s: query_time_window_s = args.query_time_window_s if args.infile: blacklist_files = args.infile query_time_window_end = int(time.time()) query_time_window_start = query_time_window_end - query_time_window_s results = {} session = requests.Session() session.auth = (iota_username, iota_password) # Suppress warnings from urllib3 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # Open blacklist files, iterate over them for blacklist_file in blacklist_files: count_fail = 0 print(f' - {blacklist_file}') with open(blacklist_file) as f: for line in f: # Skip comments or empty lines if re.match(r"^[;#\s]", line): continue # Current IP address or subnet ip = line.split()[0] print(f' - {ip:45}', end='') # Handle CIDR subnet query if re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2})|(?:[0-9a-fA-F\:]{2,39}/[0-9]{1,2}))$", ip): query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*') response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False) matching_flows = json.loads(response.content)['data'][0]['matching_flows'] if matching_flows == "0": print('Ok') else: count_fail += 1 print(f'Match (flows: {matching_flows})') # Handle single IP address query elif re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F\:]{2,39}))$", ip): query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (IP_SRC = '{ip}' OR IP_DST = '{ip}') AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*') response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False) matching_flows = json.loads(response.content)['data'][0]['matching_flows'] if matching_flows == "0": print('Ok') else: count_fail += 1 print(f'Match (flows: {matching_flows})') # Some unknown case else: print('Unhandled case') # Record results after completing file results[blacklist_file] = count_fail # Print results print('\nResults\n===') for file in results: print(f' - {file:47}{str(results[file])} matches') if __name__ == '__main__': main()
六、脚本分步说明
1.导入必要的库
import requests import re import time import urllib.parse import json import argparse from urllib3.exceptions import InsecureRequestWarning
2.设置配置
# CLI options parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.') required_args = parser.add_argument_group('required') required_args.add_argument('-d', '--device_ip', help='Device IP address') required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)') parser.add_argument('-u', '--device_username', default='apitest', help='Device IP username (default: apitest)') parser.add_argument('-p', '--device_password', default='apitest', help='Device IP password (default: apitest)') parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
- -d / -device_ip
- IOTA 设备的 IP。
- -i / -infile
- 包含要查找的 IP 地址的文本文件。也可以提供地址范围的 CIDR 子网查询。文件可能包含以 ; 或 # 为前缀的注释。任何空行或以空格开头的行都将被省略。通过多次提供此参数,可以向脚本提交多个输入文件。
- -U / -device_username
- 上一步中定义的用户名。
- -p / -device_password
- 上述步骤中定义的密码。
- -l / -query_time_window_s
- 查询应在过去多少时间内进行。默认值为 600 秒,相当于 10 分钟。
输入文件:
以输入文件为例,它将搜索特定子网以及内部 DNS 服务器的所有 IP 地址。
# IP Address of DNS server 192.168.1.250 # Host range for internal testing 10.40.0.0/24
3.检测黑名单 IP 的主逻辑
主逻辑打开黑名单文件,逐步进行解析,并查询内部数据库,进行相应的子网查询或直接 IP 地址查询。
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*') response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False) matching_flows = json.loads(response.content)['data'][0]['matching_flows']
首先,设置查询。在本例中,我们只需要查询找到的条目的数量,这就是我们执行 COUNT 操作的原因。数据库以 SQL 格式进行查询。
创建查询后,将建立一个会话,该会话将使用 HTTP GET 操作来查询信息。查询结果以 JSON 数据形式显示,目前正在读取第一个条目。
对于找到的每个 IP 地址,将打印匹配条目的数量,对于每个文件,将打印匹配条目的总数。
七、运行脚本
建议在 Python 环境中运行脚本。可以使用以下命令来实现:
python3 -m venv ./Development/iota source ./Development/iota/bin/activate python3 -m pip install argparse requests
这将创建一个 Python3 venv 环境(更多信息请参见此处),并安装默认环境中不存在的所需库。
图3创建python3 venv环境
然后,假设 Python 脚本和 infile 文件都在当前目录下,下面的命令将运行该脚本:
图4运行脚本
在这种情况下,IOTA 的定位是在出站广域网连接上进行捕获。在最后 10 分钟内,内部 DNS 服务器没有进行任何出站 DNS 查询,但内部测试网络通过广域网连接发送了 4 次数据。
在本例中,脚本被用作白名单测试,但也可轻松配置为黑名单测试。
结论
通过本指南,您可以有效地利用 IOTA 设备及其 REST API 来监控网络流量,并使用 Python 检测黑名单 IP 地址。这种方法为在自动化监控环境中维护网络安全提供了强大的工具。