from __future__ import annotations import os from typing import Any, Dict, List import requests from intel.models import Candidate from intel.utils import unique API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: params = { "keywordSearch": source.get("keyword") or system["display_name"], "resultsPerPage": source.get("results_per_page", 50), } headers = {"User-Agent": "websafe-intel"} api_key = os.environ.get("NVD_API_KEY") if api_key: headers["apiKey"] = api_key response = requests.get(API_URL, headers=headers, params=params, timeout=30) response.raise_for_status() payload = response.json() candidates: List[Candidate] = [] for item in payload.get("vulnerabilities", []): cve = item.get("cve", {}) descriptions = cve.get("descriptions", []) description = next((d.get("value") for d in descriptions if d.get("lang") == "en"), "") metrics = cve.get("metrics", {}) severity = "unknown" cvss_score = None for key in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2"): entries = metrics.get(key, []) if entries: data = entries[0].get("cvssData", {}) severity = (entries[0].get("baseSeverity") or data.get("baseSeverity") or "unknown").lower() cvss_score = data.get("baseScore") break refs = [ref.get("url") for ref in cve.get("references", []) if ref.get("url")] candidates.append( Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=refs[0] if refs else API_URL, title=cve.get("id") or f"NVD advisory for {system['display_name']}", published_at=cve.get("published"), updated_at=cve.get("lastModified"), summary=description or "", severity=severity, cvss_score=cvss_score, aliases=unique([cve.get("id")]), cve_ids=[cve.get("id")] if cve.get("id") else [], references=refs, raw=item, ) ) return candidates