更新: 3 个文件 - 2026-03-21 17:44:22

这个提交包含在:
hao
2026-03-21 17:44:22 -07:00
父节点 e13c138232
当前提交 a0a5067ae1
修改 3 个文件,包含 238 行新增29 行删除

查看文件

@@ -13,6 +13,7 @@ REGISTRY_ROOT = THREAT_INTEL_ROOT / "registry"
ADVISORIES_DIR = REGISTRY_ROOT / "advisories"
SYSTEMS_DIR = REGISTRY_ROOT / "systems"
ENTITIES_DIR = REGISTRY_ROOT / "entities"
VERSIONS_DIR = REGISTRY_ROOT / "versions"
RUNS_DIR = REGISTRY_ROOT / "runs"
TRIAGE_DIR = REGISTRY_ROOT / "triage"
GENERATED_DIR = THREAT_INTEL_ROOT / "generated"
@@ -32,6 +33,11 @@ ENTITY_BACKLOG_PATH = GENERATED_DIR / "entity-discovery-backlog.json"
ENTITY_QUEUES_PATH = GENERATED_DIR / "entity-queues.json"
ENTITY_CATALOG_REPORT_MD_PATH = GENERATED_DIR / "entity-catalog-report.md"
ENTITY_BACKLOG_REPORT_MD_PATH = GENERATED_DIR / "entity-discovery-backlog.md"
VERSION_COMPLETENESS_PATH = GENERATED_DIR / "version-completeness.json"
VERSION_BACKLOG_PATH = GENERATED_DIR / "version-backlog.json"
RELEASE_INDEX_PATH = GENERATED_DIR / "release-index.json"
VERSION_REPORT_MD_PATH = GENERATED_DIR / "version-sync-report.md"
LAB_ENQUEUE_SUMMARY_PATH = GENERATED_DIR / "lab-enqueue-summary.json"
STATE_DIR = Path.home() / ".local" / "state" / "websafe-intel"
STATE_PATH = STATE_DIR / "state.json"
@@ -62,6 +68,12 @@ DEFAULT_PARSER_HINTS = {
"date_extractors": [],
}
DEFAULT_RELEASE_SELECTOR = {
"version_patterns": [],
"date_patterns": [],
"release_url_patterns": [],
}
DEFAULT_ACCEPT_BY_KIND = {
"rss-feed": "application/rss+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8",
"atom-feed": "application/atom+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8",
@@ -85,6 +97,41 @@ DEFAULT_FORMAT_BY_KIND = {
}
def _infer_source_purpose(bucket_name: str, source: Dict[str, Any]) -> str:
configured = (source.get("purpose") or "").strip()
if configured:
return configured
url = (source.get("url") or "").lower()
kind = source.get("kind") or ""
if any(token in url for token in ("/plugins/", "/themes/", "/marketplace/", "/extensions/", "/modules/")):
return "marketplace"
if any(token in url for token in ("/releases", "/tags", "/release", "release-notes", "security-releases", "/feed/", ".rss", ".xml")):
return "release"
if kind in {"rss-feed", "atom-feed", "json-feed", "vendor-index", "html-links"} and bucket_name != "research_sources":
return "discovery"
return "advisory"
def _infer_entity_type_hint(source: Dict[str, Any]) -> str:
configured = (source.get("entity_type_hint") or "").strip()
if configured:
return configured
url = (source.get("url") or "").lower()
if "/plugins/" in url:
return "plugin"
if "/themes/" in url:
return "theme"
if "/extensions/" in url:
return "extension"
if "/modules/" in url:
return "module"
if "github.com/" in url:
return "repo"
if "npmjs.com/package/" in url or "packagist.org/packages/" in url:
return "package"
return "project"
def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any]:
normalized = dict(source or {})
normalized["status"] = normalized.get("status") or "active"
@@ -112,6 +159,11 @@ def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any
if not parser_hints.get("keywords"):
parser_hints["keywords"] = list(normalized.get("keywords") or [])
normalized["parser_hints"] = parser_hints
normalized["purpose"] = _infer_source_purpose(bucket_name, normalized)
normalized["entity_type_hint"] = _infer_entity_type_hint(normalized)
normalized["auto_catalog"] = bool(normalized.get("auto_catalog", bucket_name in {"official_sources", "ecosystem_sources"}))
normalized["version_mode"] = normalized.get("version_mode") or "security-related"
normalized["release_selector"] = {**DEFAULT_RELEASE_SELECTOR, **(normalized.get("release_selector") or {})}
normalized["bucket_name"] = bucket_name
return normalized

查看文件

@@ -4,8 +4,9 @@ import re
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Tuple
from intel.config import ENTITIES_DIR
from intel.models import AdvisoryRecord
from intel.utils import isoformat, now_utc, parse_dt, slugify, unique
from intel.utils import isoformat, load_all_json, now_utc, parse_dt, slugify, unique
FAMILY_KEYWORDS = {
@@ -133,6 +134,20 @@ def _repo_url_from_package(package_name: str) -> str:
return ""
def _github_repo_from_url(url: str) -> str:
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", (url or "").strip(), re.IGNORECASE)
if not match:
return ""
return f"https://github.com/{match.group(1)}/{match.group(2)}"
def _marketplace_slug(url: str) -> str:
parts = [part for part in re.split(r"[/?#]+", (url or "").strip()) if part]
if not parts:
return ""
return parts[-1]
def _package_registry_url(package_name: str) -> str:
normalized = _strip_package_version_suffix(package_name)
if not normalized:
@@ -442,6 +457,16 @@ def _entity_payload(
"marketplace_url": marketplace_url,
"latest_version": latest_version,
"version_scheme": version_scheme,
"latest_release_at": "",
"latest_release_url": "",
"version_source_refs": [],
"version_sync_status": "pending",
"security_version_count": 0,
"last_version_synced_at": "",
"latest_version_evidence": [],
"catalog_source": "",
"catalog_reason": "",
"auto_cataloged": False,
"last_discovered_at": "",
"last_synced_at": "",
"history_backfill_status": "pending",
@@ -457,6 +482,63 @@ def _entity_payload(
}
def _merge_source_refs(primary: List[Dict[str, Any]], secondary: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
merged: List[Dict[str, Any]] = []
seen = set()
for item in (primary or []) + (secondary or []):
if not isinstance(item, dict):
continue
key = (
item.get("name") or "",
item.get("url") or "",
item.get("kind") or "",
item.get("bucket") or "",
)
if key in seen:
continue
seen.add(key)
merged.append(item)
return merged
def _merge_entity_overlay(entity: Dict[str, Any], overlay: Dict[str, Any] | None) -> Dict[str, Any]:
if not overlay:
return entity
merged = dict(entity)
for key in (
"status",
"history_policy",
"repo_url",
"package_registry",
"marketplace_url",
"latest_version",
"version_scheme",
"latest_release_at",
"latest_release_url",
"version_source_refs",
"version_sync_status",
"security_version_count",
"last_version_synced_at",
"latest_version_evidence",
"catalog_source",
"catalog_reason",
"auto_cataloged",
"last_discovered_at",
"last_synced_at",
"history_backfill_status",
"latest_sync_status",
"official_source_covered",
):
if key not in overlay:
continue
value = overlay.get(key)
if value in (None, "", [], {}):
continue
merged[key] = value
merged["source_refs"] = _merge_source_refs(entity.get("source_refs", []), overlay.get("source_refs", []))
return merged
def _update_entity_stats(entity: Dict[str, Any], advisories: List[Dict[str, Any]]) -> None:
advisory_ids = [item.get("canonical_id") for item in advisories if item.get("canonical_id")]
workflow_count = len([item for item in advisories if item.get("workflow", {}).get("workflow_id")])
@@ -499,22 +581,57 @@ def _update_entity_stats(entity: Dict[str, Any], advisories: List[Dict[str, Any]
def _candidate_from_source(system: Dict[str, Any], source: Dict[str, Any], known_repo_urls: set[str]) -> Dict[str, Any] | None:
url = (source.get("url") or "").strip()
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", url)
if not match:
return None
repo_url = f"https://github.com/{match.group(1)}/{match.group(2)}"
if repo_url in known_repo_urls:
entity_type = source.get("entity_type_hint") or "project"
repo_url = _github_repo_from_url(url)
package_registry = ""
marketplace_url = ""
display_name = ""
stable_url = repo_url
if repo_url:
if repo_url in known_repo_urls:
return None
entity_type = source.get("entity_type_hint") or "repo"
match = re.match(r"https://github\.com/([^/]+)/([^/#?]+)", repo_url, re.IGNORECASE)
if match:
display_name = f"{match.group(1)} / {match.group(2)}"
elif "npmjs.com/package/" in url:
entity_type = source.get("entity_type_hint") or "package"
package_name = url.split("/package/", 1)[1].split("?", 1)[0].strip("/")
package_registry = f"https://www.npmjs.com/package/{package_name}"
display_name = package_name
stable_url = package_registry
elif "packagist.org/packages/" in url:
entity_type = source.get("entity_type_hint") or "package"
package_name = url.split("/packages/", 1)[1].split("?", 1)[0].strip("/")
package_registry = f"https://packagist.org/packages/{package_name}"
display_name = package_name.replace("/", " / ")
stable_url = package_registry
elif any(token in url.lower() for token in ("/plugins/", "/themes/", "/extensions/", "/modules/", "/marketplace/")):
marketplace_url = url
slug = _marketplace_slug(url)
display_name = slug.replace("-", " ")
stable_url = marketplace_url
else:
return None
if not display_name:
display_name = source.get("name") or system.get("display_name") or system.get("system_id")
return {
"candidate_id": f"{system.get('system_id')}--repo-candidate--{slugify(repo_url)}",
"candidate_id": f"{system.get('system_id')}--{entity_type}-candidate--{slugify(stable_url or display_name)}",
"root_system_id": system.get("system_id"),
"display_name": f"{match.group(1)} / {match.group(2)}",
"entity_type": "repo",
"display_name": display_name,
"entity_type": entity_type,
"status": "candidate",
"reason": "source catalog exposed a repo-like URL that is not yet cataloged as an entity",
"reason": "source catalog exposed a stable security-related object that is not yet cataloged as an entity",
"source": url,
"source_name": source.get("name") or "",
"source_confidence": source.get("confidence") or "unknown",
"source_bucket": source.get("bucket_name") or "",
"auto_catalog": bool(source.get("auto_catalog")),
"repo_url": repo_url,
"package_registry": package_registry,
"marketplace_url": marketplace_url,
"risk": "medium",
"waiting_for": "确认是否应升级为 cataloged repo/package 实体并补齐历史漏洞",
"waiting_for": "确认是否应升级为 cataloged repo/plugin/package 实体并补齐安全相关版本与历史漏洞",
"canonical_id": "",
}
@@ -523,12 +640,18 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco
generated_at = isoformat(now_utc())
systems = {item["system_id"]: item for item in source_map.get("systems", []) or [] if item.get("system_id")}
advisory_rows = [_advisory_dict(item) for item in advisories]
existing_entities = {
item.get("entity_id"): item
for item in load_all_json(ENTITIES_DIR)
if item.get("entity_id") and item.get("root_system_id") in systems
}
entities: Dict[str, Dict[str, Any]] = {}
advisories_by_entity: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for system_id, system in systems.items():
history_policy = system.get("tier") or "history-full"
entities[system_id] = _entity_payload(
entities[system_id] = _merge_entity_overlay(
_entity_payload(
entity_id=system_id,
entity_type="system",
display_name=system.get("display_name", system_id),
@@ -545,6 +668,8 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco
latest_version="",
version_scheme="vendor",
source_refs=_source_refs(system),
),
existing_entities.get(system_id),
)
for advisory in advisory_rows:
@@ -558,28 +683,37 @@ def build_entity_views(source_map: Dict[str, Any], advisories: List[AdvisoryReco
if entity_id not in entities:
package_name = advisory.get("package_name") or advisory.get("title") or entity_id
entity_type = ref.get("entity_type") or infer_entity_type(advisory)
entities[entity_id] = _entity_payload(
entity_id=entity_id,
entity_type=entity_type,
display_name=_display_name(package_name, entity_id),
parent_entity_id=root_system_id,
root_system_id=root_system_id,
category=system.get("category", advisory.get("category", "unknown")),
ecosystem=advisory.get("package_name", "").split("/", 1)[0] if advisory.get("package_name") else system.get("category", "unknown"),
official=entity_type in {"project", "repo"} and entity_type != "package",
status="cataloged",
history_policy="history-full",
repo_url=_repo_url_from_package(advisory.get("package_name") or ""),
package_registry=_package_registry_url(advisory.get("package_name") or ""),
marketplace_url="",
latest_version=advisory.get("patched_version") or "",
version_scheme="semver-ish" if advisory.get("package_name") else "vendor",
source_refs=[],
entities[entity_id] = _merge_entity_overlay(
_entity_payload(
entity_id=entity_id,
entity_type=entity_type,
display_name=_display_name(package_name, entity_id),
parent_entity_id=root_system_id,
root_system_id=root_system_id,
category=system.get("category", advisory.get("category", "unknown")),
ecosystem=advisory.get("package_name", "").split("/", 1)[0] if advisory.get("package_name") else system.get("category", "unknown"),
official=entity_type in {"project", "repo"} and entity_type != "package",
status="cataloged",
history_policy="history-full",
repo_url=_repo_url_from_package(advisory.get("package_name") or ""),
package_registry=_package_registry_url(advisory.get("package_name") or ""),
marketplace_url="",
latest_version=advisory.get("patched_version") or "",
version_scheme="semver-ish" if advisory.get("package_name") else "vendor",
source_refs=[],
),
existing_entities.get(entity_id),
)
advisories_by_entity[entity_id].append(advisory)
for entity_id, advisories_for_entity in advisories_by_entity.items():
_update_entity_stats(entities[entity_id], advisories_for_entity)
entities[entity_id] = _merge_entity_overlay(entities[entity_id], existing_entities.get(entity_id))
for entity_id, item in existing_entities.items():
if entity_id in entities:
continue
entities[entity_id] = item
known_repo_urls = {entity.get("repo_url") for entity in entities.values() if entity.get("repo_url")}
candidate_map: Dict[str, Dict[str, Any]] = {}

查看文件

@@ -71,6 +71,10 @@ class AdvisoryRecord:
introduced_version: Optional[str] = None
patched_version: Optional[str] = None
version_evidence_sources: List[str] = field(default_factory=list)
affected_version_refs: List[str] = field(default_factory=list)
fixed_version_refs: List[str] = field(default_factory=list)
patched_version_refs: List[str] = field(default_factory=list)
version_sync_confidence: str = "low"
advisory_scope: str = "core"
version_confidence: str = "low"
version_gap_reason: str = ""
@@ -97,3 +101,22 @@ class AdvisoryRecord:
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class VersionRecord:
version_id: str
entity_id: str
root_system_id: str
version: str
released_at: Optional[str] = None
release_url: Optional[str] = None
source_name: str = ""
source_confidence: str = "unknown"
security_relevant: bool = True
reason: str = ""
advisory_refs: List[str] = field(default_factory=list)
is_latest_snapshot: bool = False
def to_dict(self) -> Dict[str, Any]:
return asdict(self)