from __future__ import annotations from pathlib import Path from typing import Any, Dict, List from intel.entities import enrich_advisory_record from intel.models import AdvisoryRecord from intel.utils import slugify TOPIC_KEYWORDS = { "xss-output-encoding": ["xss", "cross-site scripting"], "dom-sink-hardening": ["dom xss", "innerhtml", "outerhtml"], "csp-trusted-types": ["trusted types", "content security policy", "csp"], "token-cookie-storage": ["cookie", "token", "session", "jwt"], "authz-server-side-recheck": ["authorization bypass", "auth bypass", "improper authorization", "access control"], "ssrf-url-validation": ["ssrf", "server-side request forgery"], "request-smuggling-boundary": ["request smuggling", "http desync"], "path-traversal-guard": ["path traversal", "directory traversal"], "file-upload-validation": ["file upload", "upload"], "plugin-extension-trust-policy": ["plugin", "extension", "module", "theme"], "dependency-upgrade-policy": ["dependency", "supply chain", "advisory", "package"], "proxy-trust-boundary": ["proxy", "middleware", "reverse proxy", "header trust"], "deserialization-safety": ["deserialization", "serialization"], "template-injection-guard": ["template injection", "ssti"], } HIGH_VALUE_TERMS = [ "rce", "remote code execution", "authorization bypass", "auth bypass", "known_exploited", "known exploited", "ssrf", "deserialization", ] def _pick_topics(system: Dict[str, Any], advisory: AdvisoryRecord) -> List[str]: haystack = " ".join( filter( None, [ advisory.title, advisory.summary, " ".join(advisory.aliases), ], ) ).lower() topics = list(system.get("secure_code_topics", [])) for topic, keywords in TOPIC_KEYWORDS.items(): if any(keyword in haystack for keyword in keywords): topics.append(topic) # preserve order while deduping seen = set() result = [] for topic in topics: if topic not in seen: seen.add(topic) result.append(topic) return result def _should_render(system: Dict[str, Any], advisory: AdvisoryRecord) -> bool: if advisory.status == "triage": return False policy = system.get("render_policy", {}) if advisory.advisory_mode == "core" and policy.get("core_always_markdown", False): return True haystack = f"{advisory.title} {advisory.summary} {advisory.exploit_status}".lower() if advisory.exploit_status and advisory.exploit_status != "unknown": return True if advisory.cvss_score is not None and advisory.cvss_score >= 8.8: return True if advisory.severity in {"critical", "high"} and any(term in haystack for term in HIGH_VALUE_TERMS): return True return False def route_advisories(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> List[AdvisoryRecord]: systems = {system["system_id"]: system for system in source_map["systems"]} routed: List[AdvisoryRecord] = [] for advisory in advisories: system = systems[advisory.system_id] enrich_advisory_record(advisory, system) advisory.secure_code_topics = _pick_topics(system, advisory) advisory.render_markdown = _should_render(system, advisory) if advisory.render_markdown: slug = slugify("-".join(filter(None, [advisory.system_id, advisory.cve_ids[0] if advisory.cve_ids else advisory.ghsa_ids[0] if advisory.ghsa_ids else advisory.title]))) advisory.case_path = str(Path(system["output_dir"]) / "cases" / f"{slug}.md") routed.append(advisory) return routed