文件
websafe-kb/scripts/lab/runners/common.py

170 行
5.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List
import requests
from lab.utils import ensure_dir, write_json
@dataclass
class RunnerContext:
profile: Dict[str, Any]
advisory: Dict[str, Any]
run_dir: Path
@property
def base_url(self) -> str:
return str((self.profile.get("baseline_urls") or [""])[0]).rstrip("/")
@property
def family(self) -> str:
return str(self.profile.get("vuln_family") or "").strip()
@property
def runner_id(self) -> str:
return str(self.profile.get("runner_id") or "").strip()
@property
def logs_dir(self) -> Path:
path = self.run_dir / "logs"
ensure_dir(path)
return path
def _request(
context: RunnerContext,
method: str,
path: str,
*,
json_payload: Dict[str, Any] | None = None,
timeout: float = 15.0,
) -> Dict[str, Any]:
response = requests.request(
method=method,
url=f"{context.base_url}{path}",
json=json_payload,
timeout=timeout,
)
body: Dict[str, Any]
try:
parsed = response.json()
body = parsed if isinstance(parsed, dict) else {"data": parsed}
except Exception:
body = {"text": response.text[:1000]}
return {
"status_code": response.status_code,
"ok": response.ok,
"body": body,
}
def _seed_payload(context: RunnerContext) -> Dict[str, Any]:
return {
"system_id": context.advisory["system_id"],
"family": context.family,
"case_id": context.advisory["canonical_id"],
"title": context.advisory.get("title"),
}
def _attack_payload(context: RunnerContext, family: str) -> Dict[str, Any]:
case_id = context.advisory["canonical_id"]
payloads = {
"proxy-boundary": {
"family": family,
"case_id": case_id,
"headers": {"x-forwarded-user": "admin", "x-forwarded-host": "internal.lab"},
},
"authz-bypass": {
"family": family,
"case_id": case_id,
"actor": "guest",
"target": "/admin",
"override": "bypass",
},
"ssrf": {
"family": family,
"case_id": case_id,
"target_url": "http://127.0.0.1/sink",
},
"xss": {
"family": family,
"case_id": case_id,
"payload": f"<script>document.body.dataset.case='{case_id}'</script>",
},
"file-upload": {
"family": family,
"case_id": case_id,
"filename": f"{case_id}.txt",
"content": f"fixture upload marker for {case_id}",
},
"deserialization": {
"family": family,
"case_id": case_id,
"payload": {"marker": case_id, "mode": "inert-object"},
},
}
return payloads[family]
def run_fixture_seed(context: RunnerContext, family: str) -> Dict[str, Any]:
result = _request(context, "POST", "/seed", json_payload=_seed_payload(context))
payload = {
"steps": [
{
"kind": "runner",
"tool": context.runner_id or f"{context.advisory['system_id']}.{family}",
"status": "completed" if result["ok"] else "failed",
"status_code": result["status_code"],
"detail": result["body"].get("detail") or "seed request completed",
}
],
"seeded": bool(result["ok"]),
"result": result,
}
write_json(context.logs_dir / "seed.json", payload)
return payload
def run_fixture_attack(context: RunnerContext, family: str) -> Dict[str, Any]:
before: Dict[str, Any] = {}
if family in {"proxy-boundary", "authz-bypass"}:
before = _request(context, "GET", "/admin")
attack = _request(context, "POST", "/attack", json_payload=_attack_payload(context, family))
proof = _request(context, "GET", "/proof")
after: Dict[str, Any] = {}
if family in {"proxy-boundary", "authz-bypass"}:
after = _request(context, "GET", "/admin")
success = bool(attack["ok"] and proof["ok"] and proof["body"].get("success"))
step = {
"kind": "runner",
"tool": context.runner_id or f"{context.advisory['system_id']}.{family}",
"status": "completed" if success else "failed",
"status_code": attack["status_code"],
"result_path": str(context.logs_dir / "attack.json"),
}
payload = {
"steps": [step],
"success": success,
"detail": proof["body"].get("detail") or attack["body"].get("detail") or "runner attack finished",
"before": before,
"attack": attack,
"after": after,
"proof": proof,
"assertions": [
{
"name": "proof-success",
"kind": "runner-proof",
"passed": success,
"detail": proof["body"].get("detail") or "runner proof endpoint returned success",
}
],
}
write_json(context.logs_dir / "attack.json", payload)
return payload