from __future__ import annotations from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from intel.models import Candidate from intel.utils import parse_dt from . import cisa_kev, github_global, html_links, nvd_api, osv_api, rss_feed HANDLERS = { "ghsa-global": github_global.fetch, "osv-batch": osv_api.fetch, "kev-json": cisa_kev.fetch, "nvd-search": nvd_api.fetch, "rss-feed": rss_feed.fetch, "html-links": html_links.fetch, } def _passes_since(candidate: Candidate, since_dt: Optional[datetime], include_undated: bool) -> bool: if since_dt is None: return True timestamps = [parse_dt(candidate.updated_at), parse_dt(candidate.published_at)] valid = [item for item in timestamps if item is not None] if not valid: return include_undated return max(valid) >= since_dt def collect_candidates( source_map: Dict[str, Any], since_dt: Optional[datetime] = None, tier: Optional[str] = None, include_undated: bool = False, ) -> Tuple[List[Candidate], List[str]]: all_candidates: List[Candidate] = [] failures: List[str] = [] for system in source_map["systems"]: if tier and system.get("tier") != tier: continue for bucket_name in ("official_sources", "ecosystem_sources", "research_sources"): for source in system.get(bucket_name, []): handler = HANDLERS.get(source["kind"]) if handler is None: failures.append(f"Unsupported source kind {source['kind']} for {system['system_id']}") continue try: items = handler(system, source) for item in items: if _passes_since(item, since_dt, include_undated): all_candidates.append(item) except Exception as exc: failures.append(f"{system['system_id']}::{source['name']}::{exc.__class__.__name__}") return all_candidates, failures