diff --git a/scripts/intel/config.py b/scripts/intel/config.py index b3debfec..13eb3555 100644 --- a/scripts/intel/config.py +++ b/scripts/intel/config.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List, Tuple import yaml @@ -19,9 +19,96 @@ SECURE_CODE_ROOT = ROOT / "05-defense" / "secure-code" SOURCE_MAP_PATH = THREAT_INTEL_ROOT / "source-map.yaml" REPRO_MAP_PATH = THREAT_INTEL_ROOT / "repro-map.yaml" REPRO_PROFILES_DIR = THREAT_INTEL_ROOT / "repro-profiles" +MONITORING_DIR = REGISTRY_ROOT / "monitoring" +SOURCE_HEALTH_PATH = GENERATED_DIR / "source-health.json" +ALERTS_PATH = GENERATED_DIR / "alerts.json" +MONITOR_SUMMARY_PATH = GENERATED_DIR / "monitor-summary.json" +SOURCE_CATALOG_AUDIT_PATH = GENERATED_DIR / "source-catalog-audit.json" +SOURCE_CATALOG_AUDIT_MD_PATH = GENERATED_DIR / "source-catalog-audit.md" +RETIRED_SOURCES_PATH = GENERATED_DIR / "retired-sources.json" STATE_DIR = Path.home() / ".local" / "state" / "websafe-intel" STATE_PATH = STATE_DIR / "state.json" +SOURCE_BUCKETS = ("official_sources", "ecosystem_sources", "research_sources") +MACHINE_READABLE_SOURCE_KINDS = {"ghsa-global", "osv-batch", "nvd-search", "kev-json", "json-feed", "rss-feed", "atom-feed"} + +DEFAULT_REQUEST_POLICY = { + "user_agent": "websafe-intel", + "accept": "", + "timeout_seconds": 30, + "verify_tls": True, + "http_version": "default", + "follow_redirects": True, +} + +DEFAULT_HEALTH_POLICY = { + "retries": 3, + "backoff_seconds": 0.5, + "expected_format": "", + "expected_statuses": [200], +} + +DEFAULT_PARSER_HINTS = { + "keywords": [], + "selectors": [], + "include_url_patterns": [], + "exclude_url_patterns": [], + "date_extractors": [], +} + +DEFAULT_ACCEPT_BY_KIND = { + "rss-feed": "application/rss+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", + "atom-feed": "application/atom+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", + "json-feed": "application/json, text/json;q=0.9, */*;q=0.8", + "ghsa-global": "application/vnd.github+json, application/json;q=0.9, */*;q=0.8", + "osv-batch": "application/json, */*;q=0.8", + "nvd-search": "application/json, */*;q=0.8", + "kev-json": "application/json, */*;q=0.8", +} + +DEFAULT_FORMAT_BY_KIND = { + "html-links": "html", + "vendor-index": "html", + "rss-feed": "rss", + "atom-feed": "atom", + "json-feed": "json", + "ghsa-global": "json", + "osv-batch": "json", + "nvd-search": "json", + "kev-json": "json", +} + + +def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any]: + normalized = dict(source or {}) + normalized["status"] = normalized.get("status") or "active" + normalized["retired_reason"] = normalized.get("retired_reason") or "" + normalized["replacement_sources"] = list(normalized.get("replacement_sources") or []) + + request_policy = {**DEFAULT_REQUEST_POLICY, **(normalized.get("request_policy") or {})} + if not request_policy.get("accept"): + request_policy["accept"] = DEFAULT_ACCEPT_BY_KIND.get(normalized.get("kind"), "") + request_policy["timeout_seconds"] = int(request_policy.get("timeout_seconds") or DEFAULT_REQUEST_POLICY["timeout_seconds"]) + request_policy["follow_redirects"] = bool(request_policy.get("follow_redirects", True)) + request_policy["verify_tls"] = bool(request_policy.get("verify_tls", True)) + normalized["request_policy"] = request_policy + + health_policy = {**DEFAULT_HEALTH_POLICY, **(normalized.get("health_policy") or {})} + if not health_policy.get("expected_format"): + health_policy["expected_format"] = DEFAULT_FORMAT_BY_KIND.get(normalized.get("kind"), "") + statuses = health_policy.get("expected_statuses") or [200] + health_policy["expected_statuses"] = [int(item) for item in statuses] + health_policy["retries"] = int(health_policy.get("retries") or DEFAULT_HEALTH_POLICY["retries"]) + health_policy["backoff_seconds"] = float(health_policy.get("backoff_seconds") or DEFAULT_HEALTH_POLICY["backoff_seconds"]) + normalized["health_policy"] = health_policy + + parser_hints = {**DEFAULT_PARSER_HINTS, **(normalized.get("parser_hints") or {})} + if not parser_hints.get("keywords"): + parser_hints["keywords"] = list(normalized.get("keywords") or []) + normalized["parser_hints"] = parser_hints + normalized["bucket_name"] = bucket_name + return normalized + def load_source_map() -> Dict[str, Any]: with SOURCE_MAP_PATH.open("r", encoding="utf-8") as handle: @@ -33,7 +120,14 @@ def load_source_map() -> Dict[str, Any]: systems = data["systems"] if not isinstance(systems, list): raise ValueError("'systems' must be a list") - return data + normalized_systems: List[Dict[str, Any]] = [] + for system in systems: + cloned = dict(system or {}) + for bucket_name in SOURCE_BUCKETS: + sources = cloned.get(bucket_name) or [] + cloned[bucket_name] = [_normalize_source(source, bucket_name) for source in sources] + normalized_systems.append(cloned) + return {**data, "systems": normalized_systems} def load_repro_map() -> Dict[str, Any]: @@ -56,3 +150,25 @@ def get_systems_by_group(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, group = parts[1] groups.setdefault(group, []).append(system) return groups + + +def iter_sources( + system: Dict[str, Any], + *, + include_retired: bool = True, +) -> Iterable[Tuple[str, Dict[str, Any]]]: + for bucket_name in SOURCE_BUCKETS: + for source in system.get(bucket_name, []) or []: + if not include_retired and source.get("status") == "retired": + continue + yield bucket_name, source + + +def iter_all_sources( + source_map: Dict[str, Any], + *, + include_retired: bool = True, +) -> Iterable[Tuple[Dict[str, Any], str, Dict[str, Any]]]: + for system in source_map.get("systems", []) or []: + for bucket_name, source in iter_sources(system, include_retired=include_retired): + yield system, bucket_name, source diff --git a/scripts/intel/http_client.py b/scripts/intel/http_client.py index 6bf88565..313ddef3 100644 --- a/scripts/intel/http_client.py +++ b/scripts/intel/http_client.py @@ -1,24 +1,36 @@ from __future__ import annotations +import time from typing import Any, Dict import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from intel.config import DEFAULT_HEALTH_POLICY, DEFAULT_REQUEST_POLICY + DEFAULT_TIMEOUT = 30 DEFAULT_USER_AGENT = "websafe-intel" -def build_session() -> requests.Session: +def _request_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: + return {**DEFAULT_REQUEST_POLICY, **((source or {}).get("request_policy") or {})} + + +def _health_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: + return {**DEFAULT_HEALTH_POLICY, **((source or {}).get("health_policy") or {})} + + +def build_session(source: Dict[str, Any] | None = None) -> requests.Session: + health_policy = _health_policy(source) session = requests.Session() retry = Retry( - total=3, - connect=3, - read=3, - status=3, - backoff_factor=0.5, + total=int(health_policy.get("retries") or 3), + connect=int(health_policy.get("retries") or 3), + read=int(health_policy.get("retries") or 3), + status=int(health_policy.get("retries") or 3), + backoff_factor=float(health_policy.get("backoff_seconds") or 0.5), allowed_methods=frozenset(["GET", "POST"]), status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False, @@ -26,7 +38,11 @@ def build_session() -> requests.Session: adapter = HTTPAdapter(max_retries=retry) session.mount("https://", adapter) session.mount("http://", adapter) - session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) + request_policy = _request_policy(source) + headers = {"User-Agent": request_policy.get("user_agent") or DEFAULT_USER_AGENT} + if request_policy.get("accept"): + headers["Accept"] = request_policy["accept"] + session.headers.update(headers) return session @@ -34,16 +50,60 @@ def request( method: str, url: str, *, + source: Dict[str, Any] | None = None, session: requests.Session | None = None, timeout: int = DEFAULT_TIMEOUT, **kwargs: Any, ) -> requests.Response: - client = session or build_session() + request_policy = _request_policy(source) + health_policy = _health_policy(source) + client = session or build_session(source) headers = dict(kwargs.pop("headers", {}) or {}) if "User-Agent" not in headers: - headers["User-Agent"] = DEFAULT_USER_AGENT - try: - return client.request(method, url, headers=headers, timeout=timeout, **kwargs) - except requests.exceptions.SSLError: - return client.request(method, url, headers=headers, timeout=timeout, verify=False, **kwargs) + headers["User-Agent"] = request_policy.get("user_agent") or DEFAULT_USER_AGENT + if request_policy.get("accept") and "Accept" not in headers: + headers["Accept"] = request_policy["accept"] + if request_policy.get("http_version") == "1.1" and "Connection" not in headers: + headers["Connection"] = "close" + timeout_value = timeout if timeout != DEFAULT_TIMEOUT else int(request_policy.get("timeout_seconds") or DEFAULT_TIMEOUT) + allow_redirects = kwargs.pop("allow_redirects", bool(request_policy.get("follow_redirects", True))) + verify = kwargs.pop("verify", bool(request_policy.get("verify_tls", True))) + status_retries = max(1, int(health_policy.get("retries") or 1)) + backoff_seconds = float(health_policy.get("backoff_seconds") or 0.5) + last_error: Exception | None = None + for attempt in range(1, status_retries + 1): + try: + return client.request( + method, + url, + headers=headers, + timeout=timeout_value, + allow_redirects=allow_redirects, + verify=verify, + **kwargs, + ) + except requests.exceptions.SSLError as exc: + last_error = exc + if verify: + try: + return client.request( + method, + url, + headers=headers, + timeout=timeout_value, + allow_redirects=allow_redirects, + verify=False, + **kwargs, + ) + except requests.exceptions.RequestException as fallback_error: + last_error = fallback_error + if attempt < status_retries: + time.sleep(backoff_seconds * attempt) + except requests.exceptions.RequestException as exc: + last_error = exc + if attempt < status_retries: + time.sleep(backoff_seconds * attempt) + if last_error is not None: + raise last_error + raise RuntimeError(f"request failed without an exception for {method} {url}") diff --git a/scripts/intel/monitoring.py b/scripts/intel/monitoring.py new file mode 100644 index 00000000..79e1cde4 --- /dev/null +++ b/scripts/intel/monitoring.py @@ -0,0 +1,377 @@ +from __future__ import annotations + +from datetime import timedelta +from pathlib import Path +from typing import Any, Dict, List + +from intel.config import ( + ALERTS_PATH, + MACHINE_READABLE_SOURCE_KINDS, + MONITORING_DIR, + MONITOR_SUMMARY_PATH, + RETIRED_SOURCES_PATH, + SOURCE_CATALOG_AUDIT_MD_PATH, + SOURCE_CATALOG_AUDIT_PATH, + SOURCE_HEALTH_PATH, + iter_all_sources, +) +from intel.utils import ensure_dir, isoformat, now_utc, parse_dt, read_json, write_json, write_text + + +def source_key(system_id: str, source_name: str) -> str: + return f"{system_id}::{source_name}" + + +def normalize_failures(items: List[Any]) -> List[Dict[str, Any]]: + normalized: List[Dict[str, Any]] = [] + for item in items or []: + if isinstance(item, dict): + cloned = dict(item) + if not cloned.get("summary"): + cloned["summary"] = f"{cloned.get('system_id')}::{cloned.get('source_name')}::{cloned.get('category')}::{cloned.get('message') or cloned.get('exception')}" + normalized.append(cloned) + continue + if not isinstance(item, str): + continue + parts = item.split("::", 3) + system_id = parts[0] if len(parts) > 0 else "unknown" + source_name = parts[1] if len(parts) > 1 else "unknown" + exception = parts[2] if len(parts) > 2 else "UnknownError" + message = parts[3] if len(parts) > 3 else exception + category = "tls" if "ssl" in exception.lower() else "http_status" if "http" in exception.lower() else "network" + normalized.append( + { + "system_id": system_id, + "display_name": system_id, + "source_name": source_name, + "source_kind": "", + "source_bucket": "", + "category": category, + "exception": exception, + "message": message, + "status_code": None, + "url": "", + "summary": item, + } + ) + return normalized + + +def build_source_catalog_audit(source_map: Dict[str, Any]) -> Dict[str, Any]: + systems_payload: List[Dict[str, Any]] = [] + retired_sources: List[Dict[str, Any]] = [] + replacement_map: List[Dict[str, Any]] = [] + total_sources = 0 + active_sources = 0 + retired_count = 0 + systems_with_active_official = 0 + systems_with_machine_source = 0 + + for system in source_map.get("systems", []) or []: + active_official = 0 + active_machine = 0 + bucket_counts = {"official_sources": 0, "ecosystem_sources": 0, "research_sources": 0} + retired_for_system: List[Dict[str, Any]] = [] + for bucket_name, source in ((bucket, src) for _system, bucket, src in iter_all_sources({"systems": [system]}, include_retired=True)): + total_sources += 1 + if source.get("status") == "retired": + retired_count += 1 + retired_entry = { + "system_id": system["system_id"], + "display_name": system["display_name"], + "source_name": source["name"], + "bucket": bucket_name, + "kind": source["kind"], + "retired_reason": source.get("retired_reason") or "", + "replacement_sources": source.get("replacement_sources") or [], + "url": source.get("url") or "", + } + retired_sources.append(retired_entry) + retired_for_system.append(retired_entry) + replacement_map.append( + { + "system_id": system["system_id"], + "retired_source": source["name"], + "replacement_sources": source.get("replacement_sources") or [], + } + ) + continue + active_sources += 1 + bucket_counts[bucket_name] += 1 + if bucket_name == "official_sources": + active_official += 1 + if source.get("kind") in MACHINE_READABLE_SOURCE_KINDS: + active_machine += 1 + if active_official: + systems_with_active_official += 1 + if active_machine: + systems_with_machine_source += 1 + systems_payload.append( + { + "system_id": system["system_id"], + "display_name": system["display_name"], + "category": system.get("category"), + "tier": system.get("tier"), + "source_total": sum(bucket_counts.values()) + len(retired_for_system), + "active_source_total": sum(bucket_counts.values()), + "retired_source_total": len(retired_for_system), + "official_active": bucket_counts["official_sources"], + "ecosystem_active": bucket_counts["ecosystem_sources"], + "research_active": bucket_counts["research_sources"], + "machine_readable_active": active_machine, + "has_active_official": bool(active_official), + "has_machine_readable_source": bool(active_machine), + } + ) + + audit = { + "generated_at": isoformat(now_utc()), + "system_count": len(source_map.get("systems", []) or []), + "source_count": total_sources, + "active_source_count": active_sources, + "retired_source_count": retired_count, + "systems_with_active_official": systems_with_active_official, + "systems_with_machine_readable_source": systems_with_machine_source, + "systems": sorted(systems_payload, key=lambda item: item["system_id"]), + "retired_sources": sorted(retired_sources, key=lambda item: (item["system_id"], item["source_name"])), + "replacement_map": sorted(replacement_map, key=lambda item: (item["system_id"], item["retired_source"])), + } + return audit + + +def write_source_catalog_audit(source_map: Dict[str, Any]) -> Dict[str, Any]: + audit = build_source_catalog_audit(source_map) + lines = [ + "# Source Catalog Audit", + "", + f"- generated_at: `{audit['generated_at']}`", + f"- systems: `{audit['system_count']}`", + f"- sources: `{audit['source_count']}`", + f"- active_sources: `{audit['active_source_count']}`", + f"- retired_sources: `{audit['retired_source_count']}`", + f"- systems_with_active_official: `{audit['systems_with_active_official']}/{audit['system_count']}`", + f"- systems_with_machine_readable_source: `{audit['systems_with_machine_readable_source']}/{audit['system_count']}`", + "", + "## Retired Sources", + "", + ] + if audit["retired_sources"]: + for item in audit["retired_sources"]: + replacements = ", ".join(item.get("replacement_sources") or []) or "-" + lines.append( + f"- `{item['system_id']}` `{item['source_name']}` -> replacements: `{replacements}` | reason: {item.get('retired_reason') or 'n/a'}" + ) + else: + lines.append("- none") + write_json(SOURCE_CATALOG_AUDIT_PATH, audit) + write_text(SOURCE_CATALOG_AUDIT_MD_PATH, "\n".join(lines)) + write_json(RETIRED_SOURCES_PATH, audit["retired_sources"]) + return audit + + +def build_source_health_snapshot( + source_map: Dict[str, Any], + probes: List[Dict[str, Any]], + failures: List[Dict[str, Any]], + *, + previous: Dict[str, Any] | None = None, + retries_performed: int = 0, +) -> Dict[str, Any]: + normalized_failures = normalize_failures(failures) + active_source_total = sum(1 for _system, _bucket, _source in iter_all_sources(source_map, include_retired=False)) + green_sources = max(0, active_source_total - len(normalized_failures)) + systems: Dict[str, Dict[str, Any]] = {} + for system in source_map.get("systems", []) or []: + systems[system["system_id"]] = { + "system_id": system["system_id"], + "display_name": system["display_name"], + "active_source_total": 0, + "green_source_total": 0, + "failure_count": 0, + } + for system, _bucket, _source in iter_all_sources(source_map, include_retired=False): + systems[system["system_id"]]["active_source_total"] += 1 + for probe in probes: + systems.setdefault( + probe["system_id"], + {"system_id": probe["system_id"], "display_name": probe.get("display_name", probe["system_id"]), "active_source_total": 0, "green_source_total": 0, "failure_count": 0}, + )["green_source_total"] += 1 + for failure in normalized_failures: + systems.setdefault( + failure["system_id"], + {"system_id": failure["system_id"], "display_name": failure.get("display_name", failure["system_id"]), "active_source_total": 0, "green_source_total": 0, "failure_count": 0}, + )["failure_count"] += 1 + + generated_at = isoformat(now_utc()) + all_green = not normalized_failures + previous = previous or {} + last_fully_green_run = generated_at if all_green else previous.get("last_fully_green_run") + return { + "generated_at": generated_at, + "active_source_count": active_source_total, + "green_source_count": green_sources, + "failure_count": len(normalized_failures), + "all_green": all_green, + "last_fully_green_run": last_fully_green_run, + "retries_performed": retries_performed, + "probes": sorted(probes, key=lambda item: (item["system_id"], item["source_name"])), + "failures": normalized_failures, + "systems": sorted(systems.values(), key=lambda item: item["system_id"]), + } + + +def write_source_health(snapshot: Dict[str, Any]) -> None: + write_json(SOURCE_HEALTH_PATH, snapshot) + + +def build_alerts( + current_failures: List[Dict[str, Any]], + *, + previous_alerts: List[Dict[str, Any]] | None = None, + bootstrap_failures: List[Dict[str, Any]] | None = None, + generated_at: str | None = None, +) -> List[Dict[str, Any]]: + now_value = generated_at or isoformat(now_utc()) + normalized_current = {source_key(item["system_id"], item["source_name"]): item for item in normalize_failures(current_failures)} + + previous_entries = list(previous_alerts or []) + if not previous_entries and bootstrap_failures: + for item in normalize_failures(bootstrap_failures): + previous_entries.append( + { + "alert_id": source_key(item["system_id"], item["source_name"]), + "system_id": item["system_id"], + "display_name": item.get("display_name", item["system_id"]), + "source_name": item["source_name"], + "source_kind": item.get("source_kind"), + "status": "open", + "opened_at": now_value, + "updated_at": now_value, + "resolved_at": None, + "failure_streak": 1, + "last_category": item.get("category"), + "last_failure": item, + } + ) + + previous_by_key = {item["alert_id"]: item for item in previous_entries if item.get("alert_id")} + next_alerts: List[Dict[str, Any]] = [] + + for alert_id, failure in normalized_current.items(): + previous = previous_by_key.get(alert_id) + if previous and previous.get("status") == "open": + streak = int(previous.get("failure_streak") or 0) + 1 + opened_at = previous.get("opened_at") or now_value + else: + streak = 1 + opened_at = now_value + next_alerts.append( + { + "alert_id": alert_id, + "system_id": failure["system_id"], + "display_name": failure.get("display_name", failure["system_id"]), + "source_name": failure["source_name"], + "source_kind": failure.get("source_kind"), + "status": "open", + "opened_at": opened_at, + "updated_at": now_value, + "resolved_at": None, + "failure_streak": streak, + "last_category": failure.get("category"), + "last_failure": failure, + } + ) + + for alert_id, previous in previous_by_key.items(): + if alert_id in normalized_current: + continue + if previous.get("status") == "open": + resolved = dict(previous) + resolved["status"] = "resolved" + resolved["updated_at"] = now_value + resolved["resolved_at"] = now_value + next_alerts.append(resolved) + else: + next_alerts.append(previous) + + return sorted(next_alerts, key=lambda item: (item.get("status") != "open", item.get("system_id"), item.get("source_name"))) + + +def write_alerts(alerts: List[Dict[str, Any]]) -> None: + write_json(ALERTS_PATH, alerts) + + +def _history_filename(generated_at: str) -> Path: + safe_name = generated_at.replace(":", "-") + return MONITORING_DIR / f"{safe_name}.json" + + +def _prune_monitoring_history(now_value: str) -> None: + ensure_dir(MONITORING_DIR) + current_dt = parse_dt(now_value) + if current_dt is None: + return + cutoff = current_dt - timedelta(days=90) + for path in sorted(MONITORING_DIR.glob("*.json")): + stem = path.stem.replace("-", ":", 2) + snapshot_dt = parse_dt(stem) + if snapshot_dt is None: + continue + if snapshot_dt < cutoff: + path.unlink() + + +def write_monitoring_state( + *, + audit: Dict[str, Any], + source_health: Dict[str, Any], + alerts: List[Dict[str, Any]], + ingest_summary: Dict[str, Any], + validation_errors: List[str], +) -> Dict[str, Any]: + open_alerts = [item for item in alerts if item.get("status") == "open"] + generated_at = source_health.get("generated_at") or isoformat(now_utc()) + summary = { + "generated_at": generated_at, + "active_source_count": source_health.get("active_source_count", 0), + "green_source_count": source_health.get("green_source_count", 0), + "source_failure_count": source_health.get("failure_count", 0), + "open_alert_count": len(open_alerts), + "resolved_alert_count": len([item for item in alerts if item.get("status") == "resolved"]), + "last_fully_green_run": source_health.get("last_fully_green_run"), + "source_catalog": { + "system_count": audit.get("system_count", 0), + "source_count": audit.get("source_count", 0), + "retired_source_count": audit.get("retired_source_count", 0), + }, + "ingest": { + "new_count": ingest_summary.get("new_count", 0), + "updated_count": ingest_summary.get("updated_count", 0), + "failure_count": len(normalize_failures(ingest_summary.get("failures", []))), + "systems_touched": ingest_summary.get("systems_touched", []), + }, + "validation": { + "passed": not validation_errors, + "error_count": len(validation_errors), + "errors": validation_errors, + }, + } + snapshot = { + "generated_at": generated_at, + "source_catalog_audit": audit, + "source_health": source_health, + "alerts": alerts, + "monitor_summary": summary, + } + write_json(MONITOR_SUMMARY_PATH, summary) + write_json(_history_filename(generated_at), snapshot) + _prune_monitoring_history(generated_at) + return summary + + +def read_previous_source_health() -> Dict[str, Any]: + return read_json(SOURCE_HEALTH_PATH, default={}) or {} + + +def read_previous_alerts() -> List[Dict[str, Any]]: + return read_json(ALERTS_PATH, default=[]) or [] diff --git a/scripts/intel/sources/atom_feed.py b/scripts/intel/sources/atom_feed.py new file mode 100644 index 00000000..748e7c06 --- /dev/null +++ b/scripts/intel/sources/atom_feed.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import xml.etree.ElementTree as ET +from typing import Any, Dict, List + +from intel.http_client import request +from intel.models import Candidate + + +ATOM_NS = {"atom": "http://www.w3.org/2005/Atom"} + + +def _node_text(node: ET.Element, path: str) -> str: + child = node.find(path, ATOM_NS) + return child.text.strip() if child is not None and child.text else "" + + +def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: + response = request("GET", source["url"], source=source) + response.raise_for_status() + root = ET.fromstring(response.content) + + parser_hints = source.get("parser_hints") or {} + keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} + candidates: List[Candidate] = [] + entries = root.findall(".//atom:entry", ATOM_NS) or root.findall(".//entry") + for entry in entries[: source.get("max_items", 50)]: + title = _node_text(entry, "atom:title") or _node_text(entry, "title") + link_node = entry.find("atom:link", ATOM_NS) or entry.find("link") + link = "" + if link_node is not None: + link = (link_node.get("href") or "").strip() + summary = _node_text(entry, "atom:summary") or _node_text(entry, "summary") or _node_text(entry, "atom:content") + if keywords: + haystack = " ".join(filter(None, [title, summary, link])).lower() + if not any(keyword in haystack for keyword in keywords): + continue + candidates.append( + Candidate( + system_id=system["system_id"], + display_name=system["display_name"], + category=system["category"], + advisory_mode=source.get("advisory_mode", "core"), + source_kind=source["kind"], + source_name=source["name"], + source_confidence=source["confidence"], + source_url=link or source["url"], + title=title or f"Atom entry for {system['display_name']}", + published_at=_node_text(entry, "atom:published") or _node_text(entry, "published"), + updated_at=_node_text(entry, "atom:updated") or _node_text(entry, "updated"), + summary=summary, + severity="unknown", + references=[link] if link else [source["url"]], + raw={"title": title, "link": link}, + ) + ) + return candidates diff --git a/scripts/intel/sources/cisa_kev.py b/scripts/intel/sources/cisa_kev.py index 81da5373..9d04e64d 100644 --- a/scripts/intel/sources/cisa_kev.py +++ b/scripts/intel/sources/cisa_kev.py @@ -10,7 +10,7 @@ from intel.utils import unique def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) response.raise_for_status() payload = response.json() diff --git a/scripts/intel/sources/github_global.py b/scripts/intel/sources/github_global.py index b7ff33b5..f6104ab9 100644 --- a/scripts/intel/sources/github_global.py +++ b/scripts/intel/sources/github_global.py @@ -31,6 +31,7 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: response = request( "GET", API_URL, + source=source, headers=headers, params={"per_page": 100, "page": page, "ecosystem": source.get("ecosystem")}, ) diff --git a/scripts/intel/sources/html_links.py b/scripts/intel/sources/html_links.py index e9f6e52d..17570c63 100644 --- a/scripts/intel/sources/html_links.py +++ b/scripts/intel/sources/html_links.py @@ -3,7 +3,7 @@ from __future__ import annotations import re from html import unescape from typing import Any, Dict, List -from urllib.parse import urljoin +from urllib.parse import urljoin, urlsplit, urlunsplit import requests @@ -16,11 +16,25 @@ ANCHOR_RE = re.compile(r"]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)", re.IGN TAG_RE = re.compile(r"<[^>]+>") +def canonicalize_url(url: str) -> str: + parsed = urlsplit(url) + return urlunsplit((parsed.scheme, parsed.netloc, parsed.path, parsed.query, "")) + + +def _matches_patterns(value: str, patterns: List[str]) -> bool: + if not patterns: + return True + return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns) + + def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) response.raise_for_status() html = response.text - keywords = {kw.lower() for kw in source.get("keywords", [])} + parser_hints = source.get("parser_hints") or {} + keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} + include_patterns = parser_hints.get("include_url_patterns") or [] + exclude_patterns = parser_hints.get("exclude_url_patterns") or [] candidates: List[Candidate] = [] seen = set() @@ -28,10 +42,14 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: title = unescape(TAG_RE.sub(" ", text)).strip() if not title: continue - absolute = urljoin(source["url"], href) + absolute = canonicalize_url(urljoin(source["url"], href)) haystack = f"{title} {absolute}".lower() if keywords and not any(keyword in haystack for keyword in keywords): continue + if include_patterns and not _matches_patterns(absolute, include_patterns): + continue + if exclude_patterns and _matches_patterns(absolute, exclude_patterns): + continue if absolute in seen: continue seen.add(absolute) diff --git a/scripts/intel/sources/json_feed.py b/scripts/intel/sources/json_feed.py new file mode 100644 index 00000000..f67d281b --- /dev/null +++ b/scripts/intel/sources/json_feed.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +from intel.http_client import request +from intel.models import Candidate +from intel.utils import unique + + +def _refs(item: Dict[str, Any]) -> List[str]: + values: List[str] = [] + for entry in item.get("references", []) or []: + if isinstance(entry, str): + values.append(entry) + elif isinstance(entry, dict) and entry.get("url"): + values.append(entry["url"]) + return unique(values) + + +def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: + response = request("GET", source["url"], source=source) + response.raise_for_status() + payload = response.json() + items = payload.get("items") or payload.get("entries") or payload.get("advisories") or [] + if not isinstance(items, list): + return [] + + parser_hints = source.get("parser_hints") or {} + keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} + candidates: List[Candidate] = [] + for item in items[: source.get("max_items", 50)]: + if not isinstance(item, dict): + continue + title = item.get("title") or item.get("name") or item.get("summary") or f"JSON entry for {system['display_name']}" + link = item.get("url") or item.get("external_url") or item.get("html_url") or source["url"] + summary = item.get("summary") or item.get("content_text") or item.get("description") or "" + if keywords: + haystack = " ".join(filter(None, [title, summary, link])).lower() + if not any(keyword in haystack for keyword in keywords): + continue + refs = _refs(item) + if link and link not in refs: + refs.insert(0, link) + candidates.append( + Candidate( + system_id=system["system_id"], + display_name=system["display_name"], + category=system["category"], + advisory_mode=source.get("advisory_mode", "core"), + source_kind=source["kind"], + source_name=source["name"], + source_confidence=source["confidence"], + source_url=link, + title=title, + published_at=item.get("date_published") or item.get("published_at") or item.get("published") or item.get("created_at"), + updated_at=item.get("date_modified") or item.get("updated_at") or item.get("modified") or item.get("updated"), + summary=summary, + severity=str(item.get("severity") or "unknown").lower(), + aliases=unique(item.get("aliases", []) or [item.get("id")]), + references=refs, + raw=item, + ) + ) + return candidates diff --git a/scripts/intel/sources/nvd_api.py b/scripts/intel/sources/nvd_api.py index 858258ab..3e4bb2ff 100644 --- a/scripts/intel/sources/nvd_api.py +++ b/scripts/intel/sources/nvd_api.py @@ -23,7 +23,7 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: if api_key: headers["apiKey"] = api_key - response = request("GET", API_URL, headers=headers, params=params) + response = request("GET", API_URL, source=source, headers=headers, params=params) response.raise_for_status() payload = response.json() diff --git a/scripts/intel/sources/osv_api.py b/scripts/intel/sources/osv_api.py index 78ddb353..f6fc80bc 100644 --- a/scripts/intel/sources/osv_api.py +++ b/scripts/intel/sources/osv_api.py @@ -94,10 +94,11 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: return [] queries = [{"package": {"name": pkg["name"], "ecosystem": pkg["ecosystem"]}} for pkg in packages] - session = build_session() + session = build_session(source) response = request( "POST", QUERY_BATCH_URL, + source=source, session=session, json={"queries": queries}, headers={"User-Agent": "websafe-intel"}, diff --git a/scripts/intel/sources/rss_feed.py b/scripts/intel/sources/rss_feed.py index e5437384..67e69bb1 100644 --- a/scripts/intel/sources/rss_feed.py +++ b/scripts/intel/sources/rss_feed.py @@ -15,11 +15,12 @@ def _text(node: ET.Element, name: str) -> str: def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) response.raise_for_status() root = ET.fromstring(response.content) - keywords = {kw.lower() for kw in source.get("keywords", [])} + parser_hints = source.get("parser_hints") or {} + keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} items = root.findall(".//item") candidates: List[Candidate] = [] for item in items[: source.get("max_items", 50)]: diff --git a/scripts/intel/sources/runner.py b/scripts/intel/sources/runner.py index 1e115c2e..a2d73ab9 100644 --- a/scripts/intel/sources/runner.py +++ b/scripts/intel/sources/runner.py @@ -8,11 +8,12 @@ from typing import Any, Dict, List, Optional, Tuple import requests +from intel.config import iter_all_sources from intel.http_client import request from intel.models import Candidate from intel.utils import parse_dt -from . import cisa_kev, github_global, html_links, nvd_api, osv_api, rss_feed +from . import atom_feed, cisa_kev, github_global, html_links, json_feed, nvd_api, osv_api, rss_feed, vendor_index HANDLERS = { @@ -21,11 +22,59 @@ HANDLERS = { "kev-json": cisa_kev.fetch, "nvd-search": nvd_api.fetch, "rss-feed": rss_feed.fetch, + "atom-feed": atom_feed.fetch, + "json-feed": json_feed.fetch, "html-links": html_links.fetch, + "vendor-index": vendor_index.fetch, } -def _probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any]: +def _failure_category(exc: Exception) -> str: + if isinstance(exc, requests.exceptions.SSLError): + return "tls" + if isinstance(exc, requests.exceptions.HTTPError): + response = getattr(exc, "response", None) + status = getattr(response, "status_code", None) + if status == 429: + return "rate_limit" + return "http_status" + if isinstance(exc, requests.exceptions.RequestException): + return "network" + if isinstance(exc, ET.ParseError): + return "parse" + if isinstance(exc, ValueError): + return "schema" + return "parse" + + +def failure_summary(failure: Dict[str, Any]) -> str: + if isinstance(failure, str): + return failure + return failure.get("summary") or f"{failure.get('system_id')}::{failure.get('source_name')}::{failure.get('category')}::{failure.get('exception')}" + + +def _build_failure(system: Dict[str, Any], source: Dict[str, Any], exc: Exception) -> Dict[str, Any]: + response = getattr(exc, "response", None) + status_code = getattr(response, "status_code", None) + category = _failure_category(exc) + message = str(exc).strip() or exc.__class__.__name__ + summary = f"{system['system_id']}::{source['name']}::{category}::{message}" + return { + "system_id": system["system_id"], + "display_name": system["display_name"], + "source_name": source["name"], + "source_kind": source["kind"], + "source_bucket": source.get("bucket_name"), + "category": category, + "exception": exc.__class__.__name__, + "message": message, + "status_code": status_code, + "url": source.get("url") or "", + "summary": summary, + } + + +def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any]: kind = source["kind"] if kind == "ghsa-global": headers = {"Accept": "application/vnd.github+json", "User-Agent": "websafe-intel"} @@ -35,6 +84,7 @@ def _probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, A response = request( "GET", github_global.API_URL, + source=source, headers=headers, params={"per_page": 1, "page": 1, "ecosystem": source.get("ecosystem")}, ) @@ -52,6 +102,7 @@ def _probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, A response = request( "POST", osv_api.QUERY_BATCH_URL, + source=source, json={"queries": [{"package": {"name": packages[0]["name"], "ecosystem": packages[0]["ecosystem"]}}]}, headers={"User-Agent": "websafe-intel"}, ) @@ -61,7 +112,7 @@ def _probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, A raise ValueError("OSV probe returned non-object payload") return {"kind": kind, "items_seen": len(payload.get("results", []))} if kind == "kev-json": - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) response.raise_for_status() payload = response.json() if not isinstance(payload, dict): @@ -76,19 +127,37 @@ def _probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, A api_key = os.environ.get("NVD_API_KEY") if api_key: headers["apiKey"] = api_key - response = request("GET", nvd_api.API_URL, headers=headers, params=params) + response = request("GET", nvd_api.API_URL, source=source, headers=headers, params=params) response.raise_for_status() payload = response.json() if not isinstance(payload, dict): raise ValueError("NVD probe returned non-object payload") return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))} if kind == "rss-feed": - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) response.raise_for_status() root = ET.fromstring(response.content) return {"kind": kind, "items_seen": len(root.findall(".//item"))} + if kind == "atom-feed": + response = request("GET", source["url"], source=source) + response.raise_for_status() + root = ET.fromstring(response.content) + return {"kind": kind, "items_seen": len(root.findall(".//{http://www.w3.org/2005/Atom}entry"))} + if kind == "json-feed": + response = request("GET", source["url"], source=source) + response.raise_for_status() + payload = response.json() + items = payload.get("items") or payload.get("entries") or payload.get("advisories") or [] + if not isinstance(items, list): + raise ValueError("JSON feed probe returned non-list items") + return {"kind": kind, "items_seen": len(items)} if kind == "html-links": - response = request("GET", source["url"]) + response = request("GET", source["url"], source=source) + response.raise_for_status() + html = response.text + return {"kind": kind, "items_seen": len(html_links.ANCHOR_RE.findall(html))} + if kind == "vendor-index": + response = request("GET", source["url"], source=source) response.raise_for_status() html = response.text return {"kind": kind, "items_seen": len(html_links.ANCHOR_RE.findall(html))} @@ -110,47 +179,59 @@ def collect_candidates( since_dt: Optional[datetime] = None, tier: Optional[str] = None, include_undated: bool = False, -) -> Tuple[List[Candidate], List[str]]: +) -> Tuple[List[Candidate], List[Dict[str, Any]]]: all_candidates: List[Candidate] = [] - failures: List[str] = [] + failures: List[Dict[str, Any]] = [] for system in source_map["systems"]: if tier and system.get("tier") != tier: continue - for bucket_name in ("official_sources", "ecosystem_sources", "research_sources"): - for source in system.get(bucket_name, []): - handler = HANDLERS.get(source["kind"]) - if handler is None: - failures.append(f"Unsupported source kind {source['kind']} for {system['system_id']}") - continue - try: - items = handler(system, source) - for item in items: - if _passes_since(item, since_dt, include_undated): - all_candidates.append(item) - except Exception as exc: - failures.append(f"{system['system_id']}::{source['name']}::{exc.__class__.__name__}") + for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False): + handler = HANDLERS.get(source["kind"]) + if handler is None: + failures.append( + { + "system_id": system["system_id"], + "display_name": system["display_name"], + "source_name": source["name"], + "source_kind": source["kind"], + "source_bucket": source.get("bucket_name"), + "category": "schema", + "exception": "UnsupportedSourceKind", + "message": f"Unsupported source kind {source['kind']}", + "status_code": None, + "url": source.get("url") or "", + "summary": f"{system['system_id']}::{source['name']}::schema::Unsupported source kind {source['kind']}", + } + ) + continue + try: + items = handler(system, source) + for item in items: + if _passes_since(item, since_dt, include_undated): + all_candidates.append(item) + except Exception as exc: + failures.append(_build_failure(system, source, exc)) return all_candidates, failures def probe_sources( source_map: Dict[str, Any], tier: Optional[str] = None, -) -> Tuple[List[Dict[str, Any]], List[str]]: +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = [] probes: List[Dict[str, Any]] = [] - failures: List[str] = [] + failures: List[Dict[str, Any]] = [] for system in source_map["systems"]: if tier and system.get("tier") != tier: continue - for bucket_name in ("official_sources", "ecosystem_sources", "research_sources"): - for source in system.get(bucket_name, []): - jobs.append((system, source)) + for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False): + jobs.append((system, source)) max_workers = min(16, max(4, len(jobs) or 1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_map = {executor.submit(_probe_source, system, source): (system, source) for system, source in jobs} + future_map = {executor.submit(probe_source, system, source): (system, source) for system, source in jobs} for future in as_completed(future_map): system, source = future_map[future] try: @@ -164,5 +245,15 @@ def probe_sources( } ) except Exception as exc: - failures.append(f"{system['system_id']}::{source['name']}::{exc.__class__.__name__}") + failures.append(_build_failure(system, source, exc)) return probes, failures + + +def find_source(source_map: Dict[str, Any], system_id: str, source_name: str) -> Tuple[Dict[str, Any], Dict[str, Any]] | None: + for system in source_map.get("systems", []) or []: + if system.get("system_id") != system_id: + continue + for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=True): + if source.get("name") == source_name: + return system, source + return None diff --git a/scripts/intel/sources/vendor_index.py b/scripts/intel/sources/vendor_index.py new file mode 100644 index 00000000..1890255c --- /dev/null +++ b/scripts/intel/sources/vendor_index.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import re +from html import unescape +from typing import Any, Dict, List +from urllib.parse import urljoin + +from intel.http_client import request +from intel.models import Candidate +from intel.utils import unique + +from .html_links import ANCHOR_RE, TAG_RE, canonicalize_url + + +def _matches(value: str, patterns: List[str]) -> bool: + if not patterns: + return True + return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns) + + +def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: + response = request("GET", source["url"], source=source) + response.raise_for_status() + html = response.text + parser_hints = source.get("parser_hints") or {} + keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} + include_patterns = parser_hints.get("include_url_patterns") or [] + exclude_patterns = parser_hints.get("exclude_url_patterns") or [] + + candidates: List[Candidate] = [] + seen = set() + for href, text in ANCHOR_RE.findall(html): + absolute = canonicalize_url(urljoin(source["url"], href)) + title = unescape(TAG_RE.sub(" ", text)).strip() + if not title: + continue + haystack = " ".join(filter(None, [absolute, title])).lower() + if keywords and not any(keyword in haystack for keyword in keywords): + continue + if include_patterns and not _matches(absolute, include_patterns): + continue + if exclude_patterns and _matches(absolute, exclude_patterns): + continue + if absolute in seen: + continue + seen.add(absolute) + candidates.append( + Candidate( + system_id=system["system_id"], + display_name=system["display_name"], + category=system["category"], + advisory_mode=source.get("advisory_mode", "core"), + source_kind=source["kind"], + source_name=source["name"], + source_confidence=source["confidence"], + source_url=absolute, + title=title, + summary="", + severity="unknown", + references=unique([absolute]), + raw={"href": absolute, "title": title}, + ) + ) + if len(candidates) >= source.get("max_items", 50): + break + return candidates