更新: 359 个文件 - 2026-03-16 23:30:01

这个提交包含在:
hao
2026-03-16 23:30:01 -07:00
父节点 527990f535
当前提交 2974cd9ad9
修改 359 个文件,包含 6332 行新增673 行删除

180
scripts/tool_contract.py 普通文件
查看文件

@@ -0,0 +1,180 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
UTC = timezone.utc
DEFAULT_AUTH_SCOPE = "lab-local, lab-public, authorized-third-party"
DEFAULT_MINIMAL_VALIDATION = "只读探测、最小化注入、可审计回显、可回滚验证。"
def now_iso() -> str:
return datetime.now(tz=UTC).replace(microsecond=0).isoformat()
def add_common_args(parser, include_input: bool = False, include_network: bool = True) -> None:
parser.add_argument(
"--ack-authorized",
action="store_true",
help="确认目标属于自有资产、测试环境或已明确授权",
)
parser.add_argument(
"--format",
choices=["text", "json", "ndjson"],
default="text",
help="输出格式,默认 text",
)
parser.add_argument("--output", help="将结果写入文件")
parser.add_argument("--evidence-dir", help="证据目录,工具会把结构化结果写入其中")
parser.add_argument("--run-id", help="关联的 run bundle ID")
parser.add_argument("--case-id", help="关联的 advisory/case ID")
if include_network:
parser.add_argument(
"--header",
action="append",
default=[],
help="附加请求头,可重复,格式 Name: Value",
)
parser.add_argument(
"--proxy",
help="调试或实验代理地址,例如 http://127.0.0.1:8080",
)
parser.add_argument(
"--rate",
type=float,
default=0.0,
help="每秒请求数上限,0 表示不额外限速",
)
if include_input:
parser.add_argument("--target-file", help="批量目标文件,每行一个目标")
parser.add_argument(
"--stdin",
action="store_true",
help="从标准输入读取目标,每行一个",
)
def ensure_authorized(args, parser) -> None:
if not getattr(args, "ack_authorized", False):
parser.error("必须显式提供 --ack-authorized 以确认目标范围合法")
def parse_headers(values: Iterable[str]) -> Dict[str, str]:
headers: Dict[str, str] = {}
for value in values or []:
if ":" not in value:
continue
name, raw = value.split(":", 1)
name = name.strip()
raw = raw.strip()
if not name:
continue
headers[name] = raw
return headers
def parse_cookie_string(raw: Optional[str]) -> Dict[str, str]:
cookies: Dict[str, str] = {}
if not raw:
return cookies
for part in raw.split(";"):
if "=" not in part:
continue
name, value = part.split("=", 1)
name = name.strip()
value = value.strip()
if name:
cookies[name] = value
return cookies
def read_targets(args, fallback: Optional[str] = None) -> List[str]:
values: List[str] = []
if fallback:
values.append(fallback)
target_file = getattr(args, "target_file", None)
if target_file:
for line in Path(target_file).read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and line not in values:
values.append(line)
if getattr(args, "stdin", False):
import sys
for line in sys.stdin.read().splitlines():
line = line.strip()
if line and line not in values:
values.append(line)
return values
def make_report(
*,
tool: str,
mode: str,
target: str,
status: str,
severity: str,
payload_or_probe: Any,
request_summary: Dict[str, Any],
evidence_refs: List[str],
destructive_risk: str = "low",
minimal_validation: str = DEFAULT_MINIMAL_VALIDATION,
authorization_scope: str = DEFAULT_AUTH_SCOPE,
extra: Optional[Dict[str, Any]] = None,
args: Any = None,
) -> Dict[str, Any]:
payload = {
"tool": tool,
"mode": mode,
"target": target,
"status": status,
"severity": severity,
"timestamp": now_iso(),
"request_summary": request_summary,
"payload_or_probe": payload_or_probe,
"evidence_refs": evidence_refs,
"minimal_validation": minimal_validation,
"authorization_scope": authorization_scope,
"destructive_risk": destructive_risk,
"run_id": getattr(args, "run_id", None) if args else None,
"case_id": getattr(args, "case_id", None) if args else None,
}
if extra:
payload.update(extra)
return payload
def write_evidence(args, name: str, data: Any) -> Optional[str]:
evidence_dir = getattr(args, "evidence_dir", None)
if not evidence_dir:
return None
path = Path(evidence_dir) / name
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
if isinstance(data, str):
handle.write(data.rstrip() + "\n")
else:
json.dump(data, handle, indent=2, ensure_ascii=True, sort_keys=False)
handle.write("\n")
return str(path)
def emit_report(args, report: Dict[str, Any], text_lines: Optional[List[str]] = None) -> int:
if args.format == "json":
content = json.dumps(report, indent=2, ensure_ascii=True)
elif args.format == "ndjson":
records = report if isinstance(report, list) else [report]
content = "\n".join(json.dumps(item, ensure_ascii=True) for item in records)
else:
content = "\n".join(text_lines or [json.dumps(report, ensure_ascii=True)])
if getattr(args, "output", None):
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(content.rstrip() + "\n", encoding="utf-8")
print(content)
return 0