文件
websafe-kb/00-environments/templates/fixtures/shared/python_fixture.py

194 行
7.7 KiB
Python

from __future__ import annotations
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from urllib.request import urlopen
SCENARIO_PATH = Path(os.environ["LAB_FIXTURE_SCENARIO"])
PORT = int(os.environ.get("PORT", "3000"))
SCENARIO = json.loads(SCENARIO_PATH.read_text(encoding="utf-8"))
STATE = {
"seeded": False,
"proof": False,
"family": SCENARIO["family"],
"system_id": SCENARIO["system_id"],
"case_id": "",
"detail": "fixture ready",
"uploads": [],
"sink_hits": 0,
"payload": None,
"events": [],
}
def _note(event: str, detail: str) -> None:
STATE["events"].append({"event": event, "detail": detail})
STATE["events"] = STATE["events"][-20:]
def _render_html() -> str:
title = SCENARIO["title"]
proof = STATE["proof"]
banner = f"<div class='proof'>Proof active: {STATE['detail']}</div>" if proof else "<div class='baseline'>Baseline ready</div>"
xss_block = ""
if proof and STATE["family"] == "xss":
xss_block = (
"<script>document.documentElement.setAttribute('data-xss-proof','true');"
f"document.title = {json.dumps(title + ' - proof')};</script>"
f"<div id='xss-proof'>XSS marker executed for {STATE['case_id']}</div>"
)
upload_block = ""
if STATE["uploads"]:
items = "".join(f"<li>{item['filename']}</li>" for item in STATE["uploads"])
upload_block = f"<section><h2>Uploads</h2><ul>{items}</ul></section>"
sink_block = ""
if STATE["sink_hits"]:
sink_block = f"<section id='ssrf-proof'>Local sink hits: {STATE['sink_hits']}</section>"
deserialize_block = ""
if proof and STATE["family"] == "deserialization":
deserialize_block = f"<section id='deserialize-proof'>Decoded marker: {STATE['case_id']}</section>"
admin_block = ""
if proof and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
admin_block = "<section id='admin-proof'>Admin boundary bypass confirmed.</section>"
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{title}{' - proof' if proof and STATE['family'] != 'xss' else ''}</title>
<style>
body {{ font-family: sans-serif; background: #0f172a; color: #e2e8f0; margin: 0; padding: 32px; }}
main {{ max-width: 900px; margin: 0 auto; background: #111827; border: 1px solid #334155; border-radius: 16px; padding: 24px; }}
.proof {{ padding: 14px; border-radius: 12px; background: #14532d; color: #dcfce7; }}
.baseline {{ padding: 14px; border-radius: 12px; background: #1e3a8a; color: #dbeafe; }}
code {{ background: rgba(255,255,255,0.08); padding: 2px 6px; border-radius: 6px; }}
</style>
</head>
<body>
<main>
<h1>{title}</h1>
<p>{SCENARIO['subtitle']}</p>
{banner}
<p>System: <code>{SCENARIO['system_id']}</code> / Family: <code>{SCENARIO['family']}</code></p>
{admin_block}
{xss_block}
{upload_block}
{sink_block}
{deserialize_block}
</main>
</body>
</html>"""
class Handler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args) -> None:
return
def _json(self, status_code: int, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _html(self, payload: str) -> None:
body = payload.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/healthz":
self._json(200, {"ok": True, "system_id": SCENARIO["system_id"], "family": SCENARIO["family"]})
return
if parsed.path == "/":
self._html(_render_html())
return
if parsed.path == "/admin":
if STATE["proof"] and STATE["family"] in {"proxy-boundary", "authz-bypass"}:
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
else:
self._json(403, {"ok": False, "detail": "admin boundary still enforced"})
return
if parsed.path == "/sink":
STATE["sink_hits"] += 1
_note("sink-hit", parsed.query or "local callback")
self._json(200, {"ok": True, "sink_hits": STATE["sink_hits"]})
return
if parsed.path == "/proof":
self._json(
200,
{
"success": bool(STATE["proof"]),
"detail": STATE["detail"],
"case_id": STATE["case_id"],
"sink_hits": STATE["sink_hits"],
"uploads": STATE["uploads"],
"events": STATE["events"],
},
)
return
self._json(404, {"ok": False, "detail": "not found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
raw = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
try:
payload = json.loads(raw.decode("utf-8") or "{}")
except Exception:
payload = {}
if parsed.path == "/seed":
STATE["seeded"] = True
STATE["proof"] = False
STATE["case_id"] = str(payload.get("case_id") or "")
STATE["detail"] = "fixture seeded"
STATE["uploads"] = []
STATE["sink_hits"] = 0
STATE["payload"] = None
_note("seed", STATE["case_id"] or "anonymous")
self._json(200, {"ok": True, "detail": "fixture seeded", "case_id": STATE["case_id"]})
return
if parsed.path == "/attack":
family = str(payload.get("family") or STATE["family"])
STATE["case_id"] = str(payload.get("case_id") or STATE["case_id"])
STATE["payload"] = payload
STATE["proof"] = True
if family == "proxy-boundary":
STATE["detail"] = "trusted forwarded headers crossed the boundary"
elif family == "authz-bypass":
STATE["detail"] = "server-side authorization recheck was bypassed"
elif family == "ssrf":
with urlopen(f"http://127.0.0.1:{PORT}/sink?case_id={STATE['case_id']}") as response:
response.read()
STATE["detail"] = "server-side callback reached the local sink"
elif family == "xss":
STATE["detail"] = "stored payload rendered inside the browser proof page"
elif family == "file-upload":
STATE["uploads"].append(
{
"filename": payload.get("filename") or f"{STATE['case_id']}.txt",
"content": payload.get("content") or "",
}
)
STATE["detail"] = "upload marker accepted and listed"
elif family == "deserialization":
STATE["detail"] = "unsafe object graph decoded without gadget execution"
_note("attack", STATE["detail"])
self._json(200, {"ok": True, "detail": STATE["detail"], "case_id": STATE["case_id"]})
return
self._json(404, {"ok": False, "detail": "not found"})
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
server.serve_forever()