323 行
12 KiB
Python
323 行
12 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import xml.etree.ElementTree as ET
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from time import perf_counter
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
from intel.config import iter_all_sources
|
|
from intel.http_client import request
|
|
from intel.models import Candidate
|
|
from intel.utils import parse_dt
|
|
|
|
from . import atom_feed, cisa_kev, github_global, html_links, json_feed, nvd_api, osv_api, rss_feed, vendor_index
|
|
|
|
|
|
HANDLERS = {
|
|
"ghsa-global": github_global.fetch,
|
|
"osv-batch": osv_api.fetch,
|
|
"kev-json": cisa_kev.fetch,
|
|
"nvd-search": nvd_api.fetch,
|
|
"rss-feed": rss_feed.fetch,
|
|
"atom-feed": atom_feed.fetch,
|
|
"json-feed": json_feed.fetch,
|
|
"html-links": html_links.fetch,
|
|
"vendor-index": vendor_index.fetch,
|
|
}
|
|
|
|
DEFAULT_MAX_WORKERS = 20
|
|
MAX_WORKER_CAP = 32
|
|
|
|
|
|
def _failure_category(exc: Exception) -> str:
|
|
if isinstance(exc, requests.exceptions.SSLError):
|
|
return "tls"
|
|
if isinstance(exc, requests.exceptions.HTTPError):
|
|
response = getattr(exc, "response", None)
|
|
status = getattr(response, "status_code", None)
|
|
if status == 429:
|
|
return "rate_limit"
|
|
return "http_status"
|
|
if isinstance(exc, requests.exceptions.RequestException):
|
|
return "network"
|
|
if isinstance(exc, ET.ParseError):
|
|
return "parse"
|
|
if isinstance(exc, ValueError):
|
|
return "schema"
|
|
return "parse"
|
|
|
|
|
|
def failure_summary(failure: Dict[str, Any]) -> str:
|
|
if isinstance(failure, str):
|
|
return failure
|
|
return failure.get("summary") or f"{failure.get('system_id')}::{failure.get('source_name')}::{failure.get('category')}::{failure.get('exception')}"
|
|
|
|
|
|
def build_failure(
|
|
system: Dict[str, Any],
|
|
source: Dict[str, Any],
|
|
exc: Exception,
|
|
*,
|
|
elapsed_seconds: float | None = None,
|
|
) -> Dict[str, Any]:
|
|
response = getattr(exc, "response", None)
|
|
status_code = getattr(response, "status_code", None)
|
|
category = _failure_category(exc)
|
|
message = str(exc).strip() or exc.__class__.__name__
|
|
summary = f"{system['system_id']}::{source['name']}::{category}::{message}"
|
|
failure = {
|
|
"system_id": system["system_id"],
|
|
"display_name": system["display_name"],
|
|
"source_name": source["name"],
|
|
"source_kind": source["kind"],
|
|
"source_bucket": source.get("bucket_name"),
|
|
"category": category,
|
|
"exception": exc.__class__.__name__,
|
|
"message": message,
|
|
"status_code": status_code,
|
|
"url": source.get("url") or "",
|
|
"summary": summary,
|
|
}
|
|
if elapsed_seconds is not None:
|
|
failure["elapsed_seconds"] = round(elapsed_seconds, 3)
|
|
return failure
|
|
|
|
|
|
def _collect_jobs(
|
|
source_map: Dict[str, Any],
|
|
*,
|
|
tier: Optional[str] = None,
|
|
) -> List[Tuple[Dict[str, Any], Dict[str, Any]]]:
|
|
jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = []
|
|
for system in source_map["systems"]:
|
|
if tier and system.get("tier") != tier:
|
|
continue
|
|
for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False):
|
|
jobs.append((system, source))
|
|
return jobs
|
|
|
|
|
|
def _max_workers(job_count: int) -> int:
|
|
if job_count <= 0:
|
|
return 4
|
|
configured = os.environ.get("WEBSAFE_INTEL_MAX_WORKERS")
|
|
if configured:
|
|
try:
|
|
value = int(configured)
|
|
except ValueError:
|
|
value = DEFAULT_MAX_WORKERS
|
|
else:
|
|
value = DEFAULT_MAX_WORKERS
|
|
value = max(4, min(MAX_WORKER_CAP, value))
|
|
return min(value, job_count)
|
|
|
|
|
|
def _collect_source_candidates(
|
|
system: Dict[str, Any],
|
|
source: Dict[str, Any],
|
|
*,
|
|
since_dt: Optional[datetime],
|
|
include_undated: bool,
|
|
) -> Tuple[List[Candidate], Optional[Dict[str, Any]]]:
|
|
started = perf_counter()
|
|
handler = HANDLERS.get(source["kind"])
|
|
if handler is None:
|
|
return (
|
|
[],
|
|
{
|
|
"system_id": system["system_id"],
|
|
"display_name": system["display_name"],
|
|
"source_name": source["name"],
|
|
"source_kind": source["kind"],
|
|
"source_bucket": source.get("bucket_name"),
|
|
"category": "schema",
|
|
"exception": "UnsupportedSourceKind",
|
|
"message": f"Unsupported source kind {source['kind']}",
|
|
"status_code": None,
|
|
"url": source.get("url") or "",
|
|
"summary": f"{system['system_id']}::{source['name']}::schema::Unsupported source kind {source['kind']}",
|
|
},
|
|
)
|
|
try:
|
|
items = handler(system, source)
|
|
filtered = [item for item in items if _passes_since(item, since_dt, include_undated)]
|
|
return filtered, None
|
|
except Exception as exc:
|
|
return [], build_failure(system, source, exc, elapsed_seconds=perf_counter() - started)
|
|
|
|
|
|
def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any]:
|
|
kind = source["kind"]
|
|
if kind == "ghsa-global":
|
|
headers = {"Accept": "application/vnd.github+json", "User-Agent": "websafe-intel"}
|
|
token = os.environ.get("GITHUB_TOKEN")
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
response = request(
|
|
"GET",
|
|
github_global.API_URL,
|
|
source=source,
|
|
headers=headers,
|
|
params={"per_page": 1, "page": 1, "ecosystem": source.get("ecosystem")},
|
|
)
|
|
if response.status_code == 403 and "rate limit" in response.text.lower():
|
|
raise requests.HTTPError("GitHub advisory rate limit exceeded; set GITHUB_TOKEN for higher quota", response=response)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if not isinstance(payload, list):
|
|
raise ValueError("GitHub advisory probe returned non-list payload")
|
|
return {"kind": kind, "items_seen": len(payload)}
|
|
if kind == "osv-batch":
|
|
packages = system.get("package_names", [])
|
|
if not packages:
|
|
return {"kind": kind, "items_seen": 0}
|
|
response = request(
|
|
"POST",
|
|
osv_api.QUERY_BATCH_URL,
|
|
source=source,
|
|
json={"queries": [{"package": {"name": packages[0]["name"], "ecosystem": packages[0]["ecosystem"]}}]},
|
|
headers={"User-Agent": "websafe-intel"},
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("OSV probe returned non-object payload")
|
|
return {"kind": kind, "items_seen": len(payload.get("results", []))}
|
|
if kind == "kev-json":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("KEV probe returned non-object payload")
|
|
return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))}
|
|
if kind == "nvd-search":
|
|
params = {
|
|
"keywordSearch": source.get("keyword") or system["display_name"],
|
|
"resultsPerPage": 1,
|
|
}
|
|
headers = {"User-Agent": "websafe-intel"}
|
|
api_key = os.environ.get("NVD_API_KEY")
|
|
if api_key:
|
|
headers["apiKey"] = api_key
|
|
payload = nvd_api.request_nvd_json(source, headers, params)
|
|
return {"kind": kind, "items_seen": len(payload.get("vulnerabilities", []))}
|
|
if kind == "rss-feed":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
root = ET.fromstring(response.content)
|
|
return {"kind": kind, "items_seen": len(root.findall(".//item"))}
|
|
if kind == "atom-feed":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
root = ET.fromstring(response.content)
|
|
return {"kind": kind, "items_seen": len(root.findall(".//{http://www.w3.org/2005/Atom}entry"))}
|
|
if kind == "json-feed":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if isinstance(payload, list):
|
|
items = payload
|
|
elif isinstance(payload, dict):
|
|
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
|
else:
|
|
raise ValueError("JSON feed probe returned unsupported payload type")
|
|
if not isinstance(items, list):
|
|
raise ValueError("JSON feed probe returned non-list items")
|
|
return {"kind": kind, "items_seen": len(items)}
|
|
if kind == "html-links":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
return {"kind": kind, "items_seen": len(html_links.ANCHOR_RE.findall(html))}
|
|
if kind == "vendor-index":
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
return {"kind": kind, "items_seen": len(vendor_index.extract_links(html))}
|
|
raise ValueError(f"Unsupported source kind {kind}")
|
|
|
|
|
|
def _passes_since(candidate: Candidate, since_dt: Optional[datetime], include_undated: bool) -> bool:
|
|
if since_dt is None:
|
|
return True
|
|
timestamps = [parse_dt(candidate.updated_at), parse_dt(candidate.published_at)]
|
|
valid = [item for item in timestamps if item is not None]
|
|
if not valid:
|
|
return include_undated
|
|
return max(valid) >= since_dt
|
|
|
|
|
|
def collect_candidates(
|
|
source_map: Dict[str, Any],
|
|
since_dt: Optional[datetime] = None,
|
|
tier: Optional[str] = None,
|
|
include_undated: bool = False,
|
|
) -> Tuple[List[Candidate], List[Dict[str, Any]]]:
|
|
all_candidates: List[Candidate] = []
|
|
failures: List[Dict[str, Any]] = []
|
|
jobs = _collect_jobs(source_map, tier=tier)
|
|
with ThreadPoolExecutor(max_workers=_max_workers(len(jobs))) as executor:
|
|
future_map = {
|
|
executor.submit(
|
|
_collect_source_candidates,
|
|
system,
|
|
source,
|
|
since_dt=since_dt,
|
|
include_undated=include_undated,
|
|
): (system, source)
|
|
for system, source in jobs
|
|
}
|
|
for future in as_completed(future_map):
|
|
items, failure = future.result()
|
|
if items:
|
|
all_candidates.extend(items)
|
|
if failure:
|
|
failures.append(failure)
|
|
all_candidates.sort(key=lambda item: (item.system_id, item.published_at or "", item.title, item.source_name))
|
|
failures.sort(key=lambda item: (item.get("system_id", ""), item.get("source_name", ""), item.get("category", "")))
|
|
return all_candidates, failures
|
|
|
|
|
|
def probe_sources(
|
|
source_map: Dict[str, Any],
|
|
tier: Optional[str] = None,
|
|
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
jobs = _collect_jobs(source_map, tier=tier)
|
|
probes: List[Dict[str, Any]] = []
|
|
failures: List[Dict[str, Any]] = []
|
|
with ThreadPoolExecutor(max_workers=_max_workers(len(jobs))) as executor:
|
|
future_map = {executor.submit(probe_source, system, source): (system, source, perf_counter()) for system, source in jobs}
|
|
for future in as_completed(future_map):
|
|
system, source, started = future_map[future]
|
|
elapsed = perf_counter() - started
|
|
try:
|
|
result = future.result()
|
|
probes.append(
|
|
{
|
|
"system_id": system["system_id"],
|
|
"source_name": source["name"],
|
|
"source_kind": source["kind"],
|
|
"elapsed_seconds": round(elapsed, 3),
|
|
**result,
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
failures.append(build_failure(system, source, exc, elapsed_seconds=elapsed))
|
|
probes.sort(key=lambda item: (item["system_id"], item["source_name"]))
|
|
failures.sort(key=lambda item: (item.get("system_id", ""), item.get("source_name", ""), item.get("category", "")))
|
|
return probes, failures
|
|
|
|
|
|
def find_source(source_map: Dict[str, Any], system_id: str, source_name: str) -> Tuple[Dict[str, Any], Dict[str, Any]] | None:
|
|
for system in source_map.get("systems", []) or []:
|
|
if system.get("system_id") != system_id:
|
|
continue
|
|
for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=True):
|
|
if source.get("name") == source_name:
|
|
return system, source
|
|
return None
|