65 行
2.7 KiB
Python
65 行
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List
|
|
|
|
from intel.http_client import request
|
|
from intel.models import Candidate
|
|
from intel.utils import unique
|
|
|
|
|
|
def _refs(item: Dict[str, Any]) -> List[str]:
|
|
values: List[str] = []
|
|
for entry in item.get("references", []) or []:
|
|
if isinstance(entry, str):
|
|
values.append(entry)
|
|
elif isinstance(entry, dict) and entry.get("url"):
|
|
values.append(entry["url"])
|
|
return unique(values)
|
|
|
|
|
|
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
|
if not isinstance(items, list):
|
|
return []
|
|
|
|
parser_hints = source.get("parser_hints") or {}
|
|
keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))}
|
|
candidates: List[Candidate] = []
|
|
for item in items[: source.get("max_items", 50)]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
title = item.get("title") or item.get("name") or item.get("summary") or f"JSON entry for {system['display_name']}"
|
|
link = item.get("url") or item.get("external_url") or item.get("html_url") or source["url"]
|
|
summary = item.get("summary") or item.get("content_text") or item.get("description") or ""
|
|
if keywords:
|
|
haystack = " ".join(filter(None, [title, summary, link])).lower()
|
|
if not any(keyword in haystack for keyword in keywords):
|
|
continue
|
|
refs = _refs(item)
|
|
if link and link not in refs:
|
|
refs.insert(0, link)
|
|
candidates.append(
|
|
Candidate(
|
|
system_id=system["system_id"],
|
|
display_name=system["display_name"],
|
|
category=system["category"],
|
|
advisory_mode=source.get("advisory_mode", "core"),
|
|
source_kind=source["kind"],
|
|
source_name=source["name"],
|
|
source_confidence=source["confidence"],
|
|
source_url=link,
|
|
title=title,
|
|
published_at=item.get("date_published") or item.get("published_at") or item.get("published") or item.get("created_at"),
|
|
updated_at=item.get("date_modified") or item.get("updated_at") or item.get("modified") or item.get("updated"),
|
|
summary=summary,
|
|
severity=str(item.get("severity") or "unknown").lower(),
|
|
aliases=unique(item.get("aliases", []) or [item.get("id")]),
|
|
references=refs,
|
|
raw=item,
|
|
)
|
|
)
|
|
return candidates
|