from __future__ import annotations from collections import defaultdict from typing import Any, Dict, Iterable, List, Tuple from intel.models import AdvisoryRecord, Candidate from intel.utils import best_severity, short_hash, unique CONFIDENCE_ORDER = { "official": 4, "ecosystem-authority": 3, "research": 2, "triage-only": 1, } def _best_confidence(values: Iterable[str]) -> str: ordered = sorted(values, key=lambda value: CONFIDENCE_ORDER.get(value, 0), reverse=True) return next((value for value in ordered if value), "triage-only") def canonical_key(candidate: Candidate) -> str: for alias in candidate.cve_ids + candidate.ghsa_ids + candidate.osv_ids + candidate.aliases: if alias: return f"{candidate.system_id}::{alias}" return f"{candidate.system_id}::{short_hash(candidate.title, candidate.source_url)}" def normalize_candidates(candidates: List[Candidate]) -> Tuple[List[AdvisoryRecord], List[Dict[str, Any]]]: buckets: Dict[str, List[Candidate]] = defaultdict(list) for candidate in candidates: buckets[canonical_key(candidate)].append(candidate) advisories: List[AdvisoryRecord] = [] triage: List[Dict[str, Any]] = [] for key, items in sorted(buckets.items()): lead = sorted( items, key=lambda item: CONFIDENCE_ORDER.get(item.source_confidence, 0), reverse=True, )[0] confidence = _best_confidence(item.source_confidence for item in items) aliases = unique(alias for item in items for alias in item.aliases) cve_ids = unique(value for item in items for value in item.cve_ids) ghsa_ids = unique(value for item in items for value in item.ghsa_ids) osv_ids = unique(value for item in items for value in item.osv_ids) affected = unique(value for item in items for value in item.affected_versions) fixed = unique(value for item in items for value in item.fixed_versions) references = unique([item.source_url for item in items] + [ref for item in items for ref in item.references]) published = next((item.published_at for item in items if item.published_at), None) updated = next((item.updated_at for item in items if item.updated_at), published) severity = best_severity(item.severity for item in items) cvss = next((item.cvss_score for item in items if item.cvss_score is not None), None) exploit_status = next( (item.exploit_status for item in items if item.exploit_status and item.exploit_status != "unknown"), "unknown", ) official_refs = [ item.source_url for item in items if item.source_confidence in {"official", "ecosystem-authority"} and item.source_url ] triage_reasons = [] status = "generated" if confidence not in {"official", "ecosystem-authority"}: triage_reasons.append("best source confidence below registry threshold") if not official_refs: triage_reasons.append("no official or ecosystem-authority source URL") if not (affected or fixed): triage_reasons.append("missing affected/fixed version details") if triage_reasons: status = "triage" triage.append( { "canonical_id": key.replace("::", "--"), "system_id": lead.system_id, "title": lead.title, "reasons": triage_reasons, "candidate_count": len(items), "references": references, } ) advisories.append( AdvisoryRecord( canonical_id=key.replace("::", "--"), system_id=lead.system_id, display_name=lead.display_name, category=lead.category, advisory_mode=lead.advisory_mode, title=lead.title, summary=lead.summary, published_at=published, updated_at=updated, severity=severity, cvss_score=cvss, exploit_status=exploit_status, source_confidence=confidence, official_source_url=official_refs[0] if official_refs else (references[0] if references else None), secondary_source_urls=references[1:] if len(references) > 1 else [], aliases=aliases, cve_ids=cve_ids, ghsa_ids=ghsa_ids, osv_ids=osv_ids, affected_versions=affected, fixed_versions=fixed, package_name=lead.package_name, render_markdown=False, case_path=None, secure_code_topics=[], status=status, triage_reasons=triage_reasons, metadata={ "source_names": unique(item.source_name for item in items), "source_kinds": unique(item.source_kind for item in items), "candidate_count": len(items), }, ) ) return advisories, triage