文件
websafe-kb/scripts/intel/versioning.py
2026-03-21 17:49:36 -07:00

1017 行
45 KiB
Python

from __future__ import annotations
import os
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from html import unescape
from typing import Any, Dict, Iterable, List, Tuple
from urllib.parse import quote, urlparse
from intel.config import (
ENTITIES_DIR,
LAB_ENQUEUE_SUMMARY_PATH,
RELEASE_INDEX_PATH,
SOURCE_BUCKETS,
VERSION_BACKLOG_PATH,
VERSION_COMPLETENESS_PATH,
VERSION_REPORT_MD_PATH,
VERSIONS_DIR,
iter_all_sources,
)
from intel.entities import (
_display_name,
_entity_id,
_entity_payload,
_github_repo_from_url,
_marketplace_slug,
_merge_entity_overlay,
_package_registry_url,
_repo_url_from_package,
build_entity_views,
)
from intel.http_client import request
from intel.models import AdvisoryRecord, VersionRecord
from intel.sources.runner import HANDLERS
from intel.utils import isoformat, load_all_json, now_utc, parse_dt, read_json, slugify, unique, write_json, write_text
VERSION_RE = re.compile(r"\bv?(?:\d+[._-]){1,5}\d+[A-Za-z0-9._-]*\b")
SECURITY_TERMS = ("security", "advisory", "cve", "ghsa", "osv", "vulnerability", "patched", "fixed")
VERSION_SOURCE_KINDS = {"rss-feed", "atom-feed", "json-feed", "html-links", "vendor-index"}
DISCOVERY_CONFIDENCE = {"official", "ecosystem-authority"}
def _advisory_dict(item: AdvisoryRecord | Dict[str, Any]) -> Dict[str, Any]:
if isinstance(item, AdvisoryRecord):
return item.to_dict()
return dict(item or {})
def _version_sort_key(value: str) -> Tuple[Tuple[int, ...], str]:
normalized = (value or "").strip().lower().lstrip("v")
numeric_parts = [int(part) for part in re.findall(r"\d+", normalized)]
return tuple(numeric_parts), normalized
def _pick_latest_version(values: Iterable[str]) -> str:
options = [str(value).strip() for value in values if str(value).strip()]
if not options:
return ""
return sorted(unique(options), key=_version_sort_key, reverse=True)[0]
def _extract_versions(*values: str) -> List[str]:
results: List[str] = []
for value in values:
if not value:
continue
for match in VERSION_RE.findall(value):
token = match.strip().lstrip("v")
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", token):
continue
if "." not in token and token.count("-") < 2:
continue
results.append(token)
return unique(results)
def _stable_release_source(system: Dict[str, Any], source: Dict[str, Any]) -> bool:
if source.get("status") == "retired":
return False
if source.get("kind") not in VERSION_SOURCE_KINDS:
return False
if source.get("bucket_name") == "research_sources":
return False
return source.get("purpose") in {"release", "discovery", "marketplace", "advisory"}
def _load_entity_registry() -> Dict[str, Dict[str, Any]]:
return {
item.get("entity_id"): item
for item in load_all_json(ENTITIES_DIR)
if item.get("entity_id")
}
def load_version_records() -> List[Dict[str, Any]]:
return load_all_json(VERSIONS_DIR)
def _write_entity_records(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
existing = _load_entity_registry()
for item in entity_records:
if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
continue
existing[item["entity_id"]] = item
for entity_id, payload in sorted(existing.items()):
if selected_system_ids and payload.get("root_system_id") in selected_system_ids:
write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
elif not selected_system_ids:
write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
def _write_version_records(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
existing = {
item.get("version_id"): item
for item in load_all_json(VERSIONS_DIR)
if item.get("version_id")
}
keep_ids: set[str] = set()
for item in version_records:
if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
continue
keep_ids.add(item["version_id"])
existing[item["version_id"]] = item
if selected_system_ids:
for version_id, payload in list(existing.items()):
if payload.get("root_system_id") in selected_system_ids and version_id not in keep_ids:
path = VERSIONS_DIR / f"{version_id}.json"
if path.exists():
path.unlink()
existing.pop(version_id, None)
for version_id, payload in sorted(existing.items()):
write_json(VERSIONS_DIR / f"{version_id}.json", payload)
def write_entity_registry(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
_write_entity_records(entity_records, selected_system_ids=selected_system_ids)
def write_version_registry(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
_write_version_records(version_records, selected_system_ids=selected_system_ids)
def _candidate_entity_from_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any] | None:
url = (source.get("url") or "").strip()
if not url:
return None
repo_url = _github_repo_from_url(url)
entity_type = source.get("entity_type_hint") or "project"
package_registry = ""
marketplace_url = ""
display_name = ""
canonical_name = ""
if repo_url:
entity_type = source.get("entity_type_hint") or "repo"
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
if match:
display_name = f"{match.group(1)} / {match.group(2)}"
canonical_name = f"{match.group(1)}/{match.group(2)}"
elif "npmjs.com/package/" in url:
entity_type = source.get("entity_type_hint") or "package"
canonical_name = url.split("/package/", 1)[1].split("?", 1)[0].strip("/")
package_registry = f"https://www.npmjs.com/package/{canonical_name}"
display_name = canonical_name
elif "packagist.org/packages/" in url:
entity_type = source.get("entity_type_hint") or "package"
canonical_name = url.split("/packages/", 1)[1].split("?", 1)[0].strip("/")
package_registry = f"https://packagist.org/packages/{canonical_name}"
display_name = canonical_name.replace("/", " / ")
elif any(token in url.lower() for token in ("/plugins/", "/themes/", "/extensions/", "/modules/", "/marketplace/")):
marketplace_url = url
canonical_name = _marketplace_slug(url)
entity_type = source.get("entity_type_hint") or entity_type
display_name = canonical_name.replace("-", " ")
else:
return None
if not canonical_name:
canonical_name = source.get("name") or system.get("display_name") or system.get("system_id")
entity_id = _entity_id(system["system_id"], entity_type, canonical_name)
return {
"entity_id": entity_id,
"entity_type": entity_type,
"display_name": _display_name(display_name or canonical_name, entity_id),
"root_system_id": system["system_id"],
"parent_entity_id": system["system_id"],
"category": system.get("category", "unknown"),
"ecosystem": system.get("category", "unknown"),
"official": source.get("confidence") == "official",
"status": "cataloged",
"history_policy": "history-full",
"repo_url": repo_url,
"package_registry": package_registry,
"marketplace_url": marketplace_url,
"latest_version": "",
"version_scheme": "semver-ish" if entity_type in {"repo", "package", "plugin", "extension", "module", "theme", "project"} else "vendor",
"source_refs": [
{
"name": source.get("name"),
"url": source.get("url"),
"kind": source.get("kind"),
"status": source.get("status"),
"bucket": source.get("bucket_name"),
"official": source.get("bucket_name") == "official_sources",
}
],
"catalog_source": source.get("name") or "",
"catalog_reason": "source catalog exposed a stable security-related object and auto-catalog is enabled",
"auto_cataloged": True,
}
def discover_entities(
source_map: Dict[str, Any],
advisories: List[AdvisoryRecord | Dict[str, Any]],
*,
write_registry: bool = False,
) -> Dict[str, Any]:
base_views = build_entity_views(source_map, advisories)
entities = {item["entity_id"]: item for item in base_views["entities"]}
auto_promoted: List[Dict[str, Any]] = []
seen_urls = {
item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
for item in entities.values()
if item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
}
for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
if source.get("confidence") not in DISCOVERY_CONFIDENCE:
continue
candidate = _candidate_entity_from_source(system, source)
if not candidate:
continue
stable_url = candidate.get("repo_url") or candidate.get("package_registry") or candidate.get("marketplace_url")
if stable_url and stable_url in seen_urls:
continue
if not source.get("auto_catalog"):
continue
if candidate["entity_id"] in entities:
continue
auto_promoted.append(
_merge_entity_overlay(
_entity_payload(
entity_id=candidate["entity_id"],
entity_type=candidate["entity_type"],
display_name=candidate["display_name"],
parent_entity_id=candidate["parent_entity_id"],
root_system_id=candidate["root_system_id"],
category=candidate["category"],
ecosystem=candidate["ecosystem"],
official=candidate["official"],
status="cataloged",
history_policy="history-full",
repo_url=candidate.get("repo_url") or "",
package_registry=candidate.get("package_registry") or "",
marketplace_url=candidate.get("marketplace_url") or "",
latest_version="",
version_scheme=candidate.get("version_scheme") or "vendor",
source_refs=candidate.get("source_refs") or [],
),
candidate,
)
)
entities[candidate["entity_id"]] = auto_promoted[-1]
if stable_url:
seen_urls.add(stable_url)
merged_entities = sorted(entities.values(), key=lambda item: item["entity_id"])
if write_registry and auto_promoted:
_write_entity_records(merged_entities, selected_system_ids={item["root_system_id"] for item in merged_entities})
refreshed_views = build_entity_views(source_map, advisories)
if auto_promoted:
refreshed_views = build_entity_views(source_map, advisories)
return {
"entities": merged_entities,
"candidate_backlog": refreshed_views["candidate_backlog"],
"auto_promoted": auto_promoted,
"summary": {
"cataloged_entity_total": len([item for item in merged_entities if item.get("status") == "cataloged"]),
"candidate_entity_total": len(refreshed_views["candidate_backlog"]),
"auto_promoted_count": len(auto_promoted),
},
}
def _fetch_source_hits_for_versions(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = []
for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
if _stable_release_source(system, source):
jobs.append((system, source))
hits_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
if not jobs:
return hits_by_system
workers = min(max(4, int(os.environ.get("WEBSAFE_VERSION_SYNC_WORKERS", "8"))), max(4, len(jobs)))
with ThreadPoolExecutor(max_workers=workers) as executor:
future_map = {}
for system, source in jobs:
handler = HANDLERS.get(source["kind"])
if handler is None:
continue
future_map[executor.submit(handler, system, dict(source))] = (system, source)
for future in as_completed(future_map):
system, source = future_map[future]
try:
items = future.result()
except Exception:
continue
for item in items[: source.get("max_items", 50)]:
title = getattr(item, "title", None) if not isinstance(item, dict) else item.get("title")
summary = getattr(item, "summary", None) if not isinstance(item, dict) else item.get("summary")
source_url = getattr(item, "source_url", None) if not isinstance(item, dict) else item.get("source_url")
affected_versions = getattr(item, "affected_versions", None) if not isinstance(item, dict) else item.get("affected_versions")
fixed_versions = getattr(item, "fixed_versions", None) if not isinstance(item, dict) else item.get("fixed_versions")
published_at = getattr(item, "published_at", None) if not isinstance(item, dict) else item.get("published_at")
updated_at = getattr(item, "updated_at", None) if not isinstance(item, dict) else item.get("updated_at")
versions = unique(
_extract_versions(
title or "",
summary or "",
source_url or "",
" ".join(affected_versions or []),
" ".join(fixed_versions or []),
)
)
if not versions:
continue
haystack = " ".join(filter(None, [title or "", summary or "", source_url or ""])).lower()
hits_by_system[system["system_id"]].append(
{
"source_name": source.get("name"),
"source_confidence": source.get("confidence"),
"source_url": source_url or source.get("url") or "",
"published_at": published_at or updated_at or "",
"updated_at": updated_at or published_at or "",
"versions": versions,
"security_related": any(term in haystack for term in SECURITY_TERMS) or source.get("purpose") in {"release", "advisory"},
}
)
return hits_by_system
def _github_latest(repo_url: str) -> Dict[str, str]:
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
if not match:
return {}
owner, repo = match.group(1), match.group(2)
headers = {"Accept": "application/vnd.github+json", "User-Agent": "websafe-intel"}
token = os.environ.get("GITHUB_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
source = {"kind": "json-feed", "request_policy": {"accept": headers["Accept"]}}
for api_url, array_key in (
(f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=10", None),
(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=10", None),
):
try:
response = request("GET", api_url, source=source, headers=headers)
response.raise_for_status()
payload = response.json()
except Exception:
continue
if not isinstance(payload, list):
continue
for item in payload:
if not isinstance(item, dict):
continue
if item.get("draft") or item.get("prerelease"):
continue
version = _pick_latest_version(
_extract_versions(
item.get("tag_name") or "",
item.get("name") or "",
item.get("html_url") or "",
)
)
if not version:
continue
return {
"version": version,
"released_at": item.get("published_at") or item.get("created_at") or "",
"release_url": item.get("html_url") or repo_url,
"source_name": "GitHub Releases API",
"source_confidence": "official",
}
return {}
def _npm_latest(package_registry: str) -> Dict[str, str]:
package_name = package_registry.split("/package/", 1)[1]
url = f"https://registry.npmjs.org/{quote(package_name, safe='@/')}/latest"
source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
try:
response = request("GET", url, source=source)
response.raise_for_status()
payload = response.json()
except Exception:
return {}
version = str(payload.get("version") or "").strip()
if not version:
return {}
return {
"version": version,
"released_at": payload.get("date") or payload.get("time") or "",
"release_url": package_registry,
"source_name": "npm latest",
"source_confidence": "ecosystem-authority",
}
def _packagist_latest(package_registry: str) -> Dict[str, str]:
package_name = package_registry.split("/packages/", 1)[1]
url = f"https://repo.packagist.org/p2/{package_name}.json"
source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
try:
response = request("GET", url, source=source)
response.raise_for_status()
payload = response.json()
except Exception:
return {}
packages = (payload.get("packages") or {}).get(package_name) or []
if not packages:
return {}
entry = packages[0]
version = str(entry.get("version") or "").strip().lstrip("v")
if not version:
return {}
return {
"version": version,
"released_at": entry.get("time") or "",
"release_url": package_registry,
"source_name": "Packagist p2",
"source_confidence": "ecosystem-authority",
}
def _latest_from_hits(entity: Dict[str, Any], hits: List[Dict[str, Any]]) -> Dict[str, str]:
candidates = []
for hit in hits:
candidates.extend(hit.get("versions") or [])
version = _pick_latest_version(candidates)
if not version:
return {}
chosen = None
for hit in hits:
if version in (hit.get("versions") or []):
chosen = hit
break
return {
"version": version,
"released_at": (chosen or {}).get("published_at") or (chosen or {}).get("updated_at") or "",
"release_url": (chosen or {}).get("source_url") or "",
"source_name": (chosen or {}).get("source_name") or "",
"source_confidence": (chosen or {}).get("source_confidence") or "unknown",
}
def _latest_from_advisories(advisories: List[Dict[str, Any]]) -> Dict[str, str]:
version = _pick_latest_version(
[
advisory.get("patched_version") or ""
for advisory in advisories
]
+ [
value
for advisory in advisories
for value in (advisory.get("fixed_versions") or [])
]
)
if not version:
return {}
advisory = next(
(
item
for item in advisories
if version == item.get("patched_version") or version in (item.get("fixed_versions") or [])
),
{},
)
return {
"version": version,
"released_at": advisory.get("updated_at") or advisory.get("published_at") or "",
"release_url": advisory.get("official_source_url") or "",
"source_name": "advisory-fixed-version",
"source_confidence": advisory.get("source_confidence") or "unknown",
}
def _strip_html(text: str) -> str:
cleaned = re.sub(r"<script\b[^>]*>.*?</script>", " ", text, flags=re.IGNORECASE | re.DOTALL)
cleaned = re.sub(r"<style\b[^>]*>.*?</style>", " ", cleaned, flags=re.IGNORECASE | re.DOTALL)
cleaned = re.sub(r"<[^>]+>", " ", cleaned)
return unescape(re.sub(r"\s+", " ", cleaned)).strip()
def _extract_page_versions(url: str) -> Dict[str, List[str]]:
source = {"kind": "html-links"}
try:
response = request("GET", url, source=source)
response.raise_for_status()
except Exception:
return {"affected": [], "fixed": [], "sources": []}
text = _strip_html(response.text)
affected: List[str] = []
fixed: List[str] = []
for pattern in (
r"(?:affected|before|prior to|through)\s+([A-Za-z0-9., _-]{3,120})",
r"(?:versions?|range)\s+([A-Za-z0-9., _-]{3,120})\s+(?:are|is)\s+affected",
):
for match in re.finditer(pattern, text, flags=re.IGNORECASE):
affected.extend(_extract_versions(match.group(1)))
for pattern in (
r"(?:fixed|patched|resolved|available in|upgrade to)\s+([A-Za-z0-9., _-]{3,120})",
r"(?:update to|updated to)\s+([A-Za-z0-9., _-]{3,120})",
):
for match in re.finditer(pattern, text, flags=re.IGNORECASE):
fixed.extend(_extract_versions(match.group(1)))
return {"affected": unique(affected), "fixed": unique(fixed), "sources": [url] if (affected or fixed) else []}
def _version_id(entity_id: str, version: str) -> str:
return f"{entity_id}--{slugify(version)}"
def _register_version(
bucket: Dict[Tuple[str, str], Dict[str, Any]],
*,
entity_id: str,
root_system_id: str,
version: str,
released_at: str,
release_url: str,
source_name: str,
source_confidence: str,
reason: str,
advisory_ref: str | None = None,
) -> None:
token = (version or "").strip()
if not token:
return
key = (entity_id, token)
existing = bucket.setdefault(
key,
VersionRecord(
version_id=_version_id(entity_id, token),
entity_id=entity_id,
root_system_id=root_system_id,
version=token,
released_at=released_at or None,
release_url=release_url or None,
source_name=source_name,
source_confidence=source_confidence,
security_relevant=True,
reason=reason,
advisory_refs=[],
is_latest_snapshot=False,
).to_dict(),
)
existing["reason"] = reason if existing.get("reason") == "affected" and reason != "affected" else existing.get("reason")
if released_at and not existing.get("released_at"):
existing["released_at"] = released_at
if release_url and not existing.get("release_url"):
existing["release_url"] = release_url
if advisory_ref and advisory_ref not in existing["advisory_refs"]:
existing["advisory_refs"].append(advisory_ref)
def _entity_target_id(advisory: Dict[str, Any]) -> str:
refs = advisory.get("entity_refs") or []
for ref in refs:
if ref.get("entity_type") != "system":
return ref.get("entity_id") or advisory.get("system_id")
return advisory.get("system_id")
def _resolve_versions_for_advisory(advisory: Dict[str, Any], *, deep: bool) -> Dict[str, Any]:
affected = unique(
list(advisory.get("affected_versions") or []) + list(advisory.get("affected_version_ranges") or [])
)
fixed = unique(
list(advisory.get("fixed_versions") or []) + list(advisory.get("fixed_version_ranges") or [])
)
evidence_sources = list(advisory.get("version_evidence_sources") or [])
if deep and not (affected or fixed) and advisory.get("official_source_url"):
page_versions = _extract_page_versions(advisory["official_source_url"])
if page_versions.get("affected"):
affected = unique(affected + page_versions["affected"])
if page_versions.get("fixed"):
fixed = unique(fixed + page_versions["fixed"])
evidence_sources = unique(evidence_sources + page_versions.get("sources", []))
patched = advisory.get("patched_version") or _pick_latest_version(fixed)
return {
"affected": affected,
"fixed": fixed,
"patched": patched,
"sources": evidence_sources,
}
def _profile_runnable(profile: Dict[str, Any]) -> bool:
if not profile:
return False
if profile.get("resolved_via") == "implicit-fallback":
return False
return bool(
profile.get("runner_id")
or profile.get("fixture_path")
or profile.get("attack_actions")
or profile.get("seed_actions")
or profile.get("services")
or profile.get("success_assertions")
)
def _enqueue_lab_updates(updated_advisories: List[Dict[str, Any]], previous_advisories: Dict[str, Dict[str, Any]], entity_changes: Dict[str, str]) -> Dict[str, Any]:
from lab import repro, task_queue # noqa: WPS433
enqueue_items: List[Dict[str, Any]] = []
pending: List[Dict[str, Any]] = []
for advisory in updated_advisories:
previous = previous_advisories.get(advisory["canonical_id"], {})
changed = previous.get("canonical_id") is None
for field in (
"affected_version_refs",
"fixed_version_refs",
"patched_version_refs",
"patched_version",
"version_resolution_needed",
"version_sync_confidence",
):
if previous.get(field) != advisory.get(field):
changed = True
break
if not changed:
target_entity = _entity_target_id(advisory)
if entity_changes.get(target_entity) and entity_changes.get(target_entity) != previous.get("latest_version"):
changed = True
if not changed:
continue
profile = repro.resolve_profile(advisory["canonical_id"], advisory)
if _profile_runnable(profile):
enqueue_items.append(
{
"advisory_id": advisory["canonical_id"],
"system_id": advisory["system_id"],
"priority": "version-sync",
}
)
else:
pending.append(
{
"advisory_id": advisory["canonical_id"],
"system_id": advisory["system_id"],
"lab_pending_reason": "no-runnable-profile",
"profile_id": profile.get("profile_id"),
}
)
enqueue_result = task_queue.enqueue_items(enqueue_items) if enqueue_items else {"queued": 0, "added": 0}
payload = {
"generated_at": isoformat(now_utc()),
"enqueued": enqueue_result.get("added", 0),
"queue_total": enqueue_result.get("queued", 0),
"items": enqueue_items,
"pending": pending,
}
write_json(LAB_ENQUEUE_SUMMARY_PATH, payload)
return payload
def sync_versions(
source_map: Dict[str, Any],
advisories: List[AdvisoryRecord | Dict[str, Any]],
*,
entity_records: List[Dict[str, Any]] | None = None,
deep: bool = False,
enqueue_lab: bool = False,
write_registry: bool = False,
) -> Dict[str, Any]:
advisory_rows = [_advisory_dict(item) for item in advisories]
previous_advisories = {
item.get("canonical_id"): item
for item in load_all_json(ENTITIES_DIR.parent / "advisories")
if item.get("canonical_id")
}
entity_seed = entity_records or build_entity_views(source_map, advisory_rows)["entities"]
entities = {item["entity_id"]: dict(item) for item in entity_seed}
advisories_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for advisory in advisory_rows:
advisories_by_entity[_entity_target_id(advisory)].append(advisory)
source_hits = _fetch_source_hits_for_versions(source_map)
version_bucket: Dict[Tuple[str, str], Dict[str, Any]] = {}
entity_changes: Dict[str, str] = {}
for entity_id, entity in entities.items():
linked_advisories = advisories_by_entity.get(entity_id, [])
if linked_advisories:
for advisory in linked_advisories:
resolved = _resolve_versions_for_advisory(advisory, deep=deep)
target_id = _entity_target_id(advisory)
for version in resolved["affected"]:
_register_version(
version_bucket,
entity_id=target_id,
root_system_id=entity["root_system_id"],
version=version,
released_at=advisory.get("published_at") or "",
release_url=advisory.get("official_source_url") or "",
source_name=advisory.get("official_source_url") or "advisory-affected",
source_confidence=advisory.get("source_confidence") or "unknown",
reason="affected",
advisory_ref=advisory["canonical_id"],
)
for version in resolved["fixed"]:
_register_version(
version_bucket,
entity_id=target_id,
root_system_id=entity["root_system_id"],
version=version,
released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
release_url=advisory.get("official_source_url") or "",
source_name=advisory.get("official_source_url") or "advisory-fixed",
source_confidence=advisory.get("source_confidence") or "unknown",
reason="fixed",
advisory_ref=advisory["canonical_id"],
)
if resolved["patched"]:
_register_version(
version_bucket,
entity_id=target_id,
root_system_id=entity["root_system_id"],
version=resolved["patched"],
released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
release_url=advisory.get("official_source_url") or "",
source_name=advisory.get("official_source_url") or "advisory-patched",
source_confidence=advisory.get("source_confidence") or "unknown",
reason="patched",
advisory_ref=advisory["canonical_id"],
)
for hit in source_hits.get(entity["root_system_id"], []):
if not hit.get("security_related"):
continue
for version in hit.get("versions") or []:
_register_version(
version_bucket,
entity_id=entity_id,
root_system_id=entity["root_system_id"],
version=version,
released_at=hit.get("published_at") or hit.get("updated_at") or "",
release_url=hit.get("source_url") or "",
source_name=hit.get("source_name") or "",
source_confidence=hit.get("source_confidence") or "unknown",
reason="security-release",
)
version_records = sorted(version_bucket.values(), key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])))
versions_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
version_lookup: Dict[Tuple[str, str], str] = {}
for item in version_records:
versions_by_entity[item["entity_id"]].append(item)
version_lookup[(item["entity_id"], item["version"])] = item["version_id"]
for entity in entities.values():
previous_latest = entity.get("latest_version") or ""
latest = {}
if entity.get("package_registry", "").startswith("https://www.npmjs.com/package/"):
latest = _npm_latest(entity["package_registry"])
elif entity.get("package_registry", "").startswith("https://packagist.org/packages/"):
latest = _packagist_latest(entity["package_registry"])
elif entity.get("repo_url", "").startswith("https://github.com/"):
latest = _github_latest(entity["repo_url"])
if not latest:
latest = _latest_from_hits(entity, source_hits.get(entity["root_system_id"], []))
if not latest:
latest = _latest_from_advisories(advisories_by_entity.get(entity["entity_id"], []))
entity["latest_version"] = latest.get("version") or entity.get("latest_version") or ""
entity["latest_release_at"] = latest.get("released_at") or entity.get("latest_release_at") or ""
entity["latest_release_url"] = latest.get("release_url") or entity.get("latest_release_url") or ""
entity["version_source_refs"] = unique(
list(entity.get("version_source_refs") or [])
+ [latest.get("release_url") or ""]
+ [item.get("release_url") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
)
entity["latest_version_evidence"] = unique(
list(entity.get("latest_version_evidence") or [])
+ [latest.get("source_name") or ""]
+ [item.get("source_name") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
)
entity["security_version_count"] = len(versions_by_entity.get(entity["entity_id"], []))
entity["last_version_synced_at"] = isoformat(now_utc())
entity["version_sync_status"] = "green" if entity.get("latest_version") else "source-gap"
if previous_latest != entity.get("latest_version"):
entity_changes[entity["entity_id"]] = entity.get("latest_version") or ""
for entity in entities.values():
if entity["entity_type"] != "system":
continue
children = [
item
for item in entities.values()
if item.get("root_system_id") == entity["entity_id"] and item.get("entity_id") != entity["entity_id"] and item.get("latest_version")
]
if children:
best = sorted(
children,
key=lambda item: (-int(item.get("advisory_count") or 0), _version_sort_key(item.get("latest_version") or "")),
reverse=False,
)[-1]
entity["latest_version"] = best.get("latest_version") or entity.get("latest_version") or ""
entity["latest_release_at"] = best.get("latest_release_at") or entity.get("latest_release_at") or ""
entity["latest_release_url"] = best.get("latest_release_url") or entity.get("latest_release_url") or ""
entity["version_sync_status"] = best.get("version_sync_status") or entity.get("version_sync_status") or "pending"
entity["security_version_count"] = sum(item.get("security_version_count", 0) for item in children)
updated_advisories: List[Dict[str, Any]] = []
for advisory in advisory_rows:
target_id = _entity_target_id(advisory)
resolved = _resolve_versions_for_advisory(advisory, deep=deep)
affected_refs = [version_lookup[(target_id, version)] for version in resolved["affected"] if (target_id, version) in version_lookup]
fixed_refs = [version_lookup[(target_id, version)] for version in resolved["fixed"] if (target_id, version) in version_lookup]
patched_refs = [version_lookup[(target_id, resolved["patched"])] for _ in [1] if resolved["patched"] and (target_id, resolved["patched"]) in version_lookup]
version_sync_confidence = "high" if (affected_refs and fixed_refs) else "medium" if (fixed_refs or patched_refs or affected_refs) else "low"
updated = dict(advisory)
updated["affected_versions"] = resolved["affected"] or updated.get("affected_versions") or []
updated["fixed_versions"] = resolved["fixed"] or updated.get("fixed_versions") or []
updated["affected_version_ranges"] = resolved["affected"] or updated.get("affected_version_ranges") or []
updated["fixed_version_ranges"] = resolved["fixed"] or updated.get("fixed_version_ranges") or []
updated["patched_version"] = resolved["patched"] or updated.get("patched_version")
updated["affected_version_refs"] = unique(affected_refs)
updated["fixed_version_refs"] = unique(fixed_refs)
updated["patched_version_refs"] = unique(patched_refs)
updated["version_evidence_sources"] = unique(list(updated.get("version_evidence_sources") or []) + resolved["sources"])
updated["version_sync_confidence"] = version_sync_confidence
if version_sync_confidence in {"high", "medium"} and (affected_refs or fixed_refs or patched_refs):
updated["version_confidence"] = "high" if version_sync_confidence == "high" else "medium"
updated["version_gap_reason"] = ""
updated["version_resolution_needed"] = False
updated_advisories.append(updated)
if enqueue_lab:
lab_summary = _enqueue_lab_updates(updated_advisories, previous_advisories, entity_changes)
else:
lab_summary = read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {}
if write_registry:
_write_entity_records(list(entities.values()))
_write_version_records(version_records)
return {
"advisories": updated_advisories,
"entities": sorted(entities.values(), key=lambda item: item["entity_id"]),
"versions": version_records,
"lab_summary": lab_summary,
"summary": {
"cataloged_entity_total": len([item for item in entities.values() if item.get("status") == "cataloged"]),
"latest_version_synced_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "green"]),
"source_gap_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "source-gap"]),
"security_version_total": len(version_records),
"lab_enqueued_count": lab_summary.get("enqueued", 0),
},
}
def build_version_views(
source_map: Dict[str, Any],
advisories: List[AdvisoryRecord | Dict[str, Any]],
entity_records: List[Dict[str, Any]] | None = None,
version_records: List[Dict[str, Any]] | None = None,
lab_summary: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
generated_at = isoformat(now_utc())
advisories_rows = [_advisory_dict(item) for item in advisories]
entities = entity_records or build_entity_views(source_map, advisories_rows)["entities"]
versions = version_records or load_version_records()
lab_summary = lab_summary or (read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {})
versions_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in versions:
versions_by_system[item["root_system_id"]].append(item)
systems_payload: List[Dict[str, Any]] = []
source_gap_entities: List[Dict[str, Any]] = []
auto_promoted_entities: List[Dict[str, Any]] = []
security_version_entities = 0
latest_version_synced_count = 0
cataloged_entities = [item for item in entities if item.get("status") == "cataloged"]
for entity in cataloged_entities:
if entity.get("security_version_count", 0):
security_version_entities += 1
if entity.get("version_sync_status") == "green":
latest_version_synced_count += 1
if entity.get("version_sync_status") == "source-gap":
source_gap_entities.append(
{
"entity_id": entity["entity_id"],
"display_name": entity["display_name"],
"root_system_id": entity["root_system_id"],
"entity_type": entity["entity_type"],
"latest_version": entity.get("latest_version") or "",
"repo_url": entity.get("repo_url") or "",
"package_registry": entity.get("package_registry") or "",
"marketplace_url": entity.get("marketplace_url") or "",
}
)
if entity.get("auto_cataloged"):
auto_promoted_entities.append(
{
"entity_id": entity["entity_id"],
"display_name": entity["display_name"],
"root_system_id": entity["root_system_id"],
"entity_type": entity["entity_type"],
"catalog_source": entity.get("catalog_source") or "",
}
)
for system in source_map.get("systems", []) or []:
system_entities = [item for item in entities if item.get("root_system_id") == system["system_id"]]
system_versions = versions_by_system.get(system["system_id"], [])
systems_payload.append(
{
"system_id": system["system_id"],
"display_name": system["display_name"],
"cataloged_entity_total": len([item for item in system_entities if item.get("status") == "cataloged"]),
"latest_version_synced_count": len([item for item in system_entities if item.get("version_sync_status") == "green"]),
"source_gap_count": len([item for item in system_entities if item.get("version_sync_status") == "source-gap"]),
"security_version_count": len(system_versions),
"auto_promoted_count": len([item for item in system_entities if item.get("auto_cataloged")]),
"latest_versions": [
{
"entity_id": item["entity_id"],
"display_name": item["display_name"],
"entity_type": item["entity_type"],
"latest_version": item.get("latest_version") or "",
"latest_release_at": item.get("latest_release_at") or "",
"version_sync_status": item.get("version_sync_status") or "pending",
}
for item in sorted(
[item for item in system_entities if item.get("latest_version")],
key=lambda value: (-int(value.get("advisory_count") or 0), value["display_name"].lower()),
)[:5]
],
}
)
unresolved_advisories = [
{
"canonical_id": item.get("canonical_id"),
"system_id": item.get("system_id"),
"title": item.get("title"),
"official_source_url": item.get("official_source_url") or "",
"version_gap_reason": item.get("version_gap_reason") or "",
}
for item in advisories_rows
if item.get("version_resolution_needed")
]
report_lines = [
"# 安全相关版本同步报告",
"",
f"- 生成时间: `{generated_at}`",
f"- 已编目实体: `{len(cataloged_entities)}`",
f"- 最新版本已同步: `{latest_version_synced_count}`",
f"- 版本 source-gap: `{len(source_gap_entities)}`",
f"- 安全相关版本记录: `{len(versions)}`",
f"- 存在安全版本历史的实体: `{security_version_entities}`",
f"- 自动升级实体: `{len(auto_promoted_entities)}`",
f"- 因版本变化触发 lab 入队: `{lab_summary.get('enqueued', 0)}`",
"",
"## 系统版本摘要",
"",
"| 系统 | cataloged | latest synced | source-gap | security versions | auto-promoted |",
"| --- | ---: | ---: | ---: | ---: | ---: |",
]
for item in sorted(systems_payload, key=lambda value: value["system_id"]):
report_lines.append(
f"| {item['system_id']} | {item['cataloged_entity_total']} | {item['latest_version_synced_count']} | {item['source_gap_count']} | {item['security_version_count']} | {item['auto_promoted_count']} |"
)
completeness = {
"generated_at": generated_at,
"cataloged_entity_total": len(cataloged_entities),
"latest_version_synced_count": latest_version_synced_count,
"source_gap_count": len(source_gap_entities),
"security_version_total": len(versions),
"security_version_entity_count": security_version_entities,
"auto_promoted_entity_count": len(auto_promoted_entities),
"lab_enqueued_count": lab_summary.get("enqueued", 0),
"systems": sorted(systems_payload, key=lambda item: item["system_id"]),
}
backlog = {
"generated_at": generated_at,
"source_gap_entities": sorted(source_gap_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
"unresolved_advisories": unresolved_advisories[:500],
"lab_pending": (lab_summary.get("pending") or [])[:500],
"auto_promoted_entities": sorted(auto_promoted_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
}
release_index = {
"generated_at": generated_at,
"version_count": len(versions),
"versions": sorted(versions, key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])), reverse=False),
}
return {
"completeness": completeness,
"backlog": backlog,
"release_index": release_index,
"report_markdown": "\n".join(report_lines),
}
def write_version_views(views: Dict[str, Any]) -> None:
write_json(VERSION_COMPLETENESS_PATH, views["completeness"])
write_json(VERSION_BACKLOG_PATH, views["backlog"])
write_json(RELEASE_INDEX_PATH, views["release_index"])
write_text(VERSION_REPORT_MD_PATH, views["report_markdown"])