From 301d15e91e8cd5108ad29000a2d6292f17f2b22b Mon Sep 17 00:00:00 2001 From: hao Date: Wed, 18 Mar 2026 16:28:30 -0700 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0:=202=20=E4=B8=AA=E6=96=87?= =?UTF-8?q?=E4=BB=B6=20-=202026-03-18=2016:28:30?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/intel/sources/nvd_api.py | 74 ++++++++++++++++++++++++++++++-- scripts/intel/sources/runner.py | 6 +-- 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/scripts/intel/sources/nvd_api.py b/scripts/intel/sources/nvd_api.py index 1ec59b43..bea3bee1 100644 --- a/scripts/intel/sources/nvd_api.py +++ b/scripts/intel/sources/nvd_api.py @@ -3,19 +3,24 @@ from __future__ import annotations import os import threading import time +from hashlib import sha1 +from pathlib import Path from typing import Any, Dict, List import requests from intel.http_client import request from intel.models import Candidate -from intel.utils import unique +from intel.config import STATE_DIR +from intel.utils import isoformat, now_utc, parse_dt, read_json, unique, write_json API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" PUBLIC_INTERVAL_SECONDS = 7.0 +DEFAULT_CACHE_TTL_SECONDS = 6 * 60 * 60 _NVD_RATE_LOCK = threading.Lock() _NVD_LAST_REQUEST = 0.0 +_CACHE_DIR = STATE_DIR / "cache" / "nvd" def _wait_for_slot() -> None: @@ -39,6 +44,69 @@ def request_nvd(source: Dict[str, Any], headers: Dict[str, Any], params: Dict[st return response +def _cache_ttl_seconds() -> int: + configured = os.environ.get("WEBSAFE_NVD_CACHE_TTL_SECONDS") + if configured: + try: + return max(0, int(configured)) + except ValueError: + return DEFAULT_CACHE_TTL_SECONDS + return DEFAULT_CACHE_TTL_SECONDS + + +def _cache_key(params: Dict[str, Any]) -> str: + normalized = "&".join(f"{key}={params[key]}" for key in sorted(params)) + return sha1(normalized.encode("utf-8")).hexdigest() + + +def _cache_path(params: Dict[str, Any]) -> Path: + return _CACHE_DIR / f"{_cache_key(params)}.json" + + +def _load_cached_payload(params: Dict[str, Any]) -> Dict[str, Any] | None: + ttl_seconds = _cache_ttl_seconds() + if ttl_seconds <= 0: + return None + path = _cache_path(params) + cached = read_json(path, default=None) + if not isinstance(cached, dict): + return None + fetched_at = parse_dt(cached.get("fetched_at")) + if fetched_at is None: + return None + age = (now_utc() - fetched_at).total_seconds() + if age > ttl_seconds: + return None + payload = cached.get("payload") + return payload if isinstance(payload, dict) else None + + +def _write_cached_payload(params: Dict[str, Any], payload: Dict[str, Any]) -> None: + write_json( + _cache_path(params), + { + "fetched_at": isoformat(now_utc()), + "payload": payload, + }, + ) + + +def request_nvd_json(source: Dict[str, Any], headers: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]: + api_key = os.environ.get("NVD_API_KEY") + if not api_key: + cached = _load_cached_payload(params) + if cached is not None: + return cached + response = request_nvd(source, headers, params) + response.raise_for_status() + payload = response.json() + if not isinstance(payload, dict): + raise ValueError("NVD response payload was not an object") + if not api_key: + _write_cached_payload(params, payload) + return payload + + def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: params = { "keywordSearch": source.get("keyword") or system["display_name"], @@ -49,9 +117,7 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: if api_key: headers["apiKey"] = api_key - response = request_nvd(source, headers, params) - response.raise_for_status() - payload = response.json() + payload = request_nvd_json(source, headers, params) candidates: List[Candidate] = [] for item in payload.get("vulnerabilities", []): diff --git a/scripts/intel/sources/runner.py b/scripts/intel/sources/runner.py index 9489a7dd..88634c64 100644 --- a/scripts/intel/sources/runner.py +++ b/scripts/intel/sources/runner.py @@ -192,11 +192,7 @@ def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, An api_key = os.environ.get("NVD_API_KEY") if api_key: headers["apiKey"] = api_key - response = nvd_api.request_nvd(source, headers, params) - response.raise_for_status() - payload = response.json() - if not isinstance(payload, dict): - raise ValueError("NVD probe returned non-object payload") + payload = nvd_api.request_nvd_json(source, headers, params) return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))} if kind == "rss-feed": response = request("GET", source["url"], source=source)