文件
websafe-kb/scripts/lab/render.py

1125 行
50 KiB
Python
原始文件 Blame 文件历史

此文件含有模棱两可的 Unicode 字符
此文件含有可能会与其他字符混淆的 Unicode 字符。 如果您是想特意这样的,可以安全地忽略该警告。 使用 Escape 按钮显示他们。
from __future__ import annotations
import html
import json
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List
from lab.config import ADVISORIES_DIR, CASE_RUNS_DIR, DASHBOARD_DIR, REPRO_MAP_PATH, ROOT, RUNS_DIR, SOURCE_MAP_PATH
from lab.repro import load_profiles
from lab.utils import ensure_dir, isoformat, load_json_dir, now_utc, read_yaml, unique, write_json, write_text
TEMPLATES_DIR = ROOT / "scripts" / "lab" / "dashboard_templates"
LOVART_TEMPLATE_DIR = TEMPLATES_DIR / "lovart"
LEGACY_TEMPLATE_DIR = TEMPLATES_DIR / "legacy"
LOVART_VENDOR_MANIFEST = LOVART_TEMPLATE_DIR / "vendor" / "source-manifest.json"
ROOT_JSON_FILES = ["summary.json", "runs.json", "systems.json", "advisories.json", "profiles.json"]
ROOT_JSON_FILES.append("architecture.json")
SECTION_ROUTE_DIRS = ["overview", "runs", "systems", "architecture", "data"]
CATEGORY_LABELS = {
"cms": "CMS / 内容平台",
"ecommerce": "电商系统",
"frameworks": "Web 框架与运行时",
"servers": "服务器与边界层",
"platforms": "开源平台与后台系统",
}
TIER_LABELS = {
"history-full": "历史全量",
"rolling-24m": "近两年全量",
}
STATUS_LABELS = {
"verified-real": "真实版本已实证",
"verified-synthetic": "合成靶场已实证",
"blocked-artifact": "制品阻塞",
"blocked-destructive": "破坏性风险阻塞",
"triage-manual": "人工分诊",
"suspected": "仅疑似命中",
}
def mermaid_from_steps(run: Dict[str, Any]) -> str:
lines = [
"flowchart LR",
'A["选择 Advisory"] --> B["解析 Repro Profile"]',
'B --> C["生成 Compose 环境"]',
'C --> D["采集基线快照"]',
'D --> E["执行受控攻击步骤"]',
'E --> F["浏览器回放验证"]',
'F --> G["收集日志与证据"]',
'G --> H["回写 Registry 与报告"]',
]
if run.get("blocked_reason"):
lines.append(f'H --> I["阻塞: {run["blocked_reason"][:60]}"]')
return "\n".join(lines)
def _relative_ref(run_dir: Path, ref: str) -> str:
try:
return str(Path(ref).resolve().relative_to(run_dir.resolve()))
except ValueError:
return ref
def _dashboard_ref(run: Dict[str, Any], ref: str) -> str:
try:
bundle_dir = Path(run["report_refs"]["bundle_dir"]).resolve()
relative = Path(ref).resolve().relative_to(bundle_dir)
return f"/runs/{run['run_id']}/{relative.as_posix()}"
except Exception:
return ref
def _artifact_kind(href: str) -> str:
suffix = Path(href).suffix.lower()
if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}:
return "image"
if suffix in {".json", ".log", ".txt", ".yaml", ".yml", ".md", ".mmd", ".html"}:
return "text"
return "link"
def _artifact_item(run: Dict[str, Any], href: str, label: str | None = None) -> Dict[str, Any]:
return {"href": href, "label": label or Path(href).name, "kind": _artifact_kind(href)}
def _artifact_group(run: Dict[str, Any], key: str, label: str, refs: List[str], use_dashboard_refs: bool = False) -> Dict[str, Any]:
items: List[Dict[str, Any]] = []
for ref in refs:
href = ref if use_dashboard_refs else _dashboard_ref(run, ref)
items.append(_artifact_item(run, href))
return {"key": key, "label": label, "count": len(items), "items": items}
def _attack_result_refs(run: Dict[str, Any]) -> List[str]:
refs: List[str] = []
for step in run.get("attack_steps", []):
result_path = step.get("result_path")
if result_path:
refs.append(str(result_path))
return unique(refs)
def _progress_counts(run: Dict[str, Any]) -> Dict[str, int]:
counts = {"completed": 0, "skipped": 0, "failed": 0, "blocked": 0, "planned": 0, "other": 0}
for item in run.get("timeline", []):
status = item.get("status", "other")
if status.startswith("blocked"):
counts["blocked"] += 1
elif status in counts:
counts[status] += 1
else:
counts["other"] += 1
return counts
def _advisory_meta(advisory: Dict[str, Any]) -> Dict[str, Any]:
if not advisory:
return {}
return {
"canonical_id": advisory.get("canonical_id"),
"title": advisory.get("title"),
"summary": advisory.get("summary"),
"display_name": advisory.get("display_name"),
"system_id": advisory.get("system_id"),
"category": advisory.get("category"),
"severity": advisory.get("severity"),
"cvss_score": advisory.get("cvss_score"),
"exploit_status": advisory.get("exploit_status"),
"published_at": advisory.get("published_at"),
"updated_at": advisory.get("updated_at"),
"official_source_url": advisory.get("official_source_url"),
"secondary_source_urls": advisory.get("secondary_source_urls", []),
"aliases": advisory.get("aliases", []),
"secure_code_topics": advisory.get("secure_code_topics", []),
"verification_status": advisory.get("verification_status"),
"verification_mode": advisory.get("verification_mode"),
"artifact_mode": advisory.get("artifact_mode"),
"blocked_reason": advisory.get("blocked_reason"),
"browser_evidence": advisory.get("browser_evidence", {}),
}
def _profile_meta(profile: Dict[str, Any]) -> Dict[str, Any]:
if not profile:
return {}
return {
"profile_id": profile.get("profile_id"),
"vuln_family": profile.get("vuln_family"),
"provisioning_mode": profile.get("provisioning_mode"),
"destructive_risk": profile.get("destructive_risk"),
"cleanup_policy": profile.get("cleanup_policy"),
"artifact_source": profile.get("artifact_source", {}),
"success_criteria": profile.get("success_criteria", []),
"seed_actions": profile.get("seed_actions", []),
"attack_actions": profile.get("attack_actions", []),
"browser_assertions": profile.get("browser_assertions", {}),
"allowed_target_types": profile.get("allowed_target_types", []),
"required_services": profile.get("required_services", []),
}
def _reasoning_lines(advisory: Dict[str, Any], profile: Dict[str, Any]) -> List[str]:
notes: List[str] = []
if advisory.get("summary"):
notes.append(advisory["summary"])
for key in ("seed_actions", "attack_actions"):
for item in profile.get(key, []):
message = item.get("message")
if message:
notes.append(message)
for item in profile.get("success_criteria", []):
if item:
notes.append(item)
if advisory.get("blocked_reason"):
notes.append(f"Current blocker: {advisory['blocked_reason']}")
return unique(notes)
def _display_value(value: Any) -> str:
if value is None or value == "":
return "-"
if isinstance(value, bool):
return "" if value else ""
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, list):
return "\n".join(_display_value(item) for item in value if item not in (None, "")) or "-"
if isinstance(value, dict):
return json.dumps(value, ensure_ascii=False, indent=2)
return str(value)
def _field(label: str, value: Any) -> Dict[str, str]:
return {"label": label, "value": _display_value(value)}
def _stat(label: str, value: Any) -> Dict[str, str]:
return {"label": label, "value": _display_value(value)}
def _link(label: str, href: str, description: str) -> Dict[str, str]:
return {"label": label, "href": href, "description": description}
def _status_label(value: str | None) -> str:
return STATUS_LABELS.get(value or "", value or "-")
def _build_architecture_data(summary: Dict[str, Any], source_map: Dict[str, Any], repro_map: Dict[str, Any]) -> Dict[str, Any]:
source_systems = source_map.get("systems", []) or []
repro_by_system = {item.get("system_id"): item for item in (repro_map.get("systems", []) or []) if item.get("system_id")}
route_links = [
_link("总览首页", "/overview/index.html", "工作台总览、最新运行和全局摘要。"),
_link("运行中心", "/runs/index.html", "运行队列、详情、证据和日志入口。"),
_link("系统分组", "/systems/index.html", "按系统和分类浏览覆盖情况。"),
_link("架构库", "/architecture/index.html", "查看控制面、数据层和授权边界。"),
_link("文档中心", "/docs/index.html", "集中查看项目文档、本地镜像和说明。"),
_link("数据中心", "/data/index.html", "查看 summary、runs、systems 等 JSON 入口。"),
_link("旧版工作台", "/legacy/index.html", "保留的 legacy 回退入口。"),
_link("项目功能文档", "/docs/project-features.html", "项目能力、目录结构与自动化链路总览。"),
_link("前端设计文档", "/docs/frontend-dashboard-design.html", "当前本地工作台的交互与视觉规范。"),
_link("安全编码索引", "/docs/secure-code-index.html", "secure-code 修复库本地镜像。"),
_link("仓库入口镜像", "/docs/root-readme.html", "仓库根 README 的本地镜像。"),
_link("授权模型", "/docs/authorization-model.html", "允许目标范围、全局原则与记录要求。"),
_link("source-map 真值", "/docs/source-map.html", "系统覆盖、来源和输出目录真值。"),
_link("repro-map 真值", "/docs/repro-map.html", "复现族路由、浏览器要求和日志策略。"),
_link("覆盖矩阵", "/docs/coverage-matrix.html", "自动生成覆盖摘要的本地镜像。"),
_link("设计来源清单", "/docs/design-source.html", "Lovart 模板本地 vendor manifest。"),
_link("架构库镜像", "/docs/architecture-library.html", "当前架构库的结构化镜像页。"),
]
data_links = [
_link("summary.json", "/summary.json", "全局摘要、状态分布和最近失败。"),
_link("runs.json", "/runs.json", "最近 run 的结构化详情。"),
_link("systems.json", "/systems.json", "系统级覆盖与浏览器证据摘要。"),
_link("advisories.json", "/advisories.json", "漏洞条目元数据与来源。"),
_link("profiles.json", "/profiles.json", "复现档案元数据。"),
_link("architecture.json", "/architecture.json", "当前架构库结构化 JSON。"),
]
category_items: List[Dict[str, Any]] = []
family_counts: Dict[str, int] = {}
tier_counts = {"history-full": 0, "rolling-24m": 0}
for system in source_systems:
tier = system.get("tier", "rolling-24m")
tier_counts[tier] = tier_counts.get(tier, 0) + 1
repro = repro_by_system.get(system.get("system_id"), {})
family = repro.get("default_repro_family") or "未定义"
family_counts[family] = family_counts.get(family, 0) + 1
for category_id in sorted(CATEGORY_LABELS, key=lambda item: CATEGORY_LABELS[item]):
systems_in_category = [item for item in source_systems if item.get("category") == category_id]
if not systems_in_category:
continue
history_full = sum(1 for item in systems_in_category if item.get("tier") == "history-full")
rolling = sum(1 for item in systems_in_category if item.get("tier") == "rolling-24m")
system_nodes: List[Dict[str, Any]] = []
for system in sorted(systems_in_category, key=lambda item: item.get("display_name", item.get("system_id", ""))):
repro = repro_by_system.get(system.get("system_id"), {})
official_sources = system.get("official_sources", []) or []
ecosystem_sources = system.get("ecosystem_sources", []) or []
research_sources = system.get("research_sources", []) or []
system_nodes.append(
{
"title": f"{system.get('display_name', system.get('system_id'))} ({system.get('system_id')})",
"summary": f"{TIER_LABELS.get(system.get('tier'), system.get('tier'))} · {', '.join(system.get('advisory_modes', [])) or '未定义模式'}",
"open": False,
"badges": [
TIER_LABELS.get(system.get("tier"), system.get("tier", "-")),
f"官方源 {len(official_sources)}",
f"生态源 {len(ecosystem_sources)}",
f"研究源 {len(research_sources)}",
],
"fields": [
_field("输出目录", system.get("output_dir")),
_field("Advisory 模式", system.get("advisory_modes", [])),
_field("Secure-Code 主题", system.get("secure_code_topics", [])),
_field("CPE 关键字", system.get("cpe_keys", [])),
_field("GHSA 关键字", system.get("ghsa_keywords", [])),
],
"items": [
{
"title": "来源配置",
"summary": "官方、生态权威与研究补充来源。",
"open": False,
"fields": [
_field("官方来源", [entry.get("name") for entry in official_sources]),
_field("生态来源", [entry.get("name") for entry in ecosystem_sources]),
_field("研究来源", [entry.get("name") for entry in research_sources]),
],
},
{
"title": "复现默认值",
"summary": "repro-map 中的默认攻击族、浏览器要求和日志策略。",
"open": False,
"fields": [
_field("默认漏洞家族", repro.get("default_repro_family")),
_field("浏览器默认要求", repro.get("browser_required_default")),
_field("优先制品模式", repro.get("provisioning_mode_preference", [])),
_field("种子策略", repro.get("seed_strategy")),
_field("日志采集器", repro.get("log_collectors", [])),
_field("报告模板", repro.get("report_template")),
],
},
],
}
)
category_items.append(
{
"title": CATEGORY_LABELS.get(category_id, category_id),
"summary": f"{len(systems_in_category)} 个系统 · 历史全量 {history_full} · 近两年全量 {rolling}",
"open": False,
"stats": [
_stat("系统数", len(systems_in_category)),
_stat("历史全量", history_full),
_stat("近两年全量", rolling),
],
"items": system_nodes,
}
)
repro_family_nodes = [
{
"title": family,
"summary": f"默认路由到该 family 的系统数:{count}",
"open": False,
"fields": [_field("系统数量", count)],
}
for family, count in sorted(family_counts.items(), key=lambda item: (-item[1], item[0]))
]
recent_failure_nodes = [
{
"title": item.get("title") or item.get("advisory_id") or item.get("run_id"),
"summary": item.get("blocked_reason") or "无额外阻塞说明。",
"open": False,
"badges": [_status_label(item.get("status"))],
"fields": [
_field("运行 ID", item.get("run_id")),
_field("漏洞条目", item.get("advisory_id")),
_field("状态", _status_label(item.get("status"))),
_field("阻塞原因", item.get("blocked_reason")),
],
}
for item in summary.get("recent_failures", [])
]
status_nodes = [
{
"title": _status_label(status),
"summary": f"当前累计 {count} 条。",
"open": False,
"fields": [
_field("状态编码", status),
_field("数量", count),
],
}
for status, count in sorted(summary.get("statuses", {}).items(), key=lambda item: (-item[1], item[0]))
]
return {
"generated_at": summary.get("generated_at"),
"title": "当前架构库",
"summary": "工作台、控制面、数据层、授权边界与系统覆盖的当前真值视图。",
"sections": [
{
"title": "仓库定位与当前状态",
"summary": "授权攻防实验与研究知识库;仅适用于自有资产、本地靶场和明确授权目标。",
"open": True,
"badges": ["LAB ONLY", "AUTHORIZED TARGETS ONLY", "非生产安全基线"],
"stats": [
_stat("纳管系统", len(source_systems)),
_stat("历史全量系统", tier_counts.get("history-full", 0)),
_stat("近两年全量系统", tier_counts.get("rolling-24m", 0)),
_stat("当前运行", summary.get("run_count", 0)),
_stat("当前漏洞条目", summary.get("advisory_count", 0)),
],
"fields": [
_field("仓库根目录", str(ROOT)),
_field("默认本地地址", "http://127.0.0.1:8734/"),
_field("自动刷新周期", "5 秒"),
_field("生成时间", summary.get("generated_at")),
],
"links": route_links[:4],
},
{
"title": "授权边界与目标模型",
"summary": "所有实验都绑定到本地、自建公网或明确授权目标,不面向无关第三方资产。",
"open": True,
"stats": [
_stat("允许目标类型", 3),
_stat("禁止类型", 1),
],
"fields": [
_field("允许目标", ["lab-local", "lab-public", "authorized-third-party"]),
_field("禁止目标", ["out-of-scope", "无归属证明目标", "公共知名站点", "泛互联网枚举"]),
_field("全局原则", [
"任何公网验证前先确认资产归属或授权关系。",
"优先只读探测、最小化回显验证和低频实验。",
"涉及账户、令牌、敏感数据和业务写入时采用最小必要动作。",
"不做泛互联网枚举,不对无关公共站点复用同类测试。",
]),
],
"links": [
_link("授权模型镜像", "./docs/authorization-model.html", "目标分类、原则与记录要求。"),
_link("仓库入口镜像", "./docs/root-readme.html", "仓库定位、能力矩阵与自动化入口。"),
],
},
{
"title": "控制面与自动化入口",
"summary": "Intel 控制面负责情报入库;Lab 控制面负责本地部署、攻击验证、证据收集和看板生成。",
"open": True,
"items": [
{
"title": "情报控制面Intel",
"summary": "负责 source adapter、规范化、渲染、校验和 PR 流程。",
"open": False,
"fields": [
_field("CLI 入口", "python3 /Users/x/websafe/scripts/intel/main.py"),
_field("主要命令", [
"render",
"validate",
"hotlane",
"ingest --since last-success",
"reconcile",
"backfill --tier history-full --dry-run",
"open-pr --dry-run",
]),
_field("定时入口", [
"scripts/intel/run-hourly.sh",
"scripts/intel/run-nightly.sh",
"scripts/intel/run-weekly-reconcile.sh",
]),
],
},
{
"title": "实证控制面Lab",
"summary": "负责 catalog、compose、seed、baseline、attack、browser、evidence、render 和 queue。",
"open": False,
"fields": [
_field("CLI 入口", "python3 /Users/x/websafe/scripts/lab/main.py"),
_field("主要命令", [
"catalog sync",
"validate",
"run-case",
"run-system",
"run-batch",
"render-run",
"serve-dashboard --port 8734",
"cleanup",
"retry-failures",
]),
_field("关键模块", [
"catalog/",
"provision/",
"compose/",
"seed/",
"baseline/",
"attack/",
"browser/",
"evidence/",
"render/",
"queue/",
]),
],
},
],
},
{
"title": "数据层与本地地址",
"summary": "Registry、生成层、run bundle 与 docs 镜像共同构成工作台的本地数据面。",
"open": True,
"items": [
{
"title": "真值层",
"summary": "统一的 registry 与 repro/source 配置。",
"open": False,
"fields": [
_field("漏洞条目 Registry", "08-threat-intel/registry/advisories/*.json"),
_field("系统 Registry", "08-threat-intel/registry/systems/*.json"),
_field("运行 Registry", "08-threat-intel/registry/runs/*.json"),
_field("source-map 真值", "08-threat-intel/source-map.yaml"),
_field("repro-map 真值", "08-threat-intel/repro-map.yaml"),
],
},
{
"title": "生成层与展示层",
"summary": "dashboard JSON、run report、docs 镜像与本地静态 UI。",
"open": False,
"links": route_links + data_links,
"fields": [
_field("工作台根目录", "08-threat-intel/generated/dashboard/"),
_field("运行归档根目录", "06-case-studies/generated-runs/<run-id>/"),
_field("默认入口", "/index.html"),
_field("总览入口", "/overview/index.html"),
_field("运行入口", "/runs/index.html"),
_field("系统入口", "/systems/index.html"),
_field("架构入口", "/architecture/index.html"),
_field("文档入口", "/docs/index.html"),
_field("数据入口", "/data/index.html"),
_field("旧版入口", "/legacy/index.html"),
],
},
],
},
{
"title": "系统覆盖分组",
"summary": "基于 source-map 和 repro-map 生成的当前分组视图,可展开查看每个系统的来源、输出目录和复现默认值。",
"open": True,
"items": category_items,
},
{
"title": "Repro 路由概览",
"summary": "按默认漏洞家族聚合当前系统路由,帮助查看 family runner 覆盖面。",
"open": True,
"items": repro_family_nodes,
},
{
"title": "当前生成态与阻塞概览",
"summary": "当前 render 后的状态分布、失败摘要与最近可见阻塞。",
"open": True,
"stats": [
_stat("Run 数", summary.get("run_count", 0)),
_stat("Advisory 数", summary.get("advisory_count", 0)),
_stat("状态类型", len(summary.get("statuses", {}))),
_stat("最近失败", len(summary.get("recent_failures", []))),
],
"items": [
{
"title": "状态分布",
"summary": "verification_status 当前计数。",
"open": False,
"items": status_nodes,
},
{
"title": "最近失败",
"summary": "当前 dashboard 摘要里可见的失败或人工分诊样本。",
"open": False,
"items": recent_failure_nodes or [
{
"title": "暂无失败样本",
"summary": "当前 summary.json 中没有 recent_failures。",
"open": False,
}
],
},
],
},
],
}
def _dashboard_doc_page(title: str, body: str, description: str) -> str:
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{html.escape(title)}</title>
<style>
:root {{
--bg: #08111f;
--panel: rgba(9, 18, 32, 0.9);
--border: rgba(137, 171, 214, 0.2);
--text: #f7fafc;
--muted: #9fb3ca;
--accent: #5eead4;
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
min-height: 100vh;
font-family: "IBM Plex Sans", "Segoe UI", sans-serif;
color: var(--text);
background:
radial-gradient(circle at top left, rgba(94, 234, 212, 0.12), transparent 26%),
linear-gradient(160deg, #050c16 0%, #091526 50%, #10233d 100%);
}}
main {{
max-width: 1080px;
margin: 0 auto;
padding: 32px 20px 40px;
}}
.panel {{
background: var(--panel);
border: 1px solid var(--border);
border-radius: 20px;
padding: 24px;
box-shadow: 0 24px 80px rgba(1, 7, 20, 0.45);
}}
.actions {{
display: flex;
flex-wrap: wrap;
gap: 12px;
margin-bottom: 18px;
}}
.chip {{
display: inline-flex;
align-items: center;
gap: 8px;
border-radius: 999px;
border: 1px solid var(--border);
padding: 10px 14px;
color: var(--text);
background: rgba(255,255,255,0.05);
text-decoration: none;
}}
.chip:hover {{ border-color: rgba(94, 234, 212, 0.42); }}
h1 {{
margin: 0 0 12px;
font-family: "IBM Plex Serif", Georgia, serif;
font-size: clamp(1.8rem, 4vw, 3rem);
line-height: 1.08;
}}
.meta {{
color: var(--muted);
margin-bottom: 18px;
}}
pre {{
margin: 0;
padding: 20px;
overflow: auto;
border-radius: 16px;
border: 1px solid rgba(137, 171, 214, 0.12);
background: rgba(2, 8, 22, 0.84);
color: #d6e5f5;
font-family: "IBM Plex Mono", "SFMono-Regular", monospace;
font-size: 0.92rem;
line-height: 1.6;
white-space: pre-wrap;
}}
</style>
</head>
<body>
<main>
<div class="panel">
<div class="actions">
<a class="chip" href="/overview/index.html">返回工作台</a>
</div>
<h1>{html.escape(title)}</h1>
<div class="meta">{html.escape(description)}</div>
<pre>{html.escape(body)}</pre>
</div>
</main>
</body>
</html>
"""
def _remove_path(path: Path) -> None:
if not path.exists() and not path.is_symlink():
return
if path.is_symlink() or path.is_file():
path.unlink()
return
shutil.rmtree(path)
def _sync_symlink(target: Path, link_path: Path) -> None:
ensure_dir(link_path.parent)
relative_target = os.path.relpath(target, link_path.parent)
if link_path.is_symlink() and os.readlink(link_path) == relative_target:
return
_remove_path(link_path)
os.symlink(relative_target, link_path, target_is_directory=target.is_dir())
def _copy_tree(source: Path, destination: Path) -> None:
ensure_dir(destination)
for path in source.rglob("*"):
relative = path.relative_to(source)
target = destination / relative
if path.is_dir():
ensure_dir(target)
continue
ensure_dir(target.parent)
shutil.copy2(path, target)
def _write_dashboard_docs(architecture: Dict[str, Any]) -> None:
docs_dir = DASHBOARD_DIR / "docs"
ensure_dir(docs_dir)
sources = [
(
"project-features.html",
"项目功能与特性总览",
(ROOT / "docs" / "project-features.md").read_text(encoding="utf-8"),
"工作台内置镜像页:仓库功能、目录和自动化链路说明。",
),
(
"frontend-dashboard-design.html",
"本地前端工作台设计文档",
(ROOT / "docs" / "frontend-dashboard-design.md").read_text(encoding="utf-8"),
"工作台内置镜像页:前端交互、展示结构和视觉规范。",
),
(
"secure-code-index.html",
"安全编码修复库索引",
(ROOT / "05-defense" / "secure-code" / "INDEX.md").read_text(encoding="utf-8"),
"工作台内置镜像页secure-code 修复主题索引。",
),
(
"root-readme.html",
"仓库入口镜像",
(ROOT / "README.md").read_text(encoding="utf-8"),
"工作台内置镜像页:仓库定位、能力矩阵、入口和自动化入口。",
),
(
"authorization-model.html",
"授权模型镜像",
(ROOT / "09-scope-and-targeting" / "authorization-model.md").read_text(encoding="utf-8"),
"工作台内置镜像页:目标范围、授权模型、最小化验证建议和记录要求。",
),
(
"source-map.html",
"source-map 真值镜像",
SOURCE_MAP_PATH.read_text(encoding="utf-8"),
"工作台内置镜像页:系统覆盖、来源、输出目录和 secure-code 主题真值。",
),
(
"repro-map.html",
"repro-map 真值镜像",
REPRO_MAP_PATH.read_text(encoding="utf-8"),
"工作台内置镜像页:默认漏洞家族、浏览器要求和日志策略真值。",
),
(
"coverage-matrix.html",
"覆盖矩阵镜像",
(ROOT / "08-threat-intel" / "generated" / "coverage-matrix.md").read_text(encoding="utf-8"),
"工作台内置镜像页:当前覆盖矩阵生成结果。",
),
]
manifest_body = LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8") if LOVART_VENDOR_MANIFEST.exists() else "{}"
sources.append(
(
"design-source.html",
"Lovart 设计来源与本地化清单",
manifest_body,
"工作台内置镜像页Lovart 来源文件、本地 vendor 路径和本地化说明。",
)
)
sources.append(
(
"architecture-library.html",
"当前架构库镜像",
json.dumps(architecture, indent=2, ensure_ascii=False),
"工作台内置镜像页:当前架构库结构化数据镜像。",
)
)
for filename, title, body, description in sources:
write_text(docs_dir / filename, _dashboard_doc_page(title, body, description))
def _write_design_source_manifest() -> None:
assets_dir = DASHBOARD_DIR / "assets"
ensure_dir(assets_dir)
manifest = json.loads(LOVART_VENDOR_MANIFEST.read_text(encoding="utf-8")) if LOVART_VENDOR_MANIFEST.exists() else {}
write_json(assets_dir / "design-source.json", manifest)
def _render_root_dashboard_shell() -> None:
assets_dir = DASHBOARD_DIR / "assets"
ensure_dir(assets_dir)
for filename in ("index.html",):
shutil.copy2(LOVART_TEMPLATE_DIR / filename, DASHBOARD_DIR / filename)
_copy_tree(LOVART_TEMPLATE_DIR / "assets", assets_dir)
def _render_section_dashboard_shells() -> None:
source_index = LOVART_TEMPLATE_DIR / "index.html"
for section in SECTION_ROUTE_DIRS:
section_dir = DASHBOARD_DIR / section
if section == "runs":
# Preserve existing /runs/<run-id>/ bundles; only refresh the section shell.
ensure_dir(section_dir)
index_path = section_dir / "index.html"
if index_path.exists():
index_path.unlink()
shutil.copy2(source_index, index_path)
continue
_remove_path(section_dir)
ensure_dir(section_dir)
shutil.copy2(source_index, section_dir / "index.html")
docs_dir = DASHBOARD_DIR / "docs"
ensure_dir(docs_dir)
shutil.copy2(source_index, docs_dir / "index.html")
def _render_legacy_dashboard_shell() -> None:
legacy_dir = DASHBOARD_DIR / "legacy"
_remove_path(legacy_dir)
ensure_dir(legacy_dir)
shutil.copy2(LEGACY_TEMPLATE_DIR / "index.html", legacy_dir / "index.html")
_copy_tree(LEGACY_TEMPLATE_DIR / "assets", legacy_dir / "assets")
for json_name in ROOT_JSON_FILES:
_sync_symlink(DASHBOARD_DIR / json_name, legacy_dir / json_name)
_sync_symlink(DASHBOARD_DIR / "runs", legacy_dir / "runs")
_sync_symlink(DASHBOARD_DIR / "docs", legacy_dir / "docs")
def _sync_run_bundles(runs: List[Dict[str, Any]]) -> None:
runs_dir = DASHBOARD_DIR / "runs"
ensure_dir(runs_dir)
for item in runs:
bundle_dir = Path(item.get("report_refs", {}).get("bundle_dir", ""))
if not bundle_dir.exists():
continue
_sync_symlink(bundle_dir, runs_dir / item["run_id"])
def render_run(run: Dict[str, Any]) -> Dict[str, str]:
run_dir = CASE_RUNS_DIR / run["run_id"]
ensure_dir(run_dir / "assets")
timeline_path = run_dir / "timeline.mmd"
write_text(timeline_path, mermaid_from_steps(run))
screenshot_refs = [ref for ref in run.get("browser_refs", []) if ref.endswith((".png", ".jpg", ".jpeg"))]
relative_screenshots = [_relative_ref(run_dir, ref) for ref in screenshot_refs]
md_lines = [
f"# 运行 {run['run_id']}",
"",
"> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成 run bundle",
"",
f"- 漏洞条目: `{run['advisory_id']}`",
f"- 系统: `{run['system_id']}`",
f"- Repro Profile: `{run['repro_profile_id']}`",
f"- 实证状态: `{run['verification_status']}`",
f"- 实证方式: `{run['verification_mode']}`",
f"- Artifact 模式: `{run['artifact_mode']}`",
f"- 启动时间: `{run['started_at']}`",
f"- 完成时间: `{run['finished_at']}`",
f"- 阻塞原因: `{run.get('blocked_reason') or '-'}`",
f"- Compose 服务: `{', '.join(run.get('compose_services', [])) or '-'}`",
"",
"## 运行时间线",
"",
f"- Mermaid: [{timeline_path.name}]({timeline_path})",
"",
"| 时间 | 步骤 | 状态 | 说明 |",
"|------|------|------|------|",
]
if run.get("timeline"):
for item in run["timeline"]:
md_lines.append(
f"| `{item.get('at', '')}` | `{item.get('step', '')}` | `{item.get('status', '')}` | {item.get('detail', '') or '-'} |"
)
else:
md_lines.append("| `-` | `-` | `-` | 无时间线 |")
md_lines.extend(
[
"",
"## Compose 拓扑",
"",
f"- Compose 文件: `{', '.join(run.get('compose_refs', [])) or '-'}`",
f"- 服务列表: `{', '.join(run.get('compose_services', [])) or '-'}`",
"",
"## 攻击步骤",
"",
"| 工具/步骤 | 状态 | 结果 |",
"|-----------|------|------|",
]
)
if run.get("attack_steps"):
for step in run["attack_steps"]:
outcome = step.get("result_path") or step.get("detail") or "-"
md_lines.append(f"| `{step.get('tool') or step.get('kind')}` | `{step.get('status', '-')}` | `{outcome}` |")
else:
md_lines.append("| `-` | `skipped` | `no attack steps` |")
md_lines.extend(
[
"",
"## 证据摘要",
"",
f"- Baseline: `{len(run.get('baseline_refs', []))}`",
f"- 攻击步骤: `{len(run.get('attack_steps', []))}`",
f"- 浏览器证据: `{len(run.get('browser_refs', []))}`",
f"- 容器日志: `{len(run.get('container_log_refs', []))}`",
f"- 请求日志: `{len(run.get('request_log_refs', []))}`",
"",
]
)
if relative_screenshots:
md_lines.extend(["## 浏览器截图", ""])
for ref in relative_screenshots:
md_lines.append(f"![{Path(ref).stem}]({ref})")
md_lines.append("")
if run.get("browser_refs"):
md_lines.extend(["## 浏览器证据", ""])
for ref in run["browser_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
if run.get("container_log_refs"):
md_lines.extend(["## 容器日志", ""])
for ref in run["container_log_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
if run.get("request_log_refs"):
md_lines.extend(["## 请求与基线日志", ""])
for ref in run["request_log_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
md_lines.extend(
[
"## 最小化验证说明",
"",
"- 仅限自有资产、本地靶场或已授权实验目标。",
"- 默认执行 minimal-proof;不会把破坏性或不可回滚动作作为默认路径。",
"- 若浏览器证据缺失,前端类案例不会被标为 `verified-*`。",
"",
]
)
report_md = run_dir / "report.md"
write_text(report_md, "\n".join(md_lines))
html_body = [
"<!doctype html>",
"<html><head><meta charset='utf-8'><title>websafe 运行报告</title>",
"<style>body{font-family:ui-sans-serif,system-ui,sans-serif;margin:2rem;line-height:1.55;background:#f8fafc;color:#0f172a;} code,pre{background:#e2e8f0;padding:.2rem .4rem;border-radius:.3rem;} pre{white-space:pre-wrap;} .grid{display:grid;grid-template-columns:repeat(2,minmax(0,1fr));gap:1rem;} .card{border:1px solid #cbd5e1;padding:1rem;border-radius:.75rem;background:#fff;} table{width:100%;border-collapse:collapse;background:#fff;border:1px solid #cbd5e1;border-radius:.75rem;overflow:hidden;} th,td{padding:.75rem;border-bottom:1px solid #e2e8f0;text-align:left;vertical-align:top;} img{max-width:100%;border:1px solid #cbd5e1;border-radius:.5rem;} .gallery{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:1rem;}</style>",
"</head><body>",
f"<h1>运行 {html.escape(run['run_id'])}</h1>",
"<div class='grid'>",
f"<div class='card'><strong>漏洞条目</strong><br><code>{html.escape(run['advisory_id'])}</code></div>",
f"<div class='card'><strong>实证状态</strong><br><code>{html.escape(run['verification_status'])}</code></div>",
f"<div class='card'><strong>复现 Profile</strong><br><code>{html.escape(run['repro_profile_id'])}</code></div>",
f"<div class='card'><strong>Artifact 模式</strong><br><code>{html.escape(run['artifact_mode'])}</code></div>",
"</div>",
"<h2>Mermaid 时间线</h2>",
f"<pre>{html.escape(mermaid_from_steps(run))}</pre>",
"<h2>运行时间线</h2>",
"<table><thead><tr><th>时间</th><th>步骤</th><th>状态</th><th>说明</th></tr></thead><tbody>",
]
if run.get("timeline"):
for item in run["timeline"]:
html_body.append(
"<tr>"
f"<td><code>{html.escape(item.get('at', ''))}</code></td>"
f"<td><code>{html.escape(item.get('step', ''))}</code></td>"
f"<td><code>{html.escape(item.get('status', ''))}</code></td>"
f"<td>{html.escape(item.get('detail', '') or '-')}</td>"
"</tr>"
)
html_body.extend(["</tbody></table>", "<h2>攻击步骤</h2>", "<table><thead><tr><th>工具</th><th>状态</th><th>输出</th></tr></thead><tbody>"])
if run.get("attack_steps"):
for step in run["attack_steps"]:
html_body.append(
"<tr>"
f"<td><code>{html.escape(step.get('tool') or step.get('kind') or '-')}</code></td>"
f"<td><code>{html.escape(step.get('status', '-'))}</code></td>"
f"<td><code>{html.escape(step.get('result_path') or '-')}</code></td>"
"</tr>"
)
else:
html_body.append("<tr><td><code>-</code></td><td><code>skipped</code></td><td><code>当前没有攻击步骤</code></td></tr>")
html_body.extend(["</tbody></table>"])
if relative_screenshots:
html_body.extend(["<h2>浏览器截图</h2>", "<div class='gallery'>"])
for ref in relative_screenshots:
html_body.append(
f"<figure><img src='{html.escape(ref)}' alt='{html.escape(Path(ref).stem)}'><figcaption><code>{html.escape(ref)}</code></figcaption></figure>"
)
html_body.append("</div>")
html_body.extend(["<h2>证据清单</h2><ul>"])
for ref in run.get("compose_refs", []) + run.get("browser_refs", []) + run.get("container_log_refs", []) + run.get("request_log_refs", []):
html_body.append(f"<li><code>{html.escape(_relative_ref(run_dir, ref))}</code></li>")
html_body.extend(["</ul>", "</body></html>"])
report_html = run_dir / "report.html"
write_text(report_html, "\n".join(html_body))
return {"bundle_dir": str(run_dir), "report_md": str(report_md), "report_html": str(report_html), "timeline": str(timeline_path)}
def render_dashboard() -> Dict[str, str]:
ensure_dir(DASHBOARD_DIR)
advisory_records = load_json_dir(ADVISORIES_DIR)
runs = load_json_dir(RUNS_DIR)
source_map = read_yaml(SOURCE_MAP_PATH, default={}) or {}
repro_map = read_yaml(REPRO_MAP_PATH, default={}) or {}
source_system_map = {item["system_id"]: item for item in source_map.get("systems", []) if item.get("system_id")}
advisory_map = {item["canonical_id"]: item for item in advisory_records if item.get("canonical_id")}
profile_map = load_profiles()
_sync_run_bundles(runs)
systems: Dict[str, Dict[str, Any]] = {}
for advisory in advisory_records:
system = systems.setdefault(
advisory["system_id"],
{
"system_id": advisory["system_id"],
"display_name": advisory.get("display_name", advisory["system_id"]),
"total": 0,
"verified_real": 0,
"verified_synthetic": 0,
"blocked": 0,
"manual": 0,
"browser_required": 0,
"browser_present": 0,
"latest_update": "",
"category": source_system_map.get(advisory["system_id"], {}).get("category", advisory.get("category")),
"tier": source_system_map.get(advisory["system_id"], {}).get("tier"),
"output_dir": source_system_map.get(advisory["system_id"], {}).get("output_dir"),
},
)
system["total"] += 1
status = advisory.get("verification_status", "triage-manual")
if status == "verified-real":
system["verified_real"] += 1
elif status == "verified-synthetic":
system["verified_synthetic"] += 1
elif status.startswith("blocked-"):
system["blocked"] += 1
else:
system["manual"] += 1
browser = advisory.get("browser_evidence", {})
if browser.get("required"):
system["browser_required"] += 1
if browser.get("present"):
system["browser_present"] += 1
latest = advisory.get("updated_at") or advisory.get("published_at") or ""
if latest > system["latest_update"]:
system["latest_update"] = latest
recent_runs = sorted(runs, key=lambda item: item.get("finished_at") or "", reverse=True)[:100]
decorated_runs: List[Dict[str, Any]] = []
for item in recent_runs:
cloned = dict(item)
advisory = advisory_map.get(item["advisory_id"], {})
profile = profile_map.get(item["repro_profile_id"], {})
browser_evidence = item.get("browser_evidence") or advisory.get("browser_evidence") or {
"required": bool(profile.get("browser_assertions", {}).get("required")),
"present": False,
"refs": [],
}
request_only_refs = [ref for ref in item.get("request_log_refs", []) if ref not in item.get("baseline_refs", [])]
cloned["dashboard_refs"] = {
"report_html": f"/runs/{item['run_id']}/report.html",
"report_md": f"/runs/{item['run_id']}/report.md",
"timeline": f"/runs/{item['run_id']}/timeline.mmd",
"bundle": f"/runs/{item['run_id']}/run.json",
}
cloned["browser_evidence"] = browser_evidence
cloned["browser_links"] = [_dashboard_ref(item, ref) for ref in item.get("browser_refs", [])]
cloned["container_links"] = [_dashboard_ref(item, ref) for ref in item.get("container_log_refs", [])]
cloned["request_links"] = [_dashboard_ref(item, ref) for ref in item.get("request_log_refs", [])]
cloned["advisory_meta"] = _advisory_meta(advisory)
cloned["profile_meta"] = _profile_meta(profile)
cloned["reasoning_lines"] = _reasoning_lines(advisory, profile)
cloned["progress"] = _progress_counts(item)
cloned["artifact_groups"] = [
_artifact_group(
item,
"reports",
"报告与运行产物",
[
cloned["dashboard_refs"]["report_html"],
cloned["dashboard_refs"]["report_md"],
cloned["dashboard_refs"]["timeline"],
cloned["dashboard_refs"]["bundle"],
],
use_dashboard_refs=True,
),
_artifact_group(item, "compose", "Compose 编排", item.get("compose_refs", [])),
_artifact_group(item, "baseline", "基线快照", item.get("baseline_refs", [])),
_artifact_group(item, "attack", "攻击输出", _attack_result_refs(item)),
_artifact_group(item, "browser", "浏览器证据", item.get("browser_refs", [])),
_artifact_group(item, "container", "容器日志", item.get("container_log_refs", [])),
_artifact_group(item, "requests", "请求与探测日志", request_only_refs),
]
cloned["artifact_groups"] = [group for group in cloned["artifact_groups"] if group["count"]]
decorated_runs.append(cloned)
summary = {
"generated_at": isoformat(now_utc()),
"advisory_count": len(advisory_records),
"run_count": len(runs),
"statuses": {},
"recent_failures": [],
}
for item in runs:
status = item.get("verification_status", "triage-manual")
summary["statuses"][status] = summary["statuses"].get(status, 0) + 1
summary["systems"] = sorted(systems.values(), key=lambda entry: (-entry["total"], entry["system_id"]))
summary["recent_failures"] = [
{
"run_id": item["run_id"],
"advisory_id": item["advisory_id"],
"status": item.get("verification_status"),
"title": item.get("advisory_meta", {}).get("title"),
"blocked_reason": item.get("blocked_reason"),
}
for item in decorated_runs
if item.get("verification_status") in {"triage-manual", "blocked-artifact", "blocked-destructive"}
][:20]
write_json(DASHBOARD_DIR / "summary.json", summary)
write_json(DASHBOARD_DIR / "runs.json", decorated_runs)
write_json(DASHBOARD_DIR / "systems.json", summary["systems"])
write_json(DASHBOARD_DIR / "advisories.json", {key: _advisory_meta(value) for key, value in advisory_map.items()})
write_json(DASHBOARD_DIR / "profiles.json", {key: _profile_meta(value) for key, value in profile_map.items()})
architecture = _build_architecture_data(summary, source_map, repro_map)
write_json(DASHBOARD_DIR / "architecture.json", architecture)
_write_dashboard_docs(architecture)
_write_design_source_manifest()
_render_root_dashboard_shell()
_render_section_dashboard_shells()
_render_legacy_dashboard_shell()
return {
"dashboard_dir": str(DASHBOARD_DIR),
"index": str(DASHBOARD_DIR / "index.html"),
"overview_index": str(DASHBOARD_DIR / "overview" / "index.html"),
"runs_index": str(DASHBOARD_DIR / "runs" / "index.html"),
"legacy_index": str(DASHBOARD_DIR / "legacy" / "index.html"),
"summary_json": str(DASHBOARD_DIR / "summary.json"),
}