94 行
3.7 KiB
Python
94 行
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
from intel.entities import enrich_advisory_record
|
|
from intel.models import AdvisoryRecord
|
|
from intel.utils import slugify
|
|
|
|
|
|
TOPIC_KEYWORDS = {
|
|
"xss-output-encoding": ["xss", "cross-site scripting"],
|
|
"dom-sink-hardening": ["dom xss", "innerhtml", "outerhtml"],
|
|
"csp-trusted-types": ["trusted types", "content security policy", "csp"],
|
|
"token-cookie-storage": ["cookie", "token", "session", "jwt"],
|
|
"authz-server-side-recheck": ["authorization bypass", "auth bypass", "improper authorization", "access control"],
|
|
"ssrf-url-validation": ["ssrf", "server-side request forgery"],
|
|
"request-smuggling-boundary": ["request smuggling", "http desync"],
|
|
"path-traversal-guard": ["path traversal", "directory traversal"],
|
|
"file-upload-validation": ["file upload", "upload"],
|
|
"plugin-extension-trust-policy": ["plugin", "extension", "module", "theme"],
|
|
"dependency-upgrade-policy": ["dependency", "supply chain", "advisory", "package"],
|
|
"proxy-trust-boundary": ["proxy", "middleware", "reverse proxy", "header trust"],
|
|
"deserialization-safety": ["deserialization", "serialization"],
|
|
"template-injection-guard": ["template injection", "ssti"],
|
|
}
|
|
|
|
HIGH_VALUE_TERMS = [
|
|
"rce",
|
|
"remote code execution",
|
|
"authorization bypass",
|
|
"auth bypass",
|
|
"known_exploited",
|
|
"known exploited",
|
|
"ssrf",
|
|
"deserialization",
|
|
]
|
|
|
|
|
|
def _pick_topics(system: Dict[str, Any], advisory: AdvisoryRecord) -> List[str]:
|
|
haystack = " ".join(
|
|
filter(
|
|
None,
|
|
[
|
|
advisory.title,
|
|
advisory.summary,
|
|
" ".join(advisory.aliases),
|
|
],
|
|
)
|
|
).lower()
|
|
topics = list(system.get("secure_code_topics", []))
|
|
for topic, keywords in TOPIC_KEYWORDS.items():
|
|
if any(keyword in haystack for keyword in keywords):
|
|
topics.append(topic)
|
|
# preserve order while deduping
|
|
seen = set()
|
|
result = []
|
|
for topic in topics:
|
|
if topic not in seen:
|
|
seen.add(topic)
|
|
result.append(topic)
|
|
return result
|
|
|
|
|
|
def _should_render(system: Dict[str, Any], advisory: AdvisoryRecord) -> bool:
|
|
if advisory.status == "triage":
|
|
return False
|
|
policy = system.get("render_policy", {})
|
|
if advisory.advisory_mode == "core" and policy.get("core_always_markdown", False):
|
|
return True
|
|
haystack = f"{advisory.title} {advisory.summary} {advisory.exploit_status}".lower()
|
|
if advisory.exploit_status and advisory.exploit_status != "unknown":
|
|
return True
|
|
if advisory.cvss_score is not None and advisory.cvss_score >= 8.8:
|
|
return True
|
|
if advisory.severity in {"critical", "high"} and any(term in haystack for term in HIGH_VALUE_TERMS):
|
|
return True
|
|
return False
|
|
|
|
|
|
def route_advisories(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> List[AdvisoryRecord]:
|
|
systems = {system["system_id"]: system for system in source_map["systems"]}
|
|
routed: List[AdvisoryRecord] = []
|
|
for advisory in advisories:
|
|
system = systems[advisory.system_id]
|
|
enrich_advisory_record(advisory, system)
|
|
advisory.secure_code_topics = _pick_topics(system, advisory)
|
|
advisory.render_markdown = _should_render(system, advisory)
|
|
if advisory.render_markdown:
|
|
slug = slugify("-".join(filter(None, [advisory.system_id, advisory.cve_ids[0] if advisory.cve_ids else advisory.ghsa_ids[0] if advisory.ghsa_ids else advisory.title])))
|
|
advisory.case_path = str(Path(system["output_dir"]) / "cases" / f"{slug}.md")
|
|
routed.append(advisory)
|
|
return routed
|