实现分层实体漏洞知识库与实体级完整度监控
这个提交包含在:
@@ -6,6 +6,7 @@ from typing import Any, Dict, List
|
||||
|
||||
from intel.config import (
|
||||
ALERTS_PATH,
|
||||
ENTITY_COMPLETENESS_PATH,
|
||||
MACHINE_READABLE_SOURCE_KINDS,
|
||||
MONITORING_DIR,
|
||||
MONITOR_SUMMARY_PATH,
|
||||
@@ -363,6 +364,7 @@ def write_monitoring_state(
|
||||
) -> Dict[str, Any]:
|
||||
open_alerts = [item for item in alerts if item.get("status") == "open"]
|
||||
generated_at = source_health.get("generated_at") or isoformat(now_utc())
|
||||
entity_completeness = read_json(ENTITY_COMPLETENESS_PATH, default={}) or {}
|
||||
summary = {
|
||||
"generated_at": generated_at,
|
||||
"active_source_count": source_health.get("active_source_count", 0),
|
||||
@@ -387,6 +389,14 @@ def write_monitoring_state(
|
||||
"error_count": len(validation_errors),
|
||||
"errors": validation_errors,
|
||||
},
|
||||
"entity_coverage": {
|
||||
"cataloged_entity_total": entity_completeness.get("cataloged_entity_total", 0),
|
||||
"candidate_entity_total": entity_completeness.get("candidate_entity_total", 0),
|
||||
"history_full_complete_count": entity_completeness.get("history_full_complete_count", 0),
|
||||
"workflow_complete_count": entity_completeness.get("workflow_complete_count", 0),
|
||||
"version_mapped_count": entity_completeness.get("version_mapped_count", 0),
|
||||
"official_source_covered_count": entity_completeness.get("official_source_covered_count", 0),
|
||||
},
|
||||
}
|
||||
snapshot = {
|
||||
"generated_at": generated_at,
|
||||
|
||||
在新工单中引用
屏蔽一个用户