1041 行
46 KiB
Python
1041 行
46 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
from collections import defaultdict
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from html import unescape
|
|
from typing import Any, Dict, Iterable, List, Tuple
|
|
from urllib.parse import quote, urlparse
|
|
|
|
from intel.config import (
|
|
ENTITIES_DIR,
|
|
LAB_ENQUEUE_SUMMARY_PATH,
|
|
RELEASE_INDEX_PATH,
|
|
SOURCE_BUCKETS,
|
|
VERSION_BACKLOG_PATH,
|
|
VERSION_COMPLETENESS_PATH,
|
|
VERSION_REPORT_MD_PATH,
|
|
VERSIONS_DIR,
|
|
iter_all_sources,
|
|
)
|
|
from intel.entities import (
|
|
_display_name,
|
|
_entity_id,
|
|
_entity_payload,
|
|
_github_repo_from_url,
|
|
_marketplace_slug,
|
|
_merge_entity_overlay,
|
|
_package_registry_url,
|
|
_repo_url_from_package,
|
|
build_entity_views,
|
|
)
|
|
from intel.http_client import request
|
|
from intel.models import AdvisoryRecord, VersionRecord
|
|
from intel.sources.runner import HANDLERS
|
|
from intel.utils import isoformat, load_all_json, now_utc, parse_dt, read_json, slugify, unique, write_json, write_text
|
|
|
|
|
|
VERSION_RE = re.compile(r"\bv?(?:\d+[._-]){1,5}\d+[A-Za-z0-9._-]*\b")
|
|
SECURITY_TERMS = ("security", "advisory", "cve", "ghsa", "osv", "vulnerability", "patched", "fixed")
|
|
VERSION_SOURCE_KINDS = {"rss-feed", "atom-feed", "json-feed", "html-links", "vendor-index"}
|
|
DISCOVERY_CONFIDENCE = {"official", "ecosystem-authority"}
|
|
|
|
|
|
def _advisory_dict(item: AdvisoryRecord | Dict[str, Any]) -> Dict[str, Any]:
|
|
if isinstance(item, AdvisoryRecord):
|
|
return item.to_dict()
|
|
return dict(item or {})
|
|
|
|
|
|
def _version_sort_key(value: str) -> Tuple[Tuple[int, ...], str]:
|
|
normalized = (value or "").strip().lower().lstrip("v")
|
|
numeric_parts = [int(part) for part in re.findall(r"\d+", normalized)]
|
|
return tuple(numeric_parts), normalized
|
|
|
|
|
|
def _pick_latest_version(values: Iterable[str]) -> str:
|
|
options = [str(value).strip() for value in values if str(value).strip()]
|
|
if not options:
|
|
return ""
|
|
return sorted(unique(options), key=_version_sort_key, reverse=True)[0]
|
|
|
|
|
|
def _extract_versions(*values: str) -> List[str]:
|
|
results: List[str] = []
|
|
for value in values:
|
|
if not value:
|
|
continue
|
|
for match in VERSION_RE.findall(value):
|
|
token = match.strip().lstrip("v")
|
|
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", token):
|
|
continue
|
|
if "." not in token and token.count("-") < 2:
|
|
continue
|
|
results.append(token)
|
|
return unique(results)
|
|
|
|
|
|
def _stable_release_source(system: Dict[str, Any], source: Dict[str, Any]) -> bool:
|
|
if source.get("status") == "retired":
|
|
return False
|
|
if source.get("kind") not in VERSION_SOURCE_KINDS:
|
|
return False
|
|
if source.get("bucket_name") == "research_sources":
|
|
return False
|
|
return source.get("purpose") in {"release", "discovery", "marketplace", "advisory"}
|
|
|
|
|
|
def _load_entity_registry() -> Dict[str, Dict[str, Any]]:
|
|
return {
|
|
item.get("entity_id"): item
|
|
for item in load_all_json(ENTITIES_DIR)
|
|
if item.get("entity_id")
|
|
}
|
|
|
|
|
|
def load_version_records() -> List[Dict[str, Any]]:
|
|
return load_all_json(VERSIONS_DIR)
|
|
|
|
|
|
def _write_entity_records(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
|
|
existing = _load_entity_registry()
|
|
for item in entity_records:
|
|
if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
|
|
continue
|
|
existing[item["entity_id"]] = item
|
|
for entity_id, payload in sorted(existing.items()):
|
|
if selected_system_ids and payload.get("root_system_id") in selected_system_ids:
|
|
write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
|
|
elif not selected_system_ids:
|
|
write_json(ENTITIES_DIR / f"{entity_id}.json", payload)
|
|
|
|
|
|
def _write_version_records(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
|
|
existing = {
|
|
item.get("version_id"): item
|
|
for item in load_all_json(VERSIONS_DIR)
|
|
if item.get("version_id")
|
|
}
|
|
keep_ids: set[str] = set()
|
|
for item in version_records:
|
|
if selected_system_ids and item.get("root_system_id") not in selected_system_ids:
|
|
continue
|
|
keep_ids.add(item["version_id"])
|
|
existing[item["version_id"]] = item
|
|
if selected_system_ids:
|
|
for version_id, payload in list(existing.items()):
|
|
if payload.get("root_system_id") in selected_system_ids and version_id not in keep_ids:
|
|
path = VERSIONS_DIR / f"{version_id}.json"
|
|
if path.exists():
|
|
path.unlink()
|
|
existing.pop(version_id, None)
|
|
for version_id, payload in sorted(existing.items()):
|
|
write_json(VERSIONS_DIR / f"{version_id}.json", payload)
|
|
|
|
|
|
def write_entity_registry(entity_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
|
|
_write_entity_records(entity_records, selected_system_ids=selected_system_ids)
|
|
|
|
|
|
def write_version_registry(version_records: List[Dict[str, Any]], *, selected_system_ids: set[str] | None = None) -> None:
|
|
_write_version_records(version_records, selected_system_ids=selected_system_ids)
|
|
|
|
|
|
def _candidate_entity_from_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any] | None:
|
|
url = (source.get("url") or "").strip()
|
|
if not url:
|
|
return None
|
|
repo_url = _github_repo_from_url(url)
|
|
entity_type = source.get("entity_type_hint") or "project"
|
|
package_registry = ""
|
|
marketplace_url = ""
|
|
display_name = ""
|
|
canonical_name = ""
|
|
|
|
if repo_url:
|
|
entity_type = source.get("entity_type_hint") or "repo"
|
|
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
|
|
if match:
|
|
display_name = f"{match.group(1)} / {match.group(2)}"
|
|
canonical_name = f"{match.group(1)}/{match.group(2)}"
|
|
elif "npmjs.com/package/" in url:
|
|
entity_type = source.get("entity_type_hint") or "package"
|
|
canonical_name = url.split("/package/", 1)[1].split("?", 1)[0].strip("/")
|
|
package_registry = f"https://www.npmjs.com/package/{canonical_name}"
|
|
display_name = canonical_name
|
|
elif "packagist.org/packages/" in url:
|
|
entity_type = source.get("entity_type_hint") or "package"
|
|
canonical_name = url.split("/packages/", 1)[1].split("?", 1)[0].strip("/")
|
|
package_registry = f"https://packagist.org/packages/{canonical_name}"
|
|
display_name = canonical_name.replace("/", " / ")
|
|
elif any(token in url.lower() for token in ("/plugins/", "/themes/", "/extensions/", "/modules/", "/marketplace/")):
|
|
marketplace_url = url
|
|
canonical_name = _marketplace_slug(url)
|
|
entity_type = source.get("entity_type_hint") or entity_type
|
|
display_name = canonical_name.replace("-", " ")
|
|
else:
|
|
return None
|
|
|
|
if not canonical_name:
|
|
canonical_name = source.get("name") or system.get("display_name") or system.get("system_id")
|
|
entity_id = _entity_id(system["system_id"], entity_type, canonical_name)
|
|
return {
|
|
"entity_id": entity_id,
|
|
"entity_type": entity_type,
|
|
"display_name": _display_name(display_name or canonical_name, entity_id),
|
|
"root_system_id": system["system_id"],
|
|
"parent_entity_id": system["system_id"],
|
|
"category": system.get("category", "unknown"),
|
|
"ecosystem": system.get("category", "unknown"),
|
|
"official": source.get("confidence") == "official",
|
|
"status": "cataloged",
|
|
"history_policy": "history-full",
|
|
"repo_url": repo_url,
|
|
"package_registry": package_registry,
|
|
"marketplace_url": marketplace_url,
|
|
"latest_version": "",
|
|
"version_scheme": "semver-ish" if entity_type in {"repo", "package", "plugin", "extension", "module", "theme", "project"} else "vendor",
|
|
"source_refs": [
|
|
{
|
|
"name": source.get("name"),
|
|
"url": source.get("url"),
|
|
"kind": source.get("kind"),
|
|
"status": source.get("status"),
|
|
"bucket": source.get("bucket_name"),
|
|
"official": source.get("bucket_name") == "official_sources",
|
|
}
|
|
],
|
|
"catalog_source": source.get("name") or "",
|
|
"catalog_reason": "source catalog exposed a stable security-related object and auto-catalog is enabled",
|
|
"auto_cataloged": True,
|
|
}
|
|
|
|
|
|
def discover_entities(
|
|
source_map: Dict[str, Any],
|
|
advisories: List[AdvisoryRecord | Dict[str, Any]],
|
|
*,
|
|
write_registry: bool = False,
|
|
) -> Dict[str, Any]:
|
|
base_views = build_entity_views(source_map, advisories)
|
|
entities = {item["entity_id"]: item for item in base_views["entities"]}
|
|
auto_promoted: List[Dict[str, Any]] = []
|
|
seen_urls = {
|
|
item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
|
for item in entities.values()
|
|
if item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
|
}
|
|
for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
|
|
if source.get("confidence") not in DISCOVERY_CONFIDENCE:
|
|
continue
|
|
candidate = _candidate_entity_from_source(system, source)
|
|
if not candidate:
|
|
continue
|
|
stable_url = candidate.get("repo_url") or candidate.get("package_registry") or candidate.get("marketplace_url")
|
|
if stable_url and stable_url in seen_urls:
|
|
continue
|
|
if not source.get("auto_catalog"):
|
|
continue
|
|
if candidate["entity_id"] in entities:
|
|
continue
|
|
auto_promoted.append(
|
|
_merge_entity_overlay(
|
|
_entity_payload(
|
|
entity_id=candidate["entity_id"],
|
|
entity_type=candidate["entity_type"],
|
|
display_name=candidate["display_name"],
|
|
parent_entity_id=candidate["parent_entity_id"],
|
|
root_system_id=candidate["root_system_id"],
|
|
category=candidate["category"],
|
|
ecosystem=candidate["ecosystem"],
|
|
official=candidate["official"],
|
|
status="cataloged",
|
|
history_policy="history-full",
|
|
repo_url=candidate.get("repo_url") or "",
|
|
package_registry=candidate.get("package_registry") or "",
|
|
marketplace_url=candidate.get("marketplace_url") or "",
|
|
latest_version="",
|
|
version_scheme=candidate.get("version_scheme") or "vendor",
|
|
source_refs=candidate.get("source_refs") or [],
|
|
),
|
|
candidate,
|
|
)
|
|
)
|
|
entities[candidate["entity_id"]] = auto_promoted[-1]
|
|
if stable_url:
|
|
seen_urls.add(stable_url)
|
|
|
|
merged_entities = sorted(entities.values(), key=lambda item: item["entity_id"])
|
|
if write_registry and auto_promoted:
|
|
_write_entity_records(merged_entities, selected_system_ids={item["root_system_id"] for item in merged_entities})
|
|
if write_registry and auto_promoted:
|
|
refreshed_views = build_entity_views(source_map, advisories)
|
|
candidate_backlog = refreshed_views["candidate_backlog"]
|
|
else:
|
|
promoted_urls = {
|
|
item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
|
for item in auto_promoted
|
|
if item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
|
}
|
|
candidate_backlog = [
|
|
item
|
|
for item in base_views["candidate_backlog"]
|
|
if (item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")) not in promoted_urls
|
|
]
|
|
return {
|
|
"entities": merged_entities,
|
|
"candidate_backlog": candidate_backlog,
|
|
"auto_promoted": auto_promoted,
|
|
"summary": {
|
|
"cataloged_entity_total": len([item for item in merged_entities if item.get("status") == "cataloged"]),
|
|
"candidate_entity_total": len(candidate_backlog),
|
|
"auto_promoted_count": len(auto_promoted),
|
|
},
|
|
}
|
|
|
|
|
|
def _fetch_source_hits_for_versions(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
|
|
jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = []
|
|
for system, _bucket, source in iter_all_sources(source_map, include_retired=False):
|
|
if _stable_release_source(system, source):
|
|
jobs.append((system, source))
|
|
|
|
hits_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
if not jobs:
|
|
return hits_by_system
|
|
|
|
workers = min(max(4, int(os.environ.get("WEBSAFE_VERSION_SYNC_WORKERS", "8"))), max(4, len(jobs)))
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
future_map = {}
|
|
for system, source in jobs:
|
|
handler = HANDLERS.get(source["kind"])
|
|
if handler is None:
|
|
continue
|
|
future_map[executor.submit(handler, system, dict(source))] = (system, source)
|
|
|
|
for future in as_completed(future_map):
|
|
system, source = future_map[future]
|
|
try:
|
|
items = future.result()
|
|
except Exception:
|
|
continue
|
|
for item in items[: source.get("max_items", 50)]:
|
|
title = getattr(item, "title", None) if not isinstance(item, dict) else item.get("title")
|
|
summary = getattr(item, "summary", None) if not isinstance(item, dict) else item.get("summary")
|
|
source_url = getattr(item, "source_url", None) if not isinstance(item, dict) else item.get("source_url")
|
|
affected_versions = getattr(item, "affected_versions", None) if not isinstance(item, dict) else item.get("affected_versions")
|
|
fixed_versions = getattr(item, "fixed_versions", None) if not isinstance(item, dict) else item.get("fixed_versions")
|
|
published_at = getattr(item, "published_at", None) if not isinstance(item, dict) else item.get("published_at")
|
|
updated_at = getattr(item, "updated_at", None) if not isinstance(item, dict) else item.get("updated_at")
|
|
versions = unique(
|
|
_extract_versions(
|
|
title or "",
|
|
summary or "",
|
|
source_url or "",
|
|
" ".join(affected_versions or []),
|
|
" ".join(fixed_versions or []),
|
|
)
|
|
)
|
|
if not versions:
|
|
continue
|
|
haystack = " ".join(filter(None, [title or "", summary or "", source_url or ""])).lower()
|
|
hits_by_system[system["system_id"]].append(
|
|
{
|
|
"source_name": source.get("name"),
|
|
"source_confidence": source.get("confidence"),
|
|
"source_url": source_url or source.get("url") or "",
|
|
"published_at": published_at or updated_at or "",
|
|
"updated_at": updated_at or published_at or "",
|
|
"versions": versions,
|
|
"security_related": any(term in haystack for term in SECURITY_TERMS) or source.get("purpose") in {"release", "advisory"},
|
|
}
|
|
)
|
|
return hits_by_system
|
|
|
|
|
|
def _github_latest(repo_url: str) -> Dict[str, str]:
|
|
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
|
|
if not match:
|
|
return {}
|
|
owner, repo = match.group(1), match.group(2)
|
|
headers = {"Accept": "application/vnd.github+json", "User-Agent": "websafe-intel"}
|
|
token = os.environ.get("GITHUB_TOKEN")
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
source = {"kind": "json-feed", "request_policy": {"accept": headers["Accept"]}}
|
|
for api_url, array_key in (
|
|
(f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=10", None),
|
|
(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=10", None),
|
|
):
|
|
try:
|
|
response = request("GET", api_url, source=source, headers=headers)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
except Exception:
|
|
continue
|
|
if not isinstance(payload, list):
|
|
continue
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if item.get("draft") or item.get("prerelease"):
|
|
continue
|
|
version = _pick_latest_version(
|
|
_extract_versions(
|
|
item.get("tag_name") or "",
|
|
item.get("name") or "",
|
|
item.get("html_url") or "",
|
|
)
|
|
)
|
|
if not version:
|
|
continue
|
|
return {
|
|
"version": version,
|
|
"released_at": item.get("published_at") or item.get("created_at") or "",
|
|
"release_url": item.get("html_url") or repo_url,
|
|
"source_name": "GitHub Releases API",
|
|
"source_confidence": "official",
|
|
}
|
|
return {}
|
|
|
|
|
|
def _npm_latest(package_registry: str) -> Dict[str, str]:
|
|
package_name = package_registry.split("/package/", 1)[1]
|
|
url = f"https://registry.npmjs.org/{quote(package_name, safe='@/')}/latest"
|
|
source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
|
|
try:
|
|
response = request("GET", url, source=source)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
except Exception:
|
|
return {}
|
|
version = str(payload.get("version") or "").strip()
|
|
if not version:
|
|
return {}
|
|
return {
|
|
"version": version,
|
|
"released_at": payload.get("date") or payload.get("time") or "",
|
|
"release_url": package_registry,
|
|
"source_name": "npm latest",
|
|
"source_confidence": "ecosystem-authority",
|
|
}
|
|
|
|
|
|
def _packagist_latest(package_registry: str) -> Dict[str, str]:
|
|
package_name = package_registry.split("/packages/", 1)[1]
|
|
url = f"https://repo.packagist.org/p2/{package_name}.json"
|
|
source = {"kind": "json-feed", "request_policy": {"accept": "application/json"}}
|
|
try:
|
|
response = request("GET", url, source=source)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
except Exception:
|
|
return {}
|
|
packages = (payload.get("packages") or {}).get(package_name) or []
|
|
if not packages:
|
|
return {}
|
|
entry = packages[0]
|
|
version = str(entry.get("version") or "").strip().lstrip("v")
|
|
if not version:
|
|
return {}
|
|
return {
|
|
"version": version,
|
|
"released_at": entry.get("time") or "",
|
|
"release_url": package_registry,
|
|
"source_name": "Packagist p2",
|
|
"source_confidence": "ecosystem-authority",
|
|
}
|
|
|
|
|
|
def _latest_from_hits(entity: Dict[str, Any], hits: List[Dict[str, Any]]) -> Dict[str, str]:
|
|
candidates = []
|
|
for hit in hits:
|
|
candidates.extend(hit.get("versions") or [])
|
|
version = _pick_latest_version(candidates)
|
|
if not version:
|
|
return {}
|
|
chosen = None
|
|
for hit in hits:
|
|
if version in (hit.get("versions") or []):
|
|
chosen = hit
|
|
break
|
|
return {
|
|
"version": version,
|
|
"released_at": (chosen or {}).get("published_at") or (chosen or {}).get("updated_at") or "",
|
|
"release_url": (chosen or {}).get("source_url") or "",
|
|
"source_name": (chosen or {}).get("source_name") or "",
|
|
"source_confidence": (chosen or {}).get("source_confidence") or "unknown",
|
|
}
|
|
|
|
|
|
def _latest_from_advisories(advisories: List[Dict[str, Any]]) -> Dict[str, str]:
|
|
version = _pick_latest_version(
|
|
[
|
|
advisory.get("patched_version") or ""
|
|
for advisory in advisories
|
|
]
|
|
+ [
|
|
value
|
|
for advisory in advisories
|
|
for value in (advisory.get("fixed_versions") or [])
|
|
]
|
|
)
|
|
if not version:
|
|
return {}
|
|
advisory = next(
|
|
(
|
|
item
|
|
for item in advisories
|
|
if version == item.get("patched_version") or version in (item.get("fixed_versions") or [])
|
|
),
|
|
{},
|
|
)
|
|
return {
|
|
"version": version,
|
|
"released_at": advisory.get("updated_at") or advisory.get("published_at") or "",
|
|
"release_url": advisory.get("official_source_url") or "",
|
|
"source_name": "advisory-fixed-version",
|
|
"source_confidence": advisory.get("source_confidence") or "unknown",
|
|
}
|
|
|
|
|
|
def _strip_html(text: str) -> str:
|
|
cleaned = re.sub(r"<script\b[^>]*>.*?</script>", " ", text, flags=re.IGNORECASE | re.DOTALL)
|
|
cleaned = re.sub(r"<style\b[^>]*>.*?</style>", " ", cleaned, flags=re.IGNORECASE | re.DOTALL)
|
|
cleaned = re.sub(r"<[^>]+>", " ", cleaned)
|
|
return unescape(re.sub(r"\s+", " ", cleaned)).strip()
|
|
|
|
|
|
def _extract_page_versions(url: str) -> Dict[str, List[str]]:
|
|
source = {"kind": "html-links"}
|
|
try:
|
|
response = request("GET", url, source=source)
|
|
response.raise_for_status()
|
|
except Exception:
|
|
return {"affected": [], "fixed": [], "sources": []}
|
|
text = _strip_html(response.text)
|
|
affected: List[str] = []
|
|
fixed: List[str] = []
|
|
for pattern in (
|
|
r"(?:affected|before|prior to|through)\s+([A-Za-z0-9., _-]{3,120})",
|
|
r"(?:versions?|range)\s+([A-Za-z0-9., _-]{3,120})\s+(?:are|is)\s+affected",
|
|
):
|
|
for match in re.finditer(pattern, text, flags=re.IGNORECASE):
|
|
affected.extend(_extract_versions(match.group(1)))
|
|
for pattern in (
|
|
r"(?:fixed|patched|resolved|available in|upgrade to)\s+([A-Za-z0-9., _-]{3,120})",
|
|
r"(?:update to|updated to)\s+([A-Za-z0-9., _-]{3,120})",
|
|
):
|
|
for match in re.finditer(pattern, text, flags=re.IGNORECASE):
|
|
fixed.extend(_extract_versions(match.group(1)))
|
|
return {"affected": unique(affected), "fixed": unique(fixed), "sources": [url] if (affected or fixed) else []}
|
|
|
|
|
|
def _version_id(entity_id: str, version: str) -> str:
|
|
return f"{entity_id}--{slugify(version)}"
|
|
|
|
|
|
def _register_version(
|
|
bucket: Dict[Tuple[str, str], Dict[str, Any]],
|
|
*,
|
|
entity_id: str,
|
|
root_system_id: str,
|
|
version: str,
|
|
released_at: str,
|
|
release_url: str,
|
|
source_name: str,
|
|
source_confidence: str,
|
|
reason: str,
|
|
advisory_ref: str | None = None,
|
|
) -> None:
|
|
token = (version or "").strip()
|
|
if not token:
|
|
return
|
|
key = (entity_id, token)
|
|
existing = bucket.setdefault(
|
|
key,
|
|
VersionRecord(
|
|
version_id=_version_id(entity_id, token),
|
|
entity_id=entity_id,
|
|
root_system_id=root_system_id,
|
|
version=token,
|
|
released_at=released_at or None,
|
|
release_url=release_url or None,
|
|
source_name=source_name,
|
|
source_confidence=source_confidence,
|
|
security_relevant=True,
|
|
reason=reason,
|
|
advisory_refs=[],
|
|
is_latest_snapshot=False,
|
|
).to_dict(),
|
|
)
|
|
existing["reason"] = reason if existing.get("reason") == "affected" and reason != "affected" else existing.get("reason")
|
|
if released_at and not existing.get("released_at"):
|
|
existing["released_at"] = released_at
|
|
if release_url and not existing.get("release_url"):
|
|
existing["release_url"] = release_url
|
|
if advisory_ref and advisory_ref not in existing["advisory_refs"]:
|
|
existing["advisory_refs"].append(advisory_ref)
|
|
|
|
|
|
def _entity_target_id(advisory: Dict[str, Any]) -> str:
|
|
refs = advisory.get("entity_refs") or []
|
|
for ref in refs:
|
|
if ref.get("entity_type") != "system":
|
|
return ref.get("entity_id") or advisory.get("system_id")
|
|
return advisory.get("system_id")
|
|
|
|
|
|
def _resolve_versions_for_advisory(advisory: Dict[str, Any], *, deep: bool) -> Dict[str, Any]:
|
|
affected = unique(
|
|
list(advisory.get("affected_versions") or []) + list(advisory.get("affected_version_ranges") or [])
|
|
)
|
|
fixed = unique(
|
|
list(advisory.get("fixed_versions") or []) + list(advisory.get("fixed_version_ranges") or [])
|
|
)
|
|
evidence_sources = list(advisory.get("version_evidence_sources") or [])
|
|
if deep and not (affected or fixed) and advisory.get("official_source_url"):
|
|
page_versions = _extract_page_versions(advisory["official_source_url"])
|
|
if page_versions.get("affected"):
|
|
affected = unique(affected + page_versions["affected"])
|
|
if page_versions.get("fixed"):
|
|
fixed = unique(fixed + page_versions["fixed"])
|
|
evidence_sources = unique(evidence_sources + page_versions.get("sources", []))
|
|
patched = advisory.get("patched_version") or _pick_latest_version(fixed)
|
|
return {
|
|
"affected": affected,
|
|
"fixed": fixed,
|
|
"patched": patched,
|
|
"sources": evidence_sources,
|
|
}
|
|
|
|
|
|
def _profile_runnable(profile: Dict[str, Any]) -> bool:
|
|
if not profile:
|
|
return False
|
|
if profile.get("resolved_via") == "implicit-fallback":
|
|
return False
|
|
return bool(
|
|
profile.get("runner_id")
|
|
or profile.get("fixture_path")
|
|
or profile.get("attack_actions")
|
|
or profile.get("seed_actions")
|
|
or profile.get("services")
|
|
or profile.get("success_assertions")
|
|
)
|
|
|
|
|
|
def _enqueue_lab_updates(updated_advisories: List[Dict[str, Any]], previous_advisories: Dict[str, Dict[str, Any]], entity_changes: Dict[str, str]) -> Dict[str, Any]:
|
|
from lab import repro, task_queue # noqa: WPS433
|
|
|
|
enqueue_items: List[Dict[str, Any]] = []
|
|
pending: List[Dict[str, Any]] = []
|
|
for advisory in updated_advisories:
|
|
previous = previous_advisories.get(advisory["canonical_id"], {})
|
|
changed = previous.get("canonical_id") is None
|
|
for field in (
|
|
"affected_version_refs",
|
|
"fixed_version_refs",
|
|
"patched_version_refs",
|
|
"patched_version",
|
|
"version_resolution_needed",
|
|
"version_sync_confidence",
|
|
):
|
|
if previous.get(field) != advisory.get(field):
|
|
changed = True
|
|
break
|
|
if not changed:
|
|
target_entity = _entity_target_id(advisory)
|
|
if entity_changes.get(target_entity) and entity_changes.get(target_entity) != previous.get("latest_version"):
|
|
changed = True
|
|
if not changed:
|
|
continue
|
|
profile = repro.resolve_profile(advisory["canonical_id"], advisory)
|
|
if _profile_runnable(profile):
|
|
enqueue_items.append(
|
|
{
|
|
"advisory_id": advisory["canonical_id"],
|
|
"system_id": advisory["system_id"],
|
|
"priority": "version-sync",
|
|
}
|
|
)
|
|
else:
|
|
pending.append(
|
|
{
|
|
"advisory_id": advisory["canonical_id"],
|
|
"system_id": advisory["system_id"],
|
|
"lab_pending_reason": "no-runnable-profile",
|
|
"profile_id": profile.get("profile_id"),
|
|
}
|
|
)
|
|
enqueue_result = task_queue.enqueue_items(enqueue_items) if enqueue_items else {"queued": 0, "added": 0}
|
|
payload = {
|
|
"generated_at": isoformat(now_utc()),
|
|
"enqueued": enqueue_result.get("added", 0),
|
|
"queue_total": enqueue_result.get("queued", 0),
|
|
"items": enqueue_items,
|
|
"pending": pending,
|
|
}
|
|
write_json(LAB_ENQUEUE_SUMMARY_PATH, payload)
|
|
return payload
|
|
|
|
|
|
def sync_versions(
|
|
source_map: Dict[str, Any],
|
|
advisories: List[AdvisoryRecord | Dict[str, Any]],
|
|
*,
|
|
entity_records: List[Dict[str, Any]] | None = None,
|
|
deep: bool = False,
|
|
enqueue_lab: bool = False,
|
|
write_registry: bool = False,
|
|
) -> Dict[str, Any]:
|
|
advisory_rows = [_advisory_dict(item) for item in advisories]
|
|
previous_advisories = {
|
|
item.get("canonical_id"): item
|
|
for item in load_all_json(ENTITIES_DIR.parent / "advisories")
|
|
if item.get("canonical_id")
|
|
}
|
|
entity_seed = entity_records or build_entity_views(source_map, advisory_rows)["entities"]
|
|
entities = {item["entity_id"]: dict(item) for item in entity_seed}
|
|
advisories_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for advisory in advisory_rows:
|
|
advisories_by_entity[_entity_target_id(advisory)].append(advisory)
|
|
source_hits = _fetch_source_hits_for_versions(source_map)
|
|
version_bucket: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
|
entity_changes: Dict[str, str] = {}
|
|
|
|
for entity_id, entity in entities.items():
|
|
linked_advisories = advisories_by_entity.get(entity_id, [])
|
|
if linked_advisories:
|
|
for advisory in linked_advisories:
|
|
resolved = _resolve_versions_for_advisory(advisory, deep=deep)
|
|
target_id = _entity_target_id(advisory)
|
|
for version in resolved["affected"]:
|
|
_register_version(
|
|
version_bucket,
|
|
entity_id=target_id,
|
|
root_system_id=entity["root_system_id"],
|
|
version=version,
|
|
released_at=advisory.get("published_at") or "",
|
|
release_url=advisory.get("official_source_url") or "",
|
|
source_name=advisory.get("official_source_url") or "advisory-affected",
|
|
source_confidence=advisory.get("source_confidence") or "unknown",
|
|
reason="affected",
|
|
advisory_ref=advisory["canonical_id"],
|
|
)
|
|
for version in resolved["fixed"]:
|
|
_register_version(
|
|
version_bucket,
|
|
entity_id=target_id,
|
|
root_system_id=entity["root_system_id"],
|
|
version=version,
|
|
released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
|
|
release_url=advisory.get("official_source_url") or "",
|
|
source_name=advisory.get("official_source_url") or "advisory-fixed",
|
|
source_confidence=advisory.get("source_confidence") or "unknown",
|
|
reason="fixed",
|
|
advisory_ref=advisory["canonical_id"],
|
|
)
|
|
if resolved["patched"]:
|
|
_register_version(
|
|
version_bucket,
|
|
entity_id=target_id,
|
|
root_system_id=entity["root_system_id"],
|
|
version=resolved["patched"],
|
|
released_at=advisory.get("updated_at") or advisory.get("published_at") or "",
|
|
release_url=advisory.get("official_source_url") or "",
|
|
source_name=advisory.get("official_source_url") or "advisory-patched",
|
|
source_confidence=advisory.get("source_confidence") or "unknown",
|
|
reason="patched",
|
|
advisory_ref=advisory["canonical_id"],
|
|
)
|
|
|
|
for hit in source_hits.get(entity["root_system_id"], []):
|
|
if not hit.get("security_related"):
|
|
continue
|
|
for version in hit.get("versions") or []:
|
|
_register_version(
|
|
version_bucket,
|
|
entity_id=entity_id,
|
|
root_system_id=entity["root_system_id"],
|
|
version=version,
|
|
released_at=hit.get("published_at") or hit.get("updated_at") or "",
|
|
release_url=hit.get("source_url") or "",
|
|
source_name=hit.get("source_name") or "",
|
|
source_confidence=hit.get("source_confidence") or "unknown",
|
|
reason="security-release",
|
|
)
|
|
|
|
version_records = sorted(version_bucket.values(), key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])))
|
|
versions_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
version_lookup: Dict[Tuple[str, str], str] = {}
|
|
for item in version_records:
|
|
versions_by_entity[item["entity_id"]].append(item)
|
|
version_lookup[(item["entity_id"], item["version"])] = item["version_id"]
|
|
|
|
for entity in entities.values():
|
|
previous_latest = entity.get("latest_version") or ""
|
|
latest = {}
|
|
if entity.get("package_registry", "").startswith("https://www.npmjs.com/package/"):
|
|
latest = _npm_latest(entity["package_registry"])
|
|
elif entity.get("package_registry", "").startswith("https://packagist.org/packages/"):
|
|
latest = _packagist_latest(entity["package_registry"])
|
|
elif entity.get("repo_url", "").startswith("https://github.com/"):
|
|
latest = _github_latest(entity["repo_url"])
|
|
if not latest:
|
|
latest = _latest_from_hits(entity, source_hits.get(entity["root_system_id"], []))
|
|
if not latest:
|
|
latest = _latest_from_advisories(advisories_by_entity.get(entity["entity_id"], []))
|
|
entity["latest_version"] = latest.get("version") or entity.get("latest_version") or ""
|
|
entity["latest_release_at"] = latest.get("released_at") or entity.get("latest_release_at") or ""
|
|
entity["latest_release_url"] = latest.get("release_url") or entity.get("latest_release_url") or ""
|
|
entity["version_source_refs"] = unique(
|
|
list(entity.get("version_source_refs") or [])
|
|
+ [latest.get("release_url") or ""]
|
|
+ [item.get("release_url") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
|
|
)
|
|
entity["latest_version_evidence"] = unique(
|
|
list(entity.get("latest_version_evidence") or [])
|
|
+ [latest.get("source_name") or ""]
|
|
+ [item.get("source_name") or "" for item in versions_by_entity.get(entity["entity_id"], [])[:20]]
|
|
)
|
|
entity["security_version_count"] = len(versions_by_entity.get(entity["entity_id"], []))
|
|
entity["last_version_synced_at"] = isoformat(now_utc())
|
|
entity["version_sync_status"] = "green" if entity.get("latest_version") else "source-gap"
|
|
if previous_latest != entity.get("latest_version"):
|
|
entity_changes[entity["entity_id"]] = entity.get("latest_version") or ""
|
|
|
|
for entity in entities.values():
|
|
if entity["entity_type"] != "system":
|
|
continue
|
|
children = [
|
|
item
|
|
for item in entities.values()
|
|
if item.get("root_system_id") == entity["entity_id"] and item.get("entity_id") != entity["entity_id"] and item.get("latest_version")
|
|
]
|
|
if children:
|
|
best = sorted(
|
|
children,
|
|
key=lambda item: (-int(item.get("advisory_count") or 0), _version_sort_key(item.get("latest_version") or "")),
|
|
reverse=False,
|
|
)[-1]
|
|
entity["latest_version"] = best.get("latest_version") or entity.get("latest_version") or ""
|
|
entity["latest_release_at"] = best.get("latest_release_at") or entity.get("latest_release_at") or ""
|
|
entity["latest_release_url"] = best.get("latest_release_url") or entity.get("latest_release_url") or ""
|
|
entity["version_sync_status"] = best.get("version_sync_status") or entity.get("version_sync_status") or "pending"
|
|
entity["security_version_count"] = sum(item.get("security_version_count", 0) for item in children)
|
|
|
|
for version in version_records:
|
|
version["is_latest_snapshot"] = False
|
|
for entity in entities.values():
|
|
latest_version = (entity.get("latest_version") or "").strip()
|
|
if not latest_version:
|
|
continue
|
|
version_id = version_lookup.get((entity["entity_id"], latest_version))
|
|
if version_id:
|
|
for version in versions_by_entity.get(entity["entity_id"], []):
|
|
if version["version_id"] == version_id:
|
|
version["is_latest_snapshot"] = True
|
|
break
|
|
|
|
updated_advisories: List[Dict[str, Any]] = []
|
|
for advisory in advisory_rows:
|
|
target_id = _entity_target_id(advisory)
|
|
resolved = _resolve_versions_for_advisory(advisory, deep=deep)
|
|
affected_refs = [version_lookup[(target_id, version)] for version in resolved["affected"] if (target_id, version) in version_lookup]
|
|
fixed_refs = [version_lookup[(target_id, version)] for version in resolved["fixed"] if (target_id, version) in version_lookup]
|
|
patched_refs = [version_lookup[(target_id, resolved["patched"])] for _ in [1] if resolved["patched"] and (target_id, resolved["patched"]) in version_lookup]
|
|
version_sync_confidence = "high" if (affected_refs and fixed_refs) else "medium" if (fixed_refs or patched_refs or affected_refs) else "low"
|
|
updated = dict(advisory)
|
|
updated["affected_versions"] = resolved["affected"] or updated.get("affected_versions") or []
|
|
updated["fixed_versions"] = resolved["fixed"] or updated.get("fixed_versions") or []
|
|
updated["affected_version_ranges"] = resolved["affected"] or updated.get("affected_version_ranges") or []
|
|
updated["fixed_version_ranges"] = resolved["fixed"] or updated.get("fixed_version_ranges") or []
|
|
updated["patched_version"] = resolved["patched"] or updated.get("patched_version")
|
|
updated["affected_version_refs"] = unique(affected_refs)
|
|
updated["fixed_version_refs"] = unique(fixed_refs)
|
|
updated["patched_version_refs"] = unique(patched_refs)
|
|
updated["version_evidence_sources"] = unique(list(updated.get("version_evidence_sources") or []) + resolved["sources"])
|
|
updated["version_sync_confidence"] = version_sync_confidence
|
|
if version_sync_confidence in {"high", "medium"} and (affected_refs or fixed_refs or patched_refs):
|
|
updated["version_confidence"] = "high" if version_sync_confidence == "high" else "medium"
|
|
updated["version_gap_reason"] = ""
|
|
updated["version_resolution_needed"] = False
|
|
updated_advisories.append(updated)
|
|
|
|
if enqueue_lab:
|
|
lab_summary = _enqueue_lab_updates(updated_advisories, previous_advisories, entity_changes)
|
|
else:
|
|
lab_summary = read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {}
|
|
|
|
if write_registry:
|
|
_write_entity_records(list(entities.values()))
|
|
_write_version_records(version_records)
|
|
|
|
return {
|
|
"advisories": updated_advisories,
|
|
"entities": sorted(entities.values(), key=lambda item: item["entity_id"]),
|
|
"versions": version_records,
|
|
"lab_summary": lab_summary,
|
|
"summary": {
|
|
"cataloged_entity_total": len([item for item in entities.values() if item.get("status") == "cataloged"]),
|
|
"latest_version_synced_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "green"]),
|
|
"source_gap_count": len([item for item in entities.values() if item.get("status") == "cataloged" and item.get("version_sync_status") == "source-gap"]),
|
|
"security_version_total": len(version_records),
|
|
"lab_enqueued_count": lab_summary.get("enqueued", 0),
|
|
},
|
|
}
|
|
|
|
|
|
def build_version_views(
|
|
source_map: Dict[str, Any],
|
|
advisories: List[AdvisoryRecord | Dict[str, Any]],
|
|
entity_records: List[Dict[str, Any]] | None = None,
|
|
version_records: List[Dict[str, Any]] | None = None,
|
|
lab_summary: Dict[str, Any] | None = None,
|
|
) -> Dict[str, Any]:
|
|
generated_at = isoformat(now_utc())
|
|
advisories_rows = [_advisory_dict(item) for item in advisories]
|
|
entities = entity_records or build_entity_views(source_map, advisories_rows)["entities"]
|
|
versions = version_records or load_version_records()
|
|
lab_summary = lab_summary or (read_json(LAB_ENQUEUE_SUMMARY_PATH, default={}) or {})
|
|
|
|
versions_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for item in versions:
|
|
versions_by_system[item["root_system_id"]].append(item)
|
|
|
|
systems_payload: List[Dict[str, Any]] = []
|
|
source_gap_entities: List[Dict[str, Any]] = []
|
|
auto_promoted_entities: List[Dict[str, Any]] = []
|
|
security_version_entities = 0
|
|
latest_version_synced_count = 0
|
|
cataloged_entities = [item for item in entities if item.get("status") == "cataloged"]
|
|
for entity in cataloged_entities:
|
|
if entity.get("security_version_count", 0):
|
|
security_version_entities += 1
|
|
if entity.get("version_sync_status") == "green":
|
|
latest_version_synced_count += 1
|
|
if entity.get("version_sync_status") == "source-gap":
|
|
source_gap_entities.append(
|
|
{
|
|
"entity_id": entity["entity_id"],
|
|
"display_name": entity["display_name"],
|
|
"root_system_id": entity["root_system_id"],
|
|
"entity_type": entity["entity_type"],
|
|
"latest_version": entity.get("latest_version") or "",
|
|
"repo_url": entity.get("repo_url") or "",
|
|
"package_registry": entity.get("package_registry") or "",
|
|
"marketplace_url": entity.get("marketplace_url") or "",
|
|
}
|
|
)
|
|
if entity.get("auto_cataloged"):
|
|
auto_promoted_entities.append(
|
|
{
|
|
"entity_id": entity["entity_id"],
|
|
"display_name": entity["display_name"],
|
|
"root_system_id": entity["root_system_id"],
|
|
"entity_type": entity["entity_type"],
|
|
"catalog_source": entity.get("catalog_source") or "",
|
|
}
|
|
)
|
|
|
|
for system in source_map.get("systems", []) or []:
|
|
system_entities = [item for item in entities if item.get("root_system_id") == system["system_id"]]
|
|
system_versions = versions_by_system.get(system["system_id"], [])
|
|
systems_payload.append(
|
|
{
|
|
"system_id": system["system_id"],
|
|
"display_name": system["display_name"],
|
|
"cataloged_entity_total": len([item for item in system_entities if item.get("status") == "cataloged"]),
|
|
"latest_version_synced_count": len([item for item in system_entities if item.get("version_sync_status") == "green"]),
|
|
"source_gap_count": len([item for item in system_entities if item.get("version_sync_status") == "source-gap"]),
|
|
"security_version_count": len(system_versions),
|
|
"auto_promoted_count": len([item for item in system_entities if item.get("auto_cataloged")]),
|
|
"latest_versions": [
|
|
{
|
|
"entity_id": item["entity_id"],
|
|
"display_name": item["display_name"],
|
|
"entity_type": item["entity_type"],
|
|
"latest_version": item.get("latest_version") or "",
|
|
"latest_release_at": item.get("latest_release_at") or "",
|
|
"version_sync_status": item.get("version_sync_status") or "pending",
|
|
}
|
|
for item in sorted(
|
|
[item for item in system_entities if item.get("latest_version")],
|
|
key=lambda value: (-int(value.get("advisory_count") or 0), value["display_name"].lower()),
|
|
)[:5]
|
|
],
|
|
}
|
|
)
|
|
|
|
unresolved_advisories = [
|
|
{
|
|
"canonical_id": item.get("canonical_id"),
|
|
"system_id": item.get("system_id"),
|
|
"title": item.get("title"),
|
|
"official_source_url": item.get("official_source_url") or "",
|
|
"version_gap_reason": item.get("version_gap_reason") or "",
|
|
}
|
|
for item in advisories_rows
|
|
if item.get("version_resolution_needed")
|
|
]
|
|
report_lines = [
|
|
"# 安全相关版本同步报告",
|
|
"",
|
|
f"- 生成时间: `{generated_at}`",
|
|
f"- 已编目实体: `{len(cataloged_entities)}`",
|
|
f"- 最新版本已同步: `{latest_version_synced_count}`",
|
|
f"- 版本 source-gap: `{len(source_gap_entities)}`",
|
|
f"- 安全相关版本记录: `{len(versions)}`",
|
|
f"- 存在安全版本历史的实体: `{security_version_entities}`",
|
|
f"- 自动升级实体: `{len(auto_promoted_entities)}`",
|
|
f"- 因版本变化触发 lab 入队: `{lab_summary.get('enqueued', 0)}`",
|
|
"",
|
|
"## 系统版本摘要",
|
|
"",
|
|
"| 系统 | cataloged | latest synced | source-gap | security versions | auto-promoted |",
|
|
"| --- | ---: | ---: | ---: | ---: | ---: |",
|
|
]
|
|
for item in sorted(systems_payload, key=lambda value: value["system_id"]):
|
|
report_lines.append(
|
|
f"| {item['system_id']} | {item['cataloged_entity_total']} | {item['latest_version_synced_count']} | {item['source_gap_count']} | {item['security_version_count']} | {item['auto_promoted_count']} |"
|
|
)
|
|
|
|
completeness = {
|
|
"generated_at": generated_at,
|
|
"cataloged_entity_total": len(cataloged_entities),
|
|
"latest_version_synced_count": latest_version_synced_count,
|
|
"source_gap_count": len(source_gap_entities),
|
|
"security_version_total": len(versions),
|
|
"security_version_entity_count": security_version_entities,
|
|
"auto_promoted_entity_count": len(auto_promoted_entities),
|
|
"lab_enqueued_count": lab_summary.get("enqueued", 0),
|
|
"systems": sorted(systems_payload, key=lambda item: item["system_id"]),
|
|
}
|
|
backlog = {
|
|
"generated_at": generated_at,
|
|
"source_gap_entities": sorted(source_gap_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
|
|
"unresolved_advisories": unresolved_advisories[:500],
|
|
"lab_pending": (lab_summary.get("pending") or [])[:500],
|
|
"auto_promoted_entities": sorted(auto_promoted_entities, key=lambda item: (item["root_system_id"], item["entity_type"], item["display_name"]))[:500],
|
|
}
|
|
release_index = {
|
|
"generated_at": generated_at,
|
|
"version_count": len(versions),
|
|
"versions": sorted(versions, key=lambda item: (item["root_system_id"], item["entity_id"], _version_sort_key(item["version"])), reverse=False),
|
|
}
|
|
return {
|
|
"completeness": completeness,
|
|
"backlog": backlog,
|
|
"release_index": release_index,
|
|
"report_markdown": "\n".join(report_lines),
|
|
}
|
|
|
|
|
|
def write_version_views(views: Dict[str, Any]) -> None:
|
|
write_json(VERSION_COMPLETENESS_PATH, views["completeness"])
|
|
write_json(VERSION_BACKLOG_PATH, views["backlog"])
|
|
write_json(RELEASE_INDEX_PATH, views["release_index"])
|
|
write_text(VERSION_REPORT_MD_PATH, views["report_markdown"])
|