from __future__ import annotations import html import json import os import shutil from pathlib import Path from typing import Any, Dict, List from lab.config import ADVISORIES_DIR, CASE_RUNS_DIR, DASHBOARD_DIR, REPRO_MAP_PATH, ROOT, RUNS_DIR, SOURCE_MAP_PATH from lab.repro import annotate_with_latest_run, latest_runs_by_advisory, load_profiles from lab.utils import ensure_dir, isoformat, load_json_dir, now_utc, read_json, read_yaml, unique, write_json, write_text TEMPLATES_DIR = ROOT / "scripts" / "lab" / "dashboard_templates" LOVART_TEMPLATE_DIR = TEMPLATES_DIR / "lovart" LEGACY_TEMPLATE_DIR = TEMPLATES_DIR / "legacy" LOVART_VENDOR_MANIFEST = LOVART_TEMPLATE_DIR / "vendor" / "source-manifest.json" ROOT_JSON_FILES = ["summary.json", "runs.json", "systems.json", "advisories.json", "profiles.json"] ROOT_JSON_FILES.append("architecture.json") SECTION_ROUTE_DIRS = ["overview", "runs", "systems", "architecture", "data"] CATEGORY_LABELS = { "cms": "CMS / 内容平台", "ecommerce": "电商系统", "frameworks": "Web 框架与运行时", "servers": "服务器与边界层", "platforms": "开源平台与后台系统", } TIER_LABELS = { "history-full": "历史全量", "rolling-24m": "近两年全量", } STATUS_LABELS = { "verified-real": "真实版本已实证", "verified-synthetic": "合成靶场已实证", "blocked-artifact": "制品阻塞", "blocked-destructive": "破坏性风险阻塞", "triage-manual": "人工分诊", "suspected": "仅疑似命中", } def mermaid_from_steps(run: Dict[str, Any]) -> str: lines = [ "flowchart LR", 'A["选择 Advisory"] --> B["解析 Repro Profile"]', 'B --> C["生成 Compose 环境"]', 'C --> D["采集基线快照"]', 'D --> E["执行受控攻击步骤"]', 'E --> F["浏览器回放验证"]', 'F --> G["收集日志与证据"]', 'G --> H["回写 Registry 与报告"]', ] if run.get("blocked_reason"): lines.append(f'H --> I["阻塞: {run["blocked_reason"][:60]}"]') return "\n".join(lines) def _relative_ref(run_dir: Path, ref: str) -> str: try: return str(Path(ref).resolve().relative_to(run_dir.resolve())) except ValueError: return ref def _dashboard_ref(run: Dict[str, Any], ref: str) -> str: try: bundle_dir = Path(run["report_refs"]["bundle_dir"]).resolve() relative = Path(ref).resolve().relative_to(bundle_dir) return f"/runs/{run['run_id']}/{relative.as_posix()}" except Exception: return ref def _artifact_kind(href: str) -> str: suffix = Path(href).suffix.lower() if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}: return "image" if suffix in {".json", ".log", ".txt", ".yaml", ".yml", ".md", ".mmd", ".html"}: return "text" return "link" def _artifact_item(run: Dict[str, Any], href: str, label: str | None = None) -> Dict[str, Any]: return {"href": href, "label": label or Path(href).name, "kind": _artifact_kind(href)} def _artifact_group(run: Dict[str, Any], key: str, label: str, refs: List[str], use_dashboard_refs: bool = False) -> Dict[str, Any]: items: List[Dict[str, Any]] = [] for ref in refs: href = ref if use_dashboard_refs else _dashboard_ref(run, ref) items.append(_artifact_item(run, href)) return {"key": key, "label": label, "count": len(items), "items": items} def _attack_result_refs(run: Dict[str, Any]) -> List[str]: refs: List[str] = [] for step in run.get("attack_steps", []): result_path = step.get("result_path") if result_path: refs.append(str(result_path)) return unique(refs) def _progress_counts(run: Dict[str, Any]) -> Dict[str, int]: counts = {"completed": 0, "skipped": 0, "failed": 0, "blocked": 0, "planned": 0, "other": 0} for item in run.get("timeline", []): status = item.get("status", "other") if status.startswith("blocked"): counts["blocked"] += 1 elif status in counts: counts[status] += 1 else: counts["other"] += 1 return counts def _advisory_meta(advisory: Dict[str, Any]) -> Dict[str, Any]: if not advisory: return {} return { "canonical_id": advisory.get("canonical_id"), "title": advisory.get("title"), "summary": advisory.get("summary"), "display_name": advisory.get("display_name"), "system_id": advisory.get("system_id"), "category": advisory.get("category"), "severity": advisory.get("severity"), "cvss_score": advisory.get("cvss_score"), "exploit_status": advisory.get("exploit_status"), "published_at": advisory.get("published_at"), "updated_at": advisory.get("updated_at"), "official_source_url": advisory.get("official_source_url"), "secondary_source_urls": advisory.get("secondary_source_urls", []), "aliases": advisory.get("aliases", []), "secure_code_topics": advisory.get("secure_code_topics", []), "verification_status": advisory.get("verification_status"), "verification_mode": advisory.get("verification_mode"), "artifact_mode": advisory.get("artifact_mode"), "blocked_reason": advisory.get("blocked_reason"), "browser_evidence": advisory.get("browser_evidence", {}), } def _profile_meta(profile: Dict[str, Any]) -> Dict[str, Any]: if not profile: return {} return { "profile_id": profile.get("profile_id"), "vuln_family": profile.get("vuln_family"), "provisioning_mode": profile.get("provisioning_mode"), "destructive_risk": profile.get("destructive_risk"), "cleanup_policy": profile.get("cleanup_policy"), "artifact_source": profile.get("artifact_source", {}), "success_criteria": profile.get("success_criteria", []), "success_assertions": profile.get("success_assertions", []), "seed_actions": profile.get("seed_actions", []), "attack_actions": profile.get("attack_actions", []), "browser_assertions": profile.get("browser_assertions", {}), "runner_id": profile.get("runner_id"), "fixture_path": profile.get("fixture_path"), "allowed_target_types": profile.get("allowed_target_types", []), "required_services": profile.get("required_services", []), } def _reasoning_lines(advisory: Dict[str, Any], profile: Dict[str, Any]) -> List[str]: notes: List[str] = [] if advisory.get("summary"): notes.append(advisory["summary"]) for key in ("seed_actions", "attack_actions"): for item in profile.get(key, []): message = item.get("message") if message: notes.append(message) for item in profile.get("success_criteria", []): if item: notes.append(item) if advisory.get("blocked_reason"): notes.append(f"Current blocker: {advisory['blocked_reason']}") return unique(notes) def _display_value(value: Any) -> str: if value is None or value == "": return "-" if isinstance(value, bool): return "是" if value else "否" if isinstance(value, (int, float)): return str(value) if isinstance(value, list): return "\n".join(_display_value(item) for item in value if item not in (None, "")) or "-" if isinstance(value, dict): return json.dumps(value, ensure_ascii=False, indent=2) return str(value) def _field(label: str, value: Any) -> Dict[str, str]: return {"label": label, "value": _display_value(value)} def _stat(label: str, value: Any) -> Dict[str, str]: return {"label": label, "value": _display_value(value)} def _link(label: str, href: str, description: str) -> Dict[str, str]: return {"label": label, "href": href, "description": description} def _status_label(value: str | None) -> str: return STATUS_LABELS.get(value or "", value or "-") def _family_name(advisory: Dict[str, Any], profile_map: Dict[str, Dict[str, Any]]) -> str: profile = profile_map.get(advisory.get("repro_profile_id") or "", {}) return profile.get("vuln_family") or advisory.get("repro_profile_id") or "unknown" def _latest_advisories(advisories: List[Dict[str, Any]]) -> List[Dict[str, Any]]: run_map = latest_runs_by_advisory() return [annotate_with_latest_run(item, run_map.get(item.get("canonical_id"))) for item in advisories] def _build_completeness( advisories: List[Dict[str, Any]], runs: List[Dict[str, Any]], profile_map: Dict[str, Dict[str, Any]], run_summary: Dict[str, Any], ) -> Dict[str, Any]: latest_statuses: Dict[str, int] = {} historical_statuses: Dict[str, int] = {} systems: Dict[str, Dict[str, Any]] = {} for item in advisories: status = item.get("verification_status", "triage-manual") latest_statuses[status] = latest_statuses.get(status, 0) + 1 system = systems.setdefault( item["system_id"], { "system_id": item["system_id"], "display_name": item.get("display_name", item["system_id"]), "total": 0, "verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0, "families": {}, }, ) system["total"] += 1 family = _family_name(item, profile_map) family_entry = system["families"].setdefault( family, {"family": family, "total": 0, "verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0}, ) family_entry["total"] += 1 if status == "verified-real": system["verified_real"] += 1 family_entry["verified_real"] += 1 elif status == "verified-synthetic": system["verified_synthetic"] += 1 family_entry["verified_synthetic"] += 1 elif status.startswith("blocked-"): system["blocked"] += 1 family_entry["blocked"] += 1 else: system["manual"] += 1 family_entry["manual"] += 1 for item in runs: status = item.get("verification_status", "triage-manual") historical_statuses[status] = historical_statuses.get(status, 0) + 1 systems_list = [] for entry in sorted(systems.values(), key=lambda value: value["system_id"]): entry["families"] = sorted(entry["families"].values(), key=lambda value: value["family"]) systems_list.append(entry) advisory_total = len(advisories) verified_real = latest_statuses.get("verified-real", 0) verified_synthetic = latest_statuses.get("verified-synthetic", 0) blocked = sum(count for key, count in latest_statuses.items() if key.startswith("blocked-")) manual = advisory_total - verified_real - verified_synthetic - blocked complete = advisory_total > 0 and advisory_total == verified_real return { "generated_at": isoformat(now_utc()), "advisory_total": advisory_total, "latest_statuses": latest_statuses, "historical_statuses": historical_statuses, "verified_real": verified_real, "verified_synthetic": verified_synthetic, "blocked": blocked, "manual": manual, "verified_ratio": round((verified_real / advisory_total) * 100, 1) if advisory_total else 0.0, "complete": complete, "systems": systems_list, "ingest_health": { "failure_count": len(run_summary.get("failures", []) or []), "failures": run_summary.get("failures", []) or [], }, "historical_blockers": [ "Docker daemon unavailable caused provision-compose-environment blocked-artifact.", "Family profiles previously used note-only attack runners and dry-run placeholders.", "Baseline and browser steps were skipped when environment readiness was not enforced.", "Latest completeness now uses one advisory -> latest run semantics instead of historical run piles.", ], } def _write_testing_completeness_report(completeness: Dict[str, Any]) -> None: lines = [ "# 全库 Advisory 完整度报告", "", f"- 生成时间: `{completeness['generated_at']}`", f"- 最新 advisory 完整度: `{completeness['verified_real']}/{completeness['advisory_total']}` `verified-real`", f"- 合成验证数量: `{completeness['verified_synthetic']}`", f"- 阻塞数量: `{completeness['blocked']}`", f"- 人工/待补证据数量: `{completeness['manual']}`", f"- 完整度百分比: `{completeness['verified_ratio']}%`", "", "## 系统覆盖矩阵", "", "| 系统 | 总数 | verified-real | verified-synthetic | blocked | manual | family 覆盖 |", "| --- | ---: | ---: | ---: | ---: | ---: | --- |", ] for system in completeness["systems"]: family_text = ", ".join( f"{item['family']}({item['verified_real']}/{item['total']})" for item in system["families"] ) lines.append( f"| {system['system_id']} | {system['total']} | {system['verified_real']} | {system['verified_synthetic']} | {system['blocked']} | {system['manual']} | {family_text} |" ) lines.extend( [ "", "## 历史阻塞项修复纪要", "", ] ) for item in completeness.get("historical_blockers", []): lines.append(f"- {item}") lines.extend( [ "", "## Ingest / Source 健康度", "", f"- source failures: `{completeness['ingest_health']['failure_count']}`", ] ) for item in completeness["ingest_health"].get("failures", []): lines.append(f"- {item}") lines.extend( [ "", "## 剩余风险说明", "", "- 本报告按 advisory 的最新 run 计算;历史失败 run 仅保留审计价值,不再污染完整度数字。", "- `browser_required=true` 的案例必须同时存在基线与攻击后浏览器证据,缺失则不会进入 `verified-real`。", "- source collector 健康度单独计数;只有当 failures 归零时,报告与 dashboard 才算真正全绿。", ] ) write_text(ROOT / "docs" / "testing-completeness-report.md", "\n".join(lines)) def _build_architecture_data(summary: Dict[str, Any], source_map: Dict[str, Any], repro_map: Dict[str, Any]) -> Dict[str, Any]: source_systems = source_map.get("systems", []) or [] repro_by_system = {item.get("system_id"): item for item in (repro_map.get("systems", []) or []) if item.get("system_id")} route_links = [ _link("总览首页", "/overview/index.html", "工作台总览、最新运行和全局摘要。"), _link("运行中心", "/runs/index.html", "运行队列、详情、证据和日志入口。"), _link("系统分组", "/systems/index.html", "按系统和分类浏览覆盖情况。"), _link("架构库", "/architecture/index.html", "查看控制面、数据层和授权边界。"), _link("文档中心", "/docs/index.html", "集中查看项目文档、本地镜像和说明。"), _link("数据中心", "/data/index.html", "查看 summary、runs、systems 等 JSON 入口。"), _link("旧版工作台", "/legacy/index.html", "保留的 legacy 回退入口。"), _link("项目功能文档", "/docs/project-features.html", "项目能力、目录结构与自动化链路总览。"), _link("前端设计文档", "/docs/frontend-dashboard-design.html", "当前本地工作台的交互与视觉规范。"), _link("完整度报告", "/docs/testing-completeness-report.html", "89 条 advisory 的最新完整度中文报告。"), _link("安全编码索引", "/docs/secure-code-index.html", "secure-code 修复库本地镜像。"), _link("仓库入口镜像", "/docs/root-readme.html", "仓库根 README 的本地镜像。"), _link("授权模型", "/docs/authorization-model.html", "允许目标范围、全局原则与记录要求。"), _link("source-map 真值", "/docs/source-map.html", "系统覆盖、来源和输出目录真值。"), _link("repro-map 真值", "/docs/repro-map.html", "复现族路由、浏览器要求和日志策略。"), _link("覆盖矩阵", "/docs/coverage-matrix.html", "自动生成覆盖摘要的本地镜像。"), _link("设计来源清单", "/docs/design-source.html", "Lovart 模板本地 vendor manifest。"), _link("架构库镜像", "/docs/architecture-library.html", "当前架构库的结构化镜像页。"), ] data_links = [ _link("summary.json", "/summary.json", "全局摘要、状态分布和最近失败。"), _link("completeness.json", "/data/completeness.json", "最新 advisory 完整度、系统/family 进度与 ingest 健康度。"), _link("runs.json", "/runs.json", "最近 run 的结构化详情。"), _link("systems.json", "/systems.json", "系统级覆盖与浏览器证据摘要。"), _link("advisories.json", "/advisories.json", "漏洞条目元数据与来源。"), _link("profiles.json", "/profiles.json", "复现档案元数据。"), _link("architecture.json", "/architecture.json", "当前架构库结构化 JSON。"), ] category_items: List[Dict[str, Any]] = [] family_counts: Dict[str, int] = {} tier_counts = {"history-full": 0, "rolling-24m": 0} for system in source_systems: tier = system.get("tier", "rolling-24m") tier_counts[tier] = tier_counts.get(tier, 0) + 1 repro = repro_by_system.get(system.get("system_id"), {}) family = repro.get("default_repro_family") or "未定义" family_counts[family] = family_counts.get(family, 0) + 1 for category_id in sorted(CATEGORY_LABELS, key=lambda item: CATEGORY_LABELS[item]): systems_in_category = [item for item in source_systems if item.get("category") == category_id] if not systems_in_category: continue history_full = sum(1 for item in systems_in_category if item.get("tier") == "history-full") rolling = sum(1 for item in systems_in_category if item.get("tier") == "rolling-24m") system_nodes: List[Dict[str, Any]] = [] for system in sorted(systems_in_category, key=lambda item: item.get("display_name", item.get("system_id", ""))): repro = repro_by_system.get(system.get("system_id"), {}) official_sources = system.get("official_sources", []) or [] ecosystem_sources = system.get("ecosystem_sources", []) or [] research_sources = system.get("research_sources", []) or [] system_nodes.append( { "title": f"{system.get('display_name', system.get('system_id'))} ({system.get('system_id')})", "summary": f"{TIER_LABELS.get(system.get('tier'), system.get('tier'))} · {', '.join(system.get('advisory_modes', [])) or '未定义模式'}", "open": False, "badges": [ TIER_LABELS.get(system.get("tier"), system.get("tier", "-")), f"官方源 {len(official_sources)}", f"生态源 {len(ecosystem_sources)}", f"研究源 {len(research_sources)}", ], "fields": [ _field("输出目录", system.get("output_dir")), _field("Advisory 模式", system.get("advisory_modes", [])), _field("Secure-Code 主题", system.get("secure_code_topics", [])), _field("CPE 关键字", system.get("cpe_keys", [])), _field("GHSA 关键字", system.get("ghsa_keywords", [])), ], "items": [ { "title": "来源配置", "summary": "官方、生态权威与研究补充来源。", "open": False, "fields": [ _field("官方来源", [entry.get("name") for entry in official_sources]), _field("生态来源", [entry.get("name") for entry in ecosystem_sources]), _field("研究来源", [entry.get("name") for entry in research_sources]), ], }, { "title": "复现默认值", "summary": "repro-map 中的默认攻击族、浏览器要求和日志策略。", "open": False, "fields": [ _field("默认漏洞家族", repro.get("default_repro_family")), _field("浏览器默认要求", repro.get("browser_required_default")), _field("优先制品模式", repro.get("provisioning_mode_preference", [])), _field("种子策略", repro.get("seed_strategy")), _field("日志采集器", repro.get("log_collectors", [])), _field("报告模板", repro.get("report_template")), ], }, ], } ) category_items.append( { "title": CATEGORY_LABELS.get(category_id, category_id), "summary": f"{len(systems_in_category)} 个系统 · 历史全量 {history_full} · 近两年全量 {rolling}", "open": False, "stats": [ _stat("系统数", len(systems_in_category)), _stat("历史全量", history_full), _stat("近两年全量", rolling), ], "items": system_nodes, } ) repro_family_nodes = [ { "title": family, "summary": f"默认路由到该 family 的系统数:{count}", "open": False, "fields": [_field("系统数量", count)], } for family, count in sorted(family_counts.items(), key=lambda item: (-item[1], item[0])) ] recent_failure_nodes = [ { "title": item.get("title") or item.get("advisory_id") or item.get("run_id"), "summary": item.get("blocked_reason") or "无额外阻塞说明。", "open": False, "badges": [_status_label(item.get("status"))], "fields": [ _field("运行 ID", item.get("run_id")), _field("漏洞条目", item.get("advisory_id")), _field("状态", _status_label(item.get("status"))), _field("阻塞原因", item.get("blocked_reason")), ], } for item in summary.get("recent_failures", []) ] status_nodes = [ { "title": _status_label(status), "summary": f"当前累计 {count} 条。", "open": False, "fields": [ _field("状态编码", status), _field("数量", count), ], } for status, count in sorted(summary.get("statuses", {}).items(), key=lambda item: (-item[1], item[0])) ] return { "generated_at": summary.get("generated_at"), "title": "当前架构库", "summary": "工作台、控制面、数据层、授权边界与系统覆盖的当前真值视图。", "sections": [ { "title": "仓库定位与当前状态", "summary": "授权攻防实验与研究知识库;仅适用于自有资产、本地靶场和明确授权目标。", "open": True, "badges": ["LAB ONLY", "AUTHORIZED TARGETS ONLY", "非生产安全基线"], "stats": [ _stat("纳管系统", len(source_systems)), _stat("历史全量系统", tier_counts.get("history-full", 0)), _stat("近两年全量系统", tier_counts.get("rolling-24m", 0)), _stat("当前运行", summary.get("run_count", 0)), _stat("当前漏洞条目", summary.get("advisory_count", 0)), ], "fields": [ _field("仓库根目录", str(ROOT)), _field("默认本地地址", "http://127.0.0.1:8734/"), _field("自动刷新周期", "5 秒"), _field("生成时间", summary.get("generated_at")), ], "links": route_links[:4], }, { "title": "授权边界与目标模型", "summary": "所有实验都绑定到本地、自建公网或明确授权目标,不面向无关第三方资产。", "open": True, "stats": [ _stat("允许目标类型", 3), _stat("禁止类型", 1), ], "fields": [ _field("允许目标", ["lab-local", "lab-public", "authorized-third-party"]), _field("禁止目标", ["out-of-scope", "无归属证明目标", "公共知名站点", "泛互联网枚举"]), _field("全局原则", [ "任何公网验证前先确认资产归属或授权关系。", "优先只读探测、最小化回显验证和低频实验。", "涉及账户、令牌、敏感数据和业务写入时采用最小必要动作。", "不做泛互联网枚举,不对无关公共站点复用同类测试。", ]), ], "links": [ _link("授权模型镜像", "./docs/authorization-model.html", "目标分类、原则与记录要求。"), _link("仓库入口镜像", "./docs/root-readme.html", "仓库定位、能力矩阵与自动化入口。"), ], }, { "title": "控制面与自动化入口", "summary": "Intel 控制面负责情报入库;Lab 控制面负责本地部署、攻击验证、证据收集和看板生成。", "open": True, "items": [ { "title": "情报控制面(Intel)", "summary": "负责 source adapter、规范化、渲染、校验和 PR 流程。", "open": False, "fields": [ _field("CLI 入口", "python3 /Users/x/websafe/scripts/intel/main.py"), _field("主要命令", [ "render", "validate", "hotlane", "ingest --since last-success", "reconcile", "backfill --tier history-full --dry-run", "open-pr --dry-run", ]), _field("定时入口", [ "scripts/intel/run-hourly.sh", "scripts/intel/run-nightly.sh", "scripts/intel/run-weekly-reconcile.sh", ]), ], }, { "title": "实证控制面(Lab)", "summary": "负责 catalog、compose、seed、baseline、attack、browser、evidence、render 和 queue。", "open": False, "fields": [ _field("CLI 入口", "python3 /Users/x/websafe/scripts/lab/main.py"), _field("主要命令", [ "catalog sync", "validate", "run-case", "run-system", "run-batch", "render-run", "serve-dashboard --port 8734", "cleanup", "retry-failures", ]), _field("关键模块", [ "catalog/", "provision/", "compose/", "seed/", "baseline/", "attack/", "browser/", "evidence/", "render/", "queue/", ]), ], }, ], }, { "title": "数据层与本地地址", "summary": "Registry、生成层、run bundle 与 docs 镜像共同构成工作台的本地数据面。", "open": True, "items": [ { "title": "真值层", "summary": "统一的 registry 与 repro/source 配置。", "open": False, "fields": [ _field("漏洞条目 Registry", "08-threat-intel/registry/advisories/*.json"), _field("系统 Registry", "08-threat-intel/registry/systems/*.json"), _field("运行 Registry", "08-threat-intel/registry/runs/*.json"), _field("source-map 真值", "08-threat-intel/source-map.yaml"), _field("repro-map 真值", "08-threat-intel/repro-map.yaml"), ], }, { "title": "生成层与展示层", "summary": "dashboard JSON、run report、docs 镜像与本地静态 UI。", "open": False, "links": route_links + data_links, "fields": [ _field("工作台根目录", "08-threat-intel/generated/dashboard/"), _field("运行归档根目录", "06-case-studies/generated-runs//"), _field("默认入口", "/index.html"), _field("总览入口", "/overview/index.html"), _field("运行入口", "/runs/index.html"), _field("系统入口", "/systems/index.html"), _field("架构入口", "/architecture/index.html"), _field("文档入口", "/docs/index.html"), _field("数据入口", "/data/index.html"), _field("旧版入口", "/legacy/index.html"), ], }, ], }, { "title": "系统覆盖分组", "summary": "基于 source-map 和 repro-map 生成的当前分组视图,可展开查看每个系统的来源、输出目录和复现默认值。", "open": True, "items": category_items, }, { "title": "Repro 路由概览", "summary": "按默认漏洞家族聚合当前系统路由,帮助查看 family runner 覆盖面。", "open": True, "items": repro_family_nodes, }, { "title": "当前生成态与阻塞概览", "summary": "当前 render 后的状态分布、失败摘要与最近可见阻塞。", "open": True, "stats": [ _stat("Run 数", summary.get("run_count", 0)), _stat("Advisory 数", summary.get("advisory_count", 0)), _stat("状态类型", len(summary.get("statuses", {}))), _stat("最近失败", len(summary.get("recent_failures", []))), ], "items": [ { "title": "状态分布", "summary": "verification_status 当前计数。", "open": False, "items": status_nodes, }, { "title": "最近失败", "summary": "当前 dashboard 摘要里可见的失败或人工分诊样本。", "open": False, "items": recent_failure_nodes or [ { "title": "暂无失败样本", "summary": "当前 summary.json 中没有 recent_failures。", "open": False, } ], }, ], }, ], } def _dashboard_doc_page(title: str, body: str, description: str) -> str: return f""" {html.escape(title)}

{html.escape(title)}

{html.escape(description)}
{html.escape(body)}
""" def _remove_path(path: Path) -> None: if not path.exists() and not path.is_symlink(): return if path.is_symlink() or path.is_file(): path.unlink() return shutil.rmtree(path) def _sync_symlink(target: Path, link_path: Path) -> None: ensure_dir(link_path.parent) relative_target = os.path.relpath(target, link_path.parent) if link_path.is_symlink() and os.readlink(link_path) == relative_target: return _remove_path(link_path) os.symlink(relative_target, link_path, target_is_directory=target.is_dir()) def _copy_tree(source: Path, destination: Path) -> None: ensure_dir(destination) for path in source.rglob("*"): relative = path.relative_to(source) target = destination / relative if path.is_dir(): ensure_dir(target) continue ensure_dir(target.parent) shutil.copy2(path, target) def _write_dashboard_docs(architecture: Dict[str, Any]) -> None: docs_dir = DASHBOARD_DIR / "docs" ensure_dir(docs_dir) sources = [ ( "project-features.html", "项目功能与特性总览", (ROOT / "docs" / "project-features.md").read_text(encoding="utf-8"), "工作台内置镜像页:仓库功能、目录和自动化链路说明。", ), ( "frontend-dashboard-design.html", "本地前端工作台设计文档", (ROOT / "docs" / "frontend-dashboard-design.md").read_text(encoding="utf-8"), "工作台内置镜像页:前端交互、展示结构和视觉规范。", ), ( "secure-code-index.html", "安全编码修复库索引", (ROOT / "05-defense" / "secure-code" / "INDEX.md").read_text(encoding="utf-8"), "工作台内置镜像页:secure-code 修复主题索引。", ), ( "root-readme.html", "仓库入口镜像", (ROOT / "README.md").read_text(encoding="utf-8"), "工作台内置镜像页:仓库定位、能力矩阵、入口和自动化入口。", ), ( "authorization-model.html", "授权模型镜像", (ROOT / "09-scope-and-targeting" / "authorization-model.md").read_text(encoding="utf-8"), "工作台内置镜像页:目标范围、授权模型、最小化验证建议和记录要求。", ), ( "source-map.html", "source-map 真值镜像", SOURCE_MAP_PATH.read_text(encoding="utf-8"), "工作台内置镜像页:系统覆盖、来源、输出目录和 secure-code 主题真值。", ), ( "repro-map.html", "repro-map 真值镜像", REPRO_MAP_PATH.read_text(encoding="utf-8"), "工作台内置镜像页:默认漏洞家族、浏览器要求和日志策略真值。", ), ( "coverage-matrix.html", "覆盖矩阵镜像", (ROOT / "08-threat-intel" / "generated" / "coverage-matrix.md").read_text(encoding="utf-8"), "工作台内置镜像页:当前覆盖矩阵生成结果。", ), ( "testing-completeness-report.html", "中文完整度报告", (ROOT / "docs" / "testing-completeness-report.md").read_text(encoding="utf-8"), "工作台内置镜像页:89 条 advisory 最新完整度、family 矩阵与 ingest 健康度。", ), ] manifest_body = LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8") if LOVART_VENDOR_MANIFEST.exists() else "{}" sources.append( ( "design-source.html", "Lovart 设计来源与本地化清单", manifest_body, "工作台内置镜像页:Lovart 来源文件、本地 vendor 路径和本地化说明。", ) ) sources.append( ( "architecture-library.html", "当前架构库镜像", json.dumps(architecture, indent=2, ensure_ascii=False), "工作台内置镜像页:当前架构库结构化数据镜像。", ) ) for filename, title, body, description in sources: write_text(docs_dir / filename, _dashboard_doc_page(title, body, description)) def _write_design_source_manifest() -> None: assets_dir = DASHBOARD_DIR / "assets" ensure_dir(assets_dir) manifest = json.loads(LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8")) if LOVART_VENDOR_MANIFEST.exists() else {} write_json(assets_dir / "design-source.json", manifest) def _render_root_dashboard_shell() -> None: assets_dir = DASHBOARD_DIR / "assets" ensure_dir(assets_dir) for filename in ("index.html",): shutil.copy2(LOVART_TEMPLATE_DIR / filename, DASHBOARD_DIR / filename) _copy_tree(LOVART_TEMPLATE_DIR / "assets", assets_dir) def _render_section_dashboard_shells() -> None: source_index = LOVART_TEMPLATE_DIR / "index.html" for section in SECTION_ROUTE_DIRS: section_dir = DASHBOARD_DIR / section if section == "runs": # Preserve existing /runs// bundles; only refresh the section shell. ensure_dir(section_dir) index_path = section_dir / "index.html" if index_path.exists(): index_path.unlink() shutil.copy2(source_index, index_path) continue _remove_path(section_dir) ensure_dir(section_dir) shutil.copy2(source_index, section_dir / "index.html") docs_dir = DASHBOARD_DIR / "docs" ensure_dir(docs_dir) shutil.copy2(source_index, docs_dir / "index.html") def _render_legacy_dashboard_shell() -> None: legacy_dir = DASHBOARD_DIR / "legacy" _remove_path(legacy_dir) ensure_dir(legacy_dir) shutil.copy2(LEGACY_TEMPLATE_DIR / "index.html", legacy_dir / "index.html") _copy_tree(LEGACY_TEMPLATE_DIR / "assets", legacy_dir / "assets") for json_name in ROOT_JSON_FILES: _sync_symlink(DASHBOARD_DIR / json_name, legacy_dir / json_name) _sync_symlink(DASHBOARD_DIR / "runs", legacy_dir / "runs") _sync_symlink(DASHBOARD_DIR / "docs", legacy_dir / "docs") def _sync_run_bundles(runs: List[Dict[str, Any]]) -> None: runs_dir = DASHBOARD_DIR / "runs" ensure_dir(runs_dir) for item in runs: bundle_dir = Path(item.get("report_refs", {}).get("bundle_dir", "")) if not bundle_dir.exists(): continue _sync_symlink(bundle_dir, runs_dir / item["run_id"]) def render_run(run: Dict[str, Any]) -> Dict[str, str]: run_dir = CASE_RUNS_DIR / run["run_id"] ensure_dir(run_dir / "assets") timeline_path = run_dir / "timeline.mmd" write_text(timeline_path, mermaid_from_steps(run)) screenshot_refs = [ref for ref in run.get("browser_refs", []) if ref.endswith((".png", ".jpg", ".jpeg"))] relative_screenshots = [_relative_ref(run_dir, ref) for ref in screenshot_refs] md_lines = [ f"# 运行 {run['run_id']}", "", "> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成 run bundle", "", f"- 漏洞条目: `{run['advisory_id']}`", f"- 系统: `{run['system_id']}`", f"- Repro Profile: `{run['repro_profile_id']}`", f"- 实证状态: `{run['verification_status']}`", f"- 实证方式: `{run['verification_mode']}`", f"- Artifact 模式: `{run['artifact_mode']}`", f"- 启动时间: `{run['started_at']}`", f"- 完成时间: `{run['finished_at']}`", f"- 阻塞原因: `{run.get('blocked_reason') or '-'}`", f"- Compose 服务: `{', '.join(run.get('compose_services', [])) or '-'}`", "", "## 运行时间线", "", f"- Mermaid: [{timeline_path.name}]({timeline_path})", "", "| 时间 | 步骤 | 状态 | 说明 |", "|------|------|------|------|", ] if run.get("timeline"): for item in run["timeline"]: md_lines.append( f"| `{item.get('at', '')}` | `{item.get('step', '')}` | `{item.get('status', '')}` | {item.get('detail', '') or '-'} |" ) else: md_lines.append("| `-` | `-` | `-` | 无时间线 |") md_lines.extend( [ "", "## Compose 拓扑", "", f"- Compose 文件: `{', '.join(run.get('compose_refs', [])) or '-'}`", f"- 服务列表: `{', '.join(run.get('compose_services', [])) or '-'}`", "", "## 攻击步骤", "", "| 工具/步骤 | 状态 | 结果 |", "|-----------|------|------|", ] ) if run.get("attack_steps"): for step in run["attack_steps"]: outcome = step.get("result_path") or step.get("detail") or "-" md_lines.append(f"| `{step.get('tool') or step.get('kind')}` | `{step.get('status', '-')}` | `{outcome}` |") else: md_lines.append("| `-` | `skipped` | `no attack steps` |") md_lines.extend( [ "", "## 证据摘要", "", f"- Baseline: `{len(run.get('baseline_refs', []))}`", f"- 攻击步骤: `{len(run.get('attack_steps', []))}`", f"- 浏览器证据: `{len(run.get('browser_refs', []))}`", f"- 容器日志: `{len(run.get('container_log_refs', []))}`", f"- 请求日志: `{len(run.get('request_log_refs', []))}`", "", ] ) if relative_screenshots: md_lines.extend(["## 浏览器截图", ""]) for ref in relative_screenshots: md_lines.append(f"![{Path(ref).stem}]({ref})") md_lines.append("") if run.get("browser_refs"): md_lines.extend(["## 浏览器证据", ""]) for ref in run["browser_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") if run.get("container_log_refs"): md_lines.extend(["## 容器日志", ""]) for ref in run["container_log_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") if run.get("request_log_refs"): md_lines.extend(["## 请求与基线日志", ""]) for ref in run["request_log_refs"]: md_lines.append(f"- `{_relative_ref(run_dir, ref)}`") md_lines.append("") md_lines.extend( [ "## 最小化验证说明", "", "- 仅限自有资产、本地靶场或已授权实验目标。", "- 默认执行 minimal-proof;不会把破坏性或不可回滚动作作为默认路径。", "- 若浏览器证据缺失,前端类案例不会被标为 `verified-*`。", "", ] ) report_md = run_dir / "report.md" write_text(report_md, "\n".join(md_lines)) html_body = [ "", "websafe 运行报告", "", "", f"

运行 {html.escape(run['run_id'])}

", "
", f"
漏洞条目
{html.escape(run['advisory_id'])}
", f"
实证状态
{html.escape(run['verification_status'])}
", f"
复现 Profile
{html.escape(run['repro_profile_id'])}
", f"
Artifact 模式
{html.escape(run['artifact_mode'])}
", "
", "

Mermaid 时间线

", f"
{html.escape(mermaid_from_steps(run))}
", "

运行时间线

", "", ] if run.get("timeline"): for item in run["timeline"]: html_body.append( "" f"" f"" f"" f"" "" ) html_body.extend(["
时间步骤状态说明
{html.escape(item.get('at', ''))}{html.escape(item.get('step', ''))}{html.escape(item.get('status', ''))}{html.escape(item.get('detail', '') or '-')}
", "

攻击步骤

", ""]) if run.get("attack_steps"): for step in run["attack_steps"]: html_body.append( "" f"" f"" f"" "" ) else: html_body.append("") html_body.extend(["
工具状态输出
{html.escape(step.get('tool') or step.get('kind') or '-')}{html.escape(step.get('status', '-'))}{html.escape(step.get('result_path') or '-')}
-skipped当前没有攻击步骤
"]) if relative_screenshots: html_body.extend(["

浏览器截图

", "") html_body.extend(["

证据清单

", ""]) report_html = run_dir / "report.html" write_text(report_html, "\n".join(html_body)) return {"bundle_dir": str(run_dir), "report_md": str(report_md), "report_html": str(report_html), "timeline": str(timeline_path)} def render_dashboard() -> Dict[str, str]: ensure_dir(DASHBOARD_DIR) advisory_records = load_json_dir(ADVISORIES_DIR) runs = load_json_dir(RUNS_DIR) run_summary = read_json(ROOT / "08-threat-intel" / "generated" / "run-summary.json", default={}) or {} source_map = read_yaml(SOURCE_MAP_PATH, default={}) or {} repro_map = read_yaml(REPRO_MAP_PATH, default={}) or {} source_system_map = {item["system_id"]: item for item in source_map.get("systems", []) if item.get("system_id")} merged_advisories = _latest_advisories(advisory_records) advisory_map = {item["canonical_id"]: item for item in merged_advisories if item.get("canonical_id")} profile_map = load_profiles() _sync_run_bundles(runs) systems: Dict[str, Dict[str, Any]] = {} for advisory in merged_advisories: system = systems.setdefault( advisory["system_id"], { "system_id": advisory["system_id"], "display_name": advisory.get("display_name", advisory["system_id"]), "total": 0, "verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0, "browser_required": 0, "browser_present": 0, "latest_update": "", "category": source_system_map.get(advisory["system_id"], {}).get("category", advisory.get("category")), "tier": source_system_map.get(advisory["system_id"], {}).get("tier"), "output_dir": source_system_map.get(advisory["system_id"], {}).get("output_dir"), "families": {}, }, ) system["total"] += 1 status = advisory.get("verification_status", "triage-manual") if status == "verified-real": system["verified_real"] += 1 elif status == "verified-synthetic": system["verified_synthetic"] += 1 elif status.startswith("blocked-"): system["blocked"] += 1 else: system["manual"] += 1 browser = advisory.get("browser_evidence") or {} if browser.get("required"): system["browser_required"] += 1 if browser.get("present"): system["browser_present"] += 1 latest = advisory.get("updated_at") or advisory.get("published_at") or "" if latest > system["latest_update"]: system["latest_update"] = latest family = _family_name(advisory, profile_map) family_entry = system["families"].setdefault(family, {"family": family, "total": 0, "verified_real": 0, "manual": 0}) family_entry["total"] += 1 if status == "verified-real": family_entry["verified_real"] += 1 elif status not in {"verified-synthetic"}: family_entry["manual"] += 1 recent_runs = sorted(runs, key=lambda item: item.get("finished_at") or "", reverse=True)[:100] decorated_runs: List[Dict[str, Any]] = [] for item in recent_runs: cloned = dict(item) advisory = advisory_map.get(item["advisory_id"], {}) profile = profile_map.get(item["repro_profile_id"], {}) browser_evidence = item.get("browser_evidence") or advisory.get("browser_evidence") or { "required": bool(profile.get("browser_assertions", {}).get("required")), "present": False, "refs": [], } request_only_refs = [ref for ref in item.get("request_log_refs", []) if ref not in item.get("baseline_refs", [])] cloned["dashboard_refs"] = { "report_html": f"/runs/{item['run_id']}/report.html", "report_md": f"/runs/{item['run_id']}/report.md", "timeline": f"/runs/{item['run_id']}/timeline.mmd", "bundle": f"/runs/{item['run_id']}/run.json", } cloned["browser_evidence"] = browser_evidence cloned["browser_links"] = [_dashboard_ref(item, ref) for ref in item.get("browser_refs", [])] cloned["container_links"] = [_dashboard_ref(item, ref) for ref in item.get("container_log_refs", [])] cloned["request_links"] = [_dashboard_ref(item, ref) for ref in item.get("request_log_refs", [])] cloned["advisory_meta"] = _advisory_meta(advisory) cloned["profile_meta"] = _profile_meta(profile) cloned["reasoning_lines"] = _reasoning_lines(advisory, profile) cloned["progress"] = _progress_counts(item) cloned["artifact_groups"] = [ _artifact_group( item, "reports", "报告与运行产物", [ cloned["dashboard_refs"]["report_html"], cloned["dashboard_refs"]["report_md"], cloned["dashboard_refs"]["timeline"], cloned["dashboard_refs"]["bundle"], ], use_dashboard_refs=True, ), _artifact_group(item, "compose", "Compose 编排", item.get("compose_refs", [])), _artifact_group(item, "baseline", "基线快照", item.get("baseline_refs", [])), _artifact_group(item, "attack", "攻击输出", _attack_result_refs(item)), _artifact_group(item, "browser", "浏览器证据", item.get("browser_refs", [])), _artifact_group(item, "container", "容器日志", item.get("container_log_refs", [])), _artifact_group(item, "requests", "请求与探测日志", request_only_refs), ] cloned["artifact_groups"] = [group for group in cloned["artifact_groups"] if group["count"]] decorated_runs.append(cloned) summary = { "generated_at": isoformat(now_utc()), "advisory_count": len(merged_advisories), "run_count": len(runs), "statuses": {}, "run_statuses": {}, "recent_failures": [], } for item in merged_advisories: status = item.get("verification_status", "triage-manual") summary["statuses"][status] = summary["statuses"].get(status, 0) + 1 for item in runs: status = item.get("verification_status", "triage-manual") summary["run_statuses"][status] = summary["run_statuses"].get(status, 0) + 1 summary["systems"] = sorted( [ { **entry, "families": sorted(entry["families"].values(), key=lambda value: value["family"]), } for entry in systems.values() ], key=lambda entry: (-entry["total"], entry["system_id"]), ) summary["recent_failures"] = [ { "run_id": item.get("last_run_id"), "advisory_id": item["canonical_id"], "status": item.get("verification_status"), "title": item.get("title"), "blocked_reason": item.get("blocked_reason"), } for item in sorted(merged_advisories, key=lambda value: value.get("updated_at") or value.get("published_at") or "", reverse=True) if item.get("verification_status") in {"triage-manual", "blocked-artifact", "blocked-destructive"} ][:20] completeness = _build_completeness(merged_advisories, runs, profile_map, run_summary) summary["completeness"] = { "advisory_total": completeness["advisory_total"], "verified_real": completeness["verified_real"], "verified_synthetic": completeness["verified_synthetic"], "blocked": completeness["blocked"], "manual": completeness["manual"], "verified_ratio": completeness["verified_ratio"], "complete": completeness["complete"], } write_json(DASHBOARD_DIR / "summary.json", summary) write_json(DASHBOARD_DIR / "runs.json", decorated_runs) write_json(DASHBOARD_DIR / "systems.json", summary["systems"]) write_json(DASHBOARD_DIR / "advisories.json", {key: _advisory_meta(value) for key, value in advisory_map.items()}) write_json(DASHBOARD_DIR / "profiles.json", {key: _profile_meta(value) for key, value in profile_map.items()}) write_json(DASHBOARD_DIR / "data" / "completeness.json", completeness) _write_testing_completeness_report(completeness) architecture = _build_architecture_data(summary, source_map, repro_map) write_json(DASHBOARD_DIR / "architecture.json", architecture) _write_dashboard_docs(architecture) _write_design_source_manifest() _render_root_dashboard_shell() _render_section_dashboard_shells() _render_legacy_dashboard_shell() return { "dashboard_dir": str(DASHBOARD_DIR), "index": str(DASHBOARD_DIR / "index.html"), "overview_index": str(DASHBOARD_DIR / "overview" / "index.html"), "runs_index": str(DASHBOARD_DIR / "runs" / "index.html"), "legacy_index": str(DASHBOARD_DIR / "legacy" / "index.html"), "summary_json": str(DASHBOARD_DIR / "summary.json"), }