freeBuf
主站

分类

云安全 AI安全 开发安全 终端安全 数据安全 Web安全 基础安全 企业安全 关基安全 移动安全 系统安全 其他安全

特色

热点 工具 漏洞 人物志 活动 安全招聘 攻防演练 政策法规

点我创作

试试在FreeBuf发布您的第一篇文章 让安全圈留下您的足迹
我知道了

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

关于URL采集的构想与实现
2024-08-12 10:54:39

闲言碎语

最近公司要求检查公司网站首页是否被挂了暗链,网上查了下对应脚本较少且,于是就写了一个关于获取网站的链接的脚本,随着要求的不断增加,再加上一些天马行空的想象,最后写了一款URL采集器

前言

URL采集是一项重要的工作,它能帮我们快速的采集到符合需求的相关URL,但市面上大部分的URL采集软件的原理都是利用多个搜索引擎的接口,输入关键字,如:采集招聘网址URL,一般是输入求职/招聘等关键字,然后对每个接口进行最大化的采集网址,自定义黑名单URL,最后去重。
这意味着需要尽可能多的接口包括但不限于谷歌、百度等,然后传参对返回的页面提取网址基于黑名单过滤部分网址,最后迭代页数。
看上去没错,输入关键词获取相关的网址。但却隐藏着几个缺点:
1、采集网址都是被搜索引擎收录的,导致许多符合需求的URL无法采集到
2、过滤不细致,只靠去重+黑名单过滤,采集到的站点不能保证是需要的
3、采集URL每个人都可以用,关键词也差不多,导致最后采集的结果也差不多,这对于网安人员来说并不友好,因为这就意味着你好不容易找到一个漏洞站点可能已被许多人利用过

功能

为了解决上诉缺点,我打算写一个URL深度采集脚本,前期构想的功能点:
1、提供两个入口,一个搜索引擎接口或导入采集好的网址
2、传入关键字爬取到符合需求网址再次自动进行友链爬行
3、导入的文本能先筛选掉不符合的站点,而后自定义是否进行友链爬取
4、用户可自定义URL黑白名单、URL网站标题黑白名单,URL网页内容黑白名单
简略流程图如下:
image.png
banner

title = '''
       __   _______   __    __  .______       __
      |  | |   ____| |  |  |  | |   _  \     |  |
      |  | |  |__    |  |  |  | |  |_)  |    |  |
.--.  |  | |   __|   |  |  |  | |      /     |  |
|  `--'  | |  |      |  `--'  | |  |\  \----.|  `----.
 \______/  |__|  _____\______/  | _| `._____||_______|
                |______|
                                        Author:JF
                                        Version:V1.0
        '''

URL采集源码

友链采集

方式一:正则过滤

def GetLink(url):
    UA = random.choice(headerss)
    headers = {'User-Agent': UA, 'Connection': 'close'}
    link_urls = []
    try:
        r = requests.get(url, headers=headers, verify=False, timeout=timeout)
        encoing = requests.utils.get_encodings_from_content(r.text)[0]
        content = r.content.decode(encoing)
        urls = [f"{urlparse(url).scheme}://{urlparse(url).netloc}"  for url in re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*,]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content, re.I)]
        for url in list(set(urls)):
            url = url.replace('\')','')
            link_urls.append(url)
            #判断存活
            # try:
            #     r = requests.get(url, timeout=5, verify=False)
            #     if b'Service Unavailable' not in r.content and b'The requested URL was not found on' not in r.content and b'The server encountered an internal error or miscon' not in r.content:
            #         if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
            #             link_urls.append(url)
            # except Exception as error:
            #     pass
    except:
        pass
    return list(set(link_urls))

方式二:bs4过滤

def GetLink(url):
    UA = random.choice(headerss)
    headers = {'User-Agent': UA, 'Connection': 'close'}
    try:
        r = requests.get(url, headers=headers, verify=False)
        encoding = requests.utils.get_encodings_from_content(r.text)[0]
        content = r.content.decode(encoding)

        # 使用BeautifulSoup解析HTML内容
        soup = BeautifulSoup(content, 'html.parser')

        # 提取页面中可能包含URL的常见标签属性
        bs4_urls = set()
        for tag in ['a', 'img', 'script', 'link']:
            for attr in ['href', 'src']:
                for element in soup.find_all(tag):
                    if attr in element.attrs:
                        href = element.get(attr)
                        if href and (href.startswith('http://') or href.startswith('https://')):
                            parsed = urlparse(href)
                            url = f"{parsed.scheme}://{parsed.netloc}"
                            bs4_urls.add(url)
    except Exception as e:
        pass

    #url存活加入
    link_urls = []
    for bs4_url in bs4_urls:
        try:
            r = requests.head(bs4_url, timeout=5, headers=headers, verify=False)
            if r.status_code == 200 or r.status_code == 301 or r.status_code == 301:
                    link_urls.append(bs4_url)
        except Exception as error:
            pass
    return link_urls

关键字采集

调用百度搜索接口对用户输入的关键字进行搜索并提取出前7页的url

def BDUrl(key):
    cookie = input('请输入cookie:')
    bd_headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
        "Referer": "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&rsv_idx=2&ch=&tn=baiduhome_pg&bar=&wd=123&oq=123&rsv_pq=896f886f000184f4&rsv_t=fdd2CqgBgjaepxfhicpCfrqeWVSXu9DOQY5WyyWqQYmsKOC%2Fl286S248elzxl%2BJhOKe2&rqlang=cn",
        # "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
        "Sec-Fetch-Mode": "navigate",
        "Cookie": cookie,
        "Connection": "Keep-Alive",
    }
    bd_url = []
    for page in range(0, 8):
        url = 'http://www.baidu.com/s?wd={}&pn={}0'
        try:
            r = requests.get(url.format(key, page), headers=bd_headers, verify=False)
            encoing = requests.utils.get_encodings_from_content(r.text)[0]
            content = r.content.decode(encoing)
            result = [f"{urlparse(url).scheme}://{urlparse(url).netloc}" for url in re.findall('mu="(.*?)"', content)[1:]]
            #result = [url.split('//')[1].split('/')[0] for url in re.findall('mu="(.*?)"', content)[1:]]
            for res_url in list(set(result)):
                bd_url.append(res_url)
                #判断存活
                # try:
                #     r = requests.get(res_url, timeout=5, verify=False)
                #     if b'Service Unavailable' not in r.content and b'The requested URL was not found on' not in r.content and b'The server encountered an internal error or miscon' not in r.content:
                #         if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                #             bd_url.append(res_url)
                # except Exception as error:
                #     pass
        except:
            pass
    return list(set(bd_url))

ini配置文件

脚本核心:用户通过自定义配置文件的内容筛选出想要的url

[User]
# 程序的用户名
whoami = JF

#state友链爬行,0关闭,1开启
#其它:
#None不检测该关键词
#支持或(or)逻辑,即 |
#优先级:网址黑名单>网址白名单>标题黑名单>标题白名单>网页内容黑名单>网页内容白名单

[Config]
#友链爬行
state = 0

# 网址黑名单
black_url = None

# 网址白名单
white_url = None

# 标题黑名单
black_title = None

# 标题白名单
white_title = 安全狗

# 网页内容黑名单
black_content = None

# 网页内容白名单
white_content = None

# 连接超时5秒
timeout = 5

用户规则树

通过配置文件所写的函数

方式一:参数为url列表

def RuleUrl(urls):
    ruleurls = []
    UA = random.choice(headerss)
    header = {'User-Agent': UA, 'Connection': 'close'}
    # 第一步,URL黑名单限制,在黑名单内的全部排除
    black_url_or = []
    for url in urls:
        if black_url == 'None':
            black_url_or.append(url)
        elif '|' in black_url:
            black_url_key = black_url.split('|')
            if all(key not in url for key in black_url_key):
                black_url_or.append(url)
        else:
            black_url_key = black_url
            if any(key not in url for key in black_url_key):
                black_url_or.append(url)

    # 第二步,URL白名单限制,出现在白名单的才保存
    white_url_or = []
    for url in black_url_or:
        if white_url == 'None':
            white_url_or.append(url)
        elif '|' in white_url:
            white_url_key = white_url.split('|')
            if any(key in url for key in white_url_key):
                white_url_or.append(url)
        else:
            white_url_key = white_url
            if all(key in url for key in white_url_key):
                white_url_or.append(url)

    # 第三步,网站标题黑名单过滤,在黑名单内的全部排除
    black_title_or = []
    for url in white_url_or:
        if black_title == 'None':
            black_title_or.append(url)
        elif '|' in black_title:
            black_title_key = black_title.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key not in title[0] for key in black_title_key):
                        black_title_or.append(url)
            except:
                pass
        else:
            black_title_key = black_title
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key not in title[0] for key in black_title_key):
                        black_title_or.append(url)
            except:
                pass

    # 第四步,网站标题白名单过滤,出现在白名单的才保存
    white_title_or = []
    for url in black_title_or:
        if white_title == 'None':
            white_title_or.append(url)
        elif '|' in white_title:
            white_title_key = white_title.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if any(key in title[0] for key in white_title_key):
                        white_title_or.append(url)
            except:
                pass
        else:
            white_title_key = white_title
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key in title[0] for key in white_title_key):
                        white_title_or.append(url)
            except:
                pass

    # 第五步,网页内容黑名单过滤,出现在黑名单的全排除
    black_content_or = []
    for url in white_title_or:
        if black_content == 'None':
            black_content_or.append(url)
        elif '|' in black_content:
            black_content_key = black_content.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if all(key not in content for key in black_content_key):
                        black_content_or.append(url)
            except:
                pass
        else:
            black_content_key = black_content
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if any(key not in content for key in black_content_key):
                        black_content_or.append(url)
            except:
                pass
    # 第六步,网页内容白名单过滤,只保存出现在白名单内的url
    white_content_or = []
    for url in black_content_or:
        if white_content == 'None':
            white_content_or.append(url)
        elif '|' in white_content:
            white_content_key = white_content.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if any(key in content for key in white_content_key):
                        white_content_or.append(url)
            except:
                pass
        else:
            white_content_key = white_content
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if all(key in content for key in white_content_key):
                        white_content_or.append(url)
            except:
                pass

    return white_content_or

方式二:接收参数改为单个url

第一版的整体逻辑虽然实现了,但效率太慢了,这一版改了整体逻辑
把接收参数改为单个url,用true/false判断传入url是否满足条件,而后实现并发

def rule_url(url):
    # 第一步,URL黑名单限制,在黑名单内的全部排除
    if black_url != 'None' and (any(key in url for key in black_url.split('|'))):
        return False

    # 第二步,URL白名单限制,出现在白名单的才保存
    if white_url != 'None' and(all(key not in url for key in white_url.split('|'))):
        return False

    try:
        UA = random.choice(headerss)
        header = {'User-Agent': UA, 'Connection': 'close'}
        r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
        if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
            encoing = requests.utils.get_encodings_from_content(r.text)[0]
            content = r.content.decode(encoing)
            title = re.findall('<title>(.*?)</title>', content, re.S)[0]
            # 第三步,网站标题黑名单过滤,在黑名单内的全部排除
            if black_title != 'None' and (any(key in title for key in black_title.split('|'))):
                return False
            # 第四步,网站标题白名单过滤,出现在白名单的才保存
            if white_title != 'None' and (all(key not in title for key in white_title.split('|'))):
                return False

            # 第五步,网页内容黑名单过滤,出现在黑名单的全排除
            if black_content != 'None' and (any(key in content for key in black_content.split('|'))):
                return False
            # 第六步,网页内容白名单过滤,只保存出现在白名单内的url
            if white_content != 'None' and (all(key not in content for key in white_content.split('|'))):
                return False

            return url
        else:
            return False
    except:
        return False
    return False

入口函数

# 程序入口
def Result():
    print(f'当前用户:{whoami}')
    if state == '0':
        print(f'[-]友链爬行:关闭')
    elif state == '1':
        print(f'[+]友链爬行:开启')
    else:
        print(f'[x]友链爬行:输入0/1!')
    if black_url == 'None':
        print(f'[-]网址黑名单:关闭', end='')
    else:
        print(f'[+]网址黑名单:开启', end='')
    if white_url == 'None':
        print(f'  [-]网址白名单:关闭')
    else:
        print(f'  [+]网址白名单:开启')

    if black_title == 'None':
        print(f'[-]标题黑名单:关闭', end='')
    else:
        print(f'[+]标题黑名单:开启', end='')
    if white_title == 'None':
        print(f'  [-]标题白名单:关闭')
    else:
        print(f'  [+]标题白名单:开启')

    if black_content == 'None':
        print(f'[-]网页黑名单:关闭', end='')
    else:
        print(f'[+]网页黑名单:开启', end='')
    if black_content == 'None':
        print(f'  [-]网页白名单:关闭')
    else:
        print(f'  [+]网页白名单:开启')
    print('='*50)
    print('0:关键字扫描       1:导入文本扫描')
    try:
        num = int(input('请选择启动方式(0/1):'))
        if num == 0:
            rurls = set()
            keywor = input('请输入关键字:')
            t1 = time.time()

            bd = bd_urls(keywor,num)
            rule_bd_urls = set()
            with ThreadPoolExecutor(max_workers=10) as executor:
                results_bd = executor.map(rule_url, bd)
            with lock:
                for url in results_bd:
                    if url:
                        rule_bd_urls.add(url)

            # 每个url再进行友链检查
            l_urls = set()
            rule_link_urls = set()
            for url in rule_bd_urls:
                link_urls = get_links(url)
                l_urls.update(link_urls)
            with ThreadPoolExecutor(max_workers=10) as executor:
                results_link = executor.map(rule_url, l_urls)
            with lock:
                for url in results_link:
                    if url:
                        rule_link_urls.add(url)
            # #可以直接保存集合或列表,减少IO
            with open(f'{keywor}_url.txt', 'a+', encoding='utf-8') as a:
                a.write('\n'.join(rule_link_urls))
            t2 = time.time()
            print(f'扫描结束,耗时:{t2-t1},结果已保存至【{keywor}_url.txt】中')
        elif num == 1:
            print('提示:文本中的网址需带有协议类型,如:http/https')
            urls = set([url.strip() for url in open(input('将需要扫描的url拖到此窗口:'),'r',encoding='utf-8')])
            print(f'文本内共发现{len(urls)}个站点,扫描中。。。')
            result = set()
            if state == '0':
                t1 = time.time()
                with ThreadPoolExecutor(max_workers=10) as executor:
                    results_link = executor.map(rule_url, urls)
                with lock:
                    for url in results_link:
                        if url:
                            result.add(url)
            elif state == '1':
                t1 = time.time()
                l_url = set()
                for url in urls:
                    l = get_links(url)
                    l_url.update(l)

                with ThreadPoolExecutor(max_workers=10) as executor:
                    results_link = executor.map(rule_url, l_url)
                with lock:
                    for url in results_link:
                        if url:
                            result.add(url)
            with open(filename, 'a+', encoding='utf-8') as a:
                a.write('\n'.join(result))
            t2 = time.time()
            t = str(t2-t1).split('.')[0]
            print(f'扫描结束,耗时:{t}s,结果已保存至【{filename}】中')

        else:
            print('输入有误,程序结束!')

    except Exception as e:
        print(f'输入有误,程序结束,错误类型:{e}')

if __name__ == '__main__':
    title = '''
           _ ______   _    _ _____  _
          | |  ____| | |  | |  __ \| |
          | | |__    | |  | | |__) | |
      _   | |  __|   | |  | |  _  /| |
     | |__| | |      | |__| | | \ \| |____ 
      \____/|_|       \____/|_|  \_\______|
    '''
    #print(title)
    Result()

使用测试

脚本测试了1w+站点,目前运行正常

配置文件config.ini

  • None不检测该关键词,只支持或逻辑,即符号|

  • 不检测可用None,字段不可放空,否则脚本无法正常运行

  • state只支持0/1,0关闭导入文本的友链爬行,1开启导入文本的友链爬行

  • 关键字优先级:网址黑 > 网址白 > 标题黑 > 标题白 > 网页内容黑 > 网页内容白

演示:
注:爬取结束后结果以txt格式保存在当前目录下

1、通过搜索引擎进行爬取教育类站点

image.png

2、通过导入的文本先筛选出教育类站点,不进行友链爬取

image.png

3、通过导入的文本先筛选出教育类站点,再进行友链爬取

image.png

结语

通过上述的代码,你可以完成该脚本,如果你不想麻烦也可以下载我打包好的,目前该脚本已打包上传到github:https://github.com/JiangFengSec/JF_URL,感兴趣的师傅可以下载尝试下,对你有帮助的话帮忙点个stars,感谢!

# web安全 # python扫描工具 # python安全 # Python工具 # 安全开发
免责声明
1.一般免责声明:本文所提供的技术信息仅供参考,不构成任何专业建议。读者应根据自身情况谨慎使用且应遵守《中华人民共和国网络安全法》,作者及发布平台不对因使用本文信息而导致的任何直接或间接责任或损失负责。
2. 适用性声明:文中技术内容可能不适用于所有情况或系统,在实际应用前请充分测试和评估。若因使用不当造成的任何问题,相关方不承担责任。
3. 更新声明:技术发展迅速,文章内容可能存在滞后性。读者需自行判断信息的时效性,因依据过时内容产生的后果,作者及发布平台不承担责任。
本文为 独立观点,未经授权禁止转载。
如需授权、对文章有疑问或需删除稿件,请联系 FreeBuf 客服小蜜蜂(微信:freebee1024)
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者
文章目录