#!/usr/bin/env python3 from __future__ import annotations import argparse import sys from pathlib import Path from typing import Any, Dict, List, Tuple CURRENT_DIR = Path(__file__).resolve().parent SCRIPTS_DIR = CURRENT_DIR.parent if str(SCRIPTS_DIR) not in sys.path: sys.path.insert(0, str(SCRIPTS_DIR)) from intel.config import ADVISORIES_DIR, GENERATED_DIR, STATE_DIR, STATE_PATH, TRIAGE_DIR, load_source_map # noqa: E402 from intel.monitoring import ( # noqa: E402 build_alerts, build_source_health_snapshot, normalize_failures, read_previous_alerts, read_previous_source_health, write_alerts, write_monitoring_state, write_source_catalog_audit, write_source_health, ) from intel.models import AdvisoryRecord # noqa: E402 from intel.normalize import normalize_candidates # noqa: E402 from intel.pr import open_pr # noqa: E402 from intel.render import render_case_pages, render_generated, render_registry, render_secure_code, render_system_scaffolding # noqa: E402 from intel.route import route_advisories # noqa: E402 from intel.sources.runner import build_failure, collect_candidates, failure_summary, find_source, probe_source, probe_sources # noqa: E402 from intel.utils import isoformat, load_all_json, now_utc, parse_since, read_json, write_json # noqa: E402 from intel.validators import validate # noqa: E402 def _load_existing_advisories() -> List[AdvisoryRecord]: advisories: List[AdvisoryRecord] = [] for item in load_all_json(ADVISORIES_DIR): try: advisories.append(AdvisoryRecord(**item)) except TypeError: continue return advisories def _load_existing_triage() -> List[Dict[str, Any]]: return load_all_json(TRIAGE_DIR) def _filter_source_map(source_map: Dict[str, Any], system_ids: List[str] | None) -> Dict[str, Any]: if not system_ids: return source_map allowed = set(system_ids) filtered = [system for system in source_map["systems"] if system["system_id"] in allowed] found = {system["system_id"] for system in filtered} missing = sorted(allowed - found) if missing: raise ValueError(f"Unknown system_id(s): {', '.join(missing)}") return {**source_map, "systems": filtered} def _merge_preserved_records( selected_source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]], ) -> tuple[List[AdvisoryRecord], List[Dict[str, Any]]]: selected_ids = {system["system_id"] for system in selected_source_map["systems"]} preserved_advisories = [item for item in _load_existing_advisories() if item.system_id not in selected_ids] preserved_triage = [item for item in _load_existing_triage() if item.get("system_id") not in selected_ids] return preserved_advisories + advisories, preserved_triage + triage def _merge_advisory_records( advisories: List[AdvisoryRecord], incoming: List[AdvisoryRecord], ) -> List[AdvisoryRecord]: merged: Dict[str, AdvisoryRecord] = {item.canonical_id: item for item in advisories} for item in incoming: merged[item.canonical_id] = item return sorted(merged.values(), key=lambda item: (item.system_id, item.canonical_id)) def _merge_triage_records( triage: List[Dict[str, Any]], incoming: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: merged: Dict[str, Dict[str, Any]] = {} for item in triage: canonical_id = item.get("canonical_id") if canonical_id: merged[canonical_id] = item for item in incoming: canonical_id = item.get("canonical_id") if canonical_id: merged[canonical_id] = item return sorted( merged.values(), key=lambda item: (item.get("system_id", ""), item.get("canonical_id", "")), ) def _merge_existing_registry( advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]], ) -> tuple[List[AdvisoryRecord], List[Dict[str, Any]]]: return ( _merge_advisory_records(_load_existing_advisories(), advisories), _merge_triage_records(_load_existing_triage(), triage), ) def _load_existing_selection( full_source_map: Dict[str, Any], source_map: Dict[str, Any], ) -> tuple[Dict[str, Any], List[AdvisoryRecord], List[Dict[str, Any]]]: allowed = {system["system_id"] for system in source_map["systems"]} advisories = [item for item in _load_existing_advisories() if item.system_id in allowed] triage = [item for item in _load_existing_triage() if item.get("system_id") in allowed] render_map = source_map if len(source_map["systems"]) != len(full_source_map["systems"]): advisories, triage = _merge_preserved_records(source_map, advisories, triage) render_map = full_source_map return render_map, advisories, triage def _summarize_changes(advisories: List[AdvisoryRecord]) -> Dict[str, Any]: new_count = 0 updated_count = 0 touched = set() for advisory in advisories: path = ADVISORIES_DIR / f"{advisory.canonical_id}.json" existing = read_json(path, default=None) current = advisory.to_dict() if existing is None: new_count += 1 touched.add(advisory.system_id) continue if existing != current: updated_count += 1 touched.add(advisory.system_id) return { "new_count": new_count, "updated_count": updated_count, "systems_touched": sorted(touched), } def _select_hotlane( advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]], ) -> Tuple[List[AdvisoryRecord], List[Dict[str, Any]]]: filtered = [] keep_ids = set() for advisory in advisories: if advisory.exploit_status in {"known_exploited", "active_exploitation", "in_the_wild"}: filtered.append(advisory) keep_ids.add(advisory.canonical_id) continue if advisory.cvss_score is not None and advisory.cvss_score >= 8.8: filtered.append(advisory) keep_ids.add(advisory.canonical_id) continue if advisory.severity == "critical": filtered.append(advisory) keep_ids.add(advisory.canonical_id) filtered_triage = [item for item in triage if item.get("canonical_id") in keep_ids] return filtered, filtered_triage def _write_outputs( source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]], failures: List[str], change_summary: Dict[str, Any], selected_system_ids: set[str] | None = None, ) -> None: render_registry(source_map, advisories, triage, selected_system_ids=selected_system_ids) render_system_scaffolding(source_map, advisories, selected_system_ids=selected_system_ids) render_case_pages(advisories, selected_system_ids=selected_system_ids) render_secure_code(source_map) render_generated(source_map, advisories, triage, failures, change_summary) def _refresh_render_state( full_source_map: Dict[str, Any], source_map: Dict[str, Any], ) -> None: render_map, advisories, triage = _load_existing_selection(full_source_map, source_map) summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {} selected_system_ids = None if len(source_map["systems"]) != len(full_source_map["systems"]): selected_system_ids = {system["system_id"] for system in source_map["systems"]} _write_outputs(render_map, advisories, triage, summary.get("failures", []), summary, selected_system_ids=selected_system_ids) def _retry_degraded_sources( source_map: Dict[str, Any], failures: List[Dict[str, Any]], ) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]], int]: recovered_probes: List[Dict[str, Any]] = [] remaining_failures: List[Dict[str, Any]] = [] seen = set() retries_performed = 0 for failure in normalize_failures(failures): key = (failure.get("system_id"), failure.get("source_name")) if key in seen: continue seen.add(key) match = find_source(source_map, failure.get("system_id", ""), failure.get("source_name", "")) if match is None: remaining_failures.append(failure) continue system, source = match if source.get("status") == "retired": continue retries_performed += 1 try: result = probe_source(system, source) recovered_probes.append( { "system_id": system["system_id"], "source_name": source["name"], "source_kind": source["kind"], **result, } ) except Exception as exc: remaining_failures.append(build_failure(system, source, exc)) return recovered_probes, remaining_failures, retries_performed def pipeline( full_source_map: Dict[str, Any], source_map: Dict[str, Any], since_arg: str, tier: str | None, include_undated: bool, hotlane_only: bool = False, ) -> tuple[list[AdvisoryRecord], list[Dict[str, Any]], list[str], Dict[str, Any]]: if tier == "history-full": since_dt = None elif tier == "rolling-24m": since_dt = parse_since("730d") else: since_dt = parse_since(since_arg, default_days=30) candidates, failures = collect_candidates(source_map, since_dt=since_dt, tier=tier, include_undated=include_undated) advisories, triage = normalize_candidates(candidates) advisories = route_advisories(source_map, advisories) if hotlane_only: advisories, triage = _select_hotlane(advisories, triage) advisories, triage = _merge_existing_registry(advisories, triage) change_summary = _summarize_changes(advisories) render_map = source_map selected_system_ids = None if len(source_map["systems"]) != len(full_source_map["systems"]): render_map = full_source_map selected_system_ids = {system["system_id"] for system in source_map["systems"]} _write_outputs(render_map, advisories, triage, failures, change_summary, selected_system_ids=selected_system_ids) return advisories, triage, failures, change_summary def cmd_render(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) render_map, advisories, triage = _load_existing_selection(full_source_map, source_map) summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {} failures = summary.get("failures", []) selected_system_ids = None if len(source_map["systems"]) != len(full_source_map["systems"]): selected_system_ids = {system["system_id"] for system in source_map["systems"]} _write_outputs(render_map, advisories, triage, failures, summary, selected_system_ids=selected_system_ids) return 0 def cmd_source_health(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) previous_source_health = read_previous_source_health() probes, failures = probe_sources(source_map, tier=args.tier) retried_probes, remaining_failures, retries_performed = _retry_degraded_sources(source_map, failures) if retried_probes: probe_map = {(item["system_id"], item["source_name"]): item for item in probes} for item in retried_probes: probe_map[(item["system_id"], item["source_name"])] = item probes = sorted(probe_map.values(), key=lambda item: (item["system_id"], item["source_name"])) else: remaining_failures = normalize_failures(failures) snapshot = build_source_health_snapshot( source_map, probes, remaining_failures, previous=previous_source_health, retries_performed=retries_performed, ) write_source_health(snapshot) print( f"Source health checked {len(probes)} active sources across {len(source_map['systems'])} systems; failures {snapshot['failure_count']}; retries {retries_performed}" ) for failure in snapshot["failures"]: print(f"- {failure_summary(failure)}") return 0 if not snapshot["failures"] else 1 def cmd_validate(args) -> int: source_map = _filter_source_map(load_source_map(), args.system) errors = validate(source_map) if errors: print("Validation failed:") for error in errors: print(f"- {error}") return 1 print("Validation passed.") return 0 def _write_state(status: str, *, record_success: bool = True) -> None: STATE_DIR.mkdir(parents=True, exist_ok=True) state = read_json(STATE_PATH, default={}) or {} state["status"] = status if record_success: state["last_success"] = isoformat(now_utc()) write_json(STATE_PATH, state) def cmd_ingest(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) since = args.since if since == "last-success": state = read_json(STATE_PATH, default={}) or {} since = state.get("last_success", "30d") advisories, triage, failures, summary = pipeline(full_source_map, source_map, since, None, include_undated=False) _write_state("success" if not failures else "degraded", record_success=not failures) print( f"Ingested {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}" ) return 0 if not failures else 1 def cmd_hotlane(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) advisories, triage, failures, summary = pipeline(full_source_map, source_map, "1d", None, include_undated=False, hotlane_only=True) _write_state("success" if not failures else "degraded", record_success=not failures) print( f"Hotlane synced {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}" ) return 0 if not failures else 1 def cmd_reconcile(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) advisories, triage, failures, summary = pipeline(full_source_map, source_map, "30d", None, include_undated=False) _write_state("success" if not failures else "degraded", record_success=not failures) print( f"Reconciled {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}" ) return 0 if not failures else 1 def cmd_backfill(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) if args.dry_run: candidates, failures = collect_candidates(source_map, since_dt=None, tier=args.tier, include_undated=True) advisories, triage = normalize_candidates(candidates) advisories = route_advisories(source_map, advisories) if args.hotlane_only: advisories, triage = _select_hotlane(advisories, triage) print( f"Dry run backfill tier={args.tier}: candidates={len(candidates)} advisories={len(advisories)} triage={len(triage)} failures={len(failures)}" ) return 0 advisories, triage, failures, summary = pipeline( full_source_map, source_map, "", args.tier, include_undated=True, hotlane_only=args.hotlane_only, ) print( f"Backfilled {len(advisories)} advisories, new {summary['new_count']}, updated {summary['updated_count']}, triage {len(triage)}, failures {len(failures)}" ) return 0 if not failures else 1 def cmd_monitor(args) -> int: full_source_map = load_source_map() source_map = _filter_source_map(full_source_map, args.system) existing_run_summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {} previous_source_health = read_previous_source_health() previous_alerts = read_previous_alerts() bootstrap_failures = previous_source_health.get("failures") or existing_run_summary.get("failures", []) audit = write_source_catalog_audit(source_map) probes, failures = probe_sources(source_map) retried_probes, remaining_failures, retries_performed = _retry_degraded_sources(source_map, failures) if retried_probes: probe_map = {(item["system_id"], item["source_name"]): item for item in probes} for item in retried_probes: probe_map[(item["system_id"], item["source_name"])] = item probes = sorted(probe_map.values(), key=lambda item: (item["system_id"], item["source_name"])) else: remaining_failures = normalize_failures(failures) source_health = build_source_health_snapshot( source_map, probes, remaining_failures, previous=previous_source_health, retries_performed=retries_performed, ) write_source_health(source_health) state = read_json(STATE_PATH, default={}) or {} since = state.get("last_success", "30d") advisories, triage, ingest_failures, summary = pipeline(full_source_map, source_map, since, None, include_undated=False) alerts = build_alerts( source_health.get("failures", []), previous_alerts=previous_alerts, bootstrap_failures=bootstrap_failures, generated_at=source_health.get("generated_at"), ) write_alerts(alerts) write_monitoring_state( audit=audit, source_health=source_health, alerts=alerts, ingest_summary={**summary, "failures": ingest_failures}, validation_errors=[], ) _refresh_render_state(full_source_map, source_map) validation_errors = validate(source_map) write_monitoring_state( audit=audit, source_health=source_health, alerts=alerts, ingest_summary={**summary, "failures": ingest_failures}, validation_errors=validation_errors, ) passed = not source_health.get("failures") and not ingest_failures and not validation_errors _write_state("success" if passed else "degraded", record_success=passed) print( "Monitor completed: " f"active_sources={source_health.get('active_source_count', 0)} " f"green_sources={source_health.get('green_source_count', 0)} " f"open_alerts={len([item for item in alerts if item.get('status') == 'open'])} " f"ingest_failures={len(ingest_failures)} " f"validation_errors={len(validation_errors)}" ) for failure in source_health.get("failures", []): print(f"- {failure_summary(failure)}") for error in validation_errors: print(f"- validate::{error}") return 0 if passed else 1 def cmd_open_pr(args) -> int: print(open_pr(base_branch=args.base, dry_run=args.dry_run)) return 0 def main() -> int: parser = argparse.ArgumentParser(description="Websafe threat intel automation") subparsers = parser.add_subparsers(dest="command", required=True) backfill = subparsers.add_parser("backfill", help="Fetch historical advisories") backfill.add_argument("--tier", choices=["history-full", "rolling-24m"], required=True) backfill.add_argument("--dry-run", action="store_true") backfill.add_argument("--hotlane-only", action="store_true") backfill.add_argument("--system", action="append") backfill.set_defaults(func=cmd_backfill) ingest = subparsers.add_parser("ingest", help="Fetch incremental advisories") ingest.add_argument("--since", default="last-success") ingest.add_argument("--system", action="append") ingest.set_defaults(func=cmd_ingest) hotlane = subparsers.add_parser("hotlane", help="Fetch only KEV / in-the-wild / critical updates") hotlane.add_argument("--system", action="append") hotlane.set_defaults(func=cmd_hotlane) reconcile = subparsers.add_parser("reconcile", help="Reconcile the last 30 days of updates") reconcile.add_argument("--system", action="append") reconcile.set_defaults(func=cmd_reconcile) render = subparsers.add_parser("render", help="Render structure and secure-code pages") render.add_argument("--system", action="append") render.set_defaults(func=cmd_render) source_health = subparsers.add_parser("source-health", help="Check source adapter health without mutating registry advisories") source_health.add_argument("--tier", choices=["history-full", "rolling-24m"]) source_health.add_argument("--system", action="append") source_health.set_defaults(func=cmd_source_health) monitor = subparsers.add_parser("monitor", help="Run source audit, health, ingest, render and monitoring state persistence") monitor.add_argument("--system", action="append") monitor.set_defaults(func=cmd_monitor) validate_parser = subparsers.add_parser("validate", help="Validate generated content") validate_parser.add_argument("--system", action="append") validate_parser.set_defaults(func=cmd_validate) open_pr_parser = subparsers.add_parser("open-pr", help="Create Gitea PR from current changes") open_pr_parser.add_argument("--base", default="main") open_pr_parser.add_argument("--dry-run", action="store_true") open_pr_parser.set_defaults(func=cmd_open_pr) args = parser.parse_args() return args.func(args) if __name__ == "__main__": raise SystemExit(main())