diff --git a/scripts/intel/entities.py b/scripts/intel/entities.py
index a3481d79..a0b184d3 100644
--- a/scripts/intel/entities.py
+++ b/scripts/intel/entities.py
@@ -358,9 +358,17 @@ def build_advisory_extensions(advisory: Dict[str, Any], system: Dict[str, Any])
}
)
- affected = list(advisory.get("affected_versions") or [])
- fixed = list(advisory.get("fixed_versions") or [])
+ affected = unique(list(advisory.get("affected_version_ranges") or []) + list(advisory.get("affected_versions") or []))
+ fixed = unique(list(advisory.get("fixed_version_ranges") or []) + list(advisory.get("fixed_versions") or []))
version_confidence, gap_reason, version_resolution_needed = _version_confidence(affected, fixed)
+ affected_version_refs = unique(list(advisory.get("affected_version_refs") or []))
+ fixed_version_refs = unique(list(advisory.get("fixed_version_refs") or []))
+ patched_version_refs = unique(list(advisory.get("patched_version_refs") or []))
+ version_sync_confidence = advisory.get("version_sync_confidence") or version_confidence
+ if version_sync_confidence in {"medium", "high"} and (affected_version_refs or fixed_version_refs or patched_version_refs):
+ version_confidence = version_sync_confidence
+ gap_reason = advisory.get("version_gap_reason") or ""
+ version_resolution_needed = False
affected_components = [
{
"name": _display_name(package_name, system.get("display_name", root_system_id)),
@@ -372,7 +380,9 @@ def build_advisory_extensions(advisory: Dict[str, Any], system: Dict[str, Any])
]
version_sources = unique(
- [advisory.get("official_source_url")] + list(advisory.get("secondary_source_urls") or [])
+ list(advisory.get("version_evidence_sources") or [])
+ + [advisory.get("official_source_url")]
+ + list(advisory.get("secondary_source_urls") or [])
)
enriched = {
"entity_refs": entity_refs,
@@ -380,8 +390,12 @@ def build_advisory_extensions(advisory: Dict[str, Any], system: Dict[str, Any])
"affected_version_ranges": affected,
"fixed_version_ranges": fixed,
"introduced_version": _pick_version_boundary(affected),
- "patched_version": _pick_version_boundary(fixed, prefer_fixed=True),
+ "patched_version": advisory.get("patched_version") or _pick_version_boundary(fixed, prefer_fixed=True),
"version_evidence_sources": version_sources,
+ "affected_version_refs": affected_version_refs,
+ "fixed_version_refs": fixed_version_refs,
+ "patched_version_refs": patched_version_refs,
+ "version_sync_confidence": version_sync_confidence,
"advisory_scope": advisory_scope,
"version_confidence": version_confidence,
"version_gap_reason": gap_reason,
diff --git a/scripts/intel/versioning.py b/scripts/intel/versioning.py
new file mode 100644
index 00000000..e0cb19c0
--- /dev/null
+++ b/scripts/intel/versioning.py
@@ -0,0 +1,1016 @@
+from __future__ import annotations
+
+import os
+import re
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from html import unescape
+from typing import Any, Dict, Iterable, List, Tuple
+from urllib.parse import quote, urlparse
+
+from intel.config import (
+ ENTITIES_DIR,
+ LAB_ENQUEUE_SUMMARY_PATH,
+ RELEASE_INDEX_PATH,
+ SOURCE_BUCKETS,
+ VERSION_BACKLOG_PATH,
+ VERSION_COMPLETENESS_PATH,
+ VERSION_REPORT_MD_PATH,
+ VERSIONS_DIR,
+ iter_all_sources,
+)
+from intel.entities import (
+ _display_name,
+ _entity_id,
+ _entity_payload,
+ _github_repo_from_url,
+ _marketplace_slug,
+ _merge_entity_overlay,
+ _package_registry_url,
+ _repo_url_from_package,
+ build_entity_views,
+)
+from intel.http_client import request
+from intel.models import AdvisoryRecord, VersionRecord
+from intel.sources.runner import HANDLERS
+from intel.utils import isoformat, load_all_json, now_utc, parse_dt, read_json, slugify, unique, write_json, write_text
+
+
+VERSION_RE = re.compile(r"\bv?(?:\d+[._-]){1,5}\d+[A-Za-z0-9._-]*\b")
+SECURITY_TERMS = ("security", "advisory", "cve", "ghsa", "osv", "vulnerability", "patched", "fixed")
+VERSION_SOURCE_KINDS = {"rss-feed", "atom-feed", "json-feed", "html-links", "vendor-index"}
+DISCOVERY_CONFIDENCE = {"official", "ecosystem-authority"}
+
+
+def _advisory_dict(item: AdvisoryRecord | Dict[str, Any]) -> Dict[str, Any]:
+ if isinstance(item, AdvisoryRecord):
+ return item.to_dict()
+ return dict(item or {})
+
+
+def _version_sort_key(value: str) -> Tuple[Tuple[int, ...], str]:
+ normalized = (value or "").strip().lower().lstrip("v")
+ numeric_parts = [int(part) for part in re.findall(r"\d+", normalized)]
+ return tuple(numeric_parts), normalized
+
+
+def _pick_latest_version(values: Iterable[str]) -> str:
+ options = [str(value).strip() for value in values if str(value).strip()]
+ if not options:
+ return ""
+ return sorted(unique(options), key=_version_sort_key, reverse=True)[0]
+
+
+def _extract_versions(*values: str) -> List[str]:
+ results: List[str] = []
+ for value in values:
+ if not value:
+ continue
+ for match in VERSION_RE.findall(value):
+ token = match.strip().lstrip("v")
+ if re.fullmatch(r"\d{4}-\d{2}-\d{2}", token):
+ continue
+ if "." not in token and token.count("-") < 2:
+ continue
+ results.append(token)
+ return unique(results)
+
+
+def _stable_release_source(system: Dict[str, Any], source: Dict[str, Any]) -> bool:
+ if source.get("status") == "retired":
+ return False
+ if source.get("kind") not in VERSION_SOURCE_KINDS:
+ return False
+ if source.get("bucket_name") == "research_sources":
+ return False
+ return source.get("purpose") in {"release", "discovery", "marketplace", "advisory"}
+
+
+def _load_entity_registry() -> Dict[str, Dict[str, Any]]:
+ return {
+ item.get("entity_id"): item
+ for item in load_all_json(ENTITIES_DIR)
+ if item.get("entity_id")
+ }
+
+
+def load_version_records() -> List[Dict[str, Any]]:
+ return load_all_json(VERSIONS_DIR)
+
+
+def _write_entity_records(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
+ existing = _load_entity_registry()
+ for item in entity_records:
+ if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
+ continue
+ existing[item["entity_id"]] = item
+ for entity_id, payload in sorted(existing.items()):
+ if selected_system_ids and payload.get("root_system_id") in selected_system_ids:
+ write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
+ elif not selected_system_ids:
+ write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
+
+
+def _write_version_records(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
+ existing = {
+ item.get("version_id"): item
+ for item in load_all_json(VERSIONS_DIR)
+ if item.get("version_id")
+ }
+ keep_ids: set[str] = set()
+ for item in version_records:
+ if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
+ continue
+ keep_ids.add(item["version_id"])
+ existing[item["version_id"]] = item
+ if selected_system_ids:
+ for version_id, payload in list(existing.items()):
+ if payload.get("root_system_id") in selected_system_ids and version_id not in keep_ids:
+ path = VERSIONS_DIR / f"{version_id}.json"
+ if path.exists():
+ path.unlink()
+ existing.pop(version_id, None)
+ for version_id, payload in sorted(existing.items()):
+ write_json(VERSIONS_DIR / f"{version_id}.json", payload)
+
+
+def write_entity_registry(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
+ _write_entity_records(entity_records, selected_system_ids=selected_system_ids)
+
+
+def write_version_registry(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
+ _write_version_records(version_records, selected_system_ids=selected_system_ids)
+
+
+def _candidate_entity_from_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any] | None:
+ url = (source.get("url") or "").strip()
+ if not url:
+ return None
+ repo_url = _github_repo_from_url(url)
+ entity_type = source.get("entity_type_hint") or "project"
+ package_registry = ""
+ marketplace_url = ""
+ display_name = ""
+ canonical_name = ""
+
+ if repo_url:
+ entity_type = source.get("entity_type_hint") or "repo"
+ match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
+ if match:
+ display_name = f"{match.group(1)} / {match.group(2)}"
+ canonical_name = f"{match.group(1)}/{match.group(2)}"
+ elif "npmjs.com/package/" in url:
+ entity_type = source.get("entity_type_hint") or "package"
+ canonical_name = url.split("/package/", 1)[1].split("?", 1)[0].strip("/")
+ package_registry = f"https://www.npmjs.com/package/{canonical_name}"
+ display_name = canonical_name
+ elif "packagist.org/packages/" in url:
+ entity_type = source.get("entity_type_hint") or "package"
+ canonical_name = url.split("/packages/", 1)[1].split("?", 1)[0].strip("/")
+ package_registry = f"https://packagist.org/packages/{canonical_name}"
+ display_name = canonical_name.replace("/", " / ")
+ elif any(token in url.lower() for token in ("/plugins/", "/themes/", "/extensions/", "/modules/", "/marketplace/")):
+ marketplace_url = url
+ canonical_name = _marketplace_slug(url)
+ entity_type = source.get("entity_type_hint") or entity_type
+ display_name = canonical_name.replace("-", " ")
+ else:
+ return None
+
+ if not canonical_name:
+ canonical_name = source.get("name") or system.get("display_name") or system.get("system_id")
+ entity_id = _entity_id(system["system_id"], entity_type, canonical_name)
+ return {
+ "entity_id": entity_id,
+ "entity_type": entity_type,
+ "display_name": _display_name(display_name or canonical_name, entity_id),
+ "root_system_id": system["system_id"],
+ "parent_entity_id": system["system_id"],
+ "category": system.get("category", "unknown"),
+ "ecosystem": system.get("category", "unknown"),
+ "official": source.get("confidence") == "official",
+ "status": "cataloged",
+ "history_policy": "history-full",
+ "repo_url": repo_url,
+ "package_registry": package_registry,
+ "marketplace_url": marketplace_url,
+ "latest_version": "",
+ "version_scheme": "semver-ish" if entity_type in {"repo", "package", "plugin", "extension", "module", "theme", "project"} else "vendor",
+ "source_refs": [
+ {
+ "name": source.get("name"),
+ "url": source.get("url"),
+ "kind": source.get("kind"),
+ "status": source.get("status"),
+ "bucket": source.get("bucket_name"),
+ "official": source.get("bucket_name") == "official_sources",
+ }
+ ],
+ "catalog_source": source.get("name") or "",
+ "catalog_reason": "source catalog exposed a stable security-related object and auto-catalog is enabled",
+ "auto_cataloged": True,
+ }
+
+
+def discover_entities(
+ source_map: Dict[str, Any],
+ advisories: List[AdvisoryRecord | Dict[str, Any]],
+ *,
+ write_registry: bool = False,
+) -> Dict[str, Any]:
+ base_views = build_entity_views(source_map, advisories)
+ entities = {item["entity_id"]: item for item in base_views["entities"]}
+ auto_promoted: List[Dict[str, Any]] = []
+ seen_urls = {
+ item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
+ for item in entities.values()
+ if item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
+ }
+ for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
+ if source.get("confidence") not in DISCOVERY_CONFIDENCE:
+ continue
+ candidate = _candidate_entity_from_source(system, source)
+ if not candidate:
+ continue
+ stable_url = candidate.get("repo_url") or candidate.get("package_registry") or candidate.get("marketplace_url")
+ if stable_url and stable_url in seen_urls:
+ continue
+ if not source.get("auto_catalog"):
+ continue
+ if candidate["entity_id"] in entities:
+ continue
+ auto_promoted.append(
+ _merge_entity_overlay(
+ _entity_payload(
+ entity_id=candidate["entity_id"],
+ entity_type=candidate["entity_type"],
+ display_name=candidate["display_name"],
+ parent_entity_id=candidate["parent_entity_id"],
+ root_system_id=candidate["root_system_id"],
+ category=candidate["category"],
+ ecosystem=candidate["ecosystem"],
+ official=candidate["official"],
+ status="cataloged",
+ history_policy="history-full",
+ repo_url=candidate.get("repo_url") or "",
+ package_registry=candidate.get("package_registry") or "",
+ marketplace_url=candidate.get("marketplace_url") or "",
+ latest_version="",
+ version_scheme=candidate.get("version_scheme") or "vendor",
+ source_refs=candidate.get("source_refs") or [],
+ ),
+ candidate,
+ )
+ )
+ entities[candidate["entity_id"]] = auto_promoted[-1]
+ if stable_url:
+ seen_urls.add(stable_url)
+
+ merged_entities = sorted(entities.values(), key=lambda item: item["entity_id"])
+ if write_registry and auto_promoted:
+ _write_entity_records(merged_entities, selected_system_ids={item["root_system_id"] for item in merged_entities})
+ refreshed_views = build_entity_views(source_map, advisories)
+ if auto_promoted:
+ refreshed_views = build_entity_views(source_map, advisories)
+ return {
+ "entities": merged_entities,
+ "candidate_backlog": refreshed_views["candidate_backlog"],
+ "auto_promoted": auto_promoted,
+ "summary": {
+ "cataloged_entity_total": len([item for item in merged_entities if item.get("status") == "cataloged"]),
+ "candidate_entity_total": len(refreshed_views["candidate_backlog"]),
+ "auto_promoted_count": len(auto_promoted),
+ },
+ }
+
+
+def _fetch_source_hits_for_versions(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
+ jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = []
+ for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
+ if _stable_release_source(system, source):
+ jobs.append((system, source))
+
+ hits_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
+ if not jobs:
+ return hits_by_system
+
+ workers = min(max(4, int(os.environ.get("WEBSAFE_VERSION_SYNC_WORKERS", "8"))), max(4, len(jobs)))
+ with ThreadPoolExecutor(max_workers=workers) as executor:
+ future_map = {}
+ for system, source in jobs:
+ handler = HANDLERS.get(source["kind"])
+ if handler is None:
+ continue
+ future_map[executor.submit(handler, system, dict(source))] = (system, source)
+
+ for future in as_completed(future_map):
+ system, source = future_map[future]
+ try:
+ items = future.result()
+ except Exception:
+ continue
+ for item in items[: source.get("max_items", 50)]:
+ title = getattr(item, "title", None) if not isinstance(item, dict) else item.get("title")
+ summary = getattr(item, "summary", None) if not isinstance(item, dict) else item.get("summary")
+ source_url = getattr(item, "source_url", None) if not isinstance(item, dict) else item.get("source_url")
+ affected_versions = getattr(item, "affected_versions", None) if not isinstance(item, dict) else item.get("affected_versions")
+ fixed_versions = getattr(item, "fixed_versions", None) if not isinstance(item, dict) else item.get("fixed_versions")
+ published_at = getattr(item, "published_at", None) if not isinstance(item, dict) else item.get("published_at")
+ updated_at = getattr(item, "updated_at", None) if not isinstance(item, dict) else item.get("updated_at")
+ versions = unique(
+ _extract_versions(
+ title or "",
+ summary or "",
+ source_url or "",
+ " ".join(affected_versions or []),
+ " ".join(fixed_versions or []),
+ )
+ )
+ if not versions:
+ continue
+ haystack = " ".join(filter(None, [title or "", summary or "", source_url or ""])).lower()
+ hits_by_system[system["system_id"]].append(
+ {
+ "source_name": source.get("name"),
+ "source_confidence": source.get("confidence"),
+ "source_url": source_url or source.get("url") or "",
+ "published_at": published_at or updated_at or "",
+ "updated_at": updated_at or published_at or "",
+ "versions": versions,
+ "security_related": any(term in haystack for term in SECURITY_TERMS) or source.get("purpose") in {"release", "advisory"},
+ }
+ )
+ return hits_by_system
+
+
+def _github_latest(repo_url: str) -> Dict[str, str]:
+ match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
+ if not match:
+ return {}
+ owner, repo = match.group(1), match.group(2)
+ headers = {"Accept": "application/vnd.github+json", "User-Agent": "websafe-intel"}
+ token = os.environ.get("GITHUB_TOKEN")
+ if token:
+ headers["Authorization"] = f"Bearer {token}"
+ source = {"kind": "json-feed", "request_policy": {"accept": headers["Accept"]}}
+ for api_url, array_key in (
+ (f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=10", None),
+ (f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=10", None),
+ ):
+ try:
+ response = request("GET", api_url, source=source, headers=headers)
+ response.raise_for_status()
+ payload = response.json()
+ except Exception:
+ continue
+ if not isinstance(payload, list):
+ continue
+ for item in payload:
+ if not isinstance(item, dict):
+ continue
+ if item.get("draft") or item.get("prerelease"):
+ continue
+ version = _pick_latest_version(
+ _extract_versions(
+ item.get("tag_name") or "",
+ item.get("name") or "",
+ item.get("html_url") or "",
+ )
+ )
+ if not version:
+ continue
+ return {
+ "version": version,
+ "released_at": item.get("published_at") or item.get("created_at") or "",
+ "release_url": item.get("html_url") or repo_url,
+ "source_name": "GitHub Releases API",
+ "source_confidence": "official",
+ }
+ return {}
+
+
+def _npm_latest(package_registry: str) -> Dict[str, str]:
+ package_name = package_registry.split("/package/", 1)[1]
+ url = f"https://registry.npmjs.org/{quote(package_name, safe='@/')}/latest"
+ source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
+ try:
+ response = request("GET", url, source=source)
+ response.raise_for_status()
+ payload = response.json()
+ except Exception:
+ return {}
+ version = str(payload.get("version") or "").strip()
+ if not version:
+ return {}
+ return {
+ "version": version,
+ "released_at": payload.get("date") or payload.get("time") or "",
+ "release_url": package_registry,
+ "source_name": "npm latest",
+ "source_confidence": "ecosystem-authority",
+ }
+
+
+def _packagist_latest(package_registry: str) -> Dict[str, str]:
+ package_name = package_registry.split("/packages/", 1)[1]
+ url = f"https://repo.packagist.org/p2/{package_name}.json"
+ source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
+ try:
+ response = request("GET", url, source=source)
+ response.raise_for_status()
+ payload = response.json()
+ except Exception:
+ return {}
+ packages = (payload.get("packages") or {}).get(package_name) or []
+ if not packages:
+ return {}
+ entry = packages[0]
+ version = str(entry.get("version") or "").strip().lstrip("v")
+ if not version:
+ return {}
+ return {
+ "version": version,
+ "released_at": entry.get("time") or "",
+ "release_url": package_registry,
+ "source_name": "Packagist p2",
+ "source_confidence": "ecosystem-authority",
+ }
+
+
+def _latest_from_hits(entity: Dict[str, Any], hits: List[Dict[str, Any]]) -> Dict[str, str]:
+ candidates = []
+ for hit in hits:
+ candidates.extend(hit.get("versions") or [])
+ version = _pick_latest_version(candidates)
+ if not version:
+ return {}
+ chosen = None
+ for hit in hits:
+ if version in (hit.get("versions") or []):
+ chosen = hit
+ break
+ return {
+ "version": version,
+ "released_at": (chosen or {}).get("published_at") or (chosen or {}).get("updated_at") or "",
+ "release_url": (chosen or {}).get("source_url") or "",
+ "source_name": (chosen or {}).get("source_name") or "",
+ "source_confidence": (chosen or {}).get("source_confidence") or "unknown",
+ }
+
+
+def _latest_from_advisories(advisories: List[Dict[str, Any]]) -> Dict[str, str]:
+ version = _pick_latest_version(
+ [
+ advisory.get("patched_version") or ""
+ for advisory in advisories
+ ]
+ + [
+ value
+ for advisory in advisories
+ for value in (advisory.get("fixed_versions") or [])
+ ]
+ )
+ if not version:
+ return {}
+ advisory = next(
+ (
+ item
+ for item in advisories
+ if version == item.get("patched_version") or version in (item.get("fixed_versions") or [])
+ ),
+ {},
+ )
+ return {
+ "version": version,
+ "released_at": advisory.get("updated_at") or advisory.get("published_at") or "",
+ "release_url": advisory.get("official_source_url") or "",
+ "source_name": "advisory-fixed-version",
+ "source_confidence": advisory.get("source_confidence") or "unknown",
+ }
+
+
+def _strip_html(text: str) -> str:
+ cleaned = re.sub(r"", " ", text, flags=re.IGNORECASE | re.DOTALL)
+ cleaned = re.sub(r"", " ", cleaned, flags=re.IGNORECASE | re.DOTALL)
+ cleaned = re.sub(r"<[^>]+>", " ", cleaned)
+ return unescape(re.sub(r"\s+", " ", cleaned)).strip()
+
+
+def _extract_page_versions(url: str) -> Dict[str, List[str]]:
+ source = {"kind": "html-links"}
+ try:
+ response = request("GET", url, source=source)
+ response.raise_for_status()
+ except Exception:
+ return {"affected": [], "fixed": [], "sources": []}
+ text = _strip_html(response.text)
+ affected: List[str] = []
+ fixed: List[str] = []
+ for pattern in (
+ r"(?:affected|before|prior to|through)\s+([A-Za-z0-9., _-]{3,120})",
+ r"(?:versions?|range)\s+([A-Za-z0-9., _-]{3,120})\s+(?:are|is)\s+affected",
+ ):
+ for match in re.finditer(pattern, text, flags=re.IGNORECASE):
+ affected.extend(_extract_versions(match.group(1)))
+ for pattern in (
+ r"(?:fixed|patched|resolved|available in|upgrade to)\s+([A-Za-z0-9., _-]{3,120})",
+ r"(?:update to|updated to)\s+([A-Za-z0-9., _-]{3,120})",
+ ):
+ for match in re.finditer(pattern, text, flags=re.IGNORECASE):
+ fixed.extend(_extract_versions(match.group(1)))
+ return {"affected": unique(affected), "fixed": unique(fixed), "sources": [url] if (affected or fixed) else []}
+
+
+def _version_id(entity_id: str, version: str) -> str:
+ return f"{entity_id}--{slugify(version)}"
+
+
+def _register_version(
+ bucket: Dict[Tuple[str, str], Dict[str, Any]],
+ *,
+ entity_id: str,
+ root_system_id: str,
+ version: str,
+ released_at: str,
+ release_url: str,
+ source_name: str,
+ source_confidence: str,
+ reason: str,
+ advisory_ref: str | None = None,
+) -> None:
+ token = (version or "").strip()
+ if not token:
+ return
+ key = (entity_id, token)
+ existing = bucket.setdefault(
+ key,
+ VersionRecord(
+ version_id=_version_id(entity_id, token),
+ entity_id=entity_id,
+ root_system_id=root_system_id,
+ version=token,
+ released_at=released_at or None,
+ release_url=release_url or None,
+ source_name=source_name,
+ source_confidence=source_confidence,
+ security_relevant=True,
+ reason=reason,
+ advisory_refs=[],
+ is_latest_snapshot=False,
+ ).to_dict(),
+ )
+ existing["reason"] = reason if existing.get("reason") == "affected" and reason != "affected" else existing.get("reason")
+ if released_at and not existing.get("released_at"):
+ existing["released_at"] = released_at
+ if release_url and not existing.get("release_url"):
+ existing["release_url"] = release_url
+ if advisory_ref and advisory_ref not in existing["advisory_refs"]:
+ existing["advisory_refs"].append(advisory_ref)
+
+
+def _entity_target_id(advisory: Dict[str, Any]) -> str:
+ refs = advisory.get("entity_refs") or []
+ for ref in refs:
+ if ref.get("entity_type") != "system":
+ return ref.get("entity_id") or advisory.get("system_id")
+ return advisory.get("system_id")
+
+
+def _resolve_versions_for_advisory(advisory: Dict[str, Any], *, deep: bool) -> Dict[str, Any]:
+ affected = unique(
+ list(advisory.get("affected_versions") or []) + list(advisory.get("affected_version_ranges") or [])
+ )
+ fixed = unique(
+ list(advisory.get("fixed_versions") or []) + list(advisory.get("fixed_version_ranges") or [])
+ )
+ evidence_sources = list(advisory.get("version_evidence_sources") or [])
+ if deep and not (affected or fixed) and advisory.get("official_source_url"):
+ page_versions = _extract_page_versions(advisory["official_source_url"])
+ if page_versions.get("affected"):
+ affected = unique(affected + page_versions["affected"])
+ if page_versions.get("fixed"):
+ fixed = unique(fixed + page_versions["fixed"])
+ evidence_sources = unique(evidence_sources + page_versions.get("sources", []))
+ patched = advisory.get("patched_version") or _pick_latest_version(fixed)
+ return {
+ "affected": affected,
+ "fixed": fixed,
+ "patched": patched,
+ "sources": evidence_sources,
+ }
+
+
+def _profile_runnable(profile: Dict[str, Any]) -> bool:
+ if not profile:
+ return False
+ if profile.get("resolved_via") == "implicit-fallback":
+ return False
+ return bool(
+ profile.get("runner_id")
+ or profile.get("fixture_path")
+ or profile.get("attack_actions")
+ or profile.get("seed_actions")
+ or profile.get("services")
+ or profile.get("success_assertions")
+ )
+
+
+def _enqueue_lab_updates(updated_advisories: List[Dict[str, Any]], previous_advisories: Dict[str, Dict[str, Any]], entity_changes: Dict[str, str]) -> Dict[str, Any]:
+ from lab import repro, task_queue # noqa: WPS433
+
+ enqueue_items: List[Dict[str, Any]] = []
+ pending: List[Dict[str, Any]] = []
+ for advisory in updated_advisories:
+ previous = previous_advisories.get(advisory["canonical_id"], {})
+ changed = previous.get("canonical_id") is None
+ for field in (
+ "affected_version_refs",
+ "fixed_version_refs",
+ "patched_version_refs",
+ "patched_version",
+ "version_resolution_needed",
+ "version_sync_confidence",
+ ):
+ if previous.get(field) != advisory.get(field):
+ changed = True
+ break
+ if not changed:
+ target_entity = _entity_target_id(advisory)
+ if entity_changes.get(target_entity) and entity_changes.get(target_entity) != previous.get("latest_version"):
+ changed = True
+ if not changed:
+ continue
+ profile = repro.resolve_profile(advisory["canonical_id"], advisory)
+ if _profile_runnable(profile):
+ enqueue_items.append(
+ {
+ "advisory_id": advisory["canonical_id"],
+ "system_id": advisory["system_id"],
+ "priority": "version-sync",
+ }
+ )
+ else:
+ pending.append(
+ {
+ "advisory_id": advisory["canonical_id"],
+ "system_id": advisory["system_id"],
+ "lab_pending_reason": "no-runnable-profile",
+ "profile_id": profile.get("profile_id"),
+ }
+ )
+ enqueue_result = task_queue.enqueue_items(enqueue_items) if enqueue_items else {"queued": 0, "added": 0}
+ payload = {
+ "generated_at": isoformat(now_utc()),
+ "enqueued": enqueue_result.get("added", 0),
+ "queue_total": enqueue_result.get("queued", 0),
+ "items": enqueue_items,
+ "pending": pending,
+ }
+ write_json(LAB_ENQUEUE_SUMMARY_PATH, payload)
+ return payload
+
+
+def sync_versions(
+ source_map: Dict[str, Any],
+ advisories: List[AdvisoryRecord | Dict[str, Any]],
+ *,
+ entity_records: List[Dict[str, Any]] | None = None,
+ deep: bool = False,
+ enqueue_lab: bool = False,
+ write_registry: bool = False,
+) -> Dict[str, Any]:
+ advisory_rows = [_advisory_dict(item) for item in advisories]
+ previous_advisories = {
+ item.get("canonical_id"): item
+ for item in load_all_json(ENTITIES_DIR.parent / "advisories")
+ if item.get("canonical_id")
+ }
+ entity_seed = entity_records or build_entity_views(source_map, advisory_rows)["entities"]
+ entities = {item["entity_id"]: dict(item) for item in entity_seed}
+ advisories_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
+ for advisory in advisory_rows:
+ advisories_by_entity[_entity_target_id(advisory)].append(advisory)
+ source_hits = _fetch_source_hits_for_versions(source_map)
+ version_bucket: Dict[Tuple[str, str], Dict[str, Any]] = {}
+ entity_changes: Dict[str, str] = {}
+
+ for entity_id, entity in entities.items():
+ linked_advisories = advisories_by_entity.get(entity_id, [])
+ if linked_advisories:
+ for advisory in linked_advisories:
+ resolved = _resolve_versions_for_advisory(advisory, deep=deep)
+ target_id = _entity_target_id(advisory)
+ for version in resolved["affected"]:
+ _register_version(
+ version_bucket,
+ entity_id=target_id,
+ root_system_id=entity["root_system_id"],
+ version=version,
+ released_at=advisory.get("published_at") or "",
+ release_url=advisory.get("official_source_url") or "",
+ source_name=advisory.get("official_source_url") or "advisory-affected",
+ source_confidence=advisory.get("source_confidence") or "unknown",
+ reason="affected",
+ advisory_ref=advisory["canonical_id"],
+ )
+ for version in resolved["fixed"]:
+ _register_version(
+ version_bucket,
+ entity_id=target_id,
+ root_system_id=entity["root_system_id"],
+ version=version,
+ released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
+ release_url=advisory.get("official_source_url") or "",
+ source_name=advisory.get("official_source_url") or "advisory-fixed",
+ source_confidence=advisory.get("source_confidence") or "unknown",
+ reason="fixed",
+ advisory_ref=advisory["canonical_id"],
+ )
+ if resolved["patched"]:
+ _register_version(
+ version_bucket,
+ entity_id=target_id,
+ root_system_id=entity["root_system_id"],
+ version=resolved["patched"],
+ released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
+ release_url=advisory.get("official_source_url") or "",
+ source_name=advisory.get("official_source_url") or "advisory-patched",
+ source_confidence=advisory.get("source_confidence") or "unknown",
+ reason="patched",
+ advisory_ref=advisory["canonical_id"],
+ )
+
+ for hit in source_hits.get(entity["root_system_id"], []):
+ if not hit.get("security_related"):
+ continue
+ for version in hit.get("versions") or []:
+ _register_version(
+ version_bucket,
+ entity_id=entity_id,
+ root_system_id=entity["root_system_id"],
+ version=version,
+ released_at=hit.get("published_at") or hit.get("updated_at") or "",
+ release_url=hit.get("source_url") or "",
+ source_name=hit.get("source_name") or "",
+ source_confidence=hit.get("source_confidence") or "unknown",
+ reason="security-release",
+ )
+
+ version_records = sorted(version_bucket.values(), key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])))
+ versions_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
+ version_lookup: Dict[Tuple[str, str], str] = {}
+ for item in version_records:
+ versions_by_entity[item["entity_id"]].append(item)
+ version_lookup[(item["entity_id"], item["version"])] = item["version_id"]
+
+ for entity in entities.values():
+ previous_latest = entity.get("latest_version") or ""
+ latest = {}
+ if entity.get("package_registry", "").startswith("https://www.npmjs.com/package/"):
+ latest = _npm_latest(entity["package_registry"])
+ elif entity.get("package_registry", "").startswith("https://packagist.org/packages/"):
+ latest = _packagist_latest(entity["package_registry"])
+ elif entity.get("repo_url", "").startswith("https://github.com/"):
+ latest = _github_latest(entity["repo_url"])
+ if not latest:
+ latest = _latest_from_hits(entity, source_hits.get(entity["root_system_id"], []))
+ if not latest:
+ latest = _latest_from_advisories(advisories_by_entity.get(entity["entity_id"], []))
+ entity["latest_version"] = latest.get("version") or entity.get("latest_version") or ""
+ entity["latest_release_at"] = latest.get("released_at") or entity.get("latest_release_at") or ""
+ entity["latest_release_url"] = latest.get("release_url") or entity.get("latest_release_url") or ""
+ entity["version_source_refs"] = unique(
+ list(entity.get("version_source_refs") or [])
+ + [latest.get("release_url") or ""]
+ + [item.get("release_url") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
+ )
+ entity["latest_version_evidence"] = unique(
+ list(entity.get("latest_version_evidence") or [])
+ + [latest.get("source_name") or ""]
+ + [item.get("source_name") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
+ )
+ entity["security_version_count"] = len(versions_by_entity.get(entity["entity_id"], []))
+ entity["last_version_synced_at"] = isoformat(now_utc())
+ entity["version_sync_status"] = "green" if entity.get("latest_version") else "source-gap"
+ if previous_latest != entity.get("latest_version"):
+ entity_changes[entity["entity_id"]] = entity.get("latest_version") or ""
+
+ for entity in entities.values():
+ if entity["entity_type"] != "system":
+ continue
+ children = [
+ item
+ for item in entities.values()
+ if item.get("root_system_id") == entity["entity_id"] and item.get("entity_id") != entity["entity_id"] and item.get("latest_version")
+ ]
+ if children:
+ best = sorted(
+ children,
+ key=lambda item: (-int(item.get("advisory_count") or 0), _version_sort_key(item.get("latest_version") or "")),
+ reverse=False,
+ )[-1]
+ entity["latest_version"] = best.get("latest_version") or entity.get("latest_version") or ""
+ entity["latest_release_at"] = best.get("latest_release_at") or entity.get("latest_release_at") or ""
+ entity["latest_release_url"] = best.get("latest_release_url") or entity.get("latest_release_url") or ""
+ entity["version_sync_status"] = best.get("version_sync_status") or entity.get("version_sync_status") or "pending"
+ entity["security_version_count"] = sum(item.get("security_version_count", 0) for item in children)
+
+ updated_advisories: List[Dict[str, Any]] = []
+ for advisory in advisory_rows:
+ target_id = _entity_target_id(advisory)
+ resolved = _resolve_versions_for_advisory(advisory, deep=deep)
+ affected_refs = [version_lookup[(target_id, version)] for version in resolved["affected"] if (target_id, version) in version_lookup]
+ fixed_refs = [version_lookup[(target_id, version)] for version in resolved["fixed"] if (target_id, version) in version_lookup]
+ patched_refs = [version_lookup[(target_id, resolved["patched"])] for _ in [1] if resolved["patched"] and (target_id, resolved["patched"]) in version_lookup]
+ version_sync_confidence = "high" if (affected_refs and fixed_refs) else "medium" if (fixed_refs or patched_refs or affected_refs) else "low"
+ updated = dict(advisory)
+ updated["affected_versions"] = resolved["affected"] or updated.get("affected_versions") or []
+ updated["fixed_versions"] = resolved["fixed"] or updated.get("fixed_versions") or []
+ updated["affected_version_ranges"] = resolved["affected"] or updated.get("affected_version_ranges") or []
+ updated["fixed_version_ranges"] = resolved["fixed"] or updated.get("fixed_version_ranges") or []
+ updated["patched_version"] = resolved["patched"] or updated.get("patched_version")
+ updated["affected_version_refs"] = unique(affected_refs)
+ updated["fixed_version_refs"] = unique(fixed_refs)
+ updated["patched_version_refs"] = unique(patched_refs)
+ updated["version_evidence_sources"] = unique(list(updated.get("version_evidence_sources") or []) + resolved["sources"])
+ updated["version_sync_confidence"] = version_sync_confidence
+ if version_sync_confidence in {"high", "medium"} and (affected_refs or fixed_refs or patched_refs):
+ updated["version_confidence"] = "high" if version_sync_confidence == "high" else "medium"
+ updated["version_gap_reason"] = ""
+ updated["version_resolution_needed"] = False
+ updated_advisories.append(updated)
+
+ if enqueue_lab:
+ lab_summary = _enqueue_lab_updates(updated_advisories, previous_advisories, entity_changes)
+ else:
+ lab_summary = read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {}
+
+ if write_registry:
+ _write_entity_records(list(entities.values()))
+ _write_version_records(version_records)
+
+ return {
+ "advisories": updated_advisories,
+ "entities": sorted(entities.values(), key=lambda item: item["entity_id"]),
+ "versions": version_records,
+ "lab_summary": lab_summary,
+ "summary": {
+ "cataloged_entity_total": len([item for item in entities.values() if item.get("status") == "cataloged"]),
+ "latest_version_synced_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "green"]),
+ "source_gap_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "source-gap"]),
+ "security_version_total": len(version_records),
+ "lab_enqueued_count": lab_summary.get("enqueued", 0),
+ },
+ }
+
+
+def build_version_views(
+ source_map: Dict[str, Any],
+ advisories: List[AdvisoryRecord | Dict[str, Any]],
+ entity_records: List[Dict[str, Any]] | None = None,
+ version_records: List[Dict[str, Any]] | None = None,
+ lab_summary: Dict[str, Any] | None = None,
+) -> Dict[str, Any]:
+ generated_at = isoformat(now_utc())
+ advisories_rows = [_advisory_dict(item) for item in advisories]
+ entities = entity_records or build_entity_views(source_map, advisories_rows)["entities"]
+ versions = version_records or load_version_records()
+ lab_summary = lab_summary or (read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {})
+
+ versions_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
+ for item in versions:
+ versions_by_system[item["root_system_id"]].append(item)
+
+ systems_payload: List[Dict[str, Any]] = []
+ source_gap_entities: List[Dict[str, Any]] = []
+ auto_promoted_entities: List[Dict[str, Any]] = []
+ security_version_entities = 0
+ latest_version_synced_count = 0
+ cataloged_entities = [item for item in entities if item.get("status") == "cataloged"]
+ for entity in cataloged_entities:
+ if entity.get("security_version_count", 0):
+ security_version_entities += 1
+ if entity.get("version_sync_status") == "green":
+ latest_version_synced_count += 1
+ if entity.get("version_sync_status") == "source-gap":
+ source_gap_entities.append(
+ {
+ "entity_id": entity["entity_id"],
+ "display_name": entity["display_name"],
+ "root_system_id": entity["root_system_id"],
+ "entity_type": entity["entity_type"],
+ "latest_version": entity.get("latest_version") or "",
+ "repo_url": entity.get("repo_url") or "",
+ "package_registry": entity.get("package_registry") or "",
+ "marketplace_url": entity.get("marketplace_url") or "",
+ }
+ )
+ if entity.get("auto_cataloged"):
+ auto_promoted_entities.append(
+ {
+ "entity_id": entity["entity_id"],
+ "display_name": entity["display_name"],
+ "root_system_id": entity["root_system_id"],
+ "entity_type": entity["entity_type"],
+ "catalog_source": entity.get("catalog_source") or "",
+ }
+ )
+
+ for system in source_map.get("systems", []) or []:
+ system_entities = [item for item in entities if item.get("root_system_id") == system["system_id"]]
+ system_versions = versions_by_system.get(system["system_id"], [])
+ systems_payload.append(
+ {
+ "system_id": system["system_id"],
+ "display_name": system["display_name"],
+ "cataloged_entity_total": len([item for item in system_entities if item.get("status") == "cataloged"]),
+ "latest_version_synced_count": len([item for item in system_entities if item.get("version_sync_status") == "green"]),
+ "source_gap_count": len([item for item in system_entities if item.get("version_sync_status") == "source-gap"]),
+ "security_version_count": len(system_versions),
+ "auto_promoted_count": len([item for item in system_entities if item.get("auto_cataloged")]),
+ "latest_versions": [
+ {
+ "entity_id": item["entity_id"],
+ "display_name": item["display_name"],
+ "entity_type": item["entity_type"],
+ "latest_version": item.get("latest_version") or "",
+ "latest_release_at": item.get("latest_release_at") or "",
+ "version_sync_status": item.get("version_sync_status") or "pending",
+ }
+ for item in sorted(
+ [item for item in system_entities if item.get("latest_version")],
+ key=lambda value: (-int(value.get("advisory_count") or 0), value["display_name"].lower()),
+ )[:5]
+ ],
+ }
+ )
+
+ unresolved_advisories = [
+ {
+ "canonical_id": item.get("canonical_id"),
+ "system_id": item.get("system_id"),
+ "title": item.get("title"),
+ "official_source_url": item.get("official_source_url") or "",
+ "version_gap_reason": item.get("version_gap_reason") or "",
+ }
+ for item in advisories_rows
+ if item.get("version_resolution_needed")
+ ]
+ report_lines = [
+ "# 安全相关版本同步报告",
+ "",
+ f"- 生成时间: `{generated_at}`",
+ f"- 已编目实体: `{len(cataloged_entities)}`",
+ f"- 最新版本已同步: `{latest_version_synced_count}`",
+ f"- 版本 source-gap: `{len(source_gap_entities)}`",
+ f"- 安全相关版本记录: `{len(versions)}`",
+ f"- 存在安全版本历史的实体: `{security_version_entities}`",
+ f"- 自动升级实体: `{len(auto_promoted_entities)}`",
+ f"- 因版本变化触发 lab 入队: `{lab_summary.get('enqueued', 0)}`",
+ "",
+ "## 系统版本摘要",
+ "",
+ "| 系统 | cataloged | latest synced | source-gap | security versions | auto-promoted |",
+ "| --- | ---: | ---: | ---: | ---: | ---: |",
+ ]
+ for item in sorted(systems_payload, key=lambda value: value["system_id"]):
+ report_lines.append(
+ f"| {item['system_id']} | {item['cataloged_entity_total']} | {item['latest_version_synced_count']} | {item['source_gap_count']} | {item['security_version_count']} | {item['auto_promoted_count']} |"
+ )
+
+ completeness = {
+ "generated_at": generated_at,
+ "cataloged_entity_total": len(cataloged_entities),
+ "latest_version_synced_count": latest_version_synced_count,
+ "source_gap_count": len(source_gap_entities),
+ "security_version_total": len(versions),
+ "security_version_entity_count": security_version_entities,
+ "auto_promoted_entity_count": len(auto_promoted_entities),
+ "lab_enqueued_count": lab_summary.get("enqueued", 0),
+ "systems": sorted(systems_payload, key=lambda item: item["system_id"]),
+ }
+ backlog = {
+ "generated_at": generated_at,
+ "source_gap_entities": sorted(source_gap_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
+ "unresolved_advisories": unresolved_advisories[:500],
+ "lab_pending": (lab_summary.get("pending") or [])[:500],
+ "auto_promoted_entities": sorted(auto_promoted_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
+ }
+ release_index = {
+ "generated_at": generated_at,
+ "version_count": len(versions),
+ "versions": sorted(versions, key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])), reverse=False),
+ }
+ return {
+ "completeness": completeness,
+ "backlog": backlog,
+ "release_index": release_index,
+ "report_markdown": "\n".join(report_lines),
+ }
+
+
+def write_version_views(views: Dict[str, Any]) -> None:
+ write_json(VERSION_COMPLETENESS_PATH, views["completeness"])
+ write_json(VERSION_BACKLOG_PATH, views["backlog"])
+ write_json(RELEASE_INDEX_PATH, views["release_index"])
+ write_text(VERSION_REPORT_MD_PATH, views["report_markdown"])