From 826a9074558a8b6af9cd6273b6e162b3e262665b Mon Sep 17 00:00:00 2001 From: hao Date: Thu, 19 Mar 2026 02:29:34 -0700 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0:=203=20=E4=B8=AA=E6=96=87?= =?UTF-8?q?=E4=BB=B6=20-=202026-03-19=2002:29:34?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 08-threat-intel/README.md | 2 + 08-threat-intel/source-map.yaml | 9 ++ scripts/intel/http_client.py | 235 +++++++++++++++++++++++++++----- 3 files changed, 210 insertions(+), 36 deletions(-) diff --git a/08-threat-intel/README.md b/08-threat-intel/README.md index ead4748e..54b50c11 100644 --- a/08-threat-intel/README.md +++ b/08-threat-intel/README.md @@ -72,6 +72,8 @@ python3 /Users/x/websafe/scripts/lab/main.py serve-dashboard --port 8734 - GitHub Global Advisories 在未认证状态下很容易碰到 rate limit;配置后能提高配额。 - `NVD_API_KEY` - 可选,用于提高 NVD 查询配额和稳定性。 +- `WEBSAFE_HTTP_CACHE_TTL_SECONDS` + - 可选,控制 HTML / RSS / Atom / JSON / KEV GET 抓取缓存秒数;默认 `900`,用于降低 probe 与 ingest 的重复网络开销。 本机私有环境文件: diff --git a/08-threat-intel/source-map.yaml b/08-threat-intel/source-map.yaml index 1e170b29..f928e7e3 100644 --- a/08-threat-intel/source-map.yaml +++ b/08-threat-intel/source-map.yaml @@ -2228,6 +2228,9 @@ systems: advisory_mode: core keywords: [security release, gitlab] max_items: 50 + status: retired + retired_reason: GitLab Security Releases Atom is the official machine-readable replacement; keeping both active adds duplicate cold-start cost without added coverage. + replacement_sources: [GitLab Security Releases Atom] - name: GitLab Security Releases Atom kind: atom-feed url: https://about.gitlab.com/security-releases.xml @@ -2275,6 +2278,9 @@ systems: advisory_mode: core keywords: [jenkins] max_items: 60 + status: retired + retired_reason: Jenkins Security Advisories RSS is the official machine-readable replacement; keeping both active adds duplicate cold-start cost without added coverage. + replacement_sources: [Jenkins Security Advisories RSS] - name: Jenkins Security Advisories RSS kind: rss-feed url: https://www.jenkins.io/security/advisories/rss.xml @@ -2346,6 +2352,9 @@ systems: advisory_mode: core keywords: [kibana, elastic, security] max_items: 60 + status: retired + retired_reason: Elastic Security Announcements RSS is the official machine-readable replacement; keeping both active adds duplicate cold-start cost without added coverage. + replacement_sources: [Elastic Security Announcements RSS] - name: Elastic Security Announcements RSS kind: rss-feed url: https://discuss.elastic.co/c/announcements/security-announcements/31.rss diff --git a/scripts/intel/http_client.py b/scripts/intel/http_client.py index 1bc9c2d1..f1d3dc11 100644 --- a/scripts/intel/http_client.py +++ b/scripts/intel/http_client.py @@ -1,15 +1,29 @@ from __future__ import annotations +import base64 +import hashlib +import json +import os +import threading import time +from pathlib import Path from typing import Any, Dict import requests +from requests.structures import CaseInsensitiveDict -from intel.config import DEFAULT_HEALTH_POLICY, DEFAULT_REQUEST_POLICY +from intel.config import DEFAULT_HEALTH_POLICY, DEFAULT_REQUEST_POLICY, STATE_DIR +from intel.utils import isoformat, now_utc, parse_dt, read_json, write_json DEFAULT_TIMEOUT = 30 DEFAULT_USER_AGENT = str(DEFAULT_REQUEST_POLICY.get("user_agent") or "Mozilla/5.0") +DEFAULT_HTTP_CACHE_TTL_SECONDS = 15 * 60 +GET_CACHEABLE_SOURCE_KINDS = {"html-links", "vendor-index", "rss-feed", "atom-feed", "json-feed", "kev-json"} +_HTTP_CACHE_DIR = STATE_DIR / "cache" / "http" +_MEMORY_CACHE: Dict[str, Dict[str, Any]] = {} +_INFLIGHT: Dict[str, threading.Event] = {} +_CACHE_LOCK = threading.Lock() def _request_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: @@ -20,6 +34,123 @@ def _health_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: return {**DEFAULT_HEALTH_POLICY, **((source or {}).get("health_policy") or {})} +def _http_cache_ttl_seconds() -> int: + configured = os.environ.get("WEBSAFE_HTTP_CACHE_TTL_SECONDS") + if configured: + try: + return max(0, int(configured)) + except ValueError: + return DEFAULT_HTTP_CACHE_TTL_SECONDS + return DEFAULT_HTTP_CACHE_TTL_SECONDS + + +def _should_cache_get(method: str, source: Dict[str, Any] | None, headers: Dict[str, Any]) -> bool: + if method.upper() != "GET": + return False + if not source or source.get("kind") not in GET_CACHEABLE_SOURCE_KINDS: + return False + return "Authorization" not in headers + + +def _cache_key( + method: str, + url: str, + *, + headers: Dict[str, Any], + params: Dict[str, Any] | None, + allow_redirects: bool, + verify: bool, +) -> str: + normalized = json.dumps( + { + "method": method.upper(), + "url": url, + "params": params or {}, + "accept": headers.get("Accept", ""), + "user_agent": headers.get("User-Agent", ""), + "allow_redirects": allow_redirects, + "verify": verify, + }, + sort_keys=True, + separators=(",", ":"), + ) + return hashlib.sha1(normalized.encode("utf-8")).hexdigest() + + +def _cache_path(cache_key: str) -> Path: + return _HTTP_CACHE_DIR / f"{cache_key}.json" + + +def _response_payload(response: requests.Response) -> Dict[str, Any]: + return { + "fetched_at": isoformat(now_utc()), + "status_code": response.status_code, + "url": response.url, + "headers": dict(response.headers), + "encoding": response.encoding, + "reason": response.reason, + "content_b64": base64.b64encode(response.content).decode("ascii"), + } + + +def _restore_response(payload: Dict[str, Any]) -> requests.Response | None: + try: + content = base64.b64decode(payload.get("content_b64", "")) + except Exception: + return None + response = requests.Response() + response.status_code = int(payload.get("status_code") or 0) + response.url = str(payload.get("url") or "") + response.headers = CaseInsensitiveDict(payload.get("headers") or {}) + response.encoding = payload.get("encoding") + response.reason = payload.get("reason") + response._content = content + return response + + +def _load_cached_response(cache_key: str) -> requests.Response | None: + ttl_seconds = _http_cache_ttl_seconds() + if ttl_seconds <= 0: + return None + cached = _MEMORY_CACHE.get(cache_key) + if cached is None: + cached = read_json(_cache_path(cache_key), default=None) + if isinstance(cached, dict): + _MEMORY_CACHE[cache_key] = cached + if not isinstance(cached, dict): + return None + fetched_at = parse_dt(cached.get("fetched_at")) + if fetched_at is None: + return None + age = (now_utc() - fetched_at).total_seconds() + if age > ttl_seconds: + return None + return _restore_response(cached) + + +def _write_cached_response(cache_key: str, response: requests.Response) -> None: + payload = _response_payload(response) + _MEMORY_CACHE[cache_key] = payload + write_json(_cache_path(cache_key), payload) + + +def _acquire_inflight(cache_key: str) -> tuple[bool, threading.Event]: + with _CACHE_LOCK: + event = _INFLIGHT.get(cache_key) + if event is None: + event = threading.Event() + _INFLIGHT[cache_key] = event + return True, event + return False, event + + +def _release_inflight(cache_key: str) -> None: + with _CACHE_LOCK: + event = _INFLIGHT.pop(cache_key, None) + if event is not None: + event.set() + + def build_session(source: Dict[str, Any] | None = None) -> requests.Session: session = requests.Session() session.trust_env = True @@ -43,6 +174,7 @@ def request( request_policy = _request_policy(source) health_policy = _health_policy(source) client = session or build_session(source) + params = dict(kwargs.get("params") or {}) headers = dict(kwargs.pop("headers", {}) or {}) if "User-Agent" not in headers: headers["User-Agent"] = request_policy.get("user_agent") or DEFAULT_USER_AGENT @@ -53,40 +185,71 @@ def request( verify = kwargs.pop("verify", bool(request_policy.get("verify_tls", True))) status_retries = max(1, int(health_policy.get("retries") or 1)) backoff_seconds = float(health_policy.get("backoff_seconds") or 0.5) + cache_key = "" + cacheable = _should_cache_get(method, source, headers) + cache_leader = False + if cacheable: + cache_key = _cache_key( + method, + url, + headers=headers, + params=params, + allow_redirects=allow_redirects, + verify=verify, + ) + cached = _load_cached_response(cache_key) + if cached is not None: + return cached + cache_leader, event = _acquire_inflight(cache_key) + if not cache_leader: + event.wait(timeout=max(1, timeout_value + 5)) + cached = _load_cached_response(cache_key) + if cached is not None: + return cached last_error: Exception | None = None - for attempt in range(1, status_retries + 1): - try: - return client.request( - method, - url, - headers=headers, - timeout=timeout_value, - allow_redirects=allow_redirects, - verify=verify, - **kwargs, - ) - except requests.exceptions.SSLError as exc: - last_error = exc - if verify: - try: - return client.request( - method, - url, - headers=headers, - timeout=timeout_value, - allow_redirects=allow_redirects, - verify=False, - **kwargs, - ) - except requests.exceptions.RequestException as fallback_error: - last_error = fallback_error - if attempt < status_retries: - time.sleep(backoff_seconds * attempt) - except requests.exceptions.RequestException as exc: - last_error = exc - if attempt < status_retries: - time.sleep(backoff_seconds * attempt) - if last_error is not None: - raise last_error - raise RuntimeError(f"request failed without an exception for {method} {url}") + try: + for attempt in range(1, status_retries + 1): + try: + response = client.request( + method, + url, + headers=headers, + timeout=timeout_value, + allow_redirects=allow_redirects, + verify=verify, + **kwargs, + ) + if cacheable and response.ok: + _write_cached_response(cache_key, response) + return response + except requests.exceptions.SSLError as exc: + last_error = exc + if verify: + try: + response = client.request( + method, + url, + headers=headers, + timeout=timeout_value, + allow_redirects=allow_redirects, + verify=False, + **kwargs, + ) + if cacheable and response.ok: + _write_cached_response(cache_key, response) + return response + except requests.exceptions.RequestException as fallback_error: + last_error = fallback_error + if attempt < status_retries: + time.sleep(backoff_seconds * attempt) + except requests.exceptions.RequestException as exc: + last_error = exc + if attempt < status_retries: + time.sleep(backoff_seconds * attempt) + if last_error is not None: + raise last_error + raise RuntimeError(f"request failed without an exception for {method} {url}") + finally: + if cacheable and cache_leader: + _release_inflight(cache_key)