194 行
7.7 KiB
Python
194 行
7.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from urllib.parse import parse_qs, urlparse
|
|
from urllib.request import urlopen
|
|
|
|
|
|
SCENARIO_PATH = Path(os.environ["LAB_FIXTURE_SCENARIO"])
|
|
PORT = int(os.environ.get("PORT", "3000"))
|
|
SCENARIO = json.loads(SCENARIO_PATH.read_text(encoding="utf-8"))
|
|
STATE = {
|
|
"seeded": False,
|
|
"proof": False,
|
|
"family": SCENARIO["family"],
|
|
"system_id": SCENARIO["system_id"],
|
|
"case_id": "",
|
|
"detail": "fixture ready",
|
|
"uploads": [],
|
|
"sink_hits": 0,
|
|
"payload": None,
|
|
"events": [],
|
|
}
|
|
|
|
|
|
def _note(event: str, detail: str) -> None:
|
|
STATE["events"].append({"event": event, "detail": detail})
|
|
STATE["events"] = STATE["events"][-20:]
|
|
|
|
|
|
def _render_html() -> str:
|
|
title = SCENARIO["title"]
|
|
proof = STATE["proof"]
|
|
banner = f"<div class='proof'>Proof active: {STATE['detail']}</div>" if proof else "<div class='baseline'>Baseline ready</div>"
|
|
xss_block = ""
|
|
if proof and STATE["family"] == "xss":
|
|
xss_block = (
|
|
"<script>document.documentElement.setAttribute('data-xss-proof','true');"
|
|
f"document.title = {json.dumps(title + ' - proof')};</script>"
|
|
f"<div id='xss-proof'>XSS marker executed for {STATE['case_id']}</div>"
|
|
)
|
|
upload_block = ""
|
|
if STATE["uploads"]:
|
|
items = "".join(f"<li>{item['filename']}</li>" for item in STATE["uploads"])
|
|
upload_block = f"<section><h2>Uploads</h2><ul>{items}</ul></section>"
|
|
sink_block = ""
|
|
if STATE["sink_hits"]:
|
|
sink_block = f"<section id='ssrf-proof'>Local sink hits: {STATE['sink_hits']}</section>"
|
|
deserialize_block = ""
|
|
if proof and STATE["family"] == "deserialization":
|
|
deserialize_block = f"<section id='deserialize-proof'>Decoded marker: {STATE['case_id']}</section>"
|
|
admin_block = ""
|
|
if proof and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
|
|
admin_block = "<section id='admin-proof'>Admin boundary bypass confirmed.</section>"
|
|
return f"""<!doctype html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>{title}{' - proof' if proof and STATE['family'] != 'xss' else ''}</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; background: #0f172a; color: #e2e8f0; margin: 0; padding: 32px; }}
|
|
main {{ max-width: 900px; margin: 0 auto; background: #111827; border: 1px solid #334155; border-radius: 16px; padding: 24px; }}
|
|
.proof {{ padding: 14px; border-radius: 12px; background: #14532d; color: #dcfce7; }}
|
|
.baseline {{ padding: 14px; border-radius: 12px; background: #1e3a8a; color: #dbeafe; }}
|
|
code {{ background: rgba(255,255,255,0.08); padding: 2px 6px; border-radius: 6px; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<main>
|
|
<h1>{title}</h1>
|
|
<p>{SCENARIO['subtitle']}</p>
|
|
{banner}
|
|
<p>System: <code>{SCENARIO['system_id']}</code> / Family: <code>{SCENARIO['family']}</code></p>
|
|
{admin_block}
|
|
{xss_block}
|
|
{upload_block}
|
|
{sink_block}
|
|
{deserialize_block}
|
|
</main>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def log_message(self, format: str, *args) -> None:
|
|
return
|
|
|
|
def _json(self, status_code: int, payload: dict) -> None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.send_response(status_code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _html(self, payload: str) -> None:
|
|
body = payload.encode("utf-8")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path == "/healthz":
|
|
self._json(200, {"ok": True, "system_id": SCENARIO["system_id"], "family": SCENARIO["family"]})
|
|
return
|
|
if parsed.path == "/":
|
|
self._html(_render_html())
|
|
return
|
|
if parsed.path == "/admin":
|
|
if STATE["proof"] and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
|
|
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
|
|
else:
|
|
self._json(403, {"ok": False, "detail": "admin boundary still enforced"})
|
|
return
|
|
if parsed.path == "/sink":
|
|
STATE["sink_hits"] += 1
|
|
_note("sink-hit", parsed.query or "local callback")
|
|
self._json(200, {"ok": True, "sink_hits": STATE["sink_hits"]})
|
|
return
|
|
if parsed.path == "/proof":
|
|
self._json(
|
|
200,
|
|
{
|
|
"success": bool(STATE["proof"]),
|
|
"detail": STATE["detail"],
|
|
"case_id": STATE["case_id"],
|
|
"sink_hits": STATE["sink_hits"],
|
|
"uploads": STATE["uploads"],
|
|
"events": STATE["events"],
|
|
},
|
|
)
|
|
return
|
|
self._json(404, {"ok": False, "detail": "not found"})
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
raw = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
|
|
try:
|
|
payload = json.loads(raw.decode("utf-8") or "{}")
|
|
except Exception:
|
|
payload = {}
|
|
if parsed.path == "/seed":
|
|
STATE["seeded"] = True
|
|
STATE["proof"] = False
|
|
STATE["case_id"] = str(payload.get("case_id") or "")
|
|
STATE["detail"] = "fixture seeded"
|
|
STATE["uploads"] = []
|
|
STATE["sink_hits"] = 0
|
|
STATE["payload"] = None
|
|
_note("seed", STATE["case_id"] or "anonymous")
|
|
self._json(200, {"ok": True, "detail": "fixture seeded", "case_id": STATE["case_id"]})
|
|
return
|
|
if parsed.path == "/attack":
|
|
family = str(payload.get("family") or STATE["family"])
|
|
STATE["case_id"] = str(payload.get("case_id") or STATE["case_id"])
|
|
STATE["payload"] = payload
|
|
STATE["proof"] = True
|
|
if family == "proxy-boundary":
|
|
STATE["detail"] = "trusted forwarded headers crossed the boundary"
|
|
elif family == "authz-bypass":
|
|
STATE["detail"] = "server-side authorization recheck was bypassed"
|
|
elif family == "ssrf":
|
|
with urlopen(f"http://127.0.0.1:{PORT}/sink?case_id={STATE['case_id']}") as response:
|
|
response.read()
|
|
STATE["detail"] = "server-side callback reached the local sink"
|
|
elif family == "xss":
|
|
STATE["detail"] = "stored payload rendered inside the browser proof page"
|
|
elif family == "file-upload":
|
|
STATE["uploads"].append(
|
|
{
|
|
"filename": payload.get("filename") or f"{STATE['case_id']}.txt",
|
|
"content": payload.get("content") or "",
|
|
}
|
|
)
|
|
STATE["detail"] = "upload marker accepted and listed"
|
|
elif family == "deserialization":
|
|
STATE["detail"] = "unsafe object graph decoded without gadget execution"
|
|
_note("attack", STATE["detail"])
|
|
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
|
|
return
|
|
self._json(404, {"ok": False, "detail": "not found"})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
|
|
server.serve_forever()
|
|
|