文件
websafe-kb/scripts/intel/route.py

94 行
3.7 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List
from intel.entities import enrich_advisory_record
from intel.models import AdvisoryRecord
from intel.utils import slugify
TOPIC_KEYWORDS = {
"xss-output-encoding": ["xss", "cross-site scripting"],
"dom-sink-hardening": ["dom xss", "innerhtml", "outerhtml"],
"csp-trusted-types": ["trusted types", "content security policy", "csp"],
"token-cookie-storage": ["cookie", "token", "session", "jwt"],
"authz-server-side-recheck": ["authorization bypass", "auth bypass", "improper authorization", "access control"],
"ssrf-url-validation": ["ssrf", "server-side request forgery"],
"request-smuggling-boundary": ["request smuggling", "http desync"],
"path-traversal-guard": ["path traversal", "directory traversal"],
"file-upload-validation": ["file upload", "upload"],
"plugin-extension-trust-policy": ["plugin", "extension", "module", "theme"],
"dependency-upgrade-policy": ["dependency", "supply chain", "advisory", "package"],
"proxy-trust-boundary": ["proxy", "middleware", "reverse proxy", "header trust"],
"deserialization-safety": ["deserialization", "serialization"],
"template-injection-guard": ["template injection", "ssti"],
}
HIGH_VALUE_TERMS = [
"rce",
"remote code execution",
"authorization bypass",
"auth bypass",
"known_exploited",
"known exploited",
"ssrf",
"deserialization",
]
def _pick_topics(system: Dict[str, Any], advisory: AdvisoryRecord) -> List[str]:
haystack = " ".join(
filter(
None,
[
advisory.title,
advisory.summary,
" ".join(advisory.aliases),
],
)
).lower()
topics = list(system.get("secure_code_topics", []))
for topic, keywords in TOPIC_KEYWORDS.items():
if any(keyword in haystack for keyword in keywords):
topics.append(topic)
# preserve order while deduping
seen = set()
result = []
for topic in topics:
if topic not in seen:
seen.add(topic)
result.append(topic)
return result
def _should_render(system: Dict[str, Any], advisory: AdvisoryRecord) -> bool:
if advisory.status == "triage":
return False
policy = system.get("render_policy", {})
if advisory.advisory_mode == "core" and policy.get("core_always_markdown", False):
return True
haystack = f"{advisory.title} {advisory.summary} {advisory.exploit_status}".lower()
if advisory.exploit_status and advisory.exploit_status != "unknown":
return True
if advisory.cvss_score is not None and advisory.cvss_score >= 8.8:
return True
if advisory.severity in {"critical", "high"} and any(term in haystack for term in HIGH_VALUE_TERMS):
return True
return False
def route_advisories(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> List[AdvisoryRecord]:
systems = {system["system_id"]: system for system in source_map["systems"]}
routed: List[AdvisoryRecord] = []
for advisory in advisories:
system = systems[advisory.system_id]
enrich_advisory_record(advisory, system)
advisory.secure_code_topics = _pick_topics(system, advisory)
advisory.render_markdown = _should_render(system, advisory)
if advisory.render_markdown:
slug = slugify("-".join(filter(None, [advisory.system_id, advisory.cve_ids[0] if advisory.cve_ids else advisory.ghsa_ids[0] if advisory.ghsa_ids else advisory.title])))
advisory.case_path = str(Path(system["output_dir"]) / "cases" / f"{slug}.md")
routed.append(advisory)
return routed