from __future__ import annotations import re from html import unescape from typing import Any, Dict, List from urllib.parse import urljoin import requests from intel.http_client import request from intel.models import Candidate from intel.utils import unique ANCHOR_RE = re.compile(r"]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)", re.IGNORECASE | re.DOTALL) TAG_RE = re.compile(r"<[^>]+>") def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: response = request("GET", source["url"]) response.raise_for_status() html = response.text keywords = {kw.lower() for kw in source.get("keywords", [])} candidates: List[Candidate] = [] seen = set() for href, text in ANCHOR_RE.findall(html): title = unescape(TAG_RE.sub(" ", text)).strip() if not title: continue absolute = urljoin(source["url"], href) haystack = f"{title} {absolute}".lower() if keywords and not any(keyword in haystack for keyword in keywords): continue if absolute in seen: continue seen.add(absolute) candidates.append( Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=absolute, title=title, summary="", severity="unknown", references=unique([absolute]), raw={"href": absolute, "title": title}, ) ) if len(candidates) >= source.get("max_items", 50): break return candidates