53 行
1.8 KiB
Python
53 行
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Any, Dict, List
|
|
|
|
import requests
|
|
|
|
from intel.http_client import request
|
|
from intel.models import Candidate
|
|
|
|
|
|
def _text(node: ET.Element, name: str) -> str:
|
|
child = node.find(name)
|
|
return child.text.strip() if child is not None and child.text else ""
|
|
|
|
|
|
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
|
response = request("GET", source["url"])
|
|
response.raise_for_status()
|
|
root = ET.fromstring(response.content)
|
|
|
|
keywords = {kw.lower() for kw in source.get("keywords", [])}
|
|
items = root.findall(".//item")
|
|
candidates: List[Candidate] = []
|
|
for item in items[: source.get("max_items", 50)]:
|
|
title = _text(item, "title")
|
|
link = _text(item, "link") or source["url"]
|
|
description = _text(item, "description")
|
|
if keywords:
|
|
haystack = " ".join([title, description]).lower()
|
|
if not any(keyword in haystack for keyword in keywords):
|
|
continue
|
|
candidates.append(
|
|
Candidate(
|
|
system_id=system["system_id"],
|
|
display_name=system["display_name"],
|
|
category=system["category"],
|
|
advisory_mode=source.get("advisory_mode", "core"),
|
|
source_kind=source["kind"],
|
|
source_name=source["name"],
|
|
source_confidence=source["confidence"],
|
|
source_url=link,
|
|
title=title or f"RSS entry for {system['display_name']}",
|
|
published_at=_text(item, "pubDate"),
|
|
updated_at=_text(item, "pubDate"),
|
|
summary=description,
|
|
severity="unknown",
|
|
references=[link],
|
|
raw={"title": title, "link": link},
|
|
)
|
|
)
|
|
return candidates
|