文件
websafe-kb/scripts/intel/config.py

175 行
6.8 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Iterable, List, Tuple
import yaml
ROOT = Path(__file__).resolve().parents[2]
FRAMEWORK_ROOT = ROOT / "07-framework-security"
THREAT_INTEL_ROOT = ROOT / "08-threat-intel"
REGISTRY_ROOT = THREAT_INTEL_ROOT / "registry"
ADVISORIES_DIR = REGISTRY_ROOT / "advisories"
SYSTEMS_DIR = REGISTRY_ROOT / "systems"
RUNS_DIR = REGISTRY_ROOT / "runs"
TRIAGE_DIR = REGISTRY_ROOT / "triage"
GENERATED_DIR = THREAT_INTEL_ROOT / "generated"
SECURE_CODE_ROOT = ROOT / "05-defense" / "secure-code"
SOURCE_MAP_PATH = THREAT_INTEL_ROOT / "source-map.yaml"
REPRO_MAP_PATH = THREAT_INTEL_ROOT / "repro-map.yaml"
REPRO_PROFILES_DIR = THREAT_INTEL_ROOT / "repro-profiles"
MONITORING_DIR = REGISTRY_ROOT / "monitoring"
SOURCE_HEALTH_PATH = GENERATED_DIR / "source-health.json"
ALERTS_PATH = GENERATED_DIR / "alerts.json"
MONITOR_SUMMARY_PATH = GENERATED_DIR / "monitor-summary.json"
SOURCE_CATALOG_AUDIT_PATH = GENERATED_DIR / "source-catalog-audit.json"
SOURCE_CATALOG_AUDIT_MD_PATH = GENERATED_DIR / "source-catalog-audit.md"
RETIRED_SOURCES_PATH = GENERATED_DIR / "retired-sources.json"
STATE_DIR = Path.home() / ".local" / "state" / "websafe-intel"
STATE_PATH = STATE_DIR / "state.json"
SOURCE_BUCKETS = ("official_sources", "ecosystem_sources", "research_sources")
MACHINE_READABLE_SOURCE_KINDS = {"ghsa-global", "osv-batch", "nvd-search", "kev-json", "json-feed", "rss-feed", "atom-feed"}
DEFAULT_REQUEST_POLICY = {
"user_agent": "python-requests/2.31.0",
"accept": "",
"timeout_seconds": 30,
"verify_tls": True,
"http_version": "default",
"follow_redirects": True,
}
DEFAULT_HEALTH_POLICY = {
"retries": 3,
"backoff_seconds": 0.5,
"expected_format": "",
"expected_statuses": [200],
}
DEFAULT_PARSER_HINTS = {
"keywords": [],
"selectors": [],
"include_url_patterns": [],
"exclude_url_patterns": [],
"date_extractors": [],
}
DEFAULT_ACCEPT_BY_KIND = {
"rss-feed": "application/rss+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8",
"atom-feed": "application/atom+xml, application/xml;q=0.9, text/xml;q=0.9, */*;q=0.8",
"json-feed": "application/json, text/json;q=0.9, */*;q=0.8",
"ghsa-global": "application/vnd.github+json, application/json;q=0.9, */*;q=0.8",
"osv-batch": "application/json, */*;q=0.8",
"nvd-search": "application/json, */*;q=0.8",
"kev-json": "application/json, */*;q=0.8",
}
DEFAULT_FORMAT_BY_KIND = {
"html-links": "html",
"vendor-index": "html",
"rss-feed": "rss",
"atom-feed": "atom",
"json-feed": "json",
"ghsa-global": "json",
"osv-batch": "json",
"nvd-search": "json",
"kev-json": "json",
}
def _normalize_source(source: Dict[str, Any], bucket_name: str) -> Dict[str, Any]:
normalized = dict(source or {})
normalized["status"] = normalized.get("status") or "active"
normalized["retired_reason"] = normalized.get("retired_reason") or ""
normalized["replacement_sources"] = list(normalized.get("replacement_sources") or [])
request_policy = {**DEFAULT_REQUEST_POLICY, **(normalized.get("request_policy") or {})}
if not request_policy.get("accept"):
request_policy["accept"] = DEFAULT_ACCEPT_BY_KIND.get(normalized.get("kind"), "")
request_policy["timeout_seconds"] = int(request_policy.get("timeout_seconds") or DEFAULT_REQUEST_POLICY["timeout_seconds"])
request_policy["follow_redirects"] = bool(request_policy.get("follow_redirects", True))
request_policy["verify_tls"] = bool(request_policy.get("verify_tls", True))
normalized["request_policy"] = request_policy
health_policy = {**DEFAULT_HEALTH_POLICY, **(normalized.get("health_policy") or {})}
if not health_policy.get("expected_format"):
health_policy["expected_format"] = DEFAULT_FORMAT_BY_KIND.get(normalized.get("kind"), "")
statuses = health_policy.get("expected_statuses") or [200]
health_policy["expected_statuses"] = [int(item) for item in statuses]
health_policy["retries"] = int(health_policy.get("retries") or DEFAULT_HEALTH_POLICY["retries"])
health_policy["backoff_seconds"] = float(health_policy.get("backoff_seconds") or DEFAULT_HEALTH_POLICY["backoff_seconds"])
normalized["health_policy"] = health_policy
parser_hints = {**DEFAULT_PARSER_HINTS, **(normalized.get("parser_hints") or {})}
if not parser_hints.get("keywords"):
parser_hints["keywords"] = list(normalized.get("keywords") or [])
normalized["parser_hints"] = parser_hints
normalized["bucket_name"] = bucket_name
return normalized
def load_source_map() -> Dict[str, Any]:
with SOURCE_MAP_PATH.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle)
if not isinstance(data, dict) or "systems" not in data:
raise ValueError("source-map.yaml must contain a top-level 'systems' list")
systems = data["systems"]
if not isinstance(systems, list):
raise ValueError("'systems' must be a list")
normalized_systems: List[Dict[str, Any]] = []
for system in systems:
cloned = dict(system or {})
for bucket_name in SOURCE_BUCKETS:
sources = cloned.get(bucket_name) or []
cloned[bucket_name] = [_normalize_source(source, bucket_name) for source in sources]
normalized_systems.append(cloned)
return {**data, "systems": normalized_systems}
def load_repro_map() -> Dict[str, Any]:
if not REPRO_MAP_PATH.exists():
return {"systems": []}
with REPRO_MAP_PATH.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if not isinstance(data, dict) or "systems" not in data:
return {"systems": []}
return data
def get_systems_by_group(source_map: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
groups: Dict[str, List[Dict[str, Any]]] = {}
for system in source_map["systems"]:
output_dir = Path(system["output_dir"])
parts = output_dir.parts
if len(parts) < 3:
raise ValueError(f"output_dir too short for system {system['system_id']}")
group = parts[1]
groups.setdefault(group, []).append(system)
return groups
def iter_sources(
system: Dict[str, Any],
*,
include_retired: bool = True,
) -> Iterable[Tuple[str, Dict[str, Any]]]:
for bucket_name in SOURCE_BUCKETS:
for source in system.get(bucket_name, []) or []:
if not include_retired and source.get("status") == "retired":
continue
yield bucket_name, source
def iter_all_sources(
source_map: Dict[str, Any],
*,
include_retired: bool = True,
) -> Iterable[Tuple[Dict[str, Any], str, Dict[str, Any]]]:
for system in source_map.get("systems", []) or []:
for bucket_name, source in iter_sources(system, include_retired=include_retired):
yield system, bucket_name, source