from __future__ import annotations from pathlib import Path from typing import Any, Dict, Iterable, List, Tuple import yaml ROOT = Path(__file__).resolve().parents[2] FRAMEWORK_ROOT = ROOT / "07-framework-security" THREAT_INTEL_ROOT = ROOT / "08-threat-intel" REGISTRY_ROOT = THREAT_INTEL_ROOT / "registry" ADVISORIES_DIR = REGISTRY_ROOT / "advisories" SYSTEMS_DIR = REGISTRY_ROOT / "systems" ENTITIES_DIR = REGISTRY_ROOT / "entities" RUNS_DIR = REGISTRY_ROOT / "runs" TRIAGE_DIR = REGISTRY_ROOT / "triage" GENERATED_DIR = THREAT_INTEL_ROOT / "generated" SECURE_CODE_ROOT = ROOT / "05-defense" / "secure-code" SOURCE_MAP_PATH = THREAT_INTEL_ROOT / "source-map.yaml" REPRO_MAP_PATH = THREAT_INTEL_ROOT / "repro-map.yaml" REPRO_PROFILES_DIR = THREAT_INTEL_ROOT / "repro-profiles" MONITORING_DIR = REGISTRY_ROOT / "monitoring" SOURCE_HEALTH_PATH = GENERATED_DIR / "source-health.json" ALERTS_PATH = GENERATED_DIR / "alerts.json" MONITOR_SUMMARY_PATH = GENERATED_DIR / "monitor-summary.json" SOURCE_CATALOG_AUDIT_PATH = GENERATED_DIR / "source-catalog-audit.json" SOURCE_CATALOG_AUDIT_MD_PATH = GENERATED_DIR / "source-catalog-audit.md" RETIRED_SOURCES_PATH = GENERATED_DIR / "retired-sources.json" ENTITY_COMPLETENESS_PATH = GENERATED_DIR / "entity-completeness.json" ENTITY_BACKLOG_PATH = GENERATED_DIR / "entity-discovery-backlog.json" ENTITY_QUEUES_PATH = GENERATED_DIR / "entity-queues.json" ENTITY_CATALOG_REPORT_MD_PATH = GENERATED_DIR / "entity-catalog-report.md" ENTITY_BACKLOG_REPORT_MD_PATH = GENERATED_DIR / "entity-discovery-backlog.md" STATE_DIR = Path.home() / ".local" / "state" / "websafe-intel" STATE_PATH = STATE_DIR / "state.json" SOURCE_BUCKETS = ("official_sources", "ecosystem_sources", "research_sources") MACHINE_READABLE_SOURCE_KINDS = {"ghsa-global", "osv-batch", "nvd-search", "kev-json", "json-feed", "rss-feed", "atom-feed"} DEFAULT_REQUEST_POLICY = { "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", "accept": "", "timeout_seconds": 30, "verify_tls": True, "http_version": "default", "follow_redirects": True, } DEFAULT_HEALTH_POLICY = { "retries": 3, "backoff_seconds": 0.5, "expected_format": "", "expected_statuses": [200], } DEFAULT_PARSER_HINTS = { "keywords": [], "selectors": [], "include_url_patterns": [], "exclude_url_patterns": [], "date_extractors": [], } DEFAULT_ACCEPT_BY_KIND = { "rss-feed": "application/rss+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", "atom-feed": "application/atom+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", "json-feed": "application/json, text/json;q=0.9, */*;q=0.8", "ghsa-global": "application/vnd.github+json, application/json;q=0.9, */*;q=0.8", "osv-batch": "application/json, */*;q=0.8", "nvd-search": "application/json, */*;q=0.8", "kev-json": "application/json, */*;q=0.8", } DEFAULT_FORMAT_BY_KIND = { "html-links": "html", "vendor-index": "html", "rss-feed": "rss", "atom-feed": "atom", "json-feed": "json", "ghsa-global": "json", "osv-batch": "json", "nvd-search": "json", "kev-json": "json", } def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any]: normalized = dict(source or {}) normalized["status"] = normalized.get("status") or "active" normalized["retired_reason"] = normalized.get("retired_reason") or "" normalized["replacement_sources"] = list(normalized.get("replacement_sources") or []) request_policy = {**DEFAULT_REQUEST_POLICY, **(normalized.get("request_policy") or {})} if not request_policy.get("accept"): request_policy["accept"] = DEFAULT_ACCEPT_BY_KIND.get(normalized.get("kind"), "") request_policy["timeout_seconds"] = int(request_policy.get("timeout_seconds") or DEFAULT_REQUEST_POLICY["timeout_seconds"]) request_policy["follow_redirects"] = bool(request_policy.get("follow_redirects", True)) request_policy["verify_tls"] = bool(request_policy.get("verify_tls", True)) normalized["request_policy"] = request_policy health_policy = {**DEFAULT_HEALTH_POLICY, **(normalized.get("health_policy") or {})} if not health_policy.get("expected_format"): health_policy["expected_format"] = DEFAULT_FORMAT_BY_KIND.get(normalized.get("kind"), "") statuses = health_policy.get("expected_statuses") or [200] health_policy["expected_statuses"] = [int(item) for item in statuses] health_policy["retries"] = int(health_policy.get("retries") or DEFAULT_HEALTH_POLICY["retries"]) health_policy["backoff_seconds"] = float(health_policy.get("backoff_seconds") or DEFAULT_HEALTH_POLICY["backoff_seconds"]) normalized["health_policy"] = health_policy parser_hints = {**DEFAULT_PARSER_HINTS, **(normalized.get("parser_hints") or {})} if not parser_hints.get("keywords"): parser_hints["keywords"] = list(normalized.get("keywords") or []) normalized["parser_hints"] = parser_hints normalized["bucket_name"] = bucket_name return normalized def load_source_map() -> Dict[str, Any]: with SOURCE_MAP_PATH.open("r", encoding="utf-8") as handle: data = yaml.safe_load(handle) if not isinstance(data, dict) or "systems" not in data: raise ValueError("source-map.yaml must contain a top-level 'systems' list") systems = data["systems"] if not isinstance(systems, list): raise ValueError("'systems' must be a list") normalized_systems: List[Dict[str, Any]] = [] for system in systems: cloned = dict(system or {}) for bucket_name in SOURCE_BUCKETS: sources = cloned.get(bucket_name) or [] cloned[bucket_name] = [_normalize_source(source, bucket_name) for source in sources] normalized_systems.append(cloned) return {**data, "systems": normalized_systems} def load_repro_map() -> Dict[str, Any]: if not REPRO_MAP_PATH.exists(): return {"systems": []} with REPRO_MAP_PATH.open("r", encoding="utf-8") as handle: data = yaml.safe_load(handle) or {} if not isinstance(data, dict) or "systems" not in data: return {"systems": []} return data def get_systems_by_group(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]: groups: Dict[str, List[Dict[str, Any]]] = {} for system in source_map["systems"]: output_dir = Path(system["output_dir"]) parts = output_dir.parts if len(parts) < 3: raise ValueError(f"output_dir too short for system {system['system_id']}") group = parts[1] groups.setdefault(group, []).append(system) return groups def iter_sources( system: Dict[str, Any], *, include_retired: bool = True, ) -> Iterable[Tuple[str, Dict[str, Any]]]: for bucket_name in SOURCE_BUCKETS: for source in system.get(bucket_name, []) or []: if not include_retired and source.get("status") == "retired": continue yield bucket_name, source def iter_all_sources( source_map: Dict[str, Any], *, include_retired: bool = True, ) -> Iterable[Tuple[Dict[str, Any], str, Dict[str, Any]]]: for system in source_map.get("systems", []) or []: for bucket_name, source in iter_sources(system, include_retired=include_retired): yield system, bucket_name, source