103 行
3.3 KiB
Python
103 行
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from html import unescape
|
|
from html.parser import HTMLParser
|
|
from typing import Any, Dict, List
|
|
from urllib.parse import urljoin
|
|
|
|
from intel.http_client import request
|
|
from intel.models import Candidate
|
|
from intel.utils import unique
|
|
|
|
from .html_links import canonicalize_url
|
|
|
|
|
|
class _AnchorCollector(HTMLParser):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.links: List[tuple[str, str]] = []
|
|
self._href: str | None = None
|
|
self._chunks: List[str] = []
|
|
|
|
def handle_starttag(self, tag: str, attrs) -> None:
|
|
if tag.lower() != "a":
|
|
return
|
|
href = dict(attrs).get("href")
|
|
if href:
|
|
self._href = href
|
|
self._chunks = []
|
|
|
|
def handle_data(self, data: str) -> None:
|
|
if self._href is not None:
|
|
self._chunks.append(data)
|
|
|
|
def handle_endtag(self, tag: str) -> None:
|
|
if tag.lower() != "a" or self._href is None:
|
|
return
|
|
text = unescape(" ".join(self._chunks)).strip()
|
|
self.links.append((self._href, text))
|
|
self._href = None
|
|
self._chunks = []
|
|
|
|
|
|
def extract_links(html: str) -> List[tuple[str, str]]:
|
|
parser = _AnchorCollector()
|
|
parser.feed(html)
|
|
parser.close()
|
|
return parser.links
|
|
|
|
|
|
def _matches(value: str, patterns: List[str]) -> bool:
|
|
if not patterns:
|
|
return True
|
|
return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns)
|
|
|
|
|
|
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
|
response = request("GET", source["url"], source=source)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
parser_hints = source.get("parser_hints") or {}
|
|
keywords = {kw.lower() for kw in (parser_hints.get("keywords") or source.get("keywords", []))}
|
|
include_patterns = parser_hints.get("include_url_patterns") or []
|
|
exclude_patterns = parser_hints.get("exclude_url_patterns") or []
|
|
|
|
candidates: List[Candidate] = []
|
|
seen = set()
|
|
for href, text in extract_links(html):
|
|
absolute = canonicalize_url(urljoin(source["url"], href))
|
|
title = unescape(text).strip()
|
|
if not title:
|
|
continue
|
|
haystack = " ".join(filter(None, [absolute, title])).lower()
|
|
if keywords and not any(keyword in haystack for keyword in keywords):
|
|
continue
|
|
if include_patterns and not _matches(absolute, include_patterns):
|
|
continue
|
|
if exclude_patterns and _matches(absolute, exclude_patterns):
|
|
continue
|
|
if absolute in seen:
|
|
continue
|
|
seen.add(absolute)
|
|
candidates.append(
|
|
Candidate(
|
|
system_id=system["system_id"],
|
|
display_name=system["display_name"],
|
|
category=system["category"],
|
|
advisory_mode=source.get("advisory_mode", "core"),
|
|
source_kind=source["kind"],
|
|
source_name=source["name"],
|
|
source_confidence=source["confidence"],
|
|
source_url=absolute,
|
|
title=title,
|
|
summary="",
|
|
severity="unknown",
|
|
references=unique([absolute]),
|
|
raw={"href": absolute, "title": title},
|
|
)
|
|
)
|
|
if len(candidates) >= source.get("max_items", 50):
|
|
break
|
|
return candidates
|