127 行
5.1 KiB
Python
127 行
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from typing import Any, Dict, Iterable, List, Tuple
|
|
|
|
from intel.models import AdvisoryRecord, Candidate
|
|
from intel.utils import best_severity, short_hash, unique
|
|
|
|
|
|
CONFIDENCE_ORDER = {
|
|
"official": 4,
|
|
"ecosystem-authority": 3,
|
|
"research": 2,
|
|
"triage-only": 1,
|
|
}
|
|
|
|
|
|
def _best_confidence(values: Iterable[str]) -> str:
|
|
ordered = sorted(values, key=lambda value: CONFIDENCE_ORDER.get(value, 0), reverse=True)
|
|
return next((value for value in ordered if value), "triage-only")
|
|
|
|
|
|
def canonical_key(candidate: Candidate) -> str:
|
|
for alias in candidate.cve_ids + candidate.ghsa_ids + candidate.osv_ids + candidate.aliases:
|
|
if alias:
|
|
return f"{candidate.system_id}::{alias}"
|
|
return f"{candidate.system_id}::{short_hash(candidate.title, candidate.source_url)}"
|
|
|
|
|
|
def normalize_candidates(candidates: List[Candidate]) -> Tuple[List[AdvisoryRecord], List[Dict[str, Any]]]:
|
|
buckets: Dict[str, List[Candidate]] = defaultdict(list)
|
|
for candidate in candidates:
|
|
buckets[canonical_key(candidate)].append(candidate)
|
|
|
|
advisories: List[AdvisoryRecord] = []
|
|
triage: List[Dict[str, Any]] = []
|
|
|
|
for key, items in sorted(buckets.items()):
|
|
lead = sorted(
|
|
items,
|
|
key=lambda item: CONFIDENCE_ORDER.get(item.source_confidence, 0),
|
|
reverse=True,
|
|
)[0]
|
|
confidence = _best_confidence(item.source_confidence for item in items)
|
|
aliases = unique(alias for item in items for alias in item.aliases)
|
|
cve_ids = unique(value for item in items for value in item.cve_ids)
|
|
ghsa_ids = unique(value for item in items for value in item.ghsa_ids)
|
|
osv_ids = unique(value for item in items for value in item.osv_ids)
|
|
affected = unique(value for item in items for value in item.affected_versions)
|
|
fixed = unique(value for item in items for value in item.fixed_versions)
|
|
references = unique([item.source_url for item in items] + [ref for item in items for ref in item.references])
|
|
|
|
published = next((item.published_at for item in items if item.published_at), None)
|
|
updated = next((item.updated_at for item in items if item.updated_at), published)
|
|
severity = best_severity(item.severity for item in items)
|
|
cvss = next((item.cvss_score for item in items if item.cvss_score is not None), None)
|
|
exploit_status = next(
|
|
(item.exploit_status for item in items if item.exploit_status and item.exploit_status != "unknown"),
|
|
"unknown",
|
|
)
|
|
|
|
official_refs = [
|
|
item.source_url
|
|
for item in items
|
|
if item.source_confidence in {"official", "ecosystem-authority"} and item.source_url
|
|
]
|
|
triage_reasons = []
|
|
status = "generated"
|
|
if confidence not in {"official", "ecosystem-authority"}:
|
|
triage_reasons.append("best source confidence below registry threshold")
|
|
if not official_refs:
|
|
triage_reasons.append("no official or ecosystem-authority source URL")
|
|
if not (affected or fixed):
|
|
triage_reasons.append("missing affected/fixed version details")
|
|
|
|
if triage_reasons:
|
|
status = "triage"
|
|
triage.append(
|
|
{
|
|
"canonical_id": key.replace("::", "--"),
|
|
"system_id": lead.system_id,
|
|
"title": lead.title,
|
|
"reasons": triage_reasons,
|
|
"candidate_count": len(items),
|
|
"references": references,
|
|
}
|
|
)
|
|
|
|
advisories.append(
|
|
AdvisoryRecord(
|
|
canonical_id=key.replace("::", "--"),
|
|
system_id=lead.system_id,
|
|
display_name=lead.display_name,
|
|
category=lead.category,
|
|
advisory_mode=lead.advisory_mode,
|
|
title=lead.title,
|
|
summary=lead.summary,
|
|
published_at=published,
|
|
updated_at=updated,
|
|
severity=severity,
|
|
cvss_score=cvss,
|
|
exploit_status=exploit_status,
|
|
source_confidence=confidence,
|
|
official_source_url=official_refs[0] if official_refs else (references[0] if references else None),
|
|
secondary_source_urls=references[1:] if len(references) > 1 else [],
|
|
aliases=aliases,
|
|
cve_ids=cve_ids,
|
|
ghsa_ids=ghsa_ids,
|
|
osv_ids=osv_ids,
|
|
affected_versions=affected,
|
|
fixed_versions=fixed,
|
|
package_name=lead.package_name,
|
|
render_markdown=False,
|
|
case_path=None,
|
|
secure_code_topics=[],
|
|
status=status,
|
|
triage_reasons=triage_reasons,
|
|
metadata={
|
|
"source_names": unique(item.source_name for item in items),
|
|
"source_kinds": unique(item.source_kind for item in items),
|
|
"candidate_count": len(items),
|
|
},
|
|
)
|
|
)
|
|
|
|
return advisories, triage
|