From a0a5067ae143fbc0feefad46a6543dba0b10d554 Mon Sep 17 00:00:00 2001 From: hao Date: Sat, 21 Mar 2026 17:44:22 -0700 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0:=203=20=E4=B8=AA=E6=96=87?= =?UTF-8?q?=E4=BB=B6=20-=202026-03-21=2017:44:22?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/intel/config.py | 52 +++++++++++ scripts/intel/entities.py | 192 ++++++++++++++++++++++++++++++++------ scripts/intel/models.py | 23 +++++ 3 files changed, 238 insertions(+), 29 deletions(-) diff --git a/scripts/intel/config.py b/scripts/intel/config.py index 99326209..41b03d43 100644 --- a/scripts/intel/config.py +++ b/scripts/intel/config.py @@ -13,6 +13,7 @@ REGISTRY_ROOT = THREAT_INTEL_ROOT / "registry" ADVISORIES_DIR = REGISTRY_ROOT / "advisories" SYSTEMS_DIR = REGISTRY_ROOT / "systems" ENTITIES_DIR = REGISTRY_ROOT / "entities" +VERSIONS_DIR = REGISTRY_ROOT / "versions" RUNS_DIR = REGISTRY_ROOT / "runs" TRIAGE_DIR = REGISTRY_ROOT / "triage" GENERATED_DIR = THREAT_INTEL_ROOT / "generated" @@ -32,6 +33,11 @@ ENTITY_BACKLOG_PATH = GENERATED_DIR / "entity-discovery-backlog.json" ENTITY_QUEUES_PATH = GENERATED_DIR / "entity-queues.json" ENTITY_CATALOG_REPORT_MD_PATH = GENERATED_DIR / "entity-catalog-report.md" ENTITY_BACKLOG_REPORT_MD_PATH = GENERATED_DIR / "entity-discovery-backlog.md" +VERSION_COMPLETENESS_PATH = GENERATED_DIR / "version-completeness.json" +VERSION_BACKLOG_PATH = GENERATED_DIR / "version-backlog.json" +RELEASE_INDEX_PATH = GENERATED_DIR / "release-index.json" +VERSION_REPORT_MD_PATH = GENERATED_DIR / "version-sync-report.md" +LAB_ENQUEUE_SUMMARY_PATH = GENERATED_DIR / "lab-enqueue-summary.json" STATE_DIR = Path.home() / ".local" / "state" / "websafe-intel" STATE_PATH = STATE_DIR / "state.json" @@ -62,6 +68,12 @@ DEFAULT_PARSER_HINTS = { "date_extractors": [], } +DEFAULT_RELEASE_SELECTOR = { + "version_patterns": [], + "date_patterns": [], + "release_url_patterns": [], +} + DEFAULT_ACCEPT_BY_KIND = { "rss-feed": "application/rss+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", "atom-feed": "application/atom+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8", @@ -85,6 +97,41 @@ DEFAULT_FORMAT_BY_KIND = { } +def _infer_source_purpose(bucket_name: str, source: Dict[str, Any]) -> str: + configured = (source.get("purpose") or "").strip() + if configured: + return configured + url = (source.get("url") or "").lower() + kind = source.get("kind") or "" + if any(token in url for token in ("/plugins/", "/themes/", "/marketplace/", "/extensions/", "/modules/")): + return "marketplace" + if any(token in url for token in ("/releases", "/tags", "/release", "release-notes", "security-releases", "/feed/", ".rss", ".xml")): + return "release" + if kind in {"rss-feed", "atom-feed", "json-feed", "vendor-index", "html-links"} and bucket_name != "research_sources": + return "discovery" + return "advisory" + + +def _infer_entity_type_hint(source: Dict[str, Any]) -> str: + configured = (source.get("entity_type_hint") or "").strip() + if configured: + return configured + url = (source.get("url") or "").lower() + if "/plugins/" in url: + return "plugin" + if "/themes/" in url: + return "theme" + if "/extensions/" in url: + return "extension" + if "/modules/" in url: + return "module" + if "github.com/" in url: + return "repo" + if "npmjs.com/package/" in url or "packagist.org/packages/" in url: + return "package" + return "project" + + def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any]: normalized = dict(source or {}) normalized["status"] = normalized.get("status") or "active" @@ -112,6 +159,11 @@ def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any if not parser_hints.get("keywords"): parser_hints["keywords"] = list(normalized.get("keywords") or []) normalized["parser_hints"] = parser_hints + normalized["purpose"] = _infer_source_purpose(bucket_name, normalized) + normalized["entity_type_hint"] = _infer_entity_type_hint(normalized) + normalized["auto_catalog"] = bool(normalized.get("auto_catalog", bucket_name in {"official_sources", "ecosystem_sources"})) + normalized["version_mode"] = normalized.get("version_mode") or "security-related" + normalized["release_selector"] = {**DEFAULT_RELEASE_SELECTOR, **(normalized.get("release_selector") or {})} normalized["bucket_name"] = bucket_name return normalized diff --git a/scripts/intel/entities.py b/scripts/intel/entities.py index 233bf944..a3481d79 100644 --- a/scripts/intel/entities.py +++ b/scripts/intel/entities.py @@ -4,8 +4,9 @@ import re from collections import defaultdict from typing import Any, Dict, Iterable, List, Tuple +from intel.config import ENTITIES_DIR from intel.models import AdvisoryRecord -from intel.utils import isoformat, now_utc, parse_dt, slugify, unique +from intel.utils import isoformat, load_all_json, now_utc, parse_dt, slugify, unique FAMILY_KEYWORDS = { @@ -133,6 +134,20 @@ def _repo_url_from_package(package_name: str) -> str: return "" +def _github_repo_from_url(url: str) -> str: + match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", (url or "").strip(), re.IGNORECASE) + if not match: + return "" + return f"https://github.com/{match.group(1)}/{match.group(2)}" + + +def _marketplace_slug(url: str) -> str: + parts = [part for part in re.split(r"[/?#]+", (url or "").strip()) if part] + if not parts: + return "" + return parts[-1] + + def _package_registry_url(package_name: str) -> str: normalized = _strip_package_version_suffix(package_name) if not normalized: @@ -442,6 +457,16 @@ def _entity_payload( "marketplace_url": marketplace_url, "latest_version": latest_version, "version_scheme": version_scheme, + "latest_release_at": "", + "latest_release_url": "", + "version_source_refs": [], + "version_sync_status": "pending", + "security_version_count": 0, + "last_version_synced_at": "", + "latest_version_evidence": [], + "catalog_source": "", + "catalog_reason": "", + "auto_cataloged": False, "last_discovered_at": "", "last_synced_at": "", "history_backfill_status": "pending", @@ -457,6 +482,63 @@ def _entity_payload( } +def _merge_source_refs(primary: List[Dict[str, Any]], secondary: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + merged: List[Dict[str, Any]] = [] + seen = set() + for item in (primary or []) + (secondary or []): + if not isinstance(item, dict): + continue + key = ( + item.get("name") or "", + item.get("url") or "", + item.get("kind") or "", + item.get("bucket") or "", + ) + if key in seen: + continue + seen.add(key) + merged.append(item) + return merged + + +def _merge_entity_overlay(entity: Dict[str, Any], overlay: Dict[str, Any] | None) -> Dict[str, Any]: + if not overlay: + return entity + merged = dict(entity) + for key in ( + "status", + "history_policy", + "repo_url", + "package_registry", + "marketplace_url", + "latest_version", + "version_scheme", + "latest_release_at", + "latest_release_url", + "version_source_refs", + "version_sync_status", + "security_version_count", + "last_version_synced_at", + "latest_version_evidence", + "catalog_source", + "catalog_reason", + "auto_cataloged", + "last_discovered_at", + "last_synced_at", + "history_backfill_status", + "latest_sync_status", + "official_source_covered", + ): + if key not in overlay: + continue + value = overlay.get(key) + if value in (None, "", [], {}): + continue + merged[key] = value + merged["source_refs"] = _merge_source_refs(entity.get("source_refs", []), overlay.get("source_refs", [])) + return merged + + def _update_entity_stats(entity: Dict[str, Any], advisories: List[Dict[str, Any]]) -> None: advisory_ids = [item.get("canonical_id") for item in advisories if item.get("canonical_id")] workflow_count = len([item for item in advisories if item.get("workflow", {}).get("workflow_id")]) @@ -499,22 +581,57 @@ def _update_entity_stats(entity: Dict[str, Any], advisories: List[Dict[str, Any] def _candidate_from_source(system: Dict[str, Any], source: Dict[str, Any], known_repo_urls: set[str]) -> Dict[str, Any] | None: url = (source.get("url") or "").strip() - match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", url) - if not match: - return None - repo_url = f"https://github.com/{match.group(1)}/{match.group(2)}" - if repo_url in known_repo_urls: + entity_type = source.get("entity_type_hint") or "project" + repo_url = _github_repo_from_url(url) + package_registry = "" + marketplace_url = "" + display_name = "" + stable_url = repo_url + if repo_url: + if repo_url in known_repo_urls: + return None + entity_type = source.get("entity_type_hint") or "repo" + match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE) + if match: + display_name = f"{match.group(1)} / {match.group(2)}" + elif "npmjs.com/package/" in url: + entity_type = source.get("entity_type_hint") or "package" + package_name = url.split("/package/", 1)[1].split("?", 1)[0].strip("/") + package_registry = f"https://www.npmjs.com/package/{package_name}" + display_name = package_name + stable_url = package_registry + elif "packagist.org/packages/" in url: + entity_type = source.get("entity_type_hint") or "package" + package_name = url.split("/packages/", 1)[1].split("?", 1)[0].strip("/") + package_registry = f"https://packagist.org/packages/{package_name}" + display_name = package_name.replace("/", " / ") + stable_url = package_registry + elif any(token in url.lower() for token in ("/plugins/", "/themes/", "/extensions/", "/modules/", "/marketplace/")): + marketplace_url = url + slug = _marketplace_slug(url) + display_name = slug.replace("-", " ") + stable_url = marketplace_url + else: return None + if not display_name: + display_name = source.get("name") or system.get("display_name") or system.get("system_id") return { - "candidate_id": f"{system.get('system_id')}--repo-candidate--{slugify(repo_url)}", + "candidate_id": f"{system.get('system_id')}--{entity_type}-candidate--{slugify(stable_url or display_name)}", "root_system_id": system.get("system_id"), - "display_name": f"{match.group(1)} / {match.group(2)}", - "entity_type": "repo", + "display_name": display_name, + "entity_type": entity_type, "status": "candidate", - "reason": "source catalog exposed a repo-like URL that is not yet cataloged as an entity", + "reason": "source catalog exposed a stable security-related object that is not yet cataloged as an entity", "source": url, + "source_name": source.get("name") or "", + "source_confidence": source.get("confidence") or "unknown", + "source_bucket": source.get("bucket_name") or "", + "auto_catalog": bool(source.get("auto_catalog")), + "repo_url": repo_url, + "package_registry": package_registry, + "marketplace_url": marketplace_url, "risk": "medium", - "waiting_for": "确认是否应升级为 cataloged repo/package 实体并补齐历史漏洞", + "waiting_for": "确认是否应升级为 cataloged repo/plugin/package 实体并补齐安全相关版本与历史漏洞", "canonical_id": "", } @@ -523,12 +640,18 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco generated_at = isoformat(now_utc()) systems = {item["system_id"]: item for item in source_map.get("systems", []) or [] if item.get("system_id")} advisory_rows = [_advisory_dict(item) for item in advisories] + existing_entities = { + item.get("entity_id"): item + for item in load_all_json(ENTITIES_DIR) + if item.get("entity_id") and item.get("root_system_id") in systems + } entities: Dict[str, Dict[str, Any]] = {} advisories_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for system_id, system in systems.items(): history_policy = system.get("tier") or "history-full" - entities[system_id] = _entity_payload( + entities[system_id] = _merge_entity_overlay( + _entity_payload( entity_id=system_id, entity_type="system", display_name=system.get("display_name", system_id), @@ -545,6 +668,8 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco latest_version="", version_scheme="vendor", source_refs=_source_refs(system), + ), + existing_entities.get(system_id), ) for advisory in advisory_rows: @@ -558,28 +683,37 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco if entity_id not in entities: package_name = advisory.get("package_name") or advisory.get("title") or entity_id entity_type = ref.get("entity_type") or infer_entity_type(advisory) - entities[entity_id] = _entity_payload( - entity_id=entity_id, - entity_type=entity_type, - display_name=_display_name(package_name, entity_id), - parent_entity_id=root_system_id, - root_system_id=root_system_id, - category=system.get("category", advisory.get("category", "unknown")), - ecosystem=advisory.get("package_name", "").split("/", 1)[0] if advisory.get("package_name") else system.get("category", "unknown"), - official=entity_type in {"project", "repo"} and entity_type != "package", - status="cataloged", - history_policy="history-full", - repo_url=_repo_url_from_package(advisory.get("package_name") or ""), - package_registry=_package_registry_url(advisory.get("package_name") or ""), - marketplace_url="", - latest_version=advisory.get("patched_version") or "", - version_scheme="semver-ish" if advisory.get("package_name") else "vendor", - source_refs=[], + entities[entity_id] = _merge_entity_overlay( + _entity_payload( + entity_id=entity_id, + entity_type=entity_type, + display_name=_display_name(package_name, entity_id), + parent_entity_id=root_system_id, + root_system_id=root_system_id, + category=system.get("category", advisory.get("category", "unknown")), + ecosystem=advisory.get("package_name", "").split("/", 1)[0] if advisory.get("package_name") else system.get("category", "unknown"), + official=entity_type in {"project", "repo"} and entity_type != "package", + status="cataloged", + history_policy="history-full", + repo_url=_repo_url_from_package(advisory.get("package_name") or ""), + package_registry=_package_registry_url(advisory.get("package_name") or ""), + marketplace_url="", + latest_version=advisory.get("patched_version") or "", + version_scheme="semver-ish" if advisory.get("package_name") else "vendor", + source_refs=[], + ), + existing_entities.get(entity_id), ) advisories_by_entity[entity_id].append(advisory) for entity_id, advisories_for_entity in advisories_by_entity.items(): _update_entity_stats(entities[entity_id], advisories_for_entity) + entities[entity_id] = _merge_entity_overlay(entities[entity_id], existing_entities.get(entity_id)) + + for entity_id, item in existing_entities.items(): + if entity_id in entities: + continue + entities[entity_id] = item known_repo_urls = {entity.get("repo_url") for entity in entities.values() if entity.get("repo_url")} candidate_map: Dict[str, Dict[str, Any]] = {} diff --git a/scripts/intel/models.py b/scripts/intel/models.py index c95792c9..c80e0296 100644 --- a/scripts/intel/models.py +++ b/scripts/intel/models.py @@ -71,6 +71,10 @@ class AdvisoryRecord: introduced_version: Optional[str] = None patched_version: Optional[str] = None version_evidence_sources: List[str] = field(default_factory=list) + affected_version_refs: List[str] = field(default_factory=list) + fixed_version_refs: List[str] = field(default_factory=list) + patched_version_refs: List[str] = field(default_factory=list) + version_sync_confidence: str = "low" advisory_scope: str = "core" version_confidence: str = "low" version_gap_reason: str = "" @@ -97,3 +101,22 @@ class AdvisoryRecord: def to_dict(self) -> Dict[str, Any]: return asdict(self) + + +@dataclass +class VersionRecord: + version_id: str + entity_id: str + root_system_id: str + version: str + released_at: Optional[str] = None + release_url: Optional[str] = None + source_name: str = "" + source_confidence: str = "unknown" + security_relevant: bool = True + reason: str = "" + advisory_refs: List[str] = field(default_factory=list) + is_latest_snapshot: bool = False + + def to_dict(self) -> Dict[str, Any]: + return asdict(self)