from __future__ import annotations
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from urllib.request import urlopen
SCENARIO_PATH = Path(os.environ["LAB_FIXTURE_SCENARIO"])
PORT = int(os.environ.get("PORT", "3000"))
SCENARIO = json.loads(SCENARIO_PATH.read_text(encoding="utf-8"))
STATE = {
"seeded": False,
"proof": False,
"family": SCENARIO["family"],
"system_id": SCENARIO["system_id"],
"case_id": "",
"detail": "fixture ready",
"uploads": [],
"sink_hits": 0,
"payload": None,
"events": [],
}
def _note(event: str, detail: str) -> None:
STATE["events"].append({"event": event, "detail": detail})
STATE["events"] = STATE["events"][-20:]
def _render_html() -> str:
title = SCENARIO["title"]
proof = STATE["proof"]
banner = f"
Proof active: {STATE['detail']}
" if proof else "Baseline ready
"
xss_block = ""
if proof and STATE["family"] == "xss":
xss_block = (
""
f"XSS marker executed for {STATE['case_id']}
"
)
upload_block = ""
if STATE["uploads"]:
items = "".join(f"{item['filename']}" for item in STATE["uploads"])
upload_block = f""
sink_block = ""
if STATE["sink_hits"]:
sink_block = f"Local sink hits: {STATE['sink_hits']}"
deserialize_block = ""
if proof and STATE["family"] == "deserialization":
deserialize_block = f"Decoded marker: {STATE['case_id']}"
admin_block = ""
if proof and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
admin_block = "Admin boundary bypass confirmed."
return f"""
{title}{' - proof' if proof and STATE['family'] != 'xss' else ''}
{title}
{SCENARIO['subtitle']}
{banner}
System: {SCENARIO['system_id']} / Family: {SCENARIO['family']}
{admin_block}
{xss_block}
{upload_block}
{sink_block}
{deserialize_block}
"""
class Handler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args) -> None:
return
def _json(self, status_code: int, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, payload: str) -> None:
body = payload.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/healthz":
self._json(200, {"ok": True, "system_id": SCENARIO["system_id"], "family": SCENARIO["family"]})
return
if parsed.path == "/":
self._html(_render_html())
return
if parsed.path == "/admin":
if STATE["proof"] and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
else:
self._json(403, {"ok": False, "detail": "admin boundary still enforced"})
return
if parsed.path == "/sink":
STATE["sink_hits"] += 1
_note("sink-hit", parsed.query or "local callback")
self._json(200, {"ok": True, "sink_hits": STATE["sink_hits"]})
return
if parsed.path == "/proof":
self._json(
200,
{
"success": bool(STATE["proof"]),
"detail": STATE["detail"],
"case_id": STATE["case_id"],
"sink_hits": STATE["sink_hits"],
"uploads": STATE["uploads"],
"events": STATE["events"],
},
)
return
self._json(404, {"ok": False, "detail": "not found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
raw = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
try:
payload = json.loads(raw.decode("utf-8") or "{}")
except Exception:
payload = {}
if parsed.path == "/seed":
STATE["seeded"] = True
STATE["proof"] = False
STATE["case_id"] = str(payload.get("case_id") or "")
STATE["detail"] = "fixture seeded"
STATE["uploads"] = []
STATE["sink_hits"] = 0
STATE["payload"] = None
_note("seed", STATE["case_id"] or "anonymous")
self._json(200, {"ok": True, "detail": "fixture seeded", "case_id": STATE["case_id"]})
return
if parsed.path == "/attack":
family = str(payload.get("family") or STATE["family"])
STATE["case_id"] = str(payload.get("case_id") or STATE["case_id"])
STATE["payload"] = payload
STATE["proof"] = True
if family == "proxy-boundary":
STATE["detail"] = "trusted forwarded headers crossed the boundary"
elif family == "authz-bypass":
STATE["detail"] = "server-side authorization recheck was bypassed"
elif family == "ssrf":
with urlopen(f"http://127.0.0.1:{PORT}/sink?case_id={STATE['case_id']}") as response:
response.read()
STATE["detail"] = "server-side callback reached the local sink"
elif family == "xss":
STATE["detail"] = "stored payload rendered inside the browser proof page"
elif family == "file-upload":
STATE["uploads"].append(
{
"filename": payload.get("filename") or f"{STATE['case_id']}.txt",
"content": payload.get("content") or "",
}
)
STATE["detail"] = "upload marker accepted and listed"
elif family == "deserialization":
STATE["detail"] = "unsafe object graph decoded without gadget execution"
_note("attack", STATE["detail"])
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
return
self._json(404, {"ok": False, "detail": "not found"})
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
server.serve_forever()