feat: sync version-driven intel coverage
这个提交包含在:
@@ -269,16 +269,27 @@ def discover_entities(
|
||||
merged_entities = sorted(entities.values(), key=lambda item: item["entity_id"])
|
||||
if write_registry and auto_promoted:
|
||||
_write_entity_records(merged_entities, selected_system_ids={item["root_system_id"] for item in merged_entities})
|
||||
refreshed_views = build_entity_views(source_map, advisories)
|
||||
if auto_promoted:
|
||||
if write_registry and auto_promoted:
|
||||
refreshed_views = build_entity_views(source_map, advisories)
|
||||
candidate_backlog = refreshed_views["candidate_backlog"]
|
||||
else:
|
||||
promoted_urls = {
|
||||
item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
||||
for item in auto_promoted
|
||||
if item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")
|
||||
}
|
||||
candidate_backlog = [
|
||||
item
|
||||
for item in base_views["candidate_backlog"]
|
||||
if (item.get("repo_url") or item.get("package_registry") or item.get("marketplace_url")) not in promoted_urls
|
||||
]
|
||||
return {
|
||||
"entities": merged_entities,
|
||||
"candidate_backlog": refreshed_views["candidate_backlog"],
|
||||
"candidate_backlog": candidate_backlog,
|
||||
"auto_promoted": auto_promoted,
|
||||
"summary": {
|
||||
"cataloged_entity_total": len([item for item in merged_entities if item.get("status") == "cataloged"]),
|
||||
"candidate_entity_total": len(refreshed_views["candidate_backlog"]),
|
||||
"candidate_entity_total": len(candidate_backlog),
|
||||
"auto_promoted_count": len(auto_promoted),
|
||||
},
|
||||
}
|
||||
@@ -815,6 +826,19 @@ def sync_versions(
|
||||
entity["version_sync_status"] = best.get("version_sync_status") or entity.get("version_sync_status") or "pending"
|
||||
entity["security_version_count"] = sum(item.get("security_version_count", 0) for item in children)
|
||||
|
||||
for version in version_records:
|
||||
version["is_latest_snapshot"] = False
|
||||
for entity in entities.values():
|
||||
latest_version = (entity.get("latest_version") or "").strip()
|
||||
if not latest_version:
|
||||
continue
|
||||
version_id = version_lookup.get((entity["entity_id"], latest_version))
|
||||
if version_id:
|
||||
for version in versions_by_entity.get(entity["entity_id"], []):
|
||||
if version["version_id"] == version_id:
|
||||
version["is_latest_snapshot"] = True
|
||||
break
|
||||
|
||||
updated_advisories: List[Dict[str, Any]] = []
|
||||
for advisory in advisory_rows:
|
||||
target_id = _entity_target_id(advisory)
|
||||
|
||||
在新工单中引用
屏蔽一个用户