181 行
5.8 KiB
Python
181 行
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
|
|
UTC = timezone.utc
|
|
DEFAULT_AUTH_SCOPE = "lab-local, lab-public, authorized-third-party"
|
|
DEFAULT_MINIMAL_VALIDATION = "只读探测、最小化注入、可审计回显、可回滚验证。"
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(tz=UTC).replace(microsecond=0).isoformat()
|
|
|
|
|
|
def add_common_args(parser, include_input: bool = False, include_network: bool = True) -> None:
|
|
parser.add_argument(
|
|
"--ack-authorized",
|
|
action="store_true",
|
|
help="确认目标属于自有资产、测试环境或已明确授权",
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["text", "json", "ndjson"],
|
|
default="text",
|
|
help="输出格式,默认 text",
|
|
)
|
|
parser.add_argument("--output", help="将结果写入文件")
|
|
parser.add_argument("--evidence-dir", help="证据目录,工具会把结构化结果写入其中")
|
|
parser.add_argument("--run-id", help="关联的 run bundle ID")
|
|
parser.add_argument("--case-id", help="关联的 advisory/case ID")
|
|
if include_network:
|
|
parser.add_argument(
|
|
"--header",
|
|
action="append",
|
|
default=[],
|
|
help="附加请求头,可重复,格式 Name: Value",
|
|
)
|
|
parser.add_argument(
|
|
"--proxy",
|
|
help="调试或实验代理地址,例如 http://127.0.0.1:8080",
|
|
)
|
|
parser.add_argument(
|
|
"--rate",
|
|
type=float,
|
|
default=0.0,
|
|
help="每秒请求数上限,0 表示不额外限速",
|
|
)
|
|
if include_input:
|
|
parser.add_argument("--target-file", help="批量目标文件,每行一个目标")
|
|
parser.add_argument(
|
|
"--stdin",
|
|
action="store_true",
|
|
help="从标准输入读取目标,每行一个",
|
|
)
|
|
|
|
|
|
def ensure_authorized(args, parser) -> None:
|
|
if not getattr(args, "ack_authorized", False):
|
|
parser.error("必须显式提供 --ack-authorized 以确认目标范围合法")
|
|
|
|
|
|
def parse_headers(values: Iterable[str]) -> Dict[str, str]:
|
|
headers: Dict[str, str] = {}
|
|
for value in values or []:
|
|
if ":" not in value:
|
|
continue
|
|
name, raw = value.split(":", 1)
|
|
name = name.strip()
|
|
raw = raw.strip()
|
|
if not name:
|
|
continue
|
|
headers[name] = raw
|
|
return headers
|
|
|
|
|
|
def parse_cookie_string(raw: Optional[str]) -> Dict[str, str]:
|
|
cookies: Dict[str, str] = {}
|
|
if not raw:
|
|
return cookies
|
|
for part in raw.split(";"):
|
|
if "=" not in part:
|
|
continue
|
|
name, value = part.split("=", 1)
|
|
name = name.strip()
|
|
value = value.strip()
|
|
if name:
|
|
cookies[name] = value
|
|
return cookies
|
|
|
|
|
|
def read_targets(args, fallback: Optional[str] = None) -> List[str]:
|
|
values: List[str] = []
|
|
if fallback:
|
|
values.append(fallback)
|
|
target_file = getattr(args, "target_file", None)
|
|
if target_file:
|
|
for line in Path(target_file).read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if line and line not in values:
|
|
values.append(line)
|
|
if getattr(args, "stdin", False):
|
|
import sys
|
|
|
|
for line in sys.stdin.read().splitlines():
|
|
line = line.strip()
|
|
if line and line not in values:
|
|
values.append(line)
|
|
return values
|
|
|
|
|
|
def make_report(
|
|
*,
|
|
tool: str,
|
|
mode: str,
|
|
target: str,
|
|
status: str,
|
|
severity: str,
|
|
payload_or_probe: Any,
|
|
request_summary: Dict[str, Any],
|
|
evidence_refs: List[str],
|
|
destructive_risk: str = "low",
|
|
minimal_validation: str = DEFAULT_MINIMAL_VALIDATION,
|
|
authorization_scope: str = DEFAULT_AUTH_SCOPE,
|
|
extra: Optional[Dict[str, Any]] = None,
|
|
args: Any = None,
|
|
) -> Dict[str, Any]:
|
|
payload = {
|
|
"tool": tool,
|
|
"mode": mode,
|
|
"target": target,
|
|
"status": status,
|
|
"severity": severity,
|
|
"timestamp": now_iso(),
|
|
"request_summary": request_summary,
|
|
"payload_or_probe": payload_or_probe,
|
|
"evidence_refs": evidence_refs,
|
|
"minimal_validation": minimal_validation,
|
|
"authorization_scope": authorization_scope,
|
|
"destructive_risk": destructive_risk,
|
|
"run_id": getattr(args, "run_id", None) if args else None,
|
|
"case_id": getattr(args, "case_id", None) if args else None,
|
|
}
|
|
if extra:
|
|
payload.update(extra)
|
|
return payload
|
|
|
|
|
|
def write_evidence(args, name: str, data: Any) -> Optional[str]:
|
|
evidence_dir = getattr(args, "evidence_dir", None)
|
|
if not evidence_dir:
|
|
return None
|
|
path = Path(evidence_dir) / name
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
if isinstance(data, str):
|
|
handle.write(data.rstrip() + "\n")
|
|
else:
|
|
json.dump(data, handle, indent=2, ensure_ascii=True, sort_keys=False)
|
|
handle.write("\n")
|
|
return str(path)
|
|
|
|
|
|
def emit_report(args, report: Dict[str, Any], text_lines: Optional[List[str]] = None) -> int:
|
|
if args.format == "json":
|
|
content = json.dumps(report, indent=2, ensure_ascii=True)
|
|
elif args.format == "ndjson":
|
|
records = report if isinstance(report, list) else [report]
|
|
content = "\n".join(json.dumps(item, ensure_ascii=True) for item in records)
|
|
else:
|
|
content = "\n".join(text_lines or [json.dumps(report, ensure_ascii=True)])
|
|
|
|
if getattr(args, "output", None):
|
|
output = Path(args.output)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
output.write_text(content.rstrip() + "\n", encoding="utf-8")
|
|
print(content)
|
|
return 0
|