169 行
7.3 KiB
Python
169 行
7.3 KiB
Python
from __future__ import annotations
|
|
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from intel.utils import load_all_json
|
|
from lab.config import ADVISORIES_DIR, ENV_CATALOG_DIR, REPRO_MAP_PATH, REPRO_PROFILES_DIR, RUNS_DIR
|
|
from lab.utils import read_json, read_yaml, unique
|
|
|
|
|
|
FAMILY_KEYWORDS = {
|
|
"xss-generic": ["xss", "cross-site scripting", "dom xss", "trusted types", "content injection"],
|
|
"sqli-generic": ["sql injection", "sqli"],
|
|
"authz-bypass-generic": ["authorization bypass", "auth bypass", "access control", "permission"],
|
|
"ssrf-generic": ["ssrf", "server-side request forgery"],
|
|
"file-upload-generic": ["file upload", "attachment", "extension bypass"],
|
|
"request-smuggling-generic": ["request smuggling", "http desync"],
|
|
"template-injection-generic": ["template injection", "ssti"],
|
|
"deserialization-generic": ["deserialization", "serialization"],
|
|
"proxy-boundary-generic": ["proxy", "middleware", "header trust"],
|
|
"plugin-extension-generic": ["plugin", "module", "extension", "theme"],
|
|
"session-token-generic": ["token", "cookie", "session", "jwt", "localstorage"],
|
|
"path-traversal-generic": ["path traversal", "directory traversal"],
|
|
"misconfiguration-generic": ["misconfiguration", "default credentials", "admin panel", "debug"],
|
|
}
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def load_repro_map() -> Dict[str, Any]:
|
|
return read_yaml(REPRO_MAP_PATH, default={"systems": []}) or {"systems": []}
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def load_profiles() -> Dict[str, Dict[str, Any]]:
|
|
profiles: Dict[str, Dict[str, Any]] = {}
|
|
if not REPRO_PROFILES_DIR.exists():
|
|
return profiles
|
|
for file_path in sorted(REPRO_PROFILES_DIR.rglob("*.yaml")):
|
|
content = read_yaml(file_path, default=None)
|
|
if not isinstance(content, dict):
|
|
continue
|
|
profile_id = content.get("profile_id")
|
|
if profile_id:
|
|
profiles[profile_id] = content
|
|
return profiles
|
|
|
|
|
|
def latest_runs_by_advisory() -> Dict[str, Dict[str, Any]]:
|
|
runs: Dict[str, Dict[str, Any]] = {}
|
|
for item in load_all_json(RUNS_DIR):
|
|
advisory_id = item.get("advisory_id")
|
|
if not advisory_id:
|
|
continue
|
|
previous = runs.get(advisory_id)
|
|
if previous is None or (item.get("finished_at") or "") >= (previous.get("finished_at") or ""):
|
|
runs[advisory_id] = item
|
|
return runs
|
|
|
|
|
|
def resolve_repro_family(advisory: Dict[str, Any], system_map: Dict[str, Any]) -> str:
|
|
text = " ".join(
|
|
filter(
|
|
None,
|
|
[
|
|
advisory.get("title"),
|
|
advisory.get("summary"),
|
|
advisory.get("system_id"),
|
|
" ".join(advisory.get("aliases", [])),
|
|
" ".join(advisory.get("secure_code_topics", [])),
|
|
],
|
|
)
|
|
).lower()
|
|
for family, keywords in FAMILY_KEYWORDS.items():
|
|
if any(keyword in text for keyword in keywords):
|
|
return family
|
|
return system_map.get("default_repro_family", "authz-bypass-generic")
|
|
|
|
|
|
def resolve_profile(advisory_id: str, advisory: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
profiles = load_profiles()
|
|
if advisory_id in profiles:
|
|
return profiles[advisory_id]
|
|
|
|
advisory = advisory or read_json(ADVISORIES_DIR / f"{advisory_id}.json", default={}) or {}
|
|
repro_map = {item["system_id"]: item for item in load_repro_map().get("systems", [])}
|
|
system_map = repro_map.get(advisory.get("system_id", ""), {})
|
|
|
|
direct_profile = profiles.get(f"{advisory.get('system_id', '')}-{advisory_id}")
|
|
if direct_profile:
|
|
return direct_profile
|
|
|
|
family = resolve_repro_family(advisory, system_map)
|
|
system_family_profile = profiles.get(f"{advisory.get('system_id', '')}-{family.replace('-generic', '')}")
|
|
if system_family_profile:
|
|
resolved = dict(system_family_profile)
|
|
resolved.setdefault("resolved_via", "system-family")
|
|
return resolved
|
|
profile = profiles.get(family)
|
|
if profile:
|
|
resolved = dict(profile)
|
|
resolved.setdefault("resolved_via", "family-generic")
|
|
resolved.setdefault("profile_id", family)
|
|
return resolved
|
|
return {
|
|
"profile_id": family,
|
|
"resolved_via": "implicit-fallback",
|
|
"vuln_family": family.replace("-generic", ""),
|
|
"provisioning_mode": "synthetic",
|
|
"browser_assertions": {"required": bool(system_map.get("browser_required_default"))},
|
|
"attack_actions": [],
|
|
"baseline_actions": [],
|
|
"success_criteria": ["manual triage required"],
|
|
"success_assertions": [],
|
|
"cleanup_policy": "destroy",
|
|
"destructive_risk": "medium",
|
|
"allowed_target_types": ["lab-local", "lab-public", "authorized-third-party"],
|
|
}
|
|
|
|
|
|
def advisory_defaults(advisory: Dict[str, Any]) -> Dict[str, Any]:
|
|
profile = resolve_profile(advisory["canonical_id"], advisory)
|
|
repro_map = {item["system_id"]: item for item in load_repro_map().get("systems", [])}
|
|
system_map = repro_map.get(advisory.get("system_id", ""), {})
|
|
mode = "synthetic" if profile.get("provisioning_mode") == "synthetic" else "real"
|
|
return {
|
|
"verification_status": advisory.get("verification_status") or "triage-manual",
|
|
"verification_mode": advisory.get("verification_mode") or mode,
|
|
"last_verified_at": advisory.get("last_verified_at"),
|
|
"last_run_id": advisory.get("last_run_id"),
|
|
"evidence_bundle": advisory.get("evidence_bundle"),
|
|
"browser_evidence": advisory.get("browser_evidence")
|
|
or {
|
|
"required": bool(profile.get("browser_assertions", {}).get("required", system_map.get("browser_required_default", False))),
|
|
"present": False,
|
|
"refs": [],
|
|
},
|
|
"repro_profile_id": advisory.get("repro_profile_id") or profile.get("profile_id"),
|
|
"artifact_mode": advisory.get("artifact_mode") or system_map.get("provisioning_mode_preference", ["synthetic"])[0],
|
|
"blocked_reason": advisory.get("blocked_reason"),
|
|
}
|
|
|
|
|
|
def annotate_with_latest_run(advisory: Dict[str, Any], run: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
|
merged = dict(advisory)
|
|
merged.update(advisory_defaults(advisory))
|
|
if not run:
|
|
return merged
|
|
merged.update(
|
|
{
|
|
"verification_status": run.get("verification_status", merged["verification_status"]),
|
|
"verification_mode": run.get("verification_mode", merged["verification_mode"]),
|
|
"last_verified_at": run.get("finished_at", merged["last_verified_at"]),
|
|
"last_run_id": run.get("run_id"),
|
|
"evidence_bundle": run.get("report_refs", {}).get("bundle_dir"),
|
|
"browser_evidence": {
|
|
"required": merged.get("browser_evidence", {}).get("required", False),
|
|
"present": bool(run.get("browser_refs")),
|
|
"refs": run.get("browser_refs", []),
|
|
},
|
|
"repro_profile_id": run.get("repro_profile_id", merged["repro_profile_id"]),
|
|
"artifact_mode": run.get("artifact_mode", merged["artifact_mode"]),
|
|
"blocked_reason": run.get("blocked_reason"),
|
|
"historical_status": run.get("verification_status", merged["verification_status"]),
|
|
"latest_status": run.get("verification_status", merged["verification_status"]),
|
|
}
|
|
)
|
|
return merged
|