freeBuf
主站

分类

漏洞 工具 极客 Web安全 系统安全 网络安全 无线安全 设备/客户端安全 数据安全 安全管理 企业安全 工控安全

特色

头条 人物志 活动 视频 观点 招聘 报告 资讯 区块链安全 标准与合规 容器安全 公开课

官方公众号企业安全新浪微博

FreeBuf.COM网络安全行业门户,每日发布专业的安全资讯、技术剖析。

FreeBuf+小程序

FreeBuf+小程序

flask安全指南
2025-01-11 21:44:16
所属地 四川省

前记

非专业web开发,以flask框架为基础记录Web开发过程的部分安全问题。

Flask

escape

When returning HTML (the default response type in Flask), any user-provided values rendered in the output must be escaped to protect from injection attacks. HTML templates rendered with Jinja, introduced later, will do this automatically.

escape(), shown here, can be used manually. It is omitted in most examples for brevity, but you should always be aware of how you’re using untrusted data.

from markupsafe import escape

@app.route("/<path:name>")
def hello(name):
    #return f"Hello, {name}!"
    return f"Hello, {escape(name)}!"
#http://127.0.0.1:5000/<script>alert("a")</script>

这里默认path是为了接受slash(/),即Converter types:

string(default) accepts any text without a slash
intaccepts positive integers
floataccepts positive floating point values
pathlikestringbut also accepts slashes
uuidaccepts UUID strings

|safe

Automatic escaping is enabled, so if person contains HTML it will be escaped automatically. If you can trust a variable and you know that it will be safe HTML (for example because it came from a module that converts wiki markup to HTML) you can mark it as safe by using the Markup class or by using the |safe filter in the template. Head over to the Jinja 2 documentation for more examples.

from flask import render_template

@app.route("/wel/<name>")
def hello(name):
        return render_template('welcome.html', person=name)       

welcome.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Hello from Flask</title>
</head>
<body>
{% if person %}
    <!--<h1>Hello {{ person}}!</h1>-->
    <h1>Hello {{ person|safe }}!</h1>
{% else %}
  <h1>Hello, World!</h1>
{% endif %}
</body>
</html>

http://127.0.0.1:5000/wel/<img src=x onerror=alert('XSS')>

secure_filename

If you want to know how the file was named on the client before it was uploaded to your application, you can access thefilenameattribute. However please keep in mind that this value can be forged so never ever trust that value. If you want to use the filename of the client to store the file on the server, pass it through thesecure_filename()function that Werkzeug provides for you:

from werkzeug.utils import secure_filename

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        file = request.files['the_file']
        file.save(f"/var/www/uploads/{secure_filename(file.filename)}")

errorhandler/make_response

If you want to get hold of the resulting response object inside the view you can use the make_response() function.

errorhandler:Register a function to handle errors by code or exception class.

自己处理错误返回信息可以防止报错导致信息泄露,特别是调试模式下的报错

@app.errorhandler(404)
def not_found(error):
    resp = make_response(render_template('error.html'), 404)
    resp.headers['X-Something'] = 'A value'
    return resp

session

In addition to the request object there is also a second object called session which allows you to store information specific to a user from one request to the next. This is implemented on top of cookies for you and signs the cookies cryptographically. What this means is that the user could look at the contents of your cookie but not modify it, unless they know the secret key used for signing.

app.secret_key = "dev"
counter = 0

@app.route('/')
@app.route('/<id>')
def index(id=0):
    if 'username' in session:
        return f'{id}Logged in as {session["username"]}'
    return f'{id}You are not logged in'

@app.route('/login', methods=['GET', 'POST'])
def login():
    global counter
    if request.method == 'POST':
        counter+=1
        session['username'] = request.form['username']
        return redirect(url_for('index',id=counter))
    return '''
        <form method="post">
            <p><input type=text name=username>
            <p><input type=submit value=Login>
        </form>
    '''

@app.route('/logout')
def logout():
    # remove the username from the session if it's there
    session.pop('username', None)
    return redirect(url_for('index'))

random secret_key

import secrets
print(secrets.token_hex())

伪造session

eyJ1c2VyX2lkIjoxfQ.Z1-oSg.ZKFyFEbpDBKPH16nWfEusSHUoPY
session三段式:base64 encode,时间戳,安全签名
import hashlib

from flask.json.tag import TaggedJSONSerializer

from itsdangerous import *

session = {"user_id":2}

secret = 'dev'

print(URLSafeSerializer(secret_key=secret,

                        salt='cookie-session',  # Flask固定的盐,盐和secret会先经过一轮sha1运算,其结果作为下一轮盐和cookie内容生成签名。

                        serializer=TaggedJSONSerializer(),

                        signer=TimestampSigner,

                        signer_kwargs={

                            'key_derivation': 'hmac',

                            'digest_method': hashlib.sha1

                        }

                        ).dumps(session))

伪造后修改cookie后完成越权:eyJ1c2VyX2lkIjoyfQ.Z1-rsA.CIWG3nrubHedU_5s-zo4fz_rmrU

sql-execute

db.execute takes a SQL query with ? placeholders for any user input, and a tuple of values to replace the placeholders with. The database library will take care of escaping the values so you are not vulnerable to a SQL injection attack.

For security, passwords should never be stored in the database directly. Instead, generate_password_hash() is used to securely hash the password, and that hash is stored. Since this query modifies data, db.commit() needs to be called afterwards to save the changes.

db.execute(
                    "INSERT INTO user (username, password) VALUES (?, ?)",
                    (username, generate_password_hash(password)),
                )

generate_password_hash、check_password_hash

db.execute(
                    "INSERT INTO user (username, password) VALUES (?, ?)",
                    (username, generate_password_hash(password)),
                )
                
check_password_hash(user["password"], password)

send_from_directory

  • sending out HTML from uploaded files, never do that, use theContent-Disposition: attachmentheader to prevent that problem.

return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename)
Content-Disposition: inline; filename=a.png
Content-Type: image/png
    
return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename, as_attachment=True)
Content-Disposition: attachment; filename=a.png
Content-Type: image/png

如果未指定as_attachment=True,浏览器将尝试直接在页面中显示文件内容;如果as_attachment=True,则强制下载文件

Content-Type

  • 自动 MIME 类型检测:

    • send_from_directory使用mimetypes.guess_type()根据文件扩展名推测Content-Type

    • 例如:

      • .txttext/plain

      • .jpgimage/jpeg

      • .pngimage/png

      • .pdfapplication/pdf

  • 手动覆盖:

    • 可通过mimetype参数手动指定内容类型:

      return send_from_directory(
          directory="uploads",
          filename="example.txt",
          mimetype="application/octet-stream",
      )
      

对上传文件进行检验和修改文件名

from flask import request, redirect, url_for, render_template, flash, Blueprint, send_from_directory
import uuid
from werkzeug.utils import secure_filename
import os
from PIL import Image
from flask import current_app
bp = Blueprint("file", __name__, url_prefix="/file")

# 检查文件扩展名
def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']

# 检查文件是否为图片
def is_image(file_path):
    try:
        Image.open(file_path).verify()
        return True
    except:
        return False

# 上传图片
@bp.route('/upload', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        if 'file' not in request.files:
            flash('No file part')
            return redirect(request.url)

        file = request.files['file']
        if file.filename == '':
            flash('No selected file')
            return redirect(request.url)

        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            ext = filename.rsplit('.', 1)[1].lower()  # 获取文件扩展名
            random_filename = f"{uuid.uuid4().hex}.{ext}"  # 生成随机文件名
            file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], random_filename)
            file.save(file_path)

            if not is_image(file_path):
                os.remove(file_path)
                flash('Uploaded file is not a valid image')
                return redirect(request.url)

            flash('File uploaded successfully!')
            return redirect(url_for('index'))
        else:
            flash('Allowed file types are: png, a.jpg, jpeg, gif')
            return redirect(request.url)

    return render_template('file/file.html')

# 下载图片
@bp.route('/download/<filename>', methods=['GET'])
def uploaded_file(filename):
    file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
    try:
        # return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename)

        return send_from_directory("../"+current_app.config['UPLOAD_FOLDER'],filename, as_attachment=True)
    except FileNotFoundError:
        flash("File not found.")
        return redirect(url_for("file.upload_file"))

    
'''app.config.from_mapping(
        # a default secret that should be overridden by instance config
        SECRET_KEY="dev",
        # store the database in the instance folder
        DATABASE=os.path.join(app.instance_path, "flaskr.sqlite"),
        UPLOAD_FOLDER = 'uploads',
        ALLOWED_EXTENSIONS= {'png', 'jpg', 'jpeg', 'gif'}
    )'''

file.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Upload Image</title>
</head>
<body>
    <h1>Upload an Image</h1>

    {% with messages = get_flashed_messages() %}
      {% if messages %}
        <ul>
          {% for message in messages %}
            <li>{{ message }}</li>
          {% endfor %}
        </ul>
      {% endif %}
    {% endwith %}

    <form method="POST" enctype="multipart/form-data">
        <label for="file">Select image:</label>
        <input type="file" name="file" accept="image/*" required>
        <input type="submit" value="Upload">
    </form>
</body>
</html>

CSRFProtect

CSRFProtect初始化后会自动保护所有带有POST,PUT,PATCH,DELETE等修改性请求。在 HTML 的form表单中添加 CSRF 令牌:

from flask_wtf import CSRFProtect
CSRFProtect(app)

<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />

AJAX Requests with CSRF-Token

base.html

<head>
  <meta name="csrf-token" content="{{ csrf_token() }}">
  <script src="{{ url_for('static', filename='js/like.js') }}"></script>
</head>

like.js

document.addEventListener('DOMContentLoaded', function () {
  // Find all like buttons by class
  const buttons = document.querySelectorAll('.like-button');
  buttons.forEach(button => {
    // Add click event listener to each button
    button.addEventListener('click', function () {
      const postId = this.dataset.postId;
      likePost(postId); // Call likePost function with postId
    });
  });
});

// Function to handle the "like" action
function likePost(postId) {
  const csrfToken = document.querySelector('meta[name="csrf-token"]').getAttribute('content');
  fetch(`${postId}/like`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-CSRFToken': csrfToken
    }
  })
    .then(response => response.json())
    .then(data => {
      if (data.status === 'success') {
        const likesSpan = document.getElementById(`likes-${postId}`);
        likesSpan.textContent = data.likes;
      } else if (data.status === 'error') {
        alert(data.message); // 显示错误消息
      }
    })
    .catch(error => console.error('Error:', error));
}

jsonify

@bp.route('/data', methods=['GET'])
def get_data():
    data = [1,"<img src=x onerror=alert('XSS')>",'xiao',77]
    # return data
    return json.dumps(data) #xss
    # return jsonify(data)

json.dumps: Content-Type: text/html; charset=utf-8

jsonify: application/json

safe_load、defusedxml

def safe_load(stream):
    """
    Parse the first YAML document in a stream
    and produce the corresponding Python object.

    Resolve only basic YAML tags. This is known
    to be safe for untrusted input.
    """
    return load(stream, SafeLoader)

import yamls
yaml_data = """
!!python/object/apply:os.system
  args: ['calc']
"""
try:
    # This prevents execution of unsafe tags
    # parsed_data = yaml.safe_load(yaml_data)
    parsed_data = yaml.load(yaml_data, Loader=yaml.UnsafeLoader)
    print("Parsed safely:", parsed_data)
except yaml.YAMLError as e:
    print(f"Error: {e}")
from defusedxml import ElementTree as ET

# Correct XML payload without external entities
safe_xml_data = """<?xml version="1.0"?>
<root>
  <message>Hello, safe world!</message>
</root>
"""

# Safe XML Parsing Example
try:
    print("\nUsing safe XML parsing")
    root_safe = ET.fromstring(safe_xml_data, forbid_dtd=True)
    print(f"Parsed (Safe): {ET.tostring(root_safe, encoding='unicode')}")
except ET.ParseError as e:
    print(f"Error: {e}")

flask_talisman/csp

flask --app .\flaskr\ run --cert=cert.pem --key=key.pem:通过证书默认开启https

from flask_talisman import Talisman
    csp = {
        'default-src': '\'self\'',
        'script-src': '\'self\'',
        'style-src': '\'self\''
    }
    Talisman(app, force_https=True, force_https_permanent=True,content_security_policy=csp)

并发

@bp.route('/<int:id>/like', methods=['POST'])
@login_required
def like_post(id):
    """Handle a like action for a post."""
    db = get_db()

    # 使用事务开始并立即锁定数据库
    #db.execute('BEGIN IMMEDIATE')

    try:
        # 检查用户是否已经给该文章点过赞
        like_exists = db.execute(
            "SELECT 1 FROM post_likes WHERE user_id = ? AND post_id = ?",
            (g.user['id'], id)
        ).fetchone()

        if like_exists:
            return jsonify(status='error', message='You have already liked this post.')

        # 增加点赞记录
        db.execute(
            "INSERT INTO post_likes (user_id, post_id) VALUES (?, ?)",
            (g.user['id'], id)
        )

        # 更新文章的点赞数量
        db.execute(
            "UPDATE post SET likes = likes + 1 WHERE id = ?",
            (id,)
        )
        db.commit()  # 提交事务

        # 获取更新后的点赞数量
        likes = db.execute(
            "SELECT likes FROM post WHERE id = ?",
            (id,)
        ).fetchone()['likes']

        return jsonify(status='success', likes=likes)
    except Exception as e:
        db.rollback()  # 如果发生异常,回滚事务
        return jsonify(status='error', message=str(e))

image-20241229122650178.png

解决上述并发漏洞的方法如下:

  • db.execute('BEGIN IMMEDIATE'):使用 BEGIN IMMEDIATE来显式开始事务并锁定表。在事务中完成所有操作(检查是否点赞、插入点赞、更新点赞数),确保这些操作在同一事务中执行。

  • 加一个额外的唯一约束,在post_likes表上建立(user_id, post_id)唯一约束来防止重复点赞

CREATE TABLE post_likes (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    post_id INTEGER NOT NULL,
    UNIQUE(user_id, post_id),  -- 确保同一用户对同一篇文章只能点赞一次
    FOREIGN KEY (user_id) REFERENCES user (id),
    FOREIGN KEY (post_id) REFERENCES post (id)
);
  • 利用缓存层批量写入

每次点赞时,写入 Redis 哈希表;定时批量同步到数据库(例如每五秒)

越权

  • 在增删查改处加入鉴权和登录校验

def login_required(view):
    """View decorator that redirects anonymous users to the login page."""
    @functools.wraps(view)
    def wrapped_view(**kwargs):
        if g.user is None:
            return redirect(url_for("auth.login"))

        return view(**kwargs)

    return wrapped_view
    
def get_post(id, check_author=True):
    ...
    if post is None:
        abort(404, f"Post id {id} doesn't exist.")

    if check_author and post["author_id"] != g.user["id"]:
        abort(403)
    return post

@bp.route("/<int:id>/update", methods=("GET", "POST"))
@login_required
def update(id):
    """Update a post if the current user is the author."""
    post = get_post(id)
  • 使用session机制验证用户身份

@bp.route("/login", methods=("GET", "POST"))
def login():
    ...
    if error is None:
            # store the user id in a new session and return to the index
            session.clear()
            session["user_id"] = user["id"]

@bp.route("/logout")
def logout():
    """Clear the current session, including the stored user id."""
    session.clear()

@bp.before_app_request
def load_logged_in_user():
    """If a user id is stored in the session, load the user object from
    the database into ``g.user``."""
    user_id = session.get("user_id")

    if user_id is None:
        g.user = None
    else:
        g.user = (
            get_db().execute("SELECT * FROM user WHERE id = ?", (user_id,)).fetchone()
        )

oss

对象存储(Object-Based Storage),也可以叫做面向对象的存储,现在也有不少厂商直接把它叫做云存储。

说到对象存储就不得不提 Amazon,Amazon S3 (Simple Storage Service) 简单存储服务,是 Amazon 的公开云存储服务,与之对应的协议被称为 S3 协议,目前 S3 协议已经被视为公认的行业标准协议,因此目前国内主流的对象存储厂商基本上都会支持 S3 协议。

在 Amazon S3 标准下中,对象存储中可以有多个桶(Bucket),然后把对象(Object)放在桶里,对象又包含了三个部分:Key、Data 和 Metadata

Key 是指存储桶中的唯一标识符

Data 是存储的数据本体

Metadata 即元数据,可以简单的理解成数据的标签、描述之类的信息

wooflugf4zwoe_20240815_717909714f994bfa90a698443ece9d5b.png

1、Bucket权限配置

公开访问:在只配置读写权限设置为公有读或公共读写的情况下,无法列出对象(AccessDenied)。但是可以爆破key,从而访问对应的KEY路径(xxx.oss-cn-hangzhou.aliyuncs.com/img.png)

ListObject:列出Object对象,访问存储桶域名可以把存储桶的东西列出来

公开写:管理员将存储桶权限配置为可写,则攻击者可上传任意文件到存储桶中,或覆盖已经存在的文件。PUT上传文件名aa,内容为aabb数据包如下

PUT /aa HTTP/1.1
Host: coleakspictures.oss-cn-hangzhou.aliyuncs.com
Cache-Control: max-age=0
Sec-Ch-Ua: "Chromium";v="97", " Not;A Brand";v="99"
Sec-Ch-Ua-Mobile: ?0
Sec-Ch-Ua-Platform: "Windows"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Connection: close

aabb

2、Bucket桶爆破

Bucket 不存在:InvalidBucketName 或 NoSuchBucket

Bucket 存在:列出Object 或 AccessDenied

3、策略配置

如果管理员设置了某些IP,UA才可以请求该存储桶的话,此时如果错误的配置了GetBucketPolicy,可导致攻击者获取策略配置

aliyun.exe oss bucket-policy oss://testpictures --method get

4、AccessKeyId,SecretAccessKey泄露

  • GitHub等开源平台中的源代码泄露Key

  • 反编译APK,找敏感信息

  • 目标网站JS源代码中

5、Bucket接管

Bucket 显示 NoSuchBucket 说明是可以接管的,创建一个同名的 Bucket ,创建完 Bucket 后,再次访问发现就显示 AccessDenied 了,说明该 Bucket 已经被我们接管了, Bucket 设置为公开并上传个文件

CORS

漏洞规则

Access-Control-Allow-OriginAccess-Control-Allow-Credentials结果
*true不存在漏洞
<all-host></all-host>true存在漏洞
<safe_host>true安全-一般不存在漏洞
nulltrue存在漏洞

全局设置

from flask import Flask
from flask_cors import CORS

app = Flask(__name__)
CORS(app, origins='http://example.com')

单个接口

from flask import Flask
from flask_cors import CORS, cross_origin

@bp.route('/api/some_endpoint')
@cross_origin(origins='https://localhost:5000', methods=['GET', 'POST'],supports_credentials=True)
def some_endpoint():
    # 处理接口逻辑
    # return jsonify({"message": f"Hello, {g.user['id']}!"})
    id=session['user_id']
    return jsonify({"message": f"Hello, {id}!"})

@bp.route('/test')
def test():
    return render_template("blog/test.html")

Flask-CORS提供了许多配置选项,用于控制CORS的行为。以下是一些常用的配置选项:

  • origins:指定允许的源。您可以使用通配符来表示所有的源,或者指定具体的源。例如,origins='*'表示允许所有的源,而origins='example.com'表示只允许example.com这个源。

  • methods:指定允许的HTTP方法。默认情况下,所有的方法(GET、POST等)都是允许的。您可以通过设置methods=['GET']来只允许GET请求。

  • headers:指定允许的请求头。默认情况下,所有的请求头都是允许的。您可以通过设置headers=['Content-Type']来只允许Content-Type请求头。

  • expose_headers:指定允许浏览器访问的响应头。默认情况下,浏览器只能访问一些基本的响应头,如Content-TypeCache-Control等。您可以通过设置expose_headers=['Authorization']来允许浏览器访问Authorization响应头。

  • supports_credentials:指定是否允许使用凭据进行跨域请求。如果设置为True,浏览器会在请求中添加Cookie等凭据信息。默认情况下,该选项是禁用的。

攻击演示

test.html

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <meta name="csrf-token" content="your-csrf-token-here">
  <title>CORS Test</title>
  <script>
    // 发送 GET 请求来测试 CORS
    function testCORS() {
fetch('https://127.0.0.1:5000/api/some_endpoint', {
  method: 'GET',
  headers: {
    'Content-Type': 'application/json',
    'X-CSRFToken': document.querySelector('meta[name="csrf-token"]').getAttribute('content'),
    'Origin': 'https://localhost:5000',
  },
  credentials: 'include'  // 确保携带 Cookies
})
      .then(response => {
        // 检查是否支持 CORS
        if (response.ok) {
          return response.json();
        } else {
          throw new Error('CORS test failed');
        }
      })
      .then(data => {
        console.log('Response:', data);
        alert('CORS request successful: ' + data.message);
      })
      .catch(error => {
        console.error('Error:', error);
        alert('CORS request failed');
      });
    }
  </script>
</head>
<body>
  <h1>CORS Test</h1>
  <button onclick="testCORS()">Test CORS</button>
</body>
</html>

数据包及返回包如下

OPTIONS /api/some_endpoint HTTP/1.1
Host: 127.0.0.1:5000
Accept: */*
Access-Control-Request-Method: GET
Access-Control-Request-Headers: content-type,x-csrftoken
Origin: https://localhost:5000
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36
Sec-Fetch-Mode: cors
Sec-Fetch-Site: cross-site
Sec-Fetch-Dest: empty
Referer: https://localhost:5000/
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Connection: close

HTTP/1.1 200 OK
Server: Werkzeug/3.1.3 Python/3.10.8
Date: Mon, 30 Dec 2024 08:17:19 GMT
Content-Type: text/html; charset=utf-8
Allow: GET, HEAD, OPTIONS
Access-Control-Allow-Origin: https://localhost:5000
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: content-type, x-csrftoken
Access-Control-Allow-Methods: GET, POST
Vary: Cookie
Content-Length: 0
Connection: close

GET /api/some_endpoint HTTP/1.1
Host: 127.0.0.1:5000
Cookie: session=eyJjc3JmX3Rva2VuIjoiYzUxYjFjY2UwNTBlNjQzYzQxZWVjMzFlODQyMDM4Y2QzZmVhZDVlOSIsInVzZXJfaWQiOjJ9.Z3JVhg.4mgF6x-gP1KYKloTpC7X12oluKU
Sec-Ch-Ua: "Chromium";v="97", " Not;A Brand";v="99"
X-Csrftoken: your-csrf-token-here
Sec-Ch-Ua-Mobile: ?0
Content-Type: application/json
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36
Sec-Ch-Ua-Platform: "Windows"
Accept: */*
Origin: https://localhost:5000
Sec-Fetch-Site: cross-site
Sec-Fetch-Mode: cors
Sec-Fetch-Dest: empty
Referer: https://localhost:5000/
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Connection: close

HTTP/1.1 200 OK
Server: Werkzeug/3.1.3 Python/3.10.8
Date: Mon, 30 Dec 2024 08:17:20 GMT
Content-Type: application/json
Content-Length: 24
Access-Control-Allow-Origin: https://localhost:5000
Access-Control-Allow-Credentials: true
Vary: Cookie
Connection: close

{"message":"Hello, 2!"}

防御措施

  • 配置csp以阻止CORS

  • origins设置可信任的白名单

  • 不开启supports_credentials(默认)

  • 浏览器samesite:Strict/Lax,而不配置为有风险的none(还将cookie加上Secure

SSTI

render_template自动转义不存在SSTI,而render_template_string会出现该漏洞

@bp.route('/cc',methods=['GET', 'POST'])
def cc():
    template = '''
        <div class="center-content error">
            <h1>Oops! That page doesn't exist.</h1>
            <h3>%s</h3>
        </div> 
    ''' %(request.url)
    return render_template_string(template)
https://127.0.0.1:5000/cc?{{7+8}}
{{self.__init__.__globals__.__builtins__['__import__']('os').popen('ipconfig').read()}}
{{''.__class__.__base__.__subclasses__()[128].__init__.__globals__['__builtins__']['eval']('__import__("os").popen("dir").read()')}}
{{().__class__.__base__.__subclasses__()[155].__init__.__globals__.__builtins__['eval']('__import__("os").popen("dir").read()')}}

魔术方法

魔术方法作用
init对象的初始化方法
class返回对象所属的类
module返回类所在的模块
mro返回类的调用顺序,可以此找到其父类(用于找父类
base获取类的直接父类(用于找父类
bases获取父类的元组,按它们出现的先后排序(用于找父类
dict返回当前类的函数、属性、全局变量等
subclasses返回所有仍处于活动状态的引用的列表,列表按定义顺序排列(用于找子类
globals获取函数所属空间下可使用的模块、方法及变量(用于访问全局变量
import用于导入模块,经常用于导入os模块
builtins返回Python中的内置函数,如eval

获取子类

# 获取子类
''.__class__.__base__.__subclasses__()
''.__class__.__bases__[0].__subclasses__()
''.__class__.__mro__[-1].__subclasses__()

data = r'''
    [<class 'type'>, <class 'async_generator'>, <class 'int'>, <class 'bytearray_iterator'>...]
'''
userful_class = ['linecache', 'os._wrap_close', 'subprocess.Popen', 'warnings.catch_warnings', '_frozen_importlib._ModuleLock', '_frozen_importlib._DummyModuleLock', '_frozen_importlib._ModuleLockManager', '_frozen_importlib.ModuleSpec']
k=data.split(',')
for i in range(len(k)):
    for j in userful_class:
        if j in k[i]:
            print(i,":",k[i])
            
100 :  <class '_frozen_importlib._ModuleLock'>
101 :  <class '_frozen_importlib._DummyModuleLock'>
102 :  <class '_frozen_importlib._ModuleLockManager'>
102 :  <class '_frozen_importlib._ModuleLockManager'>
103 :  <class '_frozen_importlib.ModuleSpec'>
139 :  <class 'os._wrap_close'>
155 :  <class 'warnings.catch_warnings'>
266 :  <class 'subprocess.Popen'>
{{().__class__.__base__.__subclasses__()[155].__init__.__globals__}}

后记

flask生命周期

接收请求
创建请求对象
请求钩子(before)
路由匹配
执行视图函数
生成响应对象
响应钩子(after 和 teardown)
返回响应

Flask 提供了 4 个钩子函数,允许你在处理请求的不同阶段执行自定义逻辑:

钩子函数作用
before_first_request第一次请求前执行一次
before_request每个请求处理前执行
after_request每个请求处理后执行(无异常)
teardown_request每个请求处理结束后执行(无论是否异常)

WSGI/ASGI

特性WSGIASGI
请求处理同步 (阻塞)异步与同步支持
实时通信支持不支持 WebSocket支持 WebSocket、HTTP/2
性能与扩展性更适合简单 Web 应用更适合高并发与异步应用
典型框架FlaskFastAPI、Django (新版)
Web 服务器Gunicorn、uWSGIUvicorn、Daphne

enev

创建:python -m venv myenv
激活:myenv\Scripts\activate.bat
退出:deactivate
删除:删除整个环境的安装目录即可。

生产环境

pyproject.toml

[project]
name = "flaskr"
version = "1.0.0"
description = "The basic blog app built in the Flask tutorial."
dependencies = [
    "flask",
]

[build-system]
requires = ["flit_core<4"]
build-backend = "flit_core.buildapi"

本地打包

pip install build

python -m build --wheel

服务安装

python -m venv myenv

myenv\Scripts\activate.bat

pip install flaskr-1.0.0-py2.py3-none-any.whl

flask --app flaskr init-db

pip install waitress

waitress-serve --call flaskr:create_app

自签证书

openssl req -x509 -newkey rsa:4096 -nodes -out cert.pem -keyout key.pem -days 365

oss

# -*- coding: utf-8 -*-
import oss2
from itertools import islice
import logging
import time
import random

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


access_key_id=""
access_key_secret=""

auth = oss2.AuthV4(access_key_id, access_key_secret)

# 设置Endpoint和Region
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
region = "cn-hangzhou"


def generate_unique_bucket_name():
    # 获取当前时间戳
    timestamp = int(time.time())
    # 生成0到9999之间的随机数
    random_number = random.randint(0, 9999)
    # 构建唯一的Bucket名称
    bucket_name = f"demo-{timestamp}-{random_number}"
    return bucket_name


# 生成唯一的Bucket名称
bucket_name = ""
bucket = oss2.Bucket(auth, endpoint, bucket_name, region=region)


def create_bucket(bucket):
    try:
        bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE)
        logging.info("Bucket created successfully")
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to create bucket: {e}")


def upload_file(bucket, object_name, data):
    try:
        result = bucket.put_object(object_name, data)
        logging.info(f"File uploaded successfully, status code: {result.status}")
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to upload file: {e}")


def download_file(bucket, object_name):
    try:
        file_obj = bucket.get_object(object_name)
        content = file_obj.read()
        with open("test.png", 'wb') as f:  # 将数据写入本地文件
            f.write(content)
        return content
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to download file: {e}")


def list_objects(bucket):
    try:
        objects = list(islice(oss2.ObjectIterator(bucket), 10))
        for obj in objects:
            logging.info(obj.key)
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to list objects: {e}")


def delete_objects(bucket):
    try:
        objects = list(islice(oss2.ObjectIterator(bucket), 100))
        if objects:
            for obj in objects:
                bucket.delete_object(obj.key)
                logging.info(f"Deleted object: {obj.key}")
        else:
            logging.info("No objects to delete")
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to delete objects: {e}")


def delete_bucket(bucket):
    try:
        bucket.delete_bucket()
        logging.info("Bucket deleted successfully")
    except oss2.exceptions.OssError as e:
        logging.error(f"Failed to delete bucket: {e}")


# 主流程
if __name__ == '__main__':
    # 1. 创建Bucket
    create_bucket(bucket)
    with open("img.png","rb") as f:
        con=f.read()
    # # 2. 上传文件
    upload_file(bucket, 'img.png', con)
    # 3. 下载文件
    download_file(bucket, 'img.png')
    # 4. 列出Bucket中的对象
    list_objects(bucket)
    # 5. 删除Bucket中的对象
    delete_objects(bucket)
    # 6. 删除Bucket
    delete_bucket(bucket)

CORS/CSRF/CSP

特性CORSCSRFCSP
目的允许或限制跨域资源访问,控制浏览器的跨域请求防止伪造请求,避免攻击者通过已认证的用户发起请求防止 XSS 攻击,限制资源加载,防止脚本注入
工作原理通过 HTTP 头Access-Control-Allow-Origin控制跨域通过 CSRF token 和 SameSite Cookies 防止伪造请求通过设置Content-Security-Policy控制资源加载
适用场景允许跨域请求或阻止不可信的跨域请求防止恶意网站利用已登录用户身份执行未授权操作防止 XSS 和数据注入攻击,限制脚本和资源的加载来源
解决的攻击跨站资源共享漏洞(XSS 的一种变体)跨站请求伪造(伪造用户的请求)跨站脚本攻击(XSS)和恶意数据注入(注入攻击)

CORS主要处理浏览器跨域请求的问题,控制哪些外部网站可以访问你的资源。

CSRF防止跨站请求伪造,确保用户请求的合法性,避免恶意网站伪造用户请求。

CSP防止 XSS 攻击,控制页面可以加载和执行的资源,减少注入攻击。

reference

https://flask.palletsprojects.com/en/stable/
https://www.secpulse.com/archives/97707.html
https://flask.palletsprojects.com/en/stable/templating/
https://escape.tech/blog/best-practices-protect-flask-applications/#what-is-flask
https://github.com/GoogleCloudPlatform/flask-talisman
https://developer.aliyun.com/article/875653
https://segmentfault.com/a/1190000041525436
https://xz.aliyun.com/t/12001
# web安全 # python # Flask框架 # Flask
本文为 独立观点,未经允许不得转载,授权请联系FreeBuf客服小蜜蜂,微信:freebee2022
被以下专辑收录,发现更多精彩内容
+ 收入我的专辑
+ 加入我的收藏
相关推荐
  • 0 文章数
  • 0 关注者
文章目录