from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List import requests from lab.utils import ensure_dir, write_json @dataclass class RunnerContext: profile: Dict[str, Any] advisory: Dict[str, Any] run_dir: Path @property def base_url(self) -> str: return str((self.profile.get("baseline_urls") or [""])[0]).rstrip("/") @property def family(self) -> str: return str(self.profile.get("vuln_family") or "").strip() @property def runner_id(self) -> str: return str(self.profile.get("runner_id") or "").strip() @property def logs_dir(self) -> Path: path = self.run_dir / "logs" ensure_dir(path) return path def _request( context: RunnerContext, method: str, path: str, *, json_payload: Dict[str, Any] | None = None, timeout: float = 15.0, ) -> Dict[str, Any]: response = requests.request( method=method, url=f"{context.base_url}{path}", json=json_payload, timeout=timeout, ) body: Dict[str, Any] try: parsed = response.json() body = parsed if isinstance(parsed, dict) else {"data": parsed} except Exception: body = {"text": response.text[:1000]} return { "status_code": response.status_code, "ok": response.ok, "body": body, } def _seed_payload(context: RunnerContext) -> Dict[str, Any]: return { "system_id": context.advisory["system_id"], "family": context.family, "case_id": context.advisory["canonical_id"], "title": context.advisory.get("title"), } def _attack_payload(context: RunnerContext, family: str) -> Dict[str, Any]: case_id = context.advisory["canonical_id"] payloads = { "proxy-boundary": { "family": family, "case_id": case_id, "headers": {"x-forwarded-user": "admin", "x-forwarded-host": "internal.lab"}, }, "authz-bypass": { "family": family, "case_id": case_id, "actor": "guest", "target": "/admin", "override": "bypass", }, "ssrf": { "family": family, "case_id": case_id, "target_url": "http://127.0.0.1/sink", }, "xss": { "family": family, "case_id": case_id, "payload": f"", }, "file-upload": { "family": family, "case_id": case_id, "filename": f"{case_id}.txt", "content": f"fixture upload marker for {case_id}", }, "deserialization": { "family": family, "case_id": case_id, "payload": {"marker": case_id, "mode": "inert-object"}, }, } return payloads[family] def run_fixture_seed(context: RunnerContext, family: str) -> Dict[str, Any]: result = _request(context, "POST", "/seed", json_payload=_seed_payload(context)) payload = { "steps": [ { "kind": "runner", "tool": context.runner_id or f"{context.advisory['system_id']}.{family}", "status": "completed" if result["ok"] else "failed", "status_code": result["status_code"], "detail": result["body"].get("detail") or "seed request completed", } ], "seeded": bool(result["ok"]), "result": result, } write_json(context.logs_dir / "seed.json", payload) return payload def run_fixture_attack(context: RunnerContext, family: str) -> Dict[str, Any]: before: Dict[str, Any] = {} if family in {"proxy-boundary", "authz-bypass"}: before = _request(context, "GET", "/admin") attack = _request(context, "POST", "/attack", json_payload=_attack_payload(context, family)) proof = _request(context, "GET", "/proof") after: Dict[str, Any] = {} if family in {"proxy-boundary", "authz-bypass"}: after = _request(context, "GET", "/admin") success = bool(attack["ok"] and proof["ok"] and proof["body"].get("success")) step = { "kind": "runner", "tool": context.runner_id or f"{context.advisory['system_id']}.{family}", "status": "completed" if success else "failed", "status_code": attack["status_code"], "result_path": str(context.logs_dir / "attack.json"), } payload = { "steps": [step], "success": success, "detail": proof["body"].get("detail") or attack["body"].get("detail") or "runner attack finished", "before": before, "attack": attack, "after": after, "proof": proof, "assertions": [ { "name": "proof-success", "kind": "runner-proof", "passed": success, "detail": proof["body"].get("detail") or "runner proof endpoint returned success", } ], } write_json(context.logs_dir / "attack.json", payload) return payload