文件
websafe-kb/scripts/intel/monitoring.py
2026-03-18 17:23:40 -07:00

410 行
16 KiB
Python

from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from typing import Any, Dict, List
from intel.config import (
ALERTS_PATH,
MACHINE_READABLE_SOURCE_KINDS,
MONITORING_DIR,
MONITOR_SUMMARY_PATH,
RETIRED_SOURCES_PATH,
SOURCE_CATALOG_AUDIT_MD_PATH,
SOURCE_CATALOG_AUDIT_PATH,
SOURCE_HEALTH_PATH,
iter_all_sources,
)
from intel.utils import ensure_dir, isoformat, now_utc, parse_dt, read_json, write_json, write_text
def source_key(system_id: str, source_name: str) -> str:
return f"{system_id}::{source_name}"
def normalize_failures(items: List[Any]) -> List[Dict[str, Any]]:
normalized: List[Dict[str, Any]] = []
for item in items or []:
if isinstance(item, dict):
cloned = dict(item)
if not cloned.get("summary"):
cloned["summary"] = f"{cloned.get('system_id')}::{cloned.get('source_name')}::{cloned.get('category')}::{cloned.get('message') or cloned.get('exception')}"
normalized.append(cloned)
continue
if not isinstance(item, str):
continue
parts = item.split("::", 3)
system_id = parts[0] if len(parts) > 0 else "unknown"
source_name = parts[1] if len(parts) > 1 else "unknown"
exception = parts[2] if len(parts) > 2 else "UnknownError"
message = parts[3] if len(parts) > 3 else exception
category = "tls" if "ssl" in exception.lower() else "http_status" if "http" in exception.lower() else "network"
normalized.append(
{
"system_id": system_id,
"display_name": system_id,
"source_name": source_name,
"source_kind": "",
"source_bucket": "",
"category": category,
"exception": exception,
"message": message,
"status_code": None,
"url": "",
"summary": item,
}
)
return normalized
def build_source_catalog_audit(source_map: Dict[str, Any]) -> Dict[str, Any]:
systems_payload: List[Dict[str, Any]] = []
retired_sources: List[Dict[str, Any]] = []
replacement_map: List[Dict[str, Any]] = []
total_sources = 0
active_sources = 0
retired_count = 0
systems_with_active_official = 0
systems_with_machine_source = 0
for system in source_map.get("systems", []) or []:
active_official = 0
active_machine = 0
bucket_counts = {"official_sources": 0, "ecosystem_sources": 0, "research_sources": 0}
retired_for_system: List[Dict[str, Any]] = []
for bucket_name, source in ((bucket, src) for _system, bucket, src in iter_all_sources({"systems": [system]}, include_retired=True)):
total_sources += 1
if source.get("status") == "retired":
retired_count += 1
retired_entry = {
"system_id": system["system_id"],
"display_name": system["display_name"],
"source_name": source["name"],
"bucket": bucket_name,
"kind": source["kind"],
"retired_reason": source.get("retired_reason") or "",
"replacement_sources": source.get("replacement_sources") or [],
"url": source.get("url") or "",
}
retired_sources.append(retired_entry)
retired_for_system.append(retired_entry)
replacement_map.append(
{
"system_id": system["system_id"],
"retired_source": source["name"],
"replacement_sources": source.get("replacement_sources") or [],
}
)
continue
active_sources += 1
bucket_counts[bucket_name] += 1
if bucket_name == "official_sources":
active_official += 1
if source.get("kind") in MACHINE_READABLE_SOURCE_KINDS:
active_machine += 1
if active_official:
systems_with_active_official += 1
if active_machine:
systems_with_machine_source += 1
systems_payload.append(
{
"system_id": system["system_id"],
"display_name": system["display_name"],
"category": system.get("category"),
"tier": system.get("tier"),
"source_total": sum(bucket_counts.values()) + len(retired_for_system),
"active_source_total": sum(bucket_counts.values()),
"retired_source_total": len(retired_for_system),
"official_active": bucket_counts["official_sources"],
"ecosystem_active": bucket_counts["ecosystem_sources"],
"research_active": bucket_counts["research_sources"],
"machine_readable_active": active_machine,
"has_active_official": bool(active_official),
"has_machine_readable_source": bool(active_machine),
}
)
audit = {
"generated_at": isoformat(now_utc()),
"system_count": len(source_map.get("systems", []) or []),
"source_count": total_sources,
"active_source_count": active_sources,
"retired_source_count": retired_count,
"systems_with_active_official": systems_with_active_official,
"systems_with_machine_readable_source": systems_with_machine_source,
"systems": sorted(systems_payload, key=lambda item: item["system_id"]),
"retired_sources": sorted(retired_sources, key=lambda item: (item["system_id"], item["source_name"])),
"replacement_map": sorted(replacement_map, key=lambda item: (item["system_id"], item["retired_source"])),
}
return audit
def write_source_catalog_audit(source_map: Dict[str, Any]) -> Dict[str, Any]:
audit = build_source_catalog_audit(source_map)
lines = [
"# Source Catalog Audit",
"",
f"- generated_at: `{audit['generated_at']}`",
f"- systems: `{audit['system_count']}`",
f"- sources: `{audit['source_count']}`",
f"- active_sources: `{audit['active_source_count']}`",
f"- retired_sources: `{audit['retired_source_count']}`",
f"- systems_with_active_official: `{audit['systems_with_active_official']}/{audit['system_count']}`",
f"- systems_with_machine_readable_source: `{audit['systems_with_machine_readable_source']}/{audit['system_count']}`",
"",
"## Retired Sources",
"",
]
if audit["retired_sources"]:
for item in audit["retired_sources"]:
replacements = ", ".join(item.get("replacement_sources") or []) or "-"
lines.append(
f"- `{item['system_id']}` `{item['source_name']}` -> replacements: `{replacements}` | reason: {item.get('retired_reason') or 'n/a'}"
)
else:
lines.append("- none")
write_json(SOURCE_CATALOG_AUDIT_PATH, audit)
write_text(SOURCE_CATALOG_AUDIT_MD_PATH, "\n".join(lines))
write_json(RETIRED_SOURCES_PATH, audit["retired_sources"])
return audit
def build_source_health_snapshot(
source_map: Dict[str, Any],
probes: List[Dict[str, Any]],
failures: List[Dict[str, Any]],
*,
previous: Dict[str, Any] | None = None,
retries_performed: int = 0,
) -> Dict[str, Any]:
normalized_failures = normalize_failures(failures)
active_source_total = sum(1 for _system, _bucket, _source in iter_all_sources(source_map, include_retired=False))
green_sources = max(0, active_source_total - len(normalized_failures))
systems: Dict[str, Dict[str, Any]] = {}
for system in source_map.get("systems", []) or []:
systems[system["system_id"]] = {
"system_id": system["system_id"],
"display_name": system["display_name"],
"active_source_total": 0,
"green_source_total": 0,
"failure_count": 0,
}
for system, _bucket, _source in iter_all_sources(source_map, include_retired=False):
systems[system["system_id"]]["active_source_total"] += 1
for probe in probes:
systems.setdefault(
probe["system_id"],
{"system_id": probe["system_id"], "display_name": probe.get("display_name", probe["system_id"]), "active_source_total": 0, "green_source_total": 0, "failure_count": 0},
)["green_source_total"] += 1
for failure in normalized_failures:
systems.setdefault(
failure["system_id"],
{"system_id": failure["system_id"], "display_name": failure.get("display_name", failure["system_id"]), "active_source_total": 0, "green_source_total": 0, "failure_count": 0},
)["failure_count"] += 1
generated_at = isoformat(now_utc())
all_green = not normalized_failures
previous = previous or {}
last_fully_green_run = generated_at if all_green else previous.get("last_fully_green_run")
slow_sources = []
telemetry_rows = []
for probe in probes:
if probe.get("elapsed_seconds") is None:
continue
telemetry_rows.append(
{
"system_id": probe["system_id"],
"source_name": probe["source_name"],
"source_kind": probe.get("source_kind"),
"elapsed_seconds": probe.get("elapsed_seconds"),
"status": "ok",
}
)
for failure in normalized_failures:
if failure.get("elapsed_seconds") is None:
continue
telemetry_rows.append(
{
"system_id": failure["system_id"],
"source_name": failure["source_name"],
"source_kind": failure.get("source_kind"),
"elapsed_seconds": failure.get("elapsed_seconds"),
"status": failure.get("category") or "failure",
}
)
slow_sources = sorted(
telemetry_rows,
key=lambda item: float(item.get("elapsed_seconds") or 0),
reverse=True,
)[:10]
return {
"generated_at": generated_at,
"active_source_count": active_source_total,
"green_source_count": green_sources,
"failure_count": len(normalized_failures),
"all_green": all_green,
"last_fully_green_run": last_fully_green_run,
"retries_performed": retries_performed,
"probes": sorted(probes, key=lambda item: (item["system_id"], item["source_name"])),
"failures": normalized_failures,
"slow_sources": slow_sources,
"systems": sorted(systems.values(), key=lambda item: item["system_id"]),
}
def write_source_health(snapshot: Dict[str, Any]) -> None:
write_json(SOURCE_HEALTH_PATH, snapshot)
def build_alerts(
current_failures: List[Dict[str, Any]],
*,
previous_alerts: List[Dict[str, Any]] | None = None,
bootstrap_failures: List[Dict[str, Any]] | None = None,
generated_at: str | None = None,
) -> List[Dict[str, Any]]:
now_value = generated_at or isoformat(now_utc())
normalized_current = {source_key(item["system_id"], item["source_name"]): item for item in normalize_failures(current_failures)}
previous_entries = list(previous_alerts or [])
if not previous_entries and bootstrap_failures:
for item in normalize_failures(bootstrap_failures):
previous_entries.append(
{
"alert_id": source_key(item["system_id"], item["source_name"]),
"system_id": item["system_id"],
"display_name": item.get("display_name", item["system_id"]),
"source_name": item["source_name"],
"source_kind": item.get("source_kind"),
"status": "open",
"opened_at": now_value,
"updated_at": now_value,
"resolved_at": None,
"failure_streak": 1,
"last_category": item.get("category"),
"last_failure": item,
}
)
previous_by_key = {item["alert_id"]: item for item in previous_entries if item.get("alert_id")}
next_alerts: List[Dict[str, Any]] = []
for alert_id, failure in normalized_current.items():
previous = previous_by_key.get(alert_id)
if previous and previous.get("status") == "open":
streak = int(previous.get("failure_streak") or 0) + 1
opened_at = previous.get("opened_at") or now_value
else:
streak = 1
opened_at = now_value
next_alerts.append(
{
"alert_id": alert_id,
"system_id": failure["system_id"],
"display_name": failure.get("display_name", failure["system_id"]),
"source_name": failure["source_name"],
"source_kind": failure.get("source_kind"),
"status": "open",
"opened_at": opened_at,
"updated_at": now_value,
"resolved_at": None,
"failure_streak": streak,
"last_category": failure.get("category"),
"last_failure": failure,
}
)
for alert_id, previous in previous_by_key.items():
if alert_id in normalized_current:
continue
if previous.get("status") == "open":
resolved = dict(previous)
resolved["status"] = "resolved"
resolved["updated_at"] = now_value
resolved["resolved_at"] = now_value
next_alerts.append(resolved)
else:
next_alerts.append(previous)
return sorted(next_alerts, key=lambda item: (item.get("status") != "open", item.get("system_id"), item.get("source_name")))
def write_alerts(alerts: List[Dict[str, Any]]) -> None:
write_json(ALERTS_PATH, alerts)
def _history_filename(generated_at: str) -> Path:
safe_name = generated_at.replace(":", "-")
return MONITORING_DIR / f"{safe_name}.json"
def _prune_monitoring_history(now_value: str) -> None:
ensure_dir(MONITORING_DIR)
current_dt = parse_dt(now_value)
if current_dt is None:
return
cutoff = current_dt - timedelta(days=90)
for path in sorted(MONITORING_DIR.glob("*.json")):
snapshot = read_json(path, default={}) or {}
snapshot_dt = parse_dt(snapshot.get("generated_at"))
if snapshot_dt is None:
continue
if snapshot_dt < cutoff:
path.unlink()
def write_monitoring_state(
*,
audit: Dict[str, Any],
source_health: Dict[str, Any],
alerts: List[Dict[str, Any]],
ingest_summary: Dict[str, Any],
validation_errors: List[str],
) -> Dict[str, Any]:
open_alerts = [item for item in alerts if item.get("status") == "open"]
generated_at = source_health.get("generated_at") or isoformat(now_utc())
summary = {
"generated_at": generated_at,
"active_source_count": source_health.get("active_source_count", 0),
"green_source_count": source_health.get("green_source_count", 0),
"source_failure_count": source_health.get("failure_count", 0),
"open_alert_count": len(open_alerts),
"resolved_alert_count": len([item for item in alerts if item.get("status") == "resolved"]),
"last_fully_green_run": source_health.get("last_fully_green_run"),
"source_catalog": {
"system_count": audit.get("system_count", 0),
"source_count": audit.get("source_count", 0),
"retired_source_count": audit.get("retired_source_count", 0),
},
"ingest": {
"new_count": ingest_summary.get("new_count", 0),
"updated_count": ingest_summary.get("updated_count", 0),
"failure_count": len(normalize_failures(ingest_summary.get("failures", []))),
"systems_touched": ingest_summary.get("systems_touched", []),
},
"validation": {
"passed": not validation_errors,
"error_count": len(validation_errors),
"errors": validation_errors,
},
}
snapshot = {
"generated_at": generated_at,
"source_catalog_audit": audit,
"source_health": source_health,
"alerts": alerts,
"monitor_summary": summary,
}
write_json(MONITOR_SUMMARY_PATH, summary)
write_json(_history_filename(generated_at), snapshot)
_prune_monitoring_history(generated_at)
return summary
def read_previous_source_health() -> Dict[str, Any]:
return read_json(SOURCE_HEALTH_PATH, default={}) or {}
def read_previous_alerts() -> List[Dict[str, Any]]:
return read_json(ALERTS_PATH, default=[]) or []