更新: 5 个文件 - 2026-03-18 09:50:04

这个提交包含在:
hao
2026-03-18 09:50:04 -07:00
父节点 dc31e6e80f
当前提交 1d5cb533e3
修改 5 个文件,包含 277 行新增39 行删除

查看文件

@@ -12,12 +12,23 @@ if str(SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPTS_DIR))
from intel.config import ADVISORIES_DIR, GENERATED_DIR, STATE_DIR, STATE_PATH, TRIAGE_DIR, load_source_map # noqa: E402
from intel.monitoring import ( # noqa: E402
build_alerts,
build_source_health_snapshot,
normalize_failures,
read_previous_alerts,
read_previous_source_health,
write_alerts,
write_monitoring_state,
write_source_catalog_audit,
write_source_health,
)
from intel.models import AdvisoryRecord # noqa: E402
from intel.normalize import normalize_candidates # noqa: E402
from intel.pr import open_pr # noqa: E402
from intel.render import render_case_pages, render_generated, render_registry, render_secure_code, render_system_scaffolding # noqa: E402
from intel.route import route_advisories # noqa: E402
from intel.sources.runner import collect_candidates, probe_sources # noqa: E402
from intel.sources.runner import build_failure, collect_candidates, failure_summary, find_source, probe_source, probe_sources # noqa: E402
from intel.utils import isoformat, load_all_json, now_utc, parse_since, read_json, write_json # noqa: E402
from intel.validators import validate # noqa: E402
@@ -131,6 +142,52 @@ def _write_outputs(
render_generated(source_map, advisories, triage, failures, change_summary)
def _refresh_render_state(
full_source_map: Dict[str, Any],
source_map: Dict[str, Any],
) -> None:
render_map, advisories, triage = _load_existing_selection(full_source_map, source_map)
summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
_write_outputs(render_map, advisories, triage, summary.get("failures", []), summary)
def _retry_degraded_sources(
source_map: Dict[str, Any],
failures: List[Dict[str, Any]],
) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]], int]:
recovered_probes: List[Dict[str, Any]] = []
remaining_failures: List[Dict[str, Any]] = []
seen = set()
retries_performed = 0
for failure in normalize_failures(failures):
key = (failure.get("system_id"), failure.get("source_name"))
if key in seen:
continue
seen.add(key)
match = find_source(source_map, failure.get("system_id", ""), failure.get("source_name", ""))
if match is None:
remaining_failures.append(failure)
continue
system, source = match
if source.get("status") == "retired":
continue
retries_performed += 1
try:
result = probe_source(system, source)
recovered_probes.append(
{
"system_id": system["system_id"],
"source_name": source["name"],
"source_kind": source["kind"],
**result,
}
)
except Exception as exc:
remaining_failures.append(build_failure(system, source, exc))
return recovered_probes, remaining_failures, retries_performed
def pipeline(
full_source_map: Dict[str, Any],
source_map: Dict[str, Any],
@@ -167,14 +224,33 @@ def cmd_render(args) -> int:
def cmd_source_health(args) -> int:
full_source_map = load_source_map()
source_map = _filter_source_map(full_source_map, args.system)
previous_source_health = read_previous_source_health()
probes, failures = probe_sources(source_map, tier=args.tier)
retried_probes, remaining_failures, retries_performed = _retry_degraded_sources(source_map, failures)
if retried_probes:
probe_map = {(item["system_id"], item["source_name"]): item for item in probes}
for item in retried_probes:
probe_map[(item["system_id"], item["source_name"])] = item
probes = sorted(probe_map.values(), key=lambda item: (item["system_id"], item["source_name"]))
else:
remaining_failures = normalize_failures(failures)
snapshot = build_source_health_snapshot(
source_map,
probes,
remaining_failures,
previous=previous_source_health,
retries_performed=retries_performed,
)
write_source_health(snapshot)
render_map, advisories, triage = _load_existing_selection(full_source_map, source_map)
existing_summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
render_generated(render_map, advisories, triage, failures, existing_summary)
print(f"Source health checked {len(probes)} sources across {len(source_map['systems'])} systems; failures {len(failures)}")
for failure in failures:
print(f"- {failure}")
return 0 if not failures else 1
render_generated(render_map, advisories, triage, snapshot.get("failures", []), existing_summary)
print(
f"Source health checked {len(probes)} active sources across {len(source_map['systems'])} systems; failures {snapshot['failure_count']}; retries {retries_performed}"
)
for failure in snapshot["failures"]:
print(f"- {failure_summary(failure)}")
return 0 if not snapshot["failures"] else 1
def cmd_validate(args) -> int:
@@ -189,11 +265,12 @@ def cmd_validate(args) -> int:
return 0
def _write_state(status: str) -> None:
def _write_state(status: str, *, record_success: bool = True) -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
state = read_json(STATE_PATH, default={}) or {}
state["last_success"] = isoformat(now_utc())
state["status"] = status
if record_success:
state["last_success"] = isoformat(now_utc())
write_json(STATE_PATH, state)
@@ -205,33 +282,33 @@ def cmd_ingest(args) -> int:
state = read_json(STATE_PATH, default={}) or {}
since = state.get("last_success", "30d")
advisories, triage, failures, summary = pipeline(full_source_map, source_map, since, None, include_undated=False)
_write_state("success")
_write_state("success" if not failures else "degraded", record_success=not failures)
print(
f"Ingested {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}"
)
return 0
return 0 if not failures else 1
def cmd_hotlane(args) -> int:
full_source_map = load_source_map()
source_map = _filter_source_map(full_source_map, args.system)
advisories, triage, failures, summary = pipeline(full_source_map, source_map, "1d", None, include_undated=False, hotlane_only=True)
_write_state("success")
_write_state("success" if not failures else "degraded", record_success=not failures)
print(
f"Hotlane synced {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}"
)
return 0
return 0 if not failures else 1
def cmd_reconcile(args) -> int:
full_source_map = load_source_map()
source_map = _filter_source_map(full_source_map, args.system)
advisories, triage, failures, summary = pipeline(full_source_map, source_map, "30d", None, include_undated=False)
_write_state("success")
_write_state("success" if not failures else "degraded", record_success=not failures)
print(
f"Reconciled {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}"
)
return 0
return 0 if not failures else 1
def cmd_backfill(args) -> int:
@@ -258,7 +335,73 @@ def cmd_backfill(args) -> int:
print(
f"Backfilled {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}"
)
return 0
return 0 if not failures else 1
def cmd_monitor(args) -> int:
full_source_map = load_source_map()
source_map = _filter_source_map(full_source_map, args.system)
existing_run_summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
previous_source_health = read_previous_source_health()
previous_alerts = read_previous_alerts()
bootstrap_failures = previous_source_health.get("failures") or existing_run_summary.get("failures", [])
audit = write_source_catalog_audit(source_map)
probes, failures = probe_sources(source_map)
retried_probes, remaining_failures, retries_performed = _retry_degraded_sources(source_map, failures)
if retried_probes:
probe_map = {(item["system_id"], item["source_name"]): item for item in probes}
for item in retried_probes:
probe_map[(item["system_id"], item["source_name"])] = item
probes = sorted(probe_map.values(), key=lambda item: (item["system_id"], item["source_name"]))
else:
remaining_failures = normalize_failures(failures)
source_health = build_source_health_snapshot(
source_map,
probes,
remaining_failures,
previous=previous_source_health,
retries_performed=retries_performed,
)
write_source_health(source_health)
state = read_json(STATE_PATH, default={}) or {}
since = state.get("last_success", "30d")
advisories, triage, ingest_failures, summary = pipeline(full_source_map, source_map, since, None, include_undated=False)
alerts = build_alerts(
source_health.get("failures", []),
previous_alerts=previous_alerts,
bootstrap_failures=bootstrap_failures,
generated_at=source_health.get("generated_at"),
)
write_alerts(alerts)
validation_errors = validate(source_map)
write_monitoring_state(
audit=audit,
source_health=source_health,
alerts=alerts,
ingest_summary={**summary, "failures": ingest_failures},
validation_errors=validation_errors,
)
_refresh_render_state(full_source_map, source_map)
passed = not source_health.get("failures") and not ingest_failures and not validation_errors
_write_state("success" if passed else "degraded", record_success=passed)
print(
"Monitor completed: "
f"active_sources={source_health.get('active_source_count', 0)} "
f"green_sources={source_health.get('green_source_count', 0)} "
f"open_alerts={len([item for item in alerts if item.get('status') == 'open'])} "
f"ingest_failures={len(ingest_failures)} "
f"validation_errors={len(validation_errors)}"
)
for failure in source_health.get("failures", []):
print(f"- {failure_summary(failure)}")
for error in validation_errors:
print(f"- validate::{error}")
return 0 if passed else 1
def cmd_open_pr(args) -> int:
@@ -299,6 +442,10 @@ def main() -> int:
source_health.add_argument("--system", action="append")
source_health.set_defaults(func=cmd_source_health)
monitor = subparsers.add_parser("monitor", help="Run source audit, health, ingest, render and monitoring state persistence")
monitor.add_argument("--system", action="append")
monitor.set_defaults(func=cmd_monitor)
validate_parser = subparsers.add_parser("validate", help="Validate generated content")
validate_parser.add_argument("--system", action="append")
validate_parser.set_defaults(func=cmd_validate)

查看文件

@@ -94,12 +94,22 @@ BAD_GOOD_SNIPPETS = {
),
}
def _failure_text(item: Any) -> str:
if isinstance(item, dict):
return item.get("summary") or f"{item.get('system_id')}::{item.get('source_name')}::{item.get('category')}::{item.get('message')}"
return str(item)
SOURCE_KIND_URLS = {
"ghsa-global": "https://github.com/advisories",
"osv-batch": "https://osv.dev/",
"nvd-search": "https://nvd.nist.gov/vuln/search",
"kev-json": "https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
"rss-feed": "https://www.rssboard.org/rss-specification",
"atom-feed": "https://datatracker.ietf.org/doc/html/rfc4287",
"json-feed": "https://www.jsonfeed.org/version/1.1/",
"vendor-index": "https://example.com/vendor-index",
}
TARGET_TYPES = ["lab-local", "lab-public", "authorized-third-party"]
@@ -498,7 +508,7 @@ def render_generated(
if failures:
latest_lines.extend(["## 失败列表", ""])
for failure in failures:
latest_lines.append(f"- {failure}")
latest_lines.append(f"- {_failure_text(failure)}")
write_text(GENERATED_DIR / "latest-ingest.md", "\n".join(latest_lines))
write_json(
GENERATED_DIR / "run-summary.json",

查看文件

@@ -53,7 +53,7 @@ def failure_summary(failure: Dict[str, Any]) -> str:
return failure.get("summary") or f"{failure.get('system_id')}::{failure.get('source_name')}::{failure.get('category')}::{failure.get('exception')}"
def _build_failure(system: Dict[str, Any], source: Dict[str, Any], exc: Exception) -> Dict[str, Any]:
def build_failure(system: Dict[str, Any], source: Dict[str, Any], exc: Exception) -> Dict[str, Any]:
response = getattr(exc, "response", None)
status_code = getattr(response, "status_code", None)
category = _failure_category(exc)
@@ -211,7 +211,7 @@ def collect_candidates(
if _passes_since(item, since_dt, include_undated):
all_candidates.append(item)
except Exception as exc:
failures.append(_build_failure(system, source, exc))
failures.append(build_failure(system, source, exc))
return all_candidates, failures
@@ -245,7 +245,7 @@ def probe_sources(
}
)
except Exception as exc:
failures.append(_build_failure(system, source, exc))
failures.append(build_failure(system, source, exc))
return probes, failures

查看文件

@@ -43,6 +43,8 @@ const DOC_HUB_ITEMS = [
{ title: "仓库入口镜像", href: "/docs/root-readme.html", description: "根 README 的本地镜像,包含能力矩阵与主入口。", badge: "readme" },
{ title: "授权模型", href: "/docs/authorization-model.html", description: "目标范围、授权模型、最小化验证建议和记录要求。", badge: "scope" },
{ title: "source-map 镜像", href: "/docs/source-map.html", description: "系统覆盖、来源、输出目录和 secure-code 主题真值。", badge: "source-map" },
{ title: "source catalog audit", href: "/docs/source-catalog-audit.html", description: "active/retired source、replacement map 与覆盖摘要。", badge: "audit" },
{ title: "retired sources", href: "/docs/retired-sources.html", description: "退役源、退役原因和 replacement_sources 真值。", badge: "retired" },
{ title: "repro-map 镜像", href: "/docs/repro-map.html", description: "默认漏洞家族、浏览器要求和日志策略真值。", badge: "repro-map" },
{ title: "覆盖矩阵镜像", href: "/docs/coverage-matrix.html", description: "当前全库覆盖矩阵的本地镜像。", badge: "coverage" },
{ title: "安全编码索引", href: "/docs/secure-code-index.html", description: "secure-code 修复主题索引镜像。", badge: "secure-code" },
@@ -52,6 +54,10 @@ const DOC_HUB_ITEMS = [
const DATA_HUB_ITEMS = [
{ title: "summary.json", href: "/summary.json", description: "全局摘要、状态分布、最近失败与系统汇总。", badge: "json" },
{ title: "completeness.json", href: "/data/completeness.json", description: "最新 advisory 完整度、系统/family 进度与 ingest 健康度。", badge: "json" },
{ title: "source-health.json", href: "/data/source-health.json", description: "active source 健康度、失败分类与系统分布。", badge: "json" },
{ title: "alerts.json", href: "/data/alerts.json", description: "source 告警状态机、failure streak 与 resolved 记录。", badge: "json" },
{ title: "monitor-summary.json", href: "/data/monitor-summary.json", description: "每日监控摘要、open alerts 与最近全绿时间。", badge: "json" },
{ title: "source-catalog-audit.json", href: "/data/source-catalog-audit.json", description: "source catalog 审计真值与 retired/replacement 关系。", badge: "json" },
{ title: "runs.json", href: "/runs.json", description: "最近运行的结构化详情,可用于 UI 和调试。", badge: "json" },
{ title: "systems.json", href: "/systems.json", description: "系统级覆盖、分类、更新时间和浏览器证据统计。", badge: "json" },
{ title: "advisories.json", href: "/advisories.json", description: "漏洞条目元数据、来源和 secure-code 主题。", badge: "json" },
@@ -87,6 +93,9 @@ const state = {
profiles: {},
architecture: null,
completeness: null,
sourceHealth: null,
alerts: [],
monitorSummary: null,
selectedRunId: null,
selectedArtifact: null,
refreshHandle: null,
@@ -279,38 +288,41 @@ function familyOptions() {
function metricCards() {
const completeness = state.completeness || state.summary?.completeness || {};
const successCount = Number(completeness.verified_real || 0) + Number(completeness.verified_synthetic || 0);
const blockedCount = Number(completeness.blocked || 0);
const inProgressCount = Number(completeness.manual || 0);
const monitoring = state.monitorSummary || state.summary?.monitoring || {};
const advisoryTotal = Number(completeness.advisory_total || state.summary?.advisory_count || 0);
const advisorySuccess = Number(completeness.verified_real || 0);
const activeSources = Number(monitoring.active_source_count || state.sourceHealth?.active_source_count || 0);
const greenSources = Number(monitoring.green_source_count || state.sourceHealth?.green_source_count || 0);
const openAlerts = Number(monitoring.open_alert_count || state.sourceHealth?.open_alert_count || 0);
const lastFullyGreen = monitoring.last_fully_green_run || state.sourceHealth?.last_fully_green_run || "";
return [
{
label: "最新 advisory",
value: advisoryTotal,
label: "advisory 完整度",
value: `${advisorySuccess}/${advisoryTotal}`,
note: `历史运行 ${state.summary?.run_count || 0}`,
color: "var(--accent-purple)",
color: "var(--accent-green)",
iconName: "report"
},
{
label: "实证成功",
value: successCount,
note: "真实版本 + 合成靶场",
color: "var(--accent-green)",
label: "active sources",
value: activeSources,
note: `green ${greenSources}`,
color: "var(--accent-blue)",
iconName: "shield"
},
{
label: "当前阻塞",
value: blockedCount,
note: "latest advisory 状态里的 blocked-*",
label: "open alerts",
value: openAlerts,
note: "source-health 告警状态机",
color: "var(--accent-red)",
iconName: "failure"
},
{
label: "待处理 / 进行中",
value: inProgressCount,
note: "人工分诊或待补证据的 latest advisory",
color: "var(--accent-blue)",
label: "最近全绿",
value: lastFullyGreen ? formatDateTime(lastFullyGreen) : "-",
note: "active source 集合最近一次全绿",
color: "var(--accent-purple)",
iconName: "timeline"
}
];

查看文件

@@ -43,6 +43,18 @@ STATUS_LABELS = {
}
def _failure_text(item: Any) -> str:
if isinstance(item, dict):
return item.get("summary") or f"{item.get('system_id')}::{item.get('source_name')}::{item.get('category')}::{item.get('message')}"
return str(item)
def _safe_read_text(path: Path, default: str = "") -> str:
if not path.exists():
return default
return path.read_text(encoding="utf-8")
def mermaid_from_steps(run: Dict[str, Any]) -> str:
lines = [
"flowchart LR",
@@ -229,6 +241,9 @@ def _build_completeness(
runs: List[Dict[str, Any]],
profile_map: Dict[str, Dict[str, Any]],
run_summary: Dict[str, Any],
source_health: Dict[str, Any],
alerts: List[Dict[str, Any]],
monitor_summary: Dict[str, Any],
) -> Dict[str, Any]:
latest_statuses: Dict[str, int] = {}
historical_statuses: Dict[str, int] = {}
@@ -284,7 +299,9 @@ def _build_completeness(
verified_synthetic = latest_statuses.get("verified-synthetic", 0)
blocked = sum(count for key, count in latest_statuses.items() if key.startswith("blocked-"))
manual = advisory_total - verified_real - verified_synthetic - blocked
complete = advisory_total > 0 and advisory_total == verified_real
source_failure_count = int(source_health.get("failure_count", 0))
open_alert_count = len([item for item in alerts if item.get("status") == "open"])
complete = advisory_total > 0 and advisory_total == verified_real and source_failure_count == 0
return {
"generated_at": isoformat(now_utc()),
"advisory_total": advisory_total,
@@ -299,13 +316,23 @@ def _build_completeness(
"systems": systems_list,
"ingest_health": {
"failure_count": len(run_summary.get("failures", []) or []),
"failures": run_summary.get("failures", []) or [],
"failures": [_failure_text(item) for item in (run_summary.get("failures", []) or [])],
},
"source_health": {
"active_source_count": int(source_health.get("active_source_count", 0)),
"green_source_count": int(source_health.get("green_source_count", 0)),
"failure_count": source_failure_count,
"last_fully_green_run": source_health.get("last_fully_green_run"),
"open_alert_count": open_alert_count,
"resolved_alert_count": len([item for item in alerts if item.get("status") == "resolved"]),
},
"monitor_summary": monitor_summary or {},
"historical_blockers": [
"Docker daemon unavailable caused provision-compose-environment blocked-artifact.",
"Family profiles previously used note-only attack runners and dry-run placeholders.",
"Baseline and browser steps were skipped when environment readiness was not enforced.",
"Latest completeness now uses one advisory -> latest run semantics instead of historical run piles.",
"Source health now counts only status=active sources; retired sources are audited separately with replacement links.",
],
}
@@ -320,6 +347,9 @@ def _write_testing_completeness_report(completeness: Dict[str, Any]) -> None:
f"- 阻塞数量: `{completeness['blocked']}`",
f"- 人工/待补证据数量: `{completeness['manual']}`",
f"- 完整度百分比: `{completeness['verified_ratio']}%`",
f"- active source 全绿: `{completeness['source_health']['green_source_count']}/{completeness['source_health']['active_source_count']}`",
f"- source open alerts: `{completeness['source_health']['open_alert_count']}`",
f"- 最近一次 source 全绿: `{completeness['source_health'].get('last_fully_green_run') or '-'}`",
"",
"## 系统覆盖矩阵",
"",
@@ -348,6 +378,9 @@ def _write_testing_completeness_report(completeness: Dict[str, Any]) -> None:
"## Ingest / Source 健康度",
"",
f"- source failures: `{completeness['ingest_health']['failure_count']}`",
f"- active sources: `{completeness['source_health']['active_source_count']}`",
f"- green sources: `{completeness['source_health']['green_source_count']}`",
f"- open alerts: `{completeness['source_health']['open_alert_count']}`",
]
)
for item in completeness["ingest_health"].get("failures", []):
@@ -384,6 +417,8 @@ def _build_architecture_data(summary: Dict[str, Any], source_map: Dict[str, Any]
_link("仓库入口镜像", "/docs/root-readme.html", "仓库根 README 的本地镜像。"),
_link("授权模型", "/docs/authorization-model.html", "允许目标范围、全局原则与记录要求。"),
_link("source-map 真值", "/docs/source-map.html", "系统覆盖、来源和输出目录真值。"),
_link("source catalog audit", "/docs/source-catalog-audit.html", "active/retired source 审计、替代关系与覆盖摘要。"),
_link("retired sources", "/docs/retired-sources.html", "退役源、退役原因与 replacement map。"),
_link("repro-map 真值", "/docs/repro-map.html", "复现族路由、浏览器要求和日志策略。"),
_link("覆盖矩阵", "/docs/coverage-matrix.html", "自动生成覆盖摘要的本地镜像。"),
_link("设计来源清单", "/docs/design-source.html", "Lovart 模板本地 vendor manifest。"),
@@ -393,11 +428,15 @@ def _build_architecture_data(summary: Dict[str, Any], source_map: Dict[str, Any]
data_links = [
_link("summary.json", "/summary.json", "全局摘要、状态分布和最近失败。"),
_link("completeness.json", "/data/completeness.json", "最新 advisory 完整度、系统/family 进度与 ingest 健康度。"),
_link("source-health.json", "/data/source-health.json", "active source 健康度、系统分布与失败分类。"),
_link("alerts.json", "/data/alerts.json", "source 告警状态机、failure streak 与 resolved 记录。"),
_link("monitor-summary.json", "/data/monitor-summary.json", "每日监控摘要、open alerts 与最近全绿时间。"),
_link("runs.json", "/runs.json", "最近 run 的结构化详情。"),
_link("systems.json", "/systems.json", "系统级覆盖与浏览器证据摘要。"),
_link("advisories.json", "/advisories.json", "漏洞条目元数据与来源。"),
_link("profiles.json", "/profiles.json", "复现档案元数据。"),
_link("architecture.json", "/architecture.json", "当前架构库结构化 JSON。"),
_link("source-catalog-audit.json", "/data/source-catalog-audit.json", "source catalog 审计真值与 retired/replacement 关系。"),
]
category_items: List[Dict[str, Any]] = []
@@ -880,6 +919,18 @@ def _write_dashboard_docs(architecture: Dict[str, Any]) -> None:
SOURCE_MAP_PATH.read_text(encoding="utf-8"),
"工作台内置镜像页:系统覆盖、来源、输出目录和 secure-code 主题真值。",
),
(
"source-catalog-audit.html",
"Source Catalog Audit",
_safe_read_text(ROOT / "08-threat-intel" / "generated" / "source-catalog-audit.md", "source catalog audit has not been generated yet."),
"工作台内置镜像页active/retired source、replacement map 与覆盖摘要。",
),
(
"retired-sources.html",
"Retired Sources & Replacement Map",
json.dumps(read_json(ROOT / "08-threat-intel" / "generated" / "retired-sources.json", default=[]), indent=2, ensure_ascii=False),
"工作台内置镜像页:退役源、退役原因和 replacement_sources 真值。",
),
(
"repro-map.html",
"repro-map 真值镜像",
@@ -1144,6 +1195,10 @@ def render_dashboard() -> Dict[str, str]:
advisory_records = load_json_dir(ADVISORIES_DIR)
runs = load_json_dir(RUNS_DIR)
run_summary = read_json(ROOT / "08-threat-intel" / "generated" / "run-summary.json", default={}) or {}
source_health = read_json(ROOT / "08-threat-intel" / "generated" / "source-health.json", default={}) or {}
alerts = read_json(ROOT / "08-threat-intel" / "generated" / "alerts.json", default=[]) or []
monitor_summary = read_json(ROOT / "08-threat-intel" / "generated" / "monitor-summary.json", default={}) or {}
source_catalog_audit = read_json(ROOT / "08-threat-intel" / "generated" / "source-catalog-audit.json", default={}) or {}
source_map = read_yaml(SOURCE_MAP_PATH, default={}) or {}
repro_map = read_yaml(REPRO_MAP_PATH, default={}) or {}
source_system_map = {item["system_id"]: item for item in source_map.get("systems", []) if item.get("system_id")}
@@ -1256,6 +1311,13 @@ def render_dashboard() -> Dict[str, str]:
"statuses": {},
"run_statuses": {},
"recent_failures": [],
"monitoring": {
"active_source_count": int(source_health.get("active_source_count", 0)),
"green_source_count": int(source_health.get("green_source_count", 0)),
"source_failure_count": int(source_health.get("failure_count", 0)),
"open_alert_count": len([item for item in alerts if item.get("status") == "open"]),
"last_fully_green_run": source_health.get("last_fully_green_run"),
},
}
for item in merged_advisories:
status = item.get("verification_status", "triage-manual")
@@ -1284,7 +1346,7 @@ def render_dashboard() -> Dict[str, str]:
for item in sorted(merged_advisories, key=lambda value: value.get("updated_at") or value.get("published_at") or "", reverse=True)
if item.get("verification_status") in {"triage-manual", "blocked-artifact", "blocked-destructive"}
][:20]
completeness = _build_completeness(merged_advisories, runs, profile_map, run_summary)
completeness = _build_completeness(merged_advisories, runs, profile_map, run_summary, source_health, alerts, monitor_summary)
summary["completeness"] = {
"advisory_total": completeness["advisory_total"],
"verified_real": completeness["verified_real"],
@@ -1293,6 +1355,9 @@ def render_dashboard() -> Dict[str, str]:
"manual": completeness["manual"],
"verified_ratio": completeness["verified_ratio"],
"complete": completeness["complete"],
"source_failure_count": completeness["source_health"]["failure_count"],
"active_source_count": completeness["source_health"]["active_source_count"],
"open_alert_count": completeness["source_health"]["open_alert_count"],
}
write_json(DASHBOARD_DIR / "summary.json", summary)
@@ -1301,6 +1366,10 @@ def render_dashboard() -> Dict[str, str]:
write_json(DASHBOARD_DIR / "advisories.json", {key: _advisory_meta(value) for key, value in advisory_map.items()})
write_json(DASHBOARD_DIR / "profiles.json", {key: _profile_meta(value) for key, value in profile_map.items()})
write_json(DASHBOARD_DIR / "data" / "completeness.json", completeness)
write_json(DASHBOARD_DIR / "data" / "source-health.json", source_health)
write_json(DASHBOARD_DIR / "data" / "alerts.json", alerts)
write_json(DASHBOARD_DIR / "data" / "monitor-summary.json", monitor_summary)
write_json(DASHBOARD_DIR / "data" / "source-catalog-audit.json", source_catalog_audit)
_write_testing_completeness_report(completeness)
architecture = _build_architecture_data(summary, source_map, repro_map)
write_json(DASHBOARD_DIR / "architecture.json", architecture)