from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Tuple import yaml from intel.config import load_source_map from lab.config import ENV_CATALOG_DIR, ENV_PROFILES_DIR, REPRO_MAP_PATH from lab.utils import ensure_dir, read_yaml, slugify, write_yaml IMAGE_HINTS: Dict[str, Dict[str, Any]] = { "wordpress": { "artifact_mode": "official-image", "services": { "app": {"image": "wordpress:php8.2-apache", "ports": ["18080:80"]}, "db": { "image": "mariadb:10.11", "environment": { "MARIADB_DATABASE": "wordpress", "MARIADB_USER": "wordpress", "MARIADB_PASSWORD": "wordpress", "MARIADB_ROOT_PASSWORD": "root", }, }, }, "browser_required": True, }, "drupal": { "artifact_mode": "official-image", "services": { "app": {"image": "drupal:10-apache", "ports": ["18081:80"]}, "db": { "image": "postgres:15", "environment": { "POSTGRES_DB": "drupal", "POSTGRES_USER": "drupal", "POSTGRES_PASSWORD": "drupal", }, }, }, "browser_required": True, }, "joomla": { "artifact_mode": "official-image", "services": { "app": {"image": "joomla:latest", "ports": ["18082:80"]}, "db": { "image": "mariadb:10.11", "environment": { "MARIADB_DATABASE": "joomla", "MARIADB_USER": "joomla", "MARIADB_PASSWORD": "joomla", "MARIADB_ROOT_PASSWORD": "root", }, }, }, "browser_required": True, }, "prestashop": { "artifact_mode": "official-image", "services": { "app": {"image": "prestashop/prestashop:latest", "ports": ["18083:80"]}, "db": { "image": "mariadb:10.11", "environment": { "MARIADB_DATABASE": "prestashop", "MARIADB_USER": "prestashop", "MARIADB_PASSWORD": "prestashop", "MARIADB_ROOT_PASSWORD": "root", }, }, }, "browser_required": True, }, "opencart": { "artifact_mode": "official-image", "services": { "app": {"image": "bitnami/opencart:latest", "ports": ["18084:8080"]}, "db": { "image": "mariadb:10.11", "environment": { "MARIADB_DATABASE": "opencart", "MARIADB_USER": "opencart", "MARIADB_PASSWORD": "opencart", "MARIADB_ROOT_PASSWORD": "root", }, }, }, "browser_required": True, }, "gitea": { "artifact_mode": "official-image", "services": { "app": {"image": "gitea/gitea:1.22.6", "ports": ["18085:3000"]}, }, "browser_required": True, }, "nginx": { "artifact_mode": "official-image", "services": {"app": {"image": "nginx:1.27-alpine", "ports": ["18086:80"]}}, "browser_required": False, }, "apache-httpd": { "artifact_mode": "official-image", "services": {"app": {"image": "httpd:2.4", "ports": ["18087:80"]}}, "browser_required": False, }, "apache-tomcat": { "artifact_mode": "official-image", "services": {"app": {"image": "tomcat:10.1", "ports": ["18088:8080"]}}, "browser_required": False, }, "nodejs": { "artifact_mode": "official-source", "services": {"app": {"image": "node:22-alpine", "ports": ["18089:3000"]}}, "browser_required": False, }, "nextjs": { "artifact_mode": "official-source", "services": {"app": {"image": "node:22-alpine", "ports": ["18090:3000"]}}, "browser_required": True, }, "vue": { "artifact_mode": "official-source", "services": {"app": {"image": "node:22-alpine", "ports": ["18091:5173"]}}, "browser_required": True, }, "nuxt": { "artifact_mode": "official-source", "services": {"app": {"image": "node:22-alpine", "ports": ["18092:3000"]}}, "browser_required": True, }, "vite": { "artifact_mode": "official-source", "services": {"app": {"image": "node:22-alpine", "ports": ["18093:5173"]}}, "browser_required": True, }, } def _default_repro_family(system: Dict[str, Any]) -> str: topics = set(system.get("secure_code_topics", [])) text = " ".join([system["display_name"], system["system_id"], *topics]).lower() if "xss" in text or "trusted types" in text: return "xss-generic" if "proxy" in text or "middleware" in text: return "proxy-boundary-generic" if "upload" in text: return "file-upload-generic" if "ssrf" in text: return "ssrf-generic" if "deserialization" in text: return "deserialization-generic" if "template" in text: return "template-injection-generic" if "token" in text or "cookie" in text or "session" in text: return "session-token-generic" if any(mode in {"plugin", "module", "extension"} for mode in system.get("advisory_modes", [])): return "plugin-extension-generic" return "authz-bypass-generic" def _fallback_port(system_id: str) -> str: base = 18100 + (sum(ord(ch) for ch in system_id) % 700) return f"{base}:80" def _system_catalog(system: Dict[str, Any]) -> Dict[str, Any]: hint = IMAGE_HINTS.get(system["system_id"], {}) artifact_mode = hint.get("artifact_mode", "synthetic") services = hint.get( "services", {"app": {"image": "nginxdemos/hello:latest", "ports": [_fallback_port(system["system_id"])]}}, ) return { "system_id": system["system_id"], "display_name": system["display_name"], "category": system["category"], "tier": system["tier"], "artifact_mode_preference": [ artifact_mode, "official-source" if artifact_mode != "official-source" else "synthetic", "synthetic", ], "default_repro_family": _default_repro_family(system), "browser_required_default": bool(hint.get("browser_required", system["category"] in {"cms", "ecommerce", "frameworks", "platforms"})), "log_collectors": ["docker-logs", "http-snapshot"], "report_template": "default-lab-report", "services": services, "source_reference": system.get("official_sources", [])[:2], } def _profile_for_system(system: Dict[str, Any], catalog: Dict[str, Any]) -> Dict[str, Any]: ports = [] for service in catalog["services"].values(): ports.extend(service.get("ports", [])) baseline_url = None if ports: first = str(ports[0]).split(":")[0] baseline_url = f"http://127.0.0.1:{first}/" return { "profile_id": f"{system['system_id']}-core-current", "system_id": system["system_id"], "version": "current", "artifact_mode": catalog["artifact_mode_preference"][0], "verification_mode": "real" if catalog["artifact_mode_preference"][0] != "synthetic" else "synthetic", "browser_required": catalog["browser_required_default"], "services": catalog["services"], "baseline_urls": [baseline_url] if baseline_url else [], "seed_actions": [{"kind": "note", "message": "Use default seed strategy derived from repro profile."}], "cleanup_policy": "destroy", } def _build_repro_map_entry(system: Dict[str, Any], catalog: Dict[str, Any]) -> Dict[str, Any]: return { "system_id": system["system_id"], "default_repro_family": catalog["default_repro_family"], "provisioning_mode_preference": catalog["artifact_mode_preference"], "browser_required_default": catalog["browser_required_default"], "seed_strategy": "default-seed" if catalog["browser_required_default"] else "minimal-seed", "log_collectors": catalog["log_collectors"], "report_template": catalog["report_template"], } def sync_catalog(write_profiles: bool = True, write_repro_map: bool = True) -> Dict[str, Any]: source_map = load_source_map() ensure_dir(ENV_CATALOG_DIR) ensure_dir(ENV_PROFILES_DIR / "core") written_catalogs = 0 written_profiles = 0 repro_entries: List[Dict[str, Any]] = [] for system in source_map["systems"]: catalog = _system_catalog(system) catalog_path = ENV_CATALOG_DIR / f"{system['system_id']}.yaml" write_yaml(catalog_path, catalog) written_catalogs += 1 if write_profiles: profile_dir = ENV_PROFILES_DIR / "core" / system["system_id"] write_yaml(profile_dir / "current.yaml", _profile_for_system(system, catalog)) written_profiles += 1 repro_entries.append(_build_repro_map_entry(system, catalog)) if write_repro_map: write_yaml(REPRO_MAP_PATH, {"systems": repro_entries}) return { "systems": len(source_map["systems"]), "catalogs_written": written_catalogs, "profiles_written": written_profiles, "repro_map_written": write_repro_map, }