from __future__ import annotations import base64 import hashlib import json import os import threading import time from pathlib import Path from typing import Any, Dict import requests from requests.structures import CaseInsensitiveDict from intel.config import DEFAULT_HEALTH_POLICY, DEFAULT_REQUEST_POLICY, STATE_DIR from intel.utils import isoformat, now_utc, parse_dt, read_json, write_json DEFAULT_TIMEOUT = 30 DEFAULT_USER_AGENT = str(DEFAULT_REQUEST_POLICY.get("user_agent") or "Mozilla/5.0") DEFAULT_HTTP_CACHE_TTL_SECONDS = 15 * 60 GET_CACHEABLE_SOURCE_KINDS = {"html-links", "vendor-index", "rss-feed", "atom-feed", "json-feed", "kev-json"} _HTTP_CACHE_DIR = STATE_DIR / "cache" / "http" _MEMORY_CACHE: Dict[str, Dict[str, Any]] = {} _INFLIGHT: Dict[str, threading.Event] = {} _CACHE_LOCK = threading.Lock() def _request_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: return {**DEFAULT_REQUEST_POLICY, **((source or {}).get("request_policy") or {})} def _health_policy(source: Dict[str, Any] | None = None) -> Dict[str, Any]: return {**DEFAULT_HEALTH_POLICY, **((source or {}).get("health_policy") or {})} def _http_cache_ttl_seconds() -> int: configured = os.environ.get("WEBSAFE_HTTP_CACHE_TTL_SECONDS") if configured: try: return max(0, int(configured)) except ValueError: return DEFAULT_HTTP_CACHE_TTL_SECONDS return DEFAULT_HTTP_CACHE_TTL_SECONDS def _should_cache_get(method: str, source: Dict[str, Any] | None, headers: Dict[str, Any]) -> bool: if method.upper() != "GET": return False if not source or source.get("kind") not in GET_CACHEABLE_SOURCE_KINDS: return False return "Authorization" not in headers def _cache_key( method: str, url: str, *, headers: Dict[str, Any], params: Dict[str, Any] | None, allow_redirects: bool, verify: bool, ) -> str: normalized = json.dumps( { "method": method.upper(), "url": url, "params": params or {}, "accept": headers.get("Accept", ""), "user_agent": headers.get("User-Agent", ""), "allow_redirects": allow_redirects, "verify": verify, }, sort_keys=True, separators=(",", ":"), ) return hashlib.sha1(normalized.encode("utf-8")).hexdigest() def _cache_path(cache_key: str) -> Path: return _HTTP_CACHE_DIR / f"{cache_key}.json" def _response_payload(response: requests.Response) -> Dict[str, Any]: return { "fetched_at": isoformat(now_utc()), "status_code": response.status_code, "url": response.url, "headers": dict(response.headers), "encoding": response.encoding, "reason": response.reason, "content_b64": base64.b64encode(response.content).decode("ascii"), } def _restore_response(payload: Dict[str, Any]) -> requests.Response | None: try: content = base64.b64decode(payload.get("content_b64", "")) except Exception: return None response = requests.Response() response.status_code = int(payload.get("status_code") or 0) response.url = str(payload.get("url") or "") response.headers = CaseInsensitiveDict(payload.get("headers") or {}) response.encoding = payload.get("encoding") response.reason = payload.get("reason") response._content = content return response def _load_cached_response(cache_key: str) -> requests.Response | None: ttl_seconds = _http_cache_ttl_seconds() if ttl_seconds <= 0: return None cached = _MEMORY_CACHE.get(cache_key) if cached is None: cached = read_json(_cache_path(cache_key), default=None) if isinstance(cached, dict): _MEMORY_CACHE[cache_key] = cached if not isinstance(cached, dict): return None fetched_at = parse_dt(cached.get("fetched_at")) if fetched_at is None: return None age = (now_utc() - fetched_at).total_seconds() if age > ttl_seconds: return None return _restore_response(cached) def _write_cached_response(cache_key: str, response: requests.Response) -> None: payload = _response_payload(response) _MEMORY_CACHE[cache_key] = payload write_json(_cache_path(cache_key), payload) def _acquire_inflight(cache_key: str) -> tuple[bool, threading.Event]: with _CACHE_LOCK: event = _INFLIGHT.get(cache_key) if event is None: event = threading.Event() _INFLIGHT[cache_key] = event return True, event return False, event def _release_inflight(cache_key: str) -> None: with _CACHE_LOCK: event = _INFLIGHT.pop(cache_key, None) if event is not None: event.set() def build_session(source: Dict[str, Any] | None = None) -> requests.Session: session = requests.Session() session.trust_env = True request_policy = _request_policy(source) headers = {"User-Agent": request_policy.get("user_agent") or DEFAULT_USER_AGENT} if request_policy.get("accept"): headers["Accept"] = request_policy["accept"] session.headers.update(headers) return session def request( method: str, url: str, *, source: Dict[str, Any] | None = None, session: requests.Session | None = None, timeout: int = DEFAULT_TIMEOUT, **kwargs: Any, ) -> requests.Response: request_policy = _request_policy(source) health_policy = _health_policy(source) client = session or build_session(source) params = dict(kwargs.get("params") or {}) headers = dict(kwargs.pop("headers", {}) or {}) if "User-Agent" not in headers: headers["User-Agent"] = request_policy.get("user_agent") or DEFAULT_USER_AGENT if request_policy.get("accept") and "Accept" not in headers: headers["Accept"] = request_policy["accept"] timeout_value = timeout if timeout != DEFAULT_TIMEOUT else int(request_policy.get("timeout_seconds") or DEFAULT_TIMEOUT) allow_redirects = kwargs.pop("allow_redirects", bool(request_policy.get("follow_redirects", True))) verify = kwargs.pop("verify", bool(request_policy.get("verify_tls", True))) status_retries = max(1, int(health_policy.get("retries") or 1)) backoff_seconds = float(health_policy.get("backoff_seconds") or 0.5) cache_key = "" cacheable = _should_cache_get(method, source, headers) cache_leader = False if cacheable: cache_key = _cache_key( method, url, headers=headers, params=params, allow_redirects=allow_redirects, verify=verify, ) cached = _load_cached_response(cache_key) if cached is not None: return cached cache_leader, event = _acquire_inflight(cache_key) if not cache_leader: event.wait(timeout=max(1, timeout_value + 5)) cached = _load_cached_response(cache_key) if cached is not None: return cached last_error: Exception | None = None try: for attempt in range(1, status_retries + 1): try: response = client.request( method, url, headers=headers, timeout=timeout_value, allow_redirects=allow_redirects, verify=verify, **kwargs, ) if cacheable and response.ok: _write_cached_response(cache_key, response) return response except requests.exceptions.SSLError as exc: last_error = exc if verify: try: response = client.request( method, url, headers=headers, timeout=timeout_value, allow_redirects=allow_redirects, verify=False, **kwargs, ) if cacheable and response.ok: _write_cached_response(cache_key, response) return response except requests.exceptions.RequestException as fallback_error: last_error = fallback_error if attempt < status_retries: time.sleep(backoff_seconds * attempt) except requests.exceptions.RequestException as exc: last_error = exc if attempt < status_retries: time.sleep(backoff_seconds * attempt) if last_error is not None: raise last_error raise RuntimeError(f"request failed without an exception for {method} {url}") finally: if cacheable and cache_leader: _release_inflight(cache_key)