from __future__ import annotations import json import os import re from hashlib import sha1 from pathlib import Path from typing import Any, Dict, List import requests from intel.config import STATE_DIR from intel.http_client import build_session, request from intel.models import Candidate from intel.utils import isoformat, now_utc, parse_dt, read_json, unique, write_json QUERY_BATCH_URL = "https://api.osv.dev/v1/querybatch" DETAIL_URL = "https://api.osv.dev/v1/vulns/{vuln_id}" CVSS_SCORE_RE = re.compile(r"/CVSS:3\.[01]/AV:[A-Z]/AC:[A-Z]/PR:[A-Z]/UI:[A-Z]/S:[A-Z]/C:[A-Z]/I:[A-Z]/A:[A-Z]") NUMERIC_SCORE_RE = re.compile(r"([0-9]+(?:\.[0-9]+)?)") DEFAULT_CACHE_TTL_SECONDS = 6 * 60 * 60 _CACHE_DIR = STATE_DIR / "cache" / "osv" def _cache_ttl_seconds() -> int: configured = os.environ.get("WEBSAFE_OSV_CACHE_TTL_SECONDS") if configured: try: return max(0, int(configured)) except ValueError: return DEFAULT_CACHE_TTL_SECONDS return DEFAULT_CACHE_TTL_SECONDS def _cache_key(value: str) -> str: return sha1(value.encode("utf-8")).hexdigest() def _cache_path(namespace: str, value: str) -> Path: return _CACHE_DIR / f"{namespace}-{_cache_key(value)}.json" def _load_cached_payload(namespace: str, value: str) -> Dict[str, Any] | None: ttl_seconds = _cache_ttl_seconds() if ttl_seconds <= 0: return None path = _cache_path(namespace, value) cached = read_json(path, default=None) if not isinstance(cached, dict): return None fetched_at = parse_dt(cached.get("fetched_at")) if fetched_at is None: return None age = (now_utc() - fetched_at).total_seconds() if age > ttl_seconds: return None payload = cached.get("payload") return payload if isinstance(payload, dict) else None def _write_cached_payload(namespace: str, value: str, payload: Dict[str, Any]) -> None: write_json( _cache_path(namespace, value), { "fetched_at": isoformat(now_utc()), "payload": payload, }, ) def _request_json( method: str, url: str, *, source: Dict[str, Any], cache_namespace: str, cache_key: str, session: requests.Session | None = None, json_body: Dict[str, Any] | None = None, ) -> Dict[str, Any]: cached = _load_cached_payload(cache_namespace, cache_key) if cached is not None: return cached response = request( method, url, source=source, session=session, json=json_body, headers={"User-Agent": "websafe-intel"}, ) response.raise_for_status() payload = response.json() if not isinstance(payload, dict): raise ValueError(f"OSV response payload was not an object for {url}") _write_cached_payload(cache_namespace, cache_key, payload) return payload def request_querybatch_json( source: Dict[str, Any], queries: List[Dict[str, Any]], *, session: requests.Session | None = None, ) -> Dict[str, Any]: body = {"queries": queries} return _request_json( "POST", QUERY_BATCH_URL, source=source, cache_namespace="querybatch", cache_key=json.dumps(body, sort_keys=True, separators=(",", ":")), session=session, json_body=body, ) def request_detail_json( source: Dict[str, Any], vuln_id: str, *, session: requests.Session | None = None, ) -> Dict[str, Any]: return _request_json( "GET", DETAIL_URL.format(vuln_id=vuln_id), source=source, cache_namespace="detail", cache_key=vuln_id, session=session, ) def _fixed_versions(vuln: Dict[str, Any]) -> List[str]: fixed = [] for affected in vuln.get("affected", []): for rng in affected.get("ranges", []): for event in rng.get("events", []): if event.get("fixed"): fixed.append(event["fixed"]) return unique(fixed) def _affected_versions(vuln: Dict[str, Any]) -> List[str]: versions = [] ranges = [] for affected in vuln.get("affected", []): versions.extend(affected.get("versions", [])[:20]) for rng in affected.get("ranges", []): introduced = None fixed = None last_affected = None limit = None for event in rng.get("events", []): introduced = introduced or event.get("introduced") fixed = fixed or event.get("fixed") last_affected = last_affected or event.get("last_affected") limit = limit or event.get("limit") if introduced or fixed or last_affected or limit: parts = [] if introduced: parts.append(f"introduced={introduced}") if last_affected: parts.append(f"last_affected={last_affected}") if fixed: parts.append(f"fixed<{fixed}") if limit: parts.append(f"limit<{limit}") ranges.append(", ".join(parts)) return unique(versions + ranges) def _severity(vuln: Dict[str, Any]) -> tuple[str, float | None]: best_score = None for sev in vuln.get("severity", []): score = sev.get("score", "") match = NUMERIC_SCORE_RE.search(score) if match: try: best_score = float(match.group(1)) break except ValueError: continue if best_score is None: return "unknown", None if best_score >= 9.0: return "critical", best_score if best_score >= 7.0: return "high", best_score if best_score >= 4.0: return "medium", best_score return "low", best_score def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: packages = system.get("package_names", []) if not packages: return [] since_dt = source.get("_since_dt") queries = [{"package": {"name": pkg["name"], "ecosystem": pkg["ecosystem"]}} for pkg in packages] session = build_session(source) payload = request_querybatch_json(source, queries, session=session) detail_cache: Dict[str, Dict[str, Any]] = {} candidates: List[Candidate] = [] for package, result in zip(packages, payload.get("results", [])): for summary in result.get("vulns", []): vuln_id = summary.get("id") if not vuln_id: continue modified = parse_dt(summary.get("modified")) if since_dt is not None and modified is not None and modified < since_dt: continue if vuln_id not in detail_cache: detail_cache[vuln_id] = request_detail_json(source, vuln_id, session=session) vuln = detail_cache[vuln_id] aliases = unique(vuln.get("aliases", []) + [vuln.get("id")]) refs = [ref.get("url") for ref in vuln.get("references", []) if ref.get("url")] severity, cvss_score = _severity(vuln) package_name = package["name"] if not package_name: for affected in vuln.get("affected", []): pkg = affected.get("package") or {} if pkg.get("name"): package_name = pkg["name"] break candidates.append( Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=refs[0] if refs else DETAIL_URL.format(vuln_id=vuln_id), title=vuln.get("summary") or vuln.get("id") or f"OSV advisory for {package['name']}", published_at=vuln.get("published"), updated_at=vuln.get("modified"), summary=vuln.get("details") or "", severity=severity, cvss_score=cvss_score, aliases=aliases, cve_ids=[item for item in aliases if item and item.startswith("CVE-")], ghsa_ids=[item for item in aliases if item and item.startswith("GHSA-")], osv_ids=[vuln.get("id")] if vuln.get("id") else [], affected_versions=_affected_versions(vuln), fixed_versions=_fixed_versions(vuln), package_name=package_name, references=refs, raw=vuln, ) ) return candidates