更新: 2 个文件 - 2026-03-18 16:28:30

这个提交包含在:
hao
2026-03-18 16:28:30 -07:00
父节点 df455d7fb5
当前提交 301d15e91e
修改 2 个文件,包含 71 行新增9 行删除

查看文件

@@ -3,19 +3,24 @@ from __future__ import annotations
import os import os
import threading import threading
import time import time
from hashlib import sha1
from pathlib import Path
from typing import Any, Dict, List from typing import Any, Dict, List
import requests import requests
from intel.http_client import request from intel.http_client import request
from intel.models import Candidate from intel.models import Candidate
from intel.utils import unique from intel.config import STATE_DIR
from intel.utils import isoformat, now_utc, parse_dt, read_json, unique, write_json
API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
PUBLIC_INTERVAL_SECONDS = 7.0 PUBLIC_INTERVAL_SECONDS = 7.0
DEFAULT_CACHE_TTL_SECONDS = 6 * 60 * 60
_NVD_RATE_LOCK = threading.Lock() _NVD_RATE_LOCK = threading.Lock()
_NVD_LAST_REQUEST = 0.0 _NVD_LAST_REQUEST = 0.0
_CACHE_DIR = STATE_DIR / "cache" / "nvd"
def _wait_for_slot() -> None: def _wait_for_slot() -> None:
@@ -39,6 +44,69 @@ def request_nvd(source: Dict[str, Any], headers: Dict[str, Any], params: Dict[st
return response return response
def _cache_ttl_seconds() -> int:
configured = os.environ.get("WEBSAFE_NVD_CACHE_TTL_SECONDS")
if configured:
try:
return max(0, int(configured))
except ValueError:
return DEFAULT_CACHE_TTL_SECONDS
return DEFAULT_CACHE_TTL_SECONDS
def _cache_key(params: Dict[str, Any]) -> str:
normalized = "&".join(f"{key}={params[key]}" for key in sorted(params))
return sha1(normalized.encode("utf-8")).hexdigest()
def _cache_path(params: Dict[str, Any]) -> Path:
return _CACHE_DIR / f"{_cache_key(params)}.json"
def _load_cached_payload(params: Dict[str, Any]) -> Dict[str, Any] | None:
ttl_seconds = _cache_ttl_seconds()
if ttl_seconds <= 0:
return None
path = _cache_path(params)
cached = read_json(path, default=None)
if not isinstance(cached, dict):
return None
fetched_at = parse_dt(cached.get("fetched_at"))
if fetched_at is None:
return None
age = (now_utc() - fetched_at).total_seconds()
if age > ttl_seconds:
return None
payload = cached.get("payload")
return payload if isinstance(payload, dict) else None
def _write_cached_payload(params: Dict[str, Any], payload: Dict[str, Any]) -> None:
write_json(
_cache_path(params),
{
"fetched_at": isoformat(now_utc()),
"payload": payload,
},
)
def request_nvd_json(source: Dict[str, Any], headers: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
api_key = os.environ.get("NVD_API_KEY")
if not api_key:
cached = _load_cached_payload(params)
if cached is not None:
return cached
response = request_nvd(source, headers, params)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict):
raise ValueError("NVD response payload was not an object")
if not api_key:
_write_cached_payload(params, payload)
return payload
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
params = { params = {
"keywordSearch": source.get("keyword") or system["display_name"], "keywordSearch": source.get("keyword") or system["display_name"],
@@ -49,9 +117,7 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
if api_key: if api_key:
headers["apiKey"] = api_key headers["apiKey"] = api_key
response = request_nvd(source, headers, params) payload = request_nvd_json(source, headers, params)
response.raise_for_status()
payload = response.json()
candidates: List[Candidate] = [] candidates: List[Candidate] = []
for item in payload.get("vulnerabilities", []): for item in payload.get("vulnerabilities", []):

查看文件

@@ -192,11 +192,7 @@ def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, An
api_key = os.environ.get("NVD_API_KEY") api_key = os.environ.get("NVD_API_KEY")
if api_key: if api_key:
headers["apiKey"] = api_key headers["apiKey"] = api_key
response = nvd_api.request_nvd(source, headers, params) payload = nvd_api.request_nvd_json(source, headers, params)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict):
raise ValueError("NVD probe returned non-object payload")
return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))} return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))}
if kind == "rss-feed": if kind == "rss-feed":
response = request("GET", source["url"], source=source) response = request("GET", source["url"], source=source)