from __future__ import annotations from typing import Any, Dict, List from intel.http_client import request from intel.models import Candidate from intel.utils import unique def _refs(item: Dict[str, Any]) -> List[str]: values: List[str] = [] for entry in item.get("references", []) or []: if isinstance(entry, str): values.append(entry) elif isinstance(entry, dict) and entry.get("url"): values.append(entry["url"]) return unique(values) def _list_value(item: Dict[str, Any], *keys: str) -> List[str]: values: List[str] = [] for key in keys: raw = item.get(key) if isinstance(raw, str) and raw: values.append(raw) elif isinstance(raw, list): values.extend(str(entry) for entry in raw if entry) return unique(values) def _title(item: Dict[str, Any], system: Dict[str, Any]) -> str: for key in ("title", "name", "summary", "issue_id", "cve_id", "id"): value = item.get(key) if isinstance(value, str) and value.strip(): return value.strip() return f"JSON entry for {system['display_name']}" def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: response = request("GET", source["url"], source=source) response.raise_for_status() payload = response.json() if isinstance(payload, list): items = payload else: items = payload.get("items") or payload.get("entries") or payload.get("advisories") or [] if not isinstance(items, list): return [] parser_hints = source.get("parser_hints") or {} keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} candidates: List[Candidate] = [] for item in items[: source.get("max_items", 50)]: if not isinstance(item, dict): continue title = _title(item, system) link = item.get("url") or item.get("external_url") or item.get("html_url") or source["url"] summary = ( item.get("summary") or item.get("content_text") or item.get("description") or item.get("details") or "" ) if keywords: haystack = " ".join(filter(None, [title, summary, link])).lower() if not any(keyword in haystack for keyword in keywords): continue refs = _refs(item) if link and link not in refs: refs.insert(0, link) aliases = _list_value(item, "aliases", "id", "issue_id", "cve_id", "ghsa_id", "osv_id") cve_ids = [value for value in aliases if value.startswith("CVE-")] ghsa_ids = [value for value in aliases if value.startswith("GHSA-")] osv_ids = [value for value in aliases if value.startswith("OSV-")] candidates.append( Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=link, title=title, published_at=( item.get("date_published") or item.get("published_at") or item.get("published") or item.get("created_at") or item.get("fix_release_date") ), updated_at=( item.get("date_modified") or item.get("updated_at") or item.get("modified") or item.get("updated") or item.get("fix_release_date") ), summary=summary, severity=str(item.get("severity") or "unknown").lower(), aliases=aliases, cve_ids=cve_ids, ghsa_ids=ghsa_ids, osv_ids=osv_ids, affected_versions=_list_value(item, "affected_versions"), fixed_versions=_list_value(item, "fixed_versions", "fix_versions"), package_name=item.get("package_name") or item.get("platform"), references=refs, raw=item, ) ) return candidates