from __future__ import annotations import re from html import unescape from typing import Any, Dict, List from urllib.parse import urljoin, urlsplit, urlunsplit import requests from intel.http_client import request from intel.models import Candidate from intel.utils import unique ANCHOR_RE = re.compile(r"]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)", re.IGNORECASE | re.DOTALL) TAG_RE = re.compile(r"<[^>]+>") GENERIC_TITLES = { "permalink", "discuss this topic", "read full topic", "read more", } def _is_generic_title(title: str) -> bool: return title.strip().lower() in GENERIC_TITLES def canonicalize_url(url: str) -> str: parsed = urlsplit(url) return urlunsplit((parsed.scheme, parsed.netloc, parsed.path, parsed.query, "")) def _matches_patterns(value: str, patterns: List[str]) -> bool: if not patterns: return True return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns) def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]: response = request("GET", source["url"], source=source) response.raise_for_status() html = response.text parser_hints = source.get("parser_hints") or {} keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))} include_patterns = parser_hints.get("include_url_patterns") or [] exclude_patterns = parser_hints.get("exclude_url_patterns") or [] candidates: List[Candidate] = [] by_url: Dict[str, Candidate] = {} ordered_urls: List[str] = [] for href, text in ANCHOR_RE.findall(html): title = unescape(TAG_RE.sub(" ", text)).strip() if not title: continue absolute = canonicalize_url(urljoin(source["url"], href)) haystack = f"{title} {absolute}".lower() if keywords and not any(keyword in haystack for keyword in keywords): continue if include_patterns and not _matches_patterns(absolute, include_patterns): continue if exclude_patterns and _matches_patterns(absolute, exclude_patterns): continue existing = by_url.get(absolute) if existing is None: ordered_urls.append(absolute) by_url[absolute] = Candidate( system_id=system["system_id"], display_name=system["display_name"], category=system["category"], advisory_mode=source.get("advisory_mode", "core"), source_kind=source["kind"], source_name=source["name"], source_confidence=source["confidence"], source_url=absolute, title=title, summary="", severity="unknown", references=unique([absolute]), raw={"href": absolute, "title": title}, ) continue if _is_generic_title(existing.title) and not _is_generic_title(title): existing.title = title existing.raw = {"href": absolute, "title": title} continue if _is_generic_title(title) and not _is_generic_title(existing.title): continue if len(title) > len(existing.title): existing.title = title existing.raw = {"href": absolute, "title": title} for absolute in ordered_urls[: source.get("max_items", 50)]: candidates.append(by_url[absolute]) return candidates