259 行
8.7 KiB
Python
259 行
8.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from hashlib import sha1
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
import requests
|
|
|
|
from intel.config import STATE_DIR
|
|
from intel.http_client import build_session, request
|
|
from intel.models import Candidate
|
|
from intel.utils import isoformat, now_utc, parse_dt, read_json, unique, write_json
|
|
|
|
|
|
QUERY_BATCH_URL = "https://api.osv.dev/v1/querybatch"
|
|
DETAIL_URL = "https://api.osv.dev/v1/vulns/{vuln_id}"
|
|
CVSS_SCORE_RE = re.compile(r"/CVSS:3\.[01]/AV:[A-Z]/AC:[A-Z]/PR:[A-Z]/UI:[A-Z]/S:[A-Z]/C:[A-Z]/I:[A-Z]/A:[A-Z]")
|
|
NUMERIC_SCORE_RE = re.compile(r"([0-9]+(?:\.[0-9]+)?)")
|
|
DEFAULT_CACHE_TTL_SECONDS = 6 * 60 * 60
|
|
_CACHE_DIR = STATE_DIR / "cache" / "osv"
|
|
|
|
|
|
def _cache_ttl_seconds() -> int:
|
|
configured = os.environ.get("WEBSAFE_OSV_CACHE_TTL_SECONDS")
|
|
if configured:
|
|
try:
|
|
return max(0, int(configured))
|
|
except ValueError:
|
|
return DEFAULT_CACHE_TTL_SECONDS
|
|
return DEFAULT_CACHE_TTL_SECONDS
|
|
|
|
|
|
def _cache_key(value: str) -> str:
|
|
return sha1(value.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _cache_path(namespace: str, value: str) -> Path:
|
|
return _CACHE_DIR / f"{namespace}-{_cache_key(value)}.json"
|
|
|
|
|
|
def _load_cached_payload(namespace: str, value: str) -> Dict[str, Any] | None:
|
|
ttl_seconds = _cache_ttl_seconds()
|
|
if ttl_seconds <= 0:
|
|
return None
|
|
path = _cache_path(namespace, value)
|
|
cached = read_json(path, default=None)
|
|
if not isinstance(cached, dict):
|
|
return None
|
|
fetched_at = parse_dt(cached.get("fetched_at"))
|
|
if fetched_at is None:
|
|
return None
|
|
age = (now_utc() - fetched_at).total_seconds()
|
|
if age > ttl_seconds:
|
|
return None
|
|
payload = cached.get("payload")
|
|
return payload if isinstance(payload, dict) else None
|
|
|
|
|
|
def _write_cached_payload(namespace: str, value: str, payload: Dict[str, Any]) -> None:
|
|
write_json(
|
|
_cache_path(namespace, value),
|
|
{
|
|
"fetched_at": isoformat(now_utc()),
|
|
"payload": payload,
|
|
},
|
|
)
|
|
|
|
|
|
def _request_json(
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
source: Dict[str, Any],
|
|
cache_namespace: str,
|
|
cache_key: str,
|
|
session: requests.Session | None = None,
|
|
json_body: Dict[str, Any] | None = None,
|
|
) -> Dict[str, Any]:
|
|
cached = _load_cached_payload(cache_namespace, cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
response = request(
|
|
method,
|
|
url,
|
|
source=source,
|
|
session=session,
|
|
json=json_body,
|
|
headers={"User-Agent": "websafe-intel"},
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if not isinstance(payload, dict):
|
|
raise ValueError(f"OSV response payload was not an object for {url}")
|
|
_write_cached_payload(cache_namespace, cache_key, payload)
|
|
return payload
|
|
|
|
|
|
def request_querybatch_json(
|
|
source: Dict[str, Any],
|
|
queries: List[Dict[str, Any]],
|
|
*,
|
|
session: requests.Session | None = None,
|
|
) -> Dict[str, Any]:
|
|
body = {"queries": queries}
|
|
return _request_json(
|
|
"POST",
|
|
QUERY_BATCH_URL,
|
|
source=source,
|
|
cache_namespace="querybatch",
|
|
cache_key=json.dumps(body, sort_keys=True, separators=(",", ":")),
|
|
session=session,
|
|
json_body=body,
|
|
)
|
|
|
|
|
|
def request_detail_json(
|
|
source: Dict[str, Any],
|
|
vuln_id: str,
|
|
*,
|
|
session: requests.Session | None = None,
|
|
) -> Dict[str, Any]:
|
|
return _request_json(
|
|
"GET",
|
|
DETAIL_URL.format(vuln_id=vuln_id),
|
|
source=source,
|
|
cache_namespace="detail",
|
|
cache_key=vuln_id,
|
|
session=session,
|
|
)
|
|
|
|
|
|
def _fixed_versions(vuln: Dict[str, Any]) -> List[str]:
|
|
fixed = []
|
|
for affected in vuln.get("affected", []):
|
|
for rng in affected.get("ranges", []):
|
|
for event in rng.get("events", []):
|
|
if event.get("fixed"):
|
|
fixed.append(event["fixed"])
|
|
return unique(fixed)
|
|
|
|
|
|
def _affected_versions(vuln: Dict[str, Any]) -> List[str]:
|
|
versions = []
|
|
ranges = []
|
|
for affected in vuln.get("affected", []):
|
|
versions.extend(affected.get("versions", [])[:20])
|
|
for rng in affected.get("ranges", []):
|
|
introduced = None
|
|
fixed = None
|
|
last_affected = None
|
|
limit = None
|
|
for event in rng.get("events", []):
|
|
introduced = introduced or event.get("introduced")
|
|
fixed = fixed or event.get("fixed")
|
|
last_affected = last_affected or event.get("last_affected")
|
|
limit = limit or event.get("limit")
|
|
if introduced or fixed or last_affected or limit:
|
|
parts = []
|
|
if introduced:
|
|
parts.append(f"introduced={introduced}")
|
|
if last_affected:
|
|
parts.append(f"last_affected={last_affected}")
|
|
if fixed:
|
|
parts.append(f"fixed<{fixed}")
|
|
if limit:
|
|
parts.append(f"limit<{limit}")
|
|
ranges.append(", ".join(parts))
|
|
return unique(versions + ranges)
|
|
|
|
|
|
def _severity(vuln: Dict[str, Any]) -> tuple[str, float | None]:
|
|
best_score = None
|
|
for sev in vuln.get("severity", []):
|
|
score = sev.get("score", "")
|
|
match = NUMERIC_SCORE_RE.search(score)
|
|
if match:
|
|
try:
|
|
best_score = float(match.group(1))
|
|
break
|
|
except ValueError:
|
|
continue
|
|
if best_score is None:
|
|
return "unknown", None
|
|
if best_score >= 9.0:
|
|
return "critical", best_score
|
|
if best_score >= 7.0:
|
|
return "high", best_score
|
|
if best_score >= 4.0:
|
|
return "medium", best_score
|
|
return "low", best_score
|
|
|
|
|
|
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
|
packages = system.get("package_names", [])
|
|
if not packages:
|
|
return []
|
|
since_dt = source.get("_since_dt")
|
|
|
|
queries = [{"package": {"name": pkg["name"], "ecosystem": pkg["ecosystem"]}} for pkg in packages]
|
|
session = build_session(source)
|
|
payload = request_querybatch_json(source, queries, session=session)
|
|
|
|
detail_cache: Dict[str, Dict[str, Any]] = {}
|
|
candidates: List[Candidate] = []
|
|
for package, result in zip(packages, payload.get("results", [])):
|
|
for summary in result.get("vulns", []):
|
|
vuln_id = summary.get("id")
|
|
if not vuln_id:
|
|
continue
|
|
modified = parse_dt(summary.get("modified"))
|
|
if since_dt is not None and modified is not None and modified < since_dt:
|
|
continue
|
|
if vuln_id not in detail_cache:
|
|
detail_cache[vuln_id] = request_detail_json(source, vuln_id, session=session)
|
|
vuln = detail_cache[vuln_id]
|
|
|
|
aliases = unique(vuln.get("aliases", []) + [vuln.get("id")])
|
|
refs = [ref.get("url") for ref in vuln.get("references", []) if ref.get("url")]
|
|
severity, cvss_score = _severity(vuln)
|
|
package_name = package["name"]
|
|
if not package_name:
|
|
for affected in vuln.get("affected", []):
|
|
pkg = affected.get("package") or {}
|
|
if pkg.get("name"):
|
|
package_name = pkg["name"]
|
|
break
|
|
|
|
candidates.append(
|
|
Candidate(
|
|
system_id=system["system_id"],
|
|
display_name=system["display_name"],
|
|
category=system["category"],
|
|
advisory_mode=source.get("advisory_mode", "core"),
|
|
source_kind=source["kind"],
|
|
source_name=source["name"],
|
|
source_confidence=source["confidence"],
|
|
source_url=refs[0] if refs else DETAIL_URL.format(vuln_id=vuln_id),
|
|
title=vuln.get("summary") or vuln.get("id") or f"OSV advisory for {package['name']}",
|
|
published_at=vuln.get("published"),
|
|
updated_at=vuln.get("modified"),
|
|
summary=vuln.get("details") or "",
|
|
severity=severity,
|
|
cvss_score=cvss_score,
|
|
aliases=aliases,
|
|
cve_ids=[item for item in aliases if item and item.startswith("CVE-")],
|
|
ghsa_ids=[item for item in aliases if item and item.startswith("GHSA-")],
|
|
osv_ids=[vuln.get("id")] if vuln.get("id") else [],
|
|
affected_versions=_affected_versions(vuln),
|
|
fixed_versions=_fixed_versions(vuln),
|
|
package_name=package_name,
|
|
references=refs,
|
|
raw=vuln,
|
|
)
|
|
)
|
|
return candidates
|