1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
import os from phply import phplex from phply.phpparse import make_parser from phply import phpast as php
class PHPASTAuditor: def __init__(self): self._init_parser() self.danger_functions = [ 'mysql_query', 'mysqli_query', 'pg_query', 'execute', 'eval', 'exec', 'shell_exec', 'system', 'passthru', 'query' ] self.safe_functions = [ 'mysql_real_escape_string', 'addslashes', 'htmlspecialchars', 'intval', 'mysqli_real_escape_string' ] self.taint_map = {} self.visited_files = set()
def _init_parser(self): """重新初始化 lexer 和 parser,保证每次解析从行 1 开始""" self.lexer = phplex.lexer.clone() self.parser = make_parser()
def parse_php_code(self, code): return self.parser.parse(code, lexer=self.lexer)
def tag_ast_file(self, node, filename): if not isinstance(node, php.Node): return setattr(node, '__file__', filename) for child in self.safe_iter_node(node): self.tag_ast_file(child, filename)
def safe_iter_node(self, node): if not isinstance(node, php.Node): return for _, value in getattr(node, '__dict__', {}).items(): if isinstance(value, php.Node): yield value elif isinstance(value, list): for item in value: if isinstance(item, php.Node): yield item
def get_func_name(self, node): if isinstance(node, php.FunctionCall): if hasattr(node.name, 'name'): return node.name.name elif isinstance(node.name, str): return node.name return None
def is_user_input(self, node): if isinstance(node, php.ArrayOffset) and isinstance(node.node, php.Variable): return node.node.name in ['$_GET', '$_POST', '$_REQUEST', '$_COOKIE'] if isinstance(node, php.Variable): return node.name in ['$_GET', '$_POST', '$_REQUEST', '$_COOKIE'] return False
def mark_taint(self, var_name): self.taint_map[var_name] = True
def is_tainted(self, var_name): return self.taint_map.get(var_name, False)
def contains_user_input_or_taint(self, node): if isinstance(node, php.Parameter): return self.contains_user_input_or_taint(node.node) if isinstance(node, php.Variable): return self.is_user_input(node) or self.is_tainted(node.name) if isinstance(node, php.ArrayOffset): return self.is_user_input(node) or self.contains_user_input_or_taint(node.node) if isinstance(node, php.BinaryOp): return (self.contains_user_input_or_taint(node.left) or self.contains_user_input_or_taint(node.right)) if isinstance(node, php.FunctionCall): func_name = self.get_func_name(node) if func_name in self.safe_functions: return False return any(self.contains_user_input_or_taint(p) for p in getattr(node, 'params', [])) if isinstance(node, php.Node): for child in self.safe_iter_node(node): if self.contains_user_input_or_taint(child): return True return False
def collect_taints_iterative(self, ast_nodes): """多轮迭代传播污点,直到无新增变量""" changed = True while changed: changed = False stack = list(ast_nodes) while stack: node = stack.pop() if isinstance(node, php.Assignment) and isinstance(node.node, php.Variable): var_name = node.node.name if self.contains_user_input_or_taint(node.expr) and not self.is_tainted(var_name): self.mark_taint(var_name) changed = True for child in self.safe_iter_node(node): stack.append(child)
def check_danger_functions(self, node): findings = [] if not isinstance(node, php.Node): return findings func_name = self.get_func_name(node) if func_name in self.danger_functions: for arg in getattr(node, 'params', []): if self.contains_user_input_or_taint(arg): file = getattr(node, '__file__', 'unknown') line = getattr(node, 'lineno', 'unknown') findings.append( f"[TAINT] Dangerous function '{func_name}' in {file} at line {line}" ) for child in self.safe_iter_node(node): findings.extend(self.check_danger_functions(child)) return findings
def process_include(self, node, current_dir): expr = node.expr included_file = None if isinstance(expr, php.Scalar) and isinstance(expr.value, str): included_file = expr.value if included_file: included_path = os.path.abspath(os.path.join(current_dir, included_file)) if os.path.exists(included_path) and included_path not in self.visited_files: self.visited_files.add(included_path) try: self._init_parser() with open(included_path, "r", encoding="utf-8", errors="ignore") as f: code = f.read() ast_nodes = self.parse_php_code(code) for n in ast_nodes: self.tag_ast_file(n, included_path) self.collect_taints_iterative(ast_nodes) for n in ast_nodes: for child in self.find_include_nodes(n): self.process_include(child, os.path.dirname(included_path)) except Exception as e: print(f"[WARN] Failed to parse include file {included_path}: {e}")
def find_include_nodes(self, node): nodes = [] if isinstance(node, php.Include) or isinstance(node, php.Require): nodes.append(node) for child in self.safe_iter_node(node): nodes.extend(self.find_include_nodes(child)) return nodes
def audit_php_file(self, filepath): self._init_parser() self.taint_map.clear() self.visited_files.clear()
abs_path = os.path.abspath(filepath) self.visited_files.add(abs_path) try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: code = f.read() ast_nodes = self.parse_php_code(code) for n in ast_nodes: self.tag_ast_file(n, abs_path)
for n in ast_nodes: for inc in self.find_include_nodes(n): self.process_include(inc, os.path.dirname(filepath))
self.collect_taints_iterative(ast_nodes)
findings = [] for n in ast_nodes: findings.extend(self.check_danger_functions(n))
return findings, list(self.taint_map.keys()) except Exception as e: return [f"Error parsing file: {e}"], []
main.py from php_auditor import PHPASTAuditor import os def scan_folder(folder): auditor = PHPASTAuditor() all_results = {} for root, _, files in os.walk(folder): for file in files: if file.endswith(".php"): path = os.path.join(root, file) findings, taints = auditor.audit_php_file(path) all_results[path] = {"taints": taints, "findings": findings} return all_results folder = "./" results = scan_folder(folder) for file, info in results.items(): print(f"\n[+] File: {file}") print("Collected taint variables:", info["taints"]) for finding in info["findings"]: print(finding)
|