from __future__ import annotations import re from html import unescape from html.parser import HTMLParser from typing import Any, Dict, List from urllib.parse import urljoin from intel.http_client import request from intel.models import Candidate from intel.utils import unique from .html_links import canonicalize_url class _AnchorCollector(HTMLParser): def __init__(self) -> None: super().__init__() self.links: List[tuple[str, str]] = [] self._href: str | None = None self._chunks: List[str] = [] def handle_starttag(self, tag: str, attrs) -> None: if tag.lower() != "a": return href = dict(attrs).get("href") if href: self._href = href self._chunks = [] def handle_data(self, data: str) -> None: if self._href is not None: self._chunks.append(data) def handle_endtag(self, tag: str) -> None: if tag.lower() != "a" or self._href is None: return text = unescape(" ".join(self._chunks)).strip() self.links.append((self._href, text)) self._href = None self._chunks = [] def extract_links(html: str) -> List[tuple[str, str]]: parser = _AnchorCollector() parser.feed(html) parser.close() return parser.links def _matches(value: str, patterns: List[str]) -> bool: if not patterns: return True return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns) def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: response = request("GET", source["url"], source=source) response.raise_for_status() html = response.text parser_hints = source.get("parser_hints") or {} keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} include_patterns = parser_hints.get("include_url_patterns") or [] exclude_patterns = parser_hints.get("exclude_url_patterns") or [] candidates: List[Candidate] = [] seen = set() for href, text in extract_links(html): absolute = canonicalize_url(urljoin(source["url"], href)) title = unescape(text).strip() if not title: continue haystack = " ".join(filter(None, [absolute, title])).lower() if keywords and not any(keyword in haystack for keyword in keywords): continue if include_patterns and not _matches(absolute, include_patterns): continue if exclude_patterns and _matches(absolute, exclude_patterns): continue if absolute in seen: continue seen.add(absolute) candidates.append( Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=absolute, title=title, summary="", severity="unknown", references=unique([absolute]), raw={"href": absolute, "title": title}, ) ) if len(candidates) >= source.get("max_items", 50): break return candidates