from __future__ import annotations import json from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional UTC = timezone.utc DEFAULT_AUTH_SCOPE = "lab-local, lab-public, authorized-third-party" DEFAULT_MINIMAL_VALIDATION = "只读探测、最小化注入、可审计回显、可回滚验证。" def now_iso() -> str: return datetime.now(tz=UTC).replace(microsecond=0).isoformat() def add_common_args(parser, include_input: bool = False, include_network: bool = True) -> None: parser.add_argument( "--ack-authorized", action="store_true", help="确认目标属于自有资产、测试环境或已明确授权", ) parser.add_argument( "--format", choices=["text", "json", "ndjson"], default="text", help="输出格式,默认 text", ) parser.add_argument("--output", help="将结果写入文件") parser.add_argument("--evidence-dir", help="证据目录,工具会把结构化结果写入其中") parser.add_argument("--run-id", help="关联的 run bundle ID") parser.add_argument("--case-id", help="关联的 advisory/case ID") if include_network: parser.add_argument( "--header", action="append", default=[], help="附加请求头,可重复,格式 Name: Value", ) parser.add_argument( "--proxy", help="调试或实验代理地址,例如 http://127.0.0.1:8080", ) parser.add_argument( "--rate", type=float, default=0.0, help="每秒请求数上限,0 表示不额外限速", ) if include_input: parser.add_argument("--target-file", help="批量目标文件,每行一个目标") parser.add_argument( "--stdin", action="store_true", help="从标准输入读取目标,每行一个", ) def ensure_authorized(args, parser) -> None: if not getattr(args, "ack_authorized", False): parser.error("必须显式提供 --ack-authorized 以确认目标范围合法") def parse_headers(values: Iterable[str]) -> Dict[str, str]: headers: Dict[str, str] = {} for value in values or []: if ":" not in value: continue name, raw = value.split(":", 1) name = name.strip() raw = raw.strip() if not name: continue headers[name] = raw return headers def parse_cookie_string(raw: Optional[str]) -> Dict[str, str]: cookies: Dict[str, str] = {} if not raw: return cookies for part in raw.split(";"): if "=" not in part: continue name, value = part.split("=", 1) name = name.strip() value = value.strip() if name: cookies[name] = value return cookies def read_targets(args, fallback: Optional[str] = None) -> List[str]: values: List[str] = [] if fallback: values.append(fallback) target_file = getattr(args, "target_file", None) if target_file: for line in Path(target_file).read_text(encoding="utf-8").splitlines(): line = line.strip() if line and line not in values: values.append(line) if getattr(args, "stdin", False): import sys for line in sys.stdin.read().splitlines(): line = line.strip() if line and line not in values: values.append(line) return values def make_report( *, tool: str, mode: str, target: str, status: str, severity: str, payload_or_probe: Any, request_summary: Dict[str, Any], evidence_refs: List[str], destructive_risk: str = "low", minimal_validation: str = DEFAULT_MINIMAL_VALIDATION, authorization_scope: str = DEFAULT_AUTH_SCOPE, extra: Optional[Dict[str, Any]] = None, args: Any = None, ) -> Dict[str, Any]: payload = { "tool": tool, "mode": mode, "target": target, "status": status, "severity": severity, "timestamp": now_iso(), "request_summary": request_summary, "payload_or_probe": payload_or_probe, "evidence_refs": evidence_refs, "minimal_validation": minimal_validation, "authorization_scope": authorization_scope, "destructive_risk": destructive_risk, "run_id": getattr(args, "run_id", None) if args else None, "case_id": getattr(args, "case_id", None) if args else None, } if extra: payload.update(extra) return payload def write_evidence(args, name: str, data: Any) -> Optional[str]: evidence_dir = getattr(args, "evidence_dir", None) if not evidence_dir: return None path = Path(evidence_dir) / name path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: if isinstance(data, str): handle.write(data.rstrip() + "\n") else: json.dump(data, handle, indent=2, ensure_ascii=True, sort_keys=False) handle.write("\n") return str(path) def emit_report(args, report: Dict[str, Any], text_lines: Optional[List[str]] = None) -> int: if args.format == "json": content = json.dumps(report, indent=2, ensure_ascii=True) elif args.format == "ndjson": records = report if isinstance(report, list) else [report] content = "\n".join(json.dumps(item, ensure_ascii=True) for item in records) else: content = "\n".join(text_lines or [json.dumps(report, ensure_ascii=True)]) if getattr(args, "output", None): output = Path(args.output) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(content.rstrip() + "\n", encoding="utf-8") print(content) return 0