#!/usr/bin/env python3 from __future__ import annotations import argparse import sys from pathlib import Path from typing import Any, Dict, List CURRENT_DIR = Path(__file__).resolve().parent SCRIPTS_DIR = CURRENT_DIR.parent if str(SCRIPTS_DIR) not in sys.path: sys.path.insert(0, str(SCRIPTS_DIR)) from lab import attack, baseline, browser, catalog, doctor, evaluate, evidence, provision, render, repro, seed, task_queue, validators # noqa: E402 from lab.config import ADVISORIES_DIR, CASE_RUNS_DIR, ENV_PROFILES_DIR, RUNS_DIR # noqa: E402 from lab.utils import command_available, ensure_dir, isoformat, load_json_dir, now_utc, read_json, read_yaml, write_json # noqa: E402 def _load_advisory(canonical_id: str) -> Dict[str, Any]: advisory = read_json(ADVISORIES_DIR / f"{canonical_id}.json", default=None) if not advisory: raise ValueError(f"Unknown advisory: {canonical_id}") return advisory def _run_dir(run_id: str) -> Path: path = CASE_RUNS_DIR / run_id ensure_dir(path) ensure_dir(path / "logs") ensure_dir(path / "assets") return path def _compose_run_id(advisory: Dict[str, Any]) -> str: return f"{advisory['system_id']}-{advisory['canonical_id']}-{now_utc().strftime('%Y%m%d%H%M%S')}" def _existing_refs(*paths: Path) -> List[str]: return [str(path) for path in paths if path.exists()] def _timeline_event(timeline: List[Dict[str, Any]], step: str, status: str, detail: str = "") -> None: timeline.append( { "at": isoformat(now_utc()), "step": step, "status": status, "detail": detail, } ) def _sync_registry_outputs() -> None: from intel.config import GENERATED_DIR, load_source_map # noqa: E402 from intel.main import _load_existing_advisories, _load_existing_triage # noqa: E402 from intel.render import render_case_pages, render_generated, render_secure_code, render_system_scaffolding # noqa: E402 source_map = load_source_map() advisories = _load_existing_advisories() triage = _load_existing_triage() summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {} render_system_scaffolding(source_map, advisories) render_case_pages(advisories) render_secure_code(source_map) render_generated(source_map, advisories, triage, summary.get("failures", []), summary) def _resolve_profile(advisory: Dict[str, Any]) -> Dict[str, Any]: profile = repro.resolve_profile(advisory["canonical_id"], advisory) current_profile = read_yaml(ENV_PROFILES_DIR / "core" / advisory["system_id"] / "current.yaml", default={}) or {} merged = dict(current_profile) merged.update(profile) if current_profile.get("services") and not merged.get("services"): merged["services"] = current_profile["services"] if current_profile.get("baseline_urls") and not merged.get("baseline_urls"): merged["baseline_urls"] = current_profile["baseline_urls"] if current_profile.get("artifact_mode") and not merged.get("artifact_mode"): merged["artifact_mode"] = current_profile["artifact_mode"] if current_profile.get("verification_mode") and not merged.get("verification_mode"): merged["verification_mode"] = current_profile["verification_mode"] if current_profile.get("browser_required"): merged.setdefault("browser_assertions", {}) merged["browser_assertions"].setdefault("required", current_profile["browser_required"]) if not profile.get("system_id"): merged["system_id"] = advisory["system_id"] if not profile.get("profile_id"): merged["profile_id"] = advisory["canonical_id"] return merged def _build_run_bundle( advisory: Dict[str, Any], profile: Dict[str, Any], run_id: str, verification_status: str, verification_mode: str, artifact_mode: str, baseline_refs: List[str], attack_steps: List[Dict[str, Any]], browser_refs: List[str], container_log_refs: List[str], request_log_refs: List[str], compose_refs: List[str], browser_evidence: Dict[str, Any], timeline: List[Dict[str, Any]], success_evaluation: Dict[str, Any], started_at: str, finished_at: str, blocked_reason: str | None, ) -> Dict[str, Any]: return { "run_id": run_id, "system_id": advisory["system_id"], "advisory_id": advisory["canonical_id"], "repro_profile_id": profile["profile_id"], "verification_status": verification_status, "verification_mode": verification_mode, "artifact_mode": artifact_mode, "target_env": "local-docker", "compose_services": sorted(profile.get("services", {}).keys()), "baseline_refs": baseline_refs, "attack_steps": attack_steps, "browser_refs": browser_refs, "browser_evidence": browser_evidence, "container_log_refs": container_log_refs, "request_log_refs": request_log_refs, "compose_refs": compose_refs, "timeline": timeline, "success_evaluation": success_evaluation, "historical_status": verification_status, "latest_status": verification_status, "started_at": started_at, "finished_at": finished_at, "blocked_reason": blocked_reason, } def _dry_run_case_plan(advisory: Dict[str, Any], profile: Dict[str, Any], run_id: str) -> Dict[str, Any]: provision_result = provision.prepare(profile, CASE_RUNS_DIR / run_id, dry_run=True) return { "run_id": run_id, "system_id": advisory["system_id"], "advisory_id": advisory["canonical_id"], "repro_profile_id": profile["profile_id"], "verification_mode": profile.get("verification_mode", "synthetic"), "artifact_mode": profile.get("artifact_mode", profile.get("provisioning_mode", "synthetic")), "browser_required": bool(profile.get("browser_assertions", {}).get("required")), "baseline_urls": profile.get("baseline_urls", []), "compose_services": sorted(profile.get("services", {}).keys()), "seed_actions": profile.get("seed_actions", []), "attack_actions": profile.get("attack_actions", []), "runner_id": profile.get("runner_id"), "fixture_path": profile.get("fixture_path"), "success_assertions": profile.get("success_assertions", []), "compose_preview": provision_result.get("compose_preview", {}), "note": "dry-run only; no bundle, report, compose file, or registry update was written", } def _execute_case(canonical_id: str, run_id: str | None = None, dry_run: bool = False, sync_outputs: bool = True) -> Dict[str, Any]: advisory = _load_advisory(canonical_id) profile = _resolve_profile(advisory) resolved_run_id = run_id or _compose_run_id(advisory) if dry_run: return _dry_run_case_plan(advisory, profile, resolved_run_id) started_at = isoformat(now_utc()) timeline: List[Dict[str, Any]] = [] _timeline_event(timeline, "select-advisory", "completed", advisory["canonical_id"]) _timeline_event(timeline, "resolve-repro-profile", "completed", profile["profile_id"]) run_dir = _run_dir(resolved_run_id) doctor_result = doctor.run_checks([profile]) write_json(run_dir / "logs" / "doctor.json", doctor_result) _timeline_event( timeline, "doctor", "completed" if doctor_result.get("ok") else "failed", doctor_result.get("summary", ""), ) if doctor_result.get("ok"): provision_result = provision.prepare(profile, run_dir, dry_run=False) else: provision_result = { "compose_path": str(run_dir / "compose" / "compose.yaml"), "status": "blocked-artifact", "blocked_reason": doctor_result.get("summary", "doctor failed"), } _timeline_event( timeline, "provision-compose-environment", provision_result.get("status", "unknown"), provision_result.get("blocked_reason", ""), ) allow_runtime_steps = provision_result.get("status") not in {"blocked-artifact"} browser_required = bool(profile.get("browser_assertions", {}).get("required")) compose_path = Path(provision_result.get("compose_path", run_dir / "compose" / "compose.yaml")) ready_payload = {"status": "skipped", "detail": "provisioning blocked", "observations": []} if allow_runtime_steps: ready_payload = provision.wait_ready(profile, run_dir, compose_path) allow_runtime_steps = ready_payload.get("status") == "completed" _timeline_event(timeline, "wait-ready", ready_payload.get("status", "unknown"), ready_payload.get("detail", "")) else: _timeline_event(timeline, "wait-ready", "skipped", "provisioning blocked") seed_payload = {"steps": [], "seeded": False} if allow_runtime_steps: seed_payload = seed.run_seed(profile, advisory, run_dir, dry_run=False) seed_failed = any(step.get("status") == "failed" for step in seed_payload.get("steps", [])) _timeline_event( timeline, "seed-environment", "failed" if seed_failed else "completed", f"steps={len(seed_payload.get('steps', []))}", ) else: _timeline_event(timeline, "seed-environment", "skipped", "runtime steps unavailable") baseline_payload = {"observations": []} if profile.get("baseline_urls") and allow_runtime_steps: baseline_payload = baseline.collect(profile, run_dir) baseline_failed = any(item.get("error") for item in baseline_payload.get("observations", [])) _timeline_event( timeline, "baseline-snapshot", "failed" if baseline_failed else "completed", f"urls={len(profile.get('baseline_urls', []))}", ) else: _timeline_event(timeline, "baseline-snapshot", "skipped", "no baseline urls or provisioning blocked") baseline_browser = {"required": browser_required, "present": False, "refs": []} if browser_required and allow_runtime_steps and profile.get("baseline_urls"): baseline_browser = browser.capture(profile["baseline_urls"][0], run_dir, prefix="baseline") _timeline_event( timeline, "browser-replay-before-attack", "completed" if baseline_browser.get("present") else "failed", baseline_browser.get("reason", ""), ) elif browser_required: _timeline_event(timeline, "browser-replay-before-attack", "skipped", "baseline browser capture unavailable") attack_payload = {"steps": []} if allow_runtime_steps: attack_payload = attack.run_attack(profile, advisory, run_dir, dry_run=False) attack_failed = any(step.get("status") == "failed" for step in attack_payload.get("steps", [])) _timeline_event( timeline, "controlled-attack-chain", "failed" if attack_failed else "completed", f"steps={len(attack_payload.get('steps', []))}", ) else: _timeline_event(timeline, "controlled-attack-chain", "skipped", "provisioning blocked") proof_browser = {"required": browser_required, "present": False, "refs": []} if browser_required and allow_runtime_steps and profile.get("baseline_urls"): proof_browser = browser.capture(profile["baseline_urls"][0], run_dir, prefix="proof") _timeline_event( timeline, "browser-replay-after-attack", "completed" if proof_browser.get("present") else "failed", proof_browser.get("reason", ""), ) elif browser_required: _timeline_event(timeline, "browser-replay-after-attack", "skipped", "proof browser capture unavailable") container_logs = evidence.collect_container_logs(run_dir, compose_path) if compose_path.exists() and allow_runtime_steps else [] _timeline_event( timeline, "collect-logs-and-evidence", "completed" if allow_runtime_steps else "skipped", f"container_logs={len(container_logs)}", ) browser_present = bool(baseline_browser.get("present")) and bool(proof_browser.get("present")) browser_payload = { "required": browser_required, "present": browser_present, "refs": baseline_browser.get("refs", []) + proof_browser.get("refs", []), "baseline_refs": baseline_browser.get("refs", []), "proof_refs": proof_browser.get("refs", []), "baseline_title": baseline_browser.get("page_title"), "proof_title": proof_browser.get("page_title"), "error_kind": proof_browser.get("error_kind") or baseline_browser.get("error_kind"), "reason": proof_browser.get("reason") or baseline_browser.get("reason"), } verification_mode = profile.get("verification_mode", "synthetic") artifact_mode = profile.get("artifact_mode", profile.get("provisioning_mode", "synthetic")) success_evaluation = evaluate.evaluate_run( profile=profile, provision_result=provision_result, baseline_payload=baseline_payload, attack_payload=attack_payload, browser_payload=browser_payload, ) verification_status = success_evaluation["verification_status"] blocked_reason = success_evaluation.get("blocked_reason") cleanup_payload = {"status": "skipped", "detail": "cleanup_policy not destroy"} if compose_path.exists() and profile.get("cleanup_policy") == "destroy": cleanup_payload = provision.teardown(run_dir, compose_path) _timeline_event( timeline, "cleanup-compose-environment", cleanup_payload.get("status", "unknown"), cleanup_payload.get("detail", ""), ) finished_at = isoformat(now_utc()) bundle = _build_run_bundle( advisory=advisory, profile=profile, run_id=resolved_run_id, verification_status=verification_status, verification_mode=verification_mode, artifact_mode=artifact_mode, baseline_refs=_existing_refs(run_dir / "logs" / "baseline.json"), attack_steps=attack_payload.get("steps", []), browser_refs=browser_payload["refs"], container_log_refs=container_logs, request_log_refs=_existing_refs(run_dir / "logs" / "attack.json", run_dir / "logs" / "baseline.json"), compose_refs=[str(compose_path)] if compose_path.exists() else [], browser_evidence=browser_payload, timeline=timeline, success_evaluation=success_evaluation, started_at=started_at, finished_at=finished_at, blocked_reason=blocked_reason, ) _timeline_event(bundle["timeline"], "update-registry-and-reports", "completed", resolved_run_id) report_refs = render.render_run(bundle) bundle["report_refs"] = report_refs evidence.write_run_bundle(run_dir, bundle) ensure_dir(RUNS_DIR) write_json(RUNS_DIR / f"{resolved_run_id}.json", bundle) if sync_outputs: _sync_registry_outputs() return bundle def cmd_catalog_sync(args) -> int: summary = catalog.sync_catalog(write_profiles=True, write_repro_map=True) print(summary) return 0 def cmd_compose_generate(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) run_dir = _run_dir(args.run_id or f"compose-{advisory['canonical_id']}") compose_result = provision.prepare(profile, run_dir, dry_run=True) print(compose_result) return 0 def cmd_provision(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) run_dir = _run_dir(args.run_id or _compose_run_id(advisory)) result = provision.prepare(profile, run_dir, dry_run=args.dry_run) print(result) return 0 def cmd_seed(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) run_dir = _run_dir(args.run_id or _compose_run_id(advisory)) print(seed.run_seed(profile, advisory, run_dir, dry_run=args.dry_run)) return 0 def cmd_baseline(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) run_dir = _run_dir(args.run_id or _compose_run_id(advisory)) result = baseline.collect(profile, run_dir) print(result) return 0 def cmd_attack(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) run_dir = _run_dir(args.run_id or _compose_run_id(advisory)) result = attack.run_attack(profile, advisory, run_dir, dry_run=args.dry_run) print(result) return 0 def cmd_verify(args) -> int: advisory = _load_advisory(args.case) profile = _resolve_profile(advisory) browser_required = bool(profile.get("browser_assertions", {}).get("required")) payload = { "advisory": advisory["canonical_id"], "profile_id": profile["profile_id"], "browser_required": browser_required, "result": "ready-for-run", } print(payload) return 0 def cmd_run_case(args) -> int: result = _execute_case(args.case, run_id=args.run_id, dry_run=args.dry_run, sync_outputs=not args.dry_run) print(result) return 0 def cmd_run_system(args) -> int: advisories = [item for item in load_json_dir(ADVISORIES_DIR) if item.get("system_id") == args.system] advisories = sorted(advisories, key=lambda item: item.get("canonical_id", "")) selected = advisories if not args.limit or args.limit <= 0 else advisories[: args.limit] for advisory in selected: _execute_case(advisory["canonical_id"], run_id=None, dry_run=args.dry_run, sync_outputs=False) if selected and not args.dry_run: _sync_registry_outputs() print({"system": args.system, "count": len(selected)}) return 0 def cmd_run_batch(args) -> int: if args.from_queue: items = task_queue.dequeue(limit=args.limit) else: task_queue.enqueue_from_registry(only_hotlane=args.only_hotlane, limit=args.limit) items = task_queue.dequeue(limit=args.limit) for item in items: _execute_case(item["advisory_id"], run_id=None, dry_run=args.dry_run, sync_outputs=False) if items and not args.dry_run: _sync_registry_outputs() print({"processed": len(items)}) return 0 def cmd_render_run(args) -> int: run = read_json(RUNS_DIR / f"{args.run_id}.json", default=None) if not run: raise ValueError(f"Unknown run: {args.run_id}") print(render.render_run(run)) return 0 def cmd_serve_dashboard(args) -> int: render.render_dashboard() import http.server import socketserver os_dir = str(render.DASHBOARD_DIR if hasattr(render, "DASHBOARD_DIR") else "") if not os_dir: from lab.config import DASHBOARD_DIR os_dir = str(DASHBOARD_DIR) handler = http.server.SimpleHTTPRequestHandler with socketserver.TCPServer(("127.0.0.1", args.port), handler) as httpd: print(f"serving dashboard at http://127.0.0.1:{args.port}/") import os os.chdir(os_dir) httpd.serve_forever() def cmd_cleanup(args) -> int: run = read_json(RUNS_DIR / f"{args.run_id}.json", default=None) if not run: raise ValueError(f"Unknown run: {args.run_id}") compose_path = Path(run["report_refs"]["bundle_dir"]) / "compose" / "compose.yaml" if command_available("docker") and compose_path.exists(): from lab.utils import run as shell_run shell_run(["docker", "compose", "-f", str(compose_path), "down", "-v"], cwd=compose_path.parent.parent) print({"cleaned": args.run_id}) return 0 def cmd_retry_failures(args) -> int: failed = [ item for item in load_json_dir(RUNS_DIR) if item.get("verification_status") in {"blocked-artifact", "triage-manual"} ] task_queue.enqueue_items( [{"advisory_id": item["advisory_id"], "system_id": item["system_id"], "priority": "retry"} for item in failed[: args.limit]] ) print({"requeued": min(len(failed), args.limit)}) return 0 def cmd_validate(args) -> int: errors = validators.validate_assets() if errors: print("Validation failed:") for error in errors: print(f"- {error}") return 1 print("Validation passed.") return 0 def cmd_doctor(args) -> int: profiles: List[Dict[str, Any]] = [] if getattr(args, "case", None): advisory = _load_advisory(args.case) profiles.append(_resolve_profile(advisory)) elif getattr(args, "system", None): advisories = [item for item in load_json_dir(ADVISORIES_DIR) if item.get("system_id") == args.system] if advisories: profiles.append(_resolve_profile(advisories[0])) result = doctor.run_checks(profiles) print(result) return 0 if result.get("ok") else 1 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Websafe local lab orchestrator") subparsers = parser.add_subparsers(dest="command", required=True) catalog_sync = subparsers.add_parser("catalog", help="catalog operations") catalog_sub = catalog_sync.add_subparsers(dest="catalog_command", required=True) catalog_sync_cmd = catalog_sub.add_parser("sync", help="sync environment catalog and repro map") catalog_sync_cmd.set_defaults(func=cmd_catalog_sync) compose_generate = subparsers.add_parser("compose", help="compose operations") compose_sub = compose_generate.add_subparsers(dest="compose_command", required=True) compose_generate_cmd = compose_sub.add_parser("generate", help="generate compose file for a case") compose_generate_cmd.add_argument("--case", required=True) compose_generate_cmd.add_argument("--run-id") compose_generate_cmd.set_defaults(func=cmd_compose_generate) for name, func in [ ("provision", cmd_provision), ("seed", cmd_seed), ("baseline", cmd_baseline), ("attack", cmd_attack), ("verify", cmd_verify), ]: sub = subparsers.add_parser(name) sub.add_argument("--case", required=True) sub.add_argument("--run-id") sub.add_argument("--dry-run", action="store_true") sub.set_defaults(func=func) run_case = subparsers.add_parser("run-case", help="run a single advisory through the lab pipeline") run_case.add_argument("--case", required=True) run_case.add_argument("--run-id") run_case.add_argument("--dry-run", action="store_true") run_case.set_defaults(func=cmd_run_case) run_system = subparsers.add_parser("run-system", help="run the first N advisories for a system") run_system.add_argument("--system", required=True) run_system.add_argument("--limit", type=int, default=0) run_system.add_argument("--dry-run", action="store_true") run_system.set_defaults(func=cmd_run_system) run_batch = subparsers.add_parser("run-batch", help="process repro queue or enqueue from registry") run_batch.add_argument("--limit", type=int, default=10) run_batch.add_argument("--only-hotlane", action="store_true") run_batch.add_argument("--from-queue", action="store_true") run_batch.add_argument("--dry-run", action="store_true") run_batch.set_defaults(func=cmd_run_batch) render_run = subparsers.add_parser("render-run", help="re-render a stored run") render_run.add_argument("--run-id", required=True) render_run.set_defaults(func=cmd_render_run) serve = subparsers.add_parser("serve-dashboard", help="serve the static dashboard locally") serve.add_argument("--port", type=int, default=8734) serve.set_defaults(func=cmd_serve_dashboard) cleanup = subparsers.add_parser("cleanup", help="tear down a stored run compose environment") cleanup.add_argument("--run-id", required=True) cleanup.set_defaults(func=cmd_cleanup) retry = subparsers.add_parser("retry-failures", help="requeue blocked or manual runs") retry.add_argument("--limit", type=int, default=50) retry.set_defaults(func=cmd_retry_failures) validate = subparsers.add_parser("validate", help="validate lab assets") validate.set_defaults(func=cmd_validate) doctor_cmd = subparsers.add_parser("doctor", help="run environment preflight checks") doctor_cmd.add_argument("--case") doctor_cmd.add_argument("--system") doctor_cmd.set_defaults(func=cmd_doctor) return parser def main() -> int: parser = build_parser() args = parser.parse_args() return args.func(args) if __name__ == "__main__": raise SystemExit(main())