from __future__ import annotations import html import json import os import shutil from pathlib import Path from typing import Any, Dict, List from lab.config import ADVISORIES_DIR, CASE_RUNS_DIR, DASHBOARD_DIR, ROOT, RUNS_DIR from lab.repro import load_profiles from lab.utils import ensure_dir, isoformat, load_json_dir, now_utc, unique, write_json, write_text TEMPLATES_DIR = ROOT / "scripts" / "lab" / "dashboard_templates" LOVART_TEMPLATE_DIR = TEMPLATES_DIR / "lovart" LEGACY_TEMPLATE_DIR = TEMPLATES_DIR / "legacy" LOVART_VENDOR_MANIFEST = LOVART_TEMPLATE_DIR / "vendor" / "source-manifest.json" ROOT_JSON_FILES = ["summary.json", "runs.json", "systems.json", "advisories.json", "profiles.json"] def mermaid_from_steps(run: Dict[str, Any]) -> str: lines = [ "flowchart LR", 'A["Select Advisory"] --> B["Resolve Repro Profile"]', 'B --> C["Provision Compose Environment"]', 'C --> D["Baseline Snapshot"]', 'D --> E["Controlled Attack Steps"]', 'E --> F["Browser Replay"]', 'F --> G["Collect Logs and Evidence"]', 'G --> H["Update Registry and Reports"]', ] if run.get("blocked_reason"): lines.append(f'H --> I["Blocked: {run["blocked_reason"][:60]}"]') return "\n".join(lines) def _relative_ref(run_dir: Path, ref: str) -> str: try: return str(Path(ref).resolve().relative_to(run_dir.resolve())) except ValueError: return ref def _dashboard_ref(run: Dict[str, Any], ref: str) -> str: try: bundle_dir = Path(run["report_refs"]["bundle_dir"]).resolve() relative = Path(ref).resolve().relative_to(bundle_dir) return f"./runs/{run['run_id']}/{relative.as_posix()}" except Exception: return ref def _artifact_kind(href: str) -> str: suffix = Path(href).suffix.lower() if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}: return "image" if suffix in {".json", ".log", ".txt", ".yaml", ".yml", ".md", ".mmd", ".html"}: return "text" return "link" def _artifact_item(run: Dict[str, Any], href: str, label: str | None = None) -> Dict[str, Any]: return {"href": href, "label": label or Path(href).name, "kind": _artifact_kind(href)} def _artifact_group(run: Dict[str, Any], key: str, label: str, refs: List[str], use_dashboard_refs: bool = False) -> Dict[str, Any]: items: List[Dict[str, Any]] = [] for ref in refs: href = ref if use_dashboard_refs else _dashboard_ref(run, ref) items.append(_artifact_item(run, href)) return {"key": key, "label": label, "count": len(items), "items": items} def _attack_result_refs(run: Dict[str, Any]) -> List[str]: refs: List[str] = [] for step in run.get("attack_steps", []): result_path = step.get("result_path") if result_path: refs.append(str(result_path)) return unique(refs) def _progress_counts(run: Dict[str, Any]) -> Dict[str, int]: counts = {"completed": 0, "skipped": 0, "failed": 0, "blocked": 0, "planned": 0, "other": 0} for item in run.get("timeline", []): status = item.get("status", "other") if status.startswith("blocked"): counts["blocked"] += 1 elif status in counts: counts[status] += 1 else: counts["other"] += 1 return counts def _advisory_meta(advisory: Dict[str, Any]) -> Dict[str, Any]: if not advisory: return {} return { "canonical_id": advisory.get("canonical_id"), "title": advisory.get("title"), "summary": advisory.get("summary"), "display_name": advisory.get("display_name"), "system_id": advisory.get("system_id"), "category": advisory.get("category"), "severity": advisory.get("severity"), "cvss_score": advisory.get("cvss_score"), "exploit_status": advisory.get("exploit_status"), "published_at": advisory.get("published_at"), "updated_at": advisory.get("updated_at"), "official_source_url": advisory.get("official_source_url"), "secondary_source_urls": advisory.get("secondary_source_urls", []), "aliases": advisory.get("aliases", []), "secure_code_topics": advisory.get("secure_code_topics", []), "verification_status": advisory.get("verification_status"), "verification_mode": advisory.get("verification_mode"), "artifact_mode": advisory.get("artifact_mode"), "blocked_reason": advisory.get("blocked_reason"), "browser_evidence": advisory.get("browser_evidence", {}), } def _profile_meta(profile: Dict[str, Any]) -> Dict[str, Any]: if not profile: return {} return { "profile_id": profile.get("profile_id"), "vuln_family": profile.get("vuln_family"), "provisioning_mode": profile.get("provisioning_mode"), "destructive_risk": profile.get("destructive_risk"), "cleanup_policy": profile.get("cleanup_policy"), "artifact_source": profile.get("artifact_source", {}), "success_criteria": profile.get("success_criteria", []), "seed_actions": profile.get("seed_actions", []), "attack_actions": profile.get("attack_actions", []), "browser_assertions": profile.get("browser_assertions", {}), "allowed_target_types": profile.get("allowed_target_types", []), "required_services": profile.get("required_services", []), } def _reasoning_lines(advisory: Dict[str, Any], profile: Dict[str, Any]) -> List[str]: notes: List[str] = [] if advisory.get("summary"): notes.append(advisory["summary"]) for key in ("seed_actions", "attack_actions"): for item in profile.get(key, []): message = item.get("message") if message: notes.append(message) for item in profile.get("success_criteria", []): if item: notes.append(item) if advisory.get("blocked_reason"): notes.append(f"Current blocker: {advisory['blocked_reason']}") return unique(notes) def _dashboard_doc_page(title: str, body: str, description: str) -> str: return f""" {html.escape(title)}

{html.escape(title)}

{html.escape(description)}
{html.escape(body)}
""" def _remove_path(path: Path) -> None: if not path.exists() and not path.is_symlink(): return if path.is_symlink() or path.is_file(): path.unlink() return shutil.rmtree(path) def _sync_symlink(target: Path, link_path: Path) -> None: ensure_dir(link_path.parent) relative_target = os.path.relpath(target, link_path.parent) if link_path.is_symlink() and os.readlink(link_path) == relative_target: return _remove_path(link_path) os.symlink(relative_target, link_path, target_is_directory=target.is_dir()) def _copy_tree(source: Path, destination: Path) -> None: ensure_dir(destination) for path in source.rglob("*"): relative = path.relative_to(source) target = destination / relative if path.is_dir(): ensure_dir(target) continue ensure_dir(target.parent) shutil.copy2(path, target) def _write_dashboard_docs() -> None: docs_dir = DASHBOARD_DIR / "docs" ensure_dir(docs_dir) sources = [ ( "project-features.html", "项目功能与特性总览", (ROOT / "docs" / "project-features.md").read_text(encoding="utf-8"), "Dashboard-local mirror of the repo feature guide.", ), ( "frontend-dashboard-design.html", "本地前端工作台设计文档", (ROOT / "docs" / "frontend-dashboard-design.md").read_text(encoding="utf-8"), "Dashboard-local mirror of the UI and interaction specification.", ), ( "secure-code-index.html", "安全编码修复库索引", (ROOT / "05-defense" / "secure-code" / "INDEX.md").read_text(encoding="utf-8"), "Dashboard-local mirror of the secure-code library index.", ), ] manifest_body = LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8") if LOVART_VENDOR_MANIFEST.exists() else "{}" sources.append( ( "design-source.html", "Lovart 设计来源与本地化清单", manifest_body, "Local vendor manifest for the Lovart-derived dashboard shell.", ) ) for filename, title, body, description in sources: write_text(docs_dir / filename, _dashboard_doc_page(title, body, description)) def _write_design_source_manifest() -> None: assets_dir = DASHBOARD_DIR / "assets" ensure_dir(assets_dir) manifest = json.loads(LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8")) if LOVART_VENDOR_MANIFEST.exists() else {} write_json(assets_dir / "design-source.json", manifest) def _render_root_dashboard_shell() -> None: assets_dir = DASHBOARD_DIR / "assets" ensure_dir(assets_dir) for filename in ("index.html",): shutil.copy2(LOVART_TEMPLATE_DIR / filename, DASHBOARD_DIR / filename) _copy_tree(LOVART_TEMPLATE_DIR / "assets", assets_dir) def _render_legacy_dashboard_shell() -> None: legacy_dir = DASHBOARD_DIR / "legacy" _remove_path(legacy_dir) ensure_dir(legacy_dir) shutil.copy2(LEGACY_TEMPLATE_DIR / "index.html", legacy_dir / "index.html") _copy_tree(LEGACY_TEMPLATE_DIR / "assets", legacy_dir / "assets") for json_name in ROOT_JSON_FILES: _sync_symlink(DASHBOARD_DIR / json_name, legacy_dir / json_name) _sync_symlink(DASHBOARD_DIR / "runs", legacy_dir / "runs") _sync_symlink(DASHBOARD_DIR / "docs", legacy_dir / "docs") def _sync_run_bundles(runs: List[Dict[str, Any]]) -> None: runs_dir = DASHBOARD_DIR / "runs" ensure_dir(runs_dir) for item in runs: bundle_dir = Path(item.get("report_refs", {}).get("bundle_dir", "")) if not bundle_dir.exists(): continue _sync_symlink(bundle_dir, runs_dir / item["run_id"]) def render_run(run: Dict[str, Any]) -> Dict[str, str]: run_dir = CASE_RUNS_DIR / run["run_id"] ensure_dir(run_dir / "assets") timeline_path = run_dir / "timeline.mmd" write_text(timeline_path, mermaid_from_steps(run)) screenshot_refs = [ref for ref in run.get("browser_refs", []) if ref.endswith((".png", ".jpg", ".jpeg"))] relative_screenshots = [_relative_ref(run_dir, ref) for ref in screenshot_refs] md_lines = [ f"# Run {run['run_id']}", "", "> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成 run bundle", "", f"- Advisory: `{run['advisory_id']}`", f"- 系统: `{run['system_id']}`", f"- Repro Profile: `{run['repro_profile_id']}`", f"- 实证状态: `{run['verification_status']}`", f"- 实证方式: `{run['verification_mode']}`", f"- Artifact 模式: `{run['artifact_mode']}`", f"- 启动时间: `{run['started_at']}`", f"- 完成时间: `{run['finished_at']}`", f"- 阻塞原因: `{run.get('blocked_reason') or '-'}`", f"- Compose 服务: `{', '.join(run.get('compose_services', [])) or '-'}`", "", "## 运行时间线", "", f"- Mermaid: [{timeline_path.name}]({timeline_path})", "", "| 时间 | 步骤 | 状态 | 说明 |", "|------|------|------|------|", ] if run.get("timeline"): for item in run["timeline"]: md_lines.append( f"| `{item.get('at', '')}` | `{item.get('step', '')}` | `{item.get('status', '')}` | {item.get('detail', '') or '-'} |" ) else: md_lines.append("| `-` | `-` | `-` | 无时间线 |") md_lines.extend( [ "", "## Compose 拓扑", "", f"- Compose 文件: `{', '.join(run.get('compose_refs', [])) or '-'}`", f"- 服务列表: `{', '.join(run.get('compose_services', [])) or '-'}`", "", "## 攻击步骤", "", "| 工具/步骤 | 状态 | 结果 |", "|-----------|------|------|", ] ) if run.get("attack_steps"): for step in run["attack_steps"]: outcome = step.get("result_path") or step.get("detail") or "-" md_lines.append(f"| `{step.get('tool') or step.get('kind')}` | `{step.get('status', '-')}` | `{outcome}` |") else: md_lines.append("| `-` | `skipped` | `no attack steps` |") md_lines.extend( [ "", "## 证据摘要", "", f"- Baseline: `{len(run.get('baseline_refs', []))}`", f"- 攻击步骤: `{len(run.get('attack_steps', []))}`", f"- 浏览器证据: `{len(run.get('browser_refs', []))}`", f"- 容器日志: `{len(run.get('container_log_refs', []))}`", f"- 请求日志: `{len(run.get('request_log_refs', []))}`", "", ] ) if relative_screenshots: md_lines.extend(["## 浏览器截图", ""]) for ref in relative_screenshots: md_lines.append(f"![{Path(ref).stem}]({ref})") md_lines.append("") if run.get("browser_refs"): md_lines.extend(["## 浏览器证据", ""]) for ref in run["browser_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") if run.get("container_log_refs"): md_lines.extend(["## 容器日志", ""]) for ref in run["container_log_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") if run.get("request_log_refs"): md_lines.extend(["## 请求与基线日志", ""]) for ref in run["request_log_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") md_lines.extend( [ "## 最小化验证说明", "", "- 仅限自有资产、本地靶场或已授权实验目标。", "- 默认执行 minimal-proof;不会把破坏性或不可回滚动作作为默认路径。", "- 若浏览器证据缺失,前端类案例不会被标为 `verified-*`。", "", ] ) report_md = run_dir / "report.md" write_text(report_md, "\n".join(md_lines)) html_body = [ "", "websafe run report", "", "", f"

Run {html.escape(run['run_id'])}

", "
", f"
Advisory
{html.escape(run['advisory_id'])}
", f"
Status
{html.escape(run['verification_status'])}
", f"
Profile
{html.escape(run['repro_profile_id'])}
", f"
Artifact Mode
{html.escape(run['artifact_mode'])}
", "
", "

Mermaid Timeline

", f"
{html.escape(mermaid_from_steps(run))}
", "

Timeline

", "", ] if run.get("timeline"): for item in run["timeline"]: html_body.append( "" f"" f"" f"" f"" "" ) html_body.extend(["
TimeStepStatusDetail
{html.escape(item.get('at', ''))}{html.escape(item.get('step', ''))}{html.escape(item.get('status', ''))}{html.escape(item.get('detail', '') or '-')}
", "

Attack Steps

", ""]) if run.get("attack_steps"): for step in run["attack_steps"]: html_body.append( "" f"" f"" f"" "" ) else: html_body.append("") html_body.extend(["
ToolStatusOutput
{html.escape(step.get('tool') or step.get('kind') or '-')}{html.escape(step.get('status', '-'))}{html.escape(step.get('result_path') or '-')}
-skippedno attack steps
"]) if relative_screenshots: html_body.extend(["

Browser Screenshots

", "") html_body.extend(["

Evidence

", ""]) report_html = run_dir / "report.html" write_text(report_html, "\n".join(html_body)) return {"bundle_dir": str(run_dir), "report_md": str(report_md), "report_html": str(report_html), "timeline": str(timeline_path)} def render_dashboard() -> Dict[str, str]: ensure_dir(DASHBOARD_DIR) advisory_records = load_json_dir(ADVISORIES_DIR) runs = load_json_dir(RUNS_DIR) advisory_map = {item["canonical_id"]: item for item in advisory_records if item.get("canonical_id")} profile_map = load_profiles() _sync_run_bundles(runs) systems: Dict[str, Dict[str, Any]] = {} for advisory in advisory_records: system = systems.setdefault( advisory["system_id"], { "system_id": advisory["system_id"], "display_name": advisory.get("display_name", advisory["system_id"]), "total": 0, "verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0, "browser_required": 0, "browser_present": 0, "latest_update": "", }, ) system["total"] += 1 status = advisory.get("verification_status", "triage-manual") if status == "verified-real": system["verified_real"] += 1 elif status == "verified-synthetic": system["verified_synthetic"] += 1 elif status.startswith("blocked-"): system["blocked"] += 1 else: system["manual"] += 1 browser = advisory.get("browser_evidence", {}) if browser.get("required"): system["browser_required"] += 1 if browser.get("present"): system["browser_present"] += 1 latest = advisory.get("updated_at") or advisory.get("published_at") or "" if latest > system["latest_update"]: system["latest_update"] = latest recent_runs = sorted(runs, key=lambda item: item.get("finished_at") or "", reverse=True)[:100] decorated_runs: List[Dict[str, Any]] = [] for item in recent_runs: cloned = dict(item) advisory = advisory_map.get(item["advisory_id"], {}) profile = profile_map.get(item["repro_profile_id"], {}) browser_evidence = item.get("browser_evidence") or advisory.get("browser_evidence") or { "required": bool(profile.get("browser_assertions", {}).get("required")), "present": False, "refs": [], } request_only_refs = [ref for ref in item.get("request_log_refs", []) if ref not in item.get("baseline_refs", [])] cloned["dashboard_refs"] = { "report_html": f"./runs/{item['run_id']}/report.html", "report_md": f"./runs/{item['run_id']}/report.md", "timeline": f"./runs/{item['run_id']}/timeline.mmd", "bundle": f"./runs/{item['run_id']}/run.json", } cloned["browser_evidence"] = browser_evidence cloned["browser_links"] = [_dashboard_ref(item, ref) for ref in item.get("browser_refs", [])] cloned["container_links"] = [_dashboard_ref(item, ref) for ref in item.get("container_log_refs", [])] cloned["request_links"] = [_dashboard_ref(item, ref) for ref in item.get("request_log_refs", [])] cloned["advisory_meta"] = _advisory_meta(advisory) cloned["profile_meta"] = _profile_meta(profile) cloned["reasoning_lines"] = _reasoning_lines(advisory, profile) cloned["progress"] = _progress_counts(item) cloned["artifact_groups"] = [ _artifact_group( item, "reports", "Reports", [ cloned["dashboard_refs"]["report_html"], cloned["dashboard_refs"]["report_md"], cloned["dashboard_refs"]["timeline"], cloned["dashboard_refs"]["bundle"], ], use_dashboard_refs=True, ), _artifact_group(item, "compose", "Compose", item.get("compose_refs", [])), _artifact_group(item, "baseline", "Baseline Snapshots", item.get("baseline_refs", [])), _artifact_group(item, "attack", "Attack Outputs", _attack_result_refs(item)), _artifact_group(item, "browser", "Browser Evidence", item.get("browser_refs", [])), _artifact_group(item, "container", "Container Logs", item.get("container_log_refs", [])), _artifact_group(item, "requests", "Request Logs", request_only_refs), ] cloned["artifact_groups"] = [group for group in cloned["artifact_groups"] if group["count"]] decorated_runs.append(cloned) summary = { "generated_at": isoformat(now_utc()), "advisory_count": len(advisory_records), "run_count": len(runs), "statuses": {}, "recent_failures": [], } for item in runs: status = item.get("verification_status", "triage-manual") summary["statuses"][status] = summary["statuses"].get(status, 0) + 1 summary["systems"] = sorted(systems.values(), key=lambda entry: (-entry["total"], entry["system_id"])) summary["recent_failures"] = [ { "run_id": item["run_id"], "advisory_id": item["advisory_id"], "status": item.get("verification_status"), "title": item.get("advisory_meta", {}).get("title"), "blocked_reason": item.get("blocked_reason"), } for item in decorated_runs if item.get("verification_status") in {"triage-manual", "blocked-artifact", "blocked-destructive"} ][:20] write_json(DASHBOARD_DIR / "summary.json", summary) write_json(DASHBOARD_DIR / "runs.json", decorated_runs) write_json(DASHBOARD_DIR / "systems.json", summary["systems"]) write_json(DASHBOARD_DIR / "advisories.json", {key: _advisory_meta(value) for key, value in advisory_map.items()}) write_json(DASHBOARD_DIR / "profiles.json", {key: _profile_meta(value) for key, value in profile_map.items()}) _write_dashboard_docs() _write_design_source_manifest() _render_root_dashboard_shell() _render_legacy_dashboard_shell() return { "dashboard_dir": str(DASHBOARD_DIR), "index": str(DASHBOARD_DIR / "index.html"), "legacy_index": str(DASHBOARD_DIR / "legacy" / "index.html"), "summary_json": str(DASHBOARD_DIR / "summary.json"), }