文件
websafe-kb/scripts/intel/sources/cisa_kev.py

58 行
2.0 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List
import requests
from intel.models import Candidate
from intel.utils import unique
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
response = requests.get(source["url"], headers={"User-Agent": "websafe-intel"}, timeout=30)
response.raise_for_status()
payload = response.json()
keywords = {kw.lower() for kw in source.get("keywords") or system.get("kev_keywords", []) or [system["display_name"]]}
candidates: List[Candidate] = []
for vuln in payload.get("vulnerabilities", []):
haystack = " ".join(
filter(
None,
[
vuln.get("vendorProject"),
vuln.get("product"),
vuln.get("vulnerabilityName"),
vuln.get("shortDescription"),
],
)
).lower()
if not any(keyword in haystack for keyword in keywords):
continue
cve = vuln.get("cveID")
refs = [source["url"]]
candidates.append(
Candidate(
system_id=system["system_id"],
display_name=system["display_name"],
category=system["category"],
advisory_mode=source.get("advisory_mode", "core"),
source_kind=source["kind"],
source_name=source["name"],
source_confidence=source["confidence"],
source_url=source["url"],
title=vuln.get("vulnerabilityName") or cve or f"KEV advisory for {system['display_name']}",
published_at=vuln.get("dateAdded"),
updated_at=vuln.get("dueDate"),
summary=vuln.get("shortDescription") or "",
severity="critical",
exploit_status="known_exploited",
aliases=unique([cve]),
cve_ids=[cve] if cve else [],
references=refs,
raw=vuln,
)
)
return candidates