文件
websafe-kb/scripts/lab/render.py

405 行
20 KiB
Python

from __future__ import annotations
import html
import os
from pathlib import Path
from typing import Any, Dict, List
from lab.config import ADVISORIES_DIR, CASE_RUNS_DIR, DASHBOARD_DIR, RUNS_DIR
from lab.utils import ensure_dir, isoformat, load_json_dir, now_utc, write_json, write_text
def mermaid_from_steps(run: Dict[str, Any]) -> str:
lines = [
"flowchart LR",
'A["Select Advisory"] --> B["Resolve Repro Profile"]',
'B --> C["Provision Compose Environment"]',
'C --> D["Baseline Snapshot"]',
'D --> E["Controlled Attack Steps"]',
'E --> F["Browser Replay"]',
'F --> G["Collect Logs and Evidence"]',
'G --> H["Update Registry and Reports"]',
]
if run.get("blocked_reason"):
lines.append(f'H --> I["Blocked: {run["blocked_reason"][:60]}"]')
return "\n".join(lines)
def _relative_ref(run_dir: Path, ref: str) -> str:
try:
return str(Path(ref).resolve().relative_to(run_dir.resolve()))
except ValueError:
return ref
def _dashboard_ref(run: Dict[str, Any], ref: str) -> str:
try:
bundle_dir = Path(run["report_refs"]["bundle_dir"]).resolve()
relative = Path(ref).resolve().relative_to(bundle_dir)
return f"./runs/{run['run_id']}/{relative.as_posix()}"
except Exception:
return ref
def render_run(run: Dict[str, Any]) -> Dict[str, str]:
run_dir = CASE_RUNS_DIR / run["run_id"]
ensure_dir(run_dir / "assets")
timeline_path = run_dir / "timeline.mmd"
write_text(timeline_path, mermaid_from_steps(run))
screenshot_refs = [ref for ref in run.get("browser_refs", []) if ref.endswith((".png", ".jpg", ".jpeg"))]
relative_screenshots = [_relative_ref(run_dir, ref) for ref in screenshot_refs]
md_lines = [
f"# Run {run['run_id']}",
"",
"> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成 run bundle",
"",
f"- Advisory: `{run['advisory_id']}`",
f"- 系统: `{run['system_id']}`",
f"- Repro Profile: `{run['repro_profile_id']}`",
f"- 实证状态: `{run['verification_status']}`",
f"- 实证方式: `{run['verification_mode']}`",
f"- Artifact 模式: `{run['artifact_mode']}`",
f"- 启动时间: `{run['started_at']}`",
f"- 完成时间: `{run['finished_at']}`",
f"- 阻塞原因: `{run.get('blocked_reason') or '-'}`",
f"- Compose 服务: `{', '.join(run.get('compose_services', [])) or '-'}`",
"",
"## 运行时间线",
"",
f"- Mermaid: [{timeline_path.name}]({timeline_path})",
"",
"| 时间 | 步骤 | 状态 | 说明 |",
"|------|------|------|------|",
]
if run.get("timeline"):
for item in run["timeline"]:
md_lines.append(
f"| `{item.get('at', '')}` | `{item.get('step', '')}` | `{item.get('status', '')}` | {item.get('detail', '') or '-'} |"
)
else:
md_lines.append("| `-` | `-` | `-` | 无时间线 |")
md_lines.extend(
[
"",
"## Compose 拓扑",
"",
f"- Compose 文件: `{', '.join(run.get('compose_refs', [])) or '-'}`",
f"- 服务列表: `{', '.join(run.get('compose_services', [])) or '-'}`",
"",
"## 攻击步骤",
"",
"| 工具/步骤 | 状态 | 结果 |",
"|-----------|------|------|",
]
)
if run.get("attack_steps"):
for step in run["attack_steps"]:
outcome = step.get("result_path") or step.get("detail") or "-"
md_lines.append(f"| `{step.get('tool') or step.get('kind')}` | `{step.get('status', '-')}` | `{outcome}` |")
else:
md_lines.append("| `-` | `skipped` | `no attack steps` |")
md_lines.extend(
[
"",
"## 证据摘要",
"",
f"- Baseline: `{len(run.get('baseline_refs', []))}`",
f"- 攻击步骤: `{len(run.get('attack_steps', []))}`",
f"- 浏览器证据: `{len(run.get('browser_refs', []))}`",
f"- 容器日志: `{len(run.get('container_log_refs', []))}`",
f"- 请求日志: `{len(run.get('request_log_refs', []))}`",
"",
]
)
if relative_screenshots:
md_lines.extend(["## 浏览器截图", ""])
for ref in relative_screenshots:
md_lines.append(f"![{Path(ref).stem}]({ref})")
md_lines.append("")
if run.get("browser_refs"):
md_lines.extend(["## 浏览器证据", ""])
for ref in run["browser_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
if run.get("container_log_refs"):
md_lines.extend(["## 容器日志", ""])
for ref in run["container_log_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
if run.get("request_log_refs"):
md_lines.extend(["## 请求与基线日志", ""])
for ref in run["request_log_refs"]:
md_lines.append(f"- `{_relative_ref(run_dir, ref)}`")
md_lines.append("")
md_lines.extend(
[
"## 最小化验证说明",
"",
"- 仅限自有资产、本地靶场或已授权实验目标。",
"- 默认执行 minimal-proof;不会把破坏性或不可回滚动作作为默认路径。",
"- 若浏览器证据缺失,前端类案例不会被标为 `verified-*`。",
"",
]
)
report_md = run_dir / "report.md"
write_text(report_md, "\n".join(md_lines))
html_body = [
"<!doctype html>",
"<html><head><meta charset='utf-8'><title>websafe run report</title>",
"<style>body{font-family:ui-sans-serif,system-ui,sans-serif;margin:2rem;line-height:1.55;background:#f8fafc;color:#0f172a;} code,pre{background:#e2e8f0;padding:.2rem .4rem;border-radius:.3rem;} pre{white-space:pre-wrap;} .grid{display:grid;grid-template-columns:repeat(2,minmax(0,1fr));gap:1rem;} .card{border:1px solid #cbd5e1;padding:1rem;border-radius:.75rem;background:#fff;} table{width:100%;border-collapse:collapse;background:#fff;border:1px solid #cbd5e1;border-radius:.75rem;overflow:hidden;} th,td{padding:.75rem;border-bottom:1px solid #e2e8f0;text-align:left;vertical-align:top;} img{max-width:100%;border:1px solid #cbd5e1;border-radius:.5rem;} .gallery{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:1rem;}</style>",
"</head><body>",
f"<h1>Run {html.escape(run['run_id'])}</h1>",
"<div class='grid'>",
f"<div class='card'><strong>Advisory</strong><br><code>{html.escape(run['advisory_id'])}</code></div>",
f"<div class='card'><strong>Status</strong><br><code>{html.escape(run['verification_status'])}</code></div>",
f"<div class='card'><strong>Profile</strong><br><code>{html.escape(run['repro_profile_id'])}</code></div>",
f"<div class='card'><strong>Artifact Mode</strong><br><code>{html.escape(run['artifact_mode'])}</code></div>",
"</div>",
"<h2>Mermaid Timeline</h2>",
f"<pre>{html.escape(mermaid_from_steps(run))}</pre>",
"<h2>Timeline</h2>",
"<table><thead><tr><th>Time</th><th>Step</th><th>Status</th><th>Detail</th></tr></thead><tbody>",
]
if run.get("timeline"):
for item in run["timeline"]:
html_body.append(
"<tr>"
f"<td><code>{html.escape(item.get('at', ''))}</code></td>"
f"<td><code>{html.escape(item.get('step', ''))}</code></td>"
f"<td><code>{html.escape(item.get('status', ''))}</code></td>"
f"<td>{html.escape(item.get('detail', '') or '-')}</td>"
"</tr>"
)
html_body.extend(["</tbody></table>", "<h2>Attack Steps</h2>", "<table><thead><tr><th>Tool</th><th>Status</th><th>Output</th></tr></thead><tbody>"])
if run.get("attack_steps"):
for step in run["attack_steps"]:
html_body.append(
"<tr>"
f"<td><code>{html.escape(step.get('tool') or step.get('kind') or '-')}</code></td>"
f"<td><code>{html.escape(step.get('status', '-'))}</code></td>"
f"<td><code>{html.escape(step.get('result_path') or '-')}</code></td>"
"</tr>"
)
else:
html_body.append("<tr><td><code>-</code></td><td><code>skipped</code></td><td><code>no attack steps</code></td></tr>")
html_body.extend(["</tbody></table>"])
if relative_screenshots:
html_body.extend(["<h2>Browser Screenshots</h2>", "<div class='gallery'>"])
for ref in relative_screenshots:
html_body.append(
f"<figure><img src='{html.escape(ref)}' alt='{html.escape(Path(ref).stem)}'><figcaption><code>{html.escape(ref)}</code></figcaption></figure>"
)
html_body.append("</div>")
html_body.extend(["<h2>Evidence</h2><ul>"])
for ref in run.get("compose_refs", []) + run.get("browser_refs", []) + run.get("container_log_refs", []) + run.get("request_log_refs", []):
html_body.append(f"<li><code>{html.escape(_relative_ref(run_dir, ref))}</code></li>")
html_body.extend(["</ul>", "</body></html>"])
report_html = run_dir / "report.html"
write_text(report_html, "\n".join(html_body))
return {"bundle_dir": str(run_dir), "report_md": str(report_md), "report_html": str(report_html), "timeline": str(timeline_path)}
def render_dashboard() -> Dict[str, str]:
ensure_dir(DASHBOARD_DIR)
advisory_records = load_json_dir(ADVISORIES_DIR)
runs = load_json_dir(RUNS_DIR)
runs_dir = DASHBOARD_DIR / "runs"
ensure_dir(runs_dir)
for item in runs:
bundle_dir = Path(item.get("report_refs", {}).get("bundle_dir", ""))
if not bundle_dir.exists():
continue
symlink_path = runs_dir / item["run_id"]
relative_target = os.path.relpath(bundle_dir, symlink_path.parent)
try:
if symlink_path.is_symlink() or symlink_path.exists():
if symlink_path.is_symlink() and os.readlink(symlink_path) == relative_target:
pass
else:
symlink_path.unlink()
os.symlink(relative_target, symlink_path, target_is_directory=True)
else:
os.symlink(relative_target, symlink_path, target_is_directory=True)
except OSError:
continue
systems: Dict[str, Dict[str, Any]] = {}
for advisory in advisory_records:
system = systems.setdefault(
advisory["system_id"],
{
"system_id": advisory["system_id"],
"display_name": advisory.get("display_name", advisory["system_id"]),
"total": 0,
"verified_real": 0,
"verified_synthetic": 0,
"blocked": 0,
"manual": 0,
"browser_required": 0,
"browser_present": 0,
"latest_update": "",
},
)
system["total"] += 1
status = advisory.get("verification_status", "triage-manual")
if status == "verified-real":
system["verified_real"] += 1
elif status == "verified-synthetic":
system["verified_synthetic"] += 1
elif status.startswith("blocked-"):
system["blocked"] += 1
else:
system["manual"] += 1
browser = advisory.get("browser_evidence", {})
if browser.get("required"):
system["browser_required"] += 1
if browser.get("present"):
system["browser_present"] += 1
latest = advisory.get("updated_at") or advisory.get("published_at") or ""
if latest > system["latest_update"]:
system["latest_update"] = latest
recent_runs = sorted(runs, key=lambda item: item.get("finished_at") or "", reverse=True)[:100]
decorated_runs: List[Dict[str, Any]] = []
for item in recent_runs:
cloned = dict(item)
cloned["dashboard_refs"] = {
"report_html": f"./runs/{item['run_id']}/report.html",
"report_md": f"./runs/{item['run_id']}/report.md",
"timeline": f"./runs/{item['run_id']}/timeline.mmd",
"bundle": f"./runs/{item['run_id']}/run.json",
}
cloned["browser_links"] = [_dashboard_ref(item, ref) for ref in item.get("browser_refs", [])]
cloned["container_links"] = [_dashboard_ref(item, ref) for ref in item.get("container_log_refs", [])]
cloned["request_links"] = [_dashboard_ref(item, ref) for ref in item.get("request_log_refs", [])]
decorated_runs.append(cloned)
summary = {
"generated_at": isoformat(now_utc()),
"advisory_count": len(advisory_records),
"run_count": len(runs),
"statuses": {},
"recent_failures": [],
}
for item in runs:
status = item.get("verification_status", "triage-manual")
summary["statuses"][status] = summary["statuses"].get(status, 0) + 1
summary["systems"] = sorted(systems.values(), key=lambda item: (-item["total"], item["system_id"]))
summary["recent_failures"] = [
{
"run_id": item["run_id"],
"advisory_id": item["advisory_id"],
"status": item.get("verification_status"),
"blocked_reason": item.get("blocked_reason"),
}
for item in decorated_runs
if item.get("verification_status") in {"triage-manual", "blocked-artifact", "blocked-destructive"}
][:20]
write_json(DASHBOARD_DIR / "summary.json", summary)
write_json(DASHBOARD_DIR / "runs.json", decorated_runs)
write_json(DASHBOARD_DIR / "systems.json", summary["systems"])
html_page = """<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>websafe dashboard</title>
<style>
body { font-family: ui-sans-serif, system-ui, sans-serif; margin: 2rem; background: #f8fafc; color: #0f172a; }
h1, h2 { margin-bottom: .5rem; }
.cards { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 1rem; margin: 1rem 0 2rem; }
.card { background: white; border: 1px solid #cbd5e1; border-radius: 14px; padding: 1rem; box-shadow: 0 4px 18px rgba(15,23,42,.06); }
.filters { display:flex; flex-wrap:wrap; gap:.75rem; margin: 1rem 0; }
input, select { padding: .6rem .75rem; border: 1px solid #cbd5e1; border-radius: 10px; background: white; }
table { width: 100%%; border-collapse: collapse; background: white; border-radius: 12px; overflow: hidden; margin-bottom: 2rem; }
th, td { padding: .75rem; border-bottom: 1px solid #e2e8f0; text-align: left; font-size: .92rem; }
code { background: #e2e8f0; padding: .1rem .35rem; border-radius: 6px; }
.muted { color: #475569; }
</style>
</head>
<body>
<h1>websafe Local Lab Dashboard</h1>
<p>LAB ONLY | AUTHORIZED TARGETS ONLY | 本地静态看板</p>
<div id="summary" class="cards"></div>
<h2>System Coverage</h2>
<table>
<thead><tr><th>System</th><th>Total</th><th>Verified Real</th><th>Verified Synthetic</th><th>Blocked</th><th>Manual</th><th>Browser</th><th>Latest</th></tr></thead>
<tbody id="systemRows"></tbody>
</table>
<h2>Recent Runs</h2>
<div class="filters">
<input id="search" placeholder="Search advisory or run id">
<select id="systemFilter"><option value="">All systems</option></select>
<select id="statusFilter"><option value="">All statuses</option></select>
<select id="familyFilter"><option value="">All profiles</option></select>
</div>
<table>
<thead><tr><th>Run</th><th>System</th><th>Advisory</th><th>Status</th><th>Mode</th><th>Profile</th><th>Finished</th><th>Artifacts</th></tr></thead>
<tbody id="rows"></tbody>
</table>
<script>
async function main() {
const [summary, runs, systems] = await Promise.all([
fetch('./summary.json').then(r => r.json()),
fetch('./runs.json').then(r => r.json()),
fetch('./systems.json').then(r => r.json())
]);
const summaryRoot = document.getElementById('summary');
const cards = [{label: 'Advisories', value: summary.advisory_count}, {label: 'Run Count', value: summary.run_count}];
for (const [key, value] of Object.entries(summary.statuses)) {
cards.push({label: key, value});
}
summaryRoot.innerHTML = cards.map(item => `<div class="card"><strong>${item.label}</strong><div style="font-size:2rem;margin-top:.5rem;">${item.value}</div></div>`).join('');
const systemRows = document.getElementById('systemRows');
systemRows.innerHTML = systems.map(item => `<tr><td><code>${item.system_id}</code></td><td>${item.total}</td><td>${item.verified_real}</td><td>${item.verified_synthetic}</td><td>${item.blocked}</td><td>${item.manual}</td><td>${item.browser_present}/${item.browser_required}</td><td>${item.latest_update || ''}</td></tr>`).join('');
const systemFilter = document.getElementById('systemFilter');
const statusFilter = document.getElementById('statusFilter');
const familyFilter = document.getElementById('familyFilter');
const search = document.getElementById('search');
const distinct = (values) => Array.from(new Set(values.filter(Boolean))).sort();
systemFilter.innerHTML += distinct(runs.map(item => item.system_id)).map(value => `<option value="${value}">${value}</option>`).join('');
statusFilter.innerHTML += distinct(runs.map(item => item.verification_status)).map(value => `<option value="${value}">${value}</option>`).join('');
familyFilter.innerHTML += distinct(runs.map(item => item.repro_profile_id)).map(value => `<option value="${value}">${value}</option>`).join('');
const rows = document.getElementById('rows');
function renderRows() {
const query = search.value.trim().toLowerCase();
const filtered = runs.filter(item => {
if (systemFilter.value && item.system_id !== systemFilter.value) return false;
if (statusFilter.value && item.verification_status !== statusFilter.value) return false;
if (familyFilter.value && item.repro_profile_id !== familyFilter.value) return false;
if (query) {
const haystack = `${item.run_id} ${item.advisory_id} ${item.system_id} ${item.repro_profile_id}`.toLowerCase();
if (!haystack.includes(query)) return false;
}
return true;
});
rows.innerHTML = filtered.map(item => {
const links = [];
if (item.dashboard_refs && item.dashboard_refs.report_html) links.push(`<a href="${item.dashboard_refs.report_html}">report</a>`);
if (item.dashboard_refs && item.dashboard_refs.timeline) links.push(`<a href="${item.dashboard_refs.timeline}">timeline</a>`);
if (item.dashboard_refs && item.dashboard_refs.bundle) links.push(`<a href="${item.dashboard_refs.bundle}">bundle</a>`);
if (item.browser_links && item.browser_links.length) links.push(`<a href="${item.browser_links[0]}">browser</a>`);
if (item.container_links && item.container_links.length) links.push(`<a href="${item.container_links[0]}">logs</a>`);
const reason = item.blocked_reason ? `<div class="muted">${item.blocked_reason}</div>` : '';
return `<tr><td><code>${item.run_id}</code>${reason}</td><td><code>${item.system_id}</code></td><td><code>${item.advisory_id}</code></td><td>${item.verification_status}</td><td>${item.verification_mode}</td><td><code>${item.repro_profile_id}</code></td><td>${item.finished_at || ''}</td><td>${links.join(' | ') || '-'}</td></tr>`;
}).join('');
}
[systemFilter, statusFilter, familyFilter, search].forEach(node => node.addEventListener('input', renderRows));
renderRows();
}
main();
</script>
</body>
</html>
"""
write_text(DASHBOARD_DIR / "index.html", html_page)
return {
"dashboard_dir": str(DASHBOARD_DIR),
"index_html": str(DASHBOARD_DIR / "index.html"),
}