更新: 421 个文件 - 2026-03-17 18:30:02

这个提交包含在:
hao
2026-03-17 18:30:02 -07:00
父节点 29c3faaa28
当前提交 a3edc88834
修改 421 个文件,包含 12474 行新增5845 行删除

查看文件

@@ -0,0 +1,4 @@
from lab.runners.dispatcher import run_attack, run_seed
__all__ = ["run_seed", "run_attack"]

查看文件

@@ -0,0 +1,169 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List
import requests
from lab.utils import ensure_dir, write_json
@dataclass
class RunnerContext:
profile: Dict[str, Any]
advisory: Dict[str, Any]
run_dir: Path
@property
def base_url(self) -> str:
return str((self.profile.get("baseline_urls") or [""])[0]).rstrip("/")
@property
def family(self) -> str:
return str(self.profile.get("vuln_family") or "").strip()
@property
def runner_id(self) -> str:
return str(self.profile.get("runner_id") or "").strip()
@property
def logs_dir(self) -> Path:
path = self.run_dir / "logs"
ensure_dir(path)
return path
def _request(
context: RunnerContext,
method: str,
path: str,
*,
json_payload: Dict[str, Any] | None = None,
timeout: float = 15.0,
) -> Dict[str, Any]:
response = requests.request(
method=method,
url=f"{context.base_url}{path}",
json=json_payload,
timeout=timeout,
)
body: Dict[str, Any]
try:
parsed = response.json()
body = parsed if isinstance(parsed, dict) else {"data": parsed}
except Exception:
body = {"text": response.text[:1000]}
return {
"status_code": response.status_code,
"ok": response.ok,
"body": body,
}
def _seed_payload(context: RunnerContext) -> Dict[str, Any]:
return {
"system_id": context.advisory["system_id"],
"family": context.family,
"case_id": context.advisory["canonical_id"],
"title": context.advisory.get("title"),
}
def _attack_payload(context: RunnerContext, family: str) -> Dict[str, Any]:
case_id = context.advisory["canonical_id"]
payloads = {
"proxy-boundary": {
"family": family,
"case_id": case_id,
"headers": {"x-forwarded-user": "admin", "x-forwarded-host": "internal.lab"},
},
"authz-bypass": {
"family": family,
"case_id": case_id,
"actor": "guest",
"target": "/admin",
"override": "bypass",
},
"ssrf": {
"family": family,
"case_id": case_id,
"target_url": "http://127.0.0.1/sink",
},
"xss": {
"family": family,
"case_id": case_id,
"payload": f"<script>document.body.dataset.case='{case_id}'</script>",
},
"file-upload": {
"family": family,
"case_id": case_id,
"filename": f"{case_id}.txt",
"content": f"fixture upload marker for {case_id}",
},
"deserialization": {
"family": family,
"case_id": case_id,
"payload": {"marker": case_id, "mode": "inert-object"},
},
}
return payloads[family]
def run_fixture_seed(context: RunnerContext, family: str) -> Dict[str, Any]:
result = _request(context, "POST", "/seed", json_payload=_seed_payload(context))
payload = {
"steps": [
{
"kind": "runner",
"tool": context.runner_id or f"{context.advisory['system_id']}.{family}",
"status": "completed" if result["ok"] else "failed",
"status_code": result["status_code"],
"detail": result["body"].get("detail") or "seed request completed",
}
],
"seeded": bool(result["ok"]),
"result": result,
}
write_json(context.logs_dir / "seed.json", payload)
return payload
def run_fixture_attack(context: RunnerContext, family: str) -> Dict[str, Any]:
before: Dict[str, Any] = {}
if family in {"proxy-boundary", "authz-bypass"}:
before = _request(context, "GET", "/admin")
attack = _request(context, "POST", "/attack", json_payload=_attack_payload(context, family))
proof = _request(context, "GET", "/proof")
after: Dict[str, Any] = {}
if family in {"proxy-boundary", "authz-bypass"}:
after = _request(context, "GET", "/admin")
success = bool(attack["ok"] and proof["ok"] and proof["body"].get("success"))
step = {
"kind": "runner",
"tool": context.runner_id or f"{context.advisory['system_id']}.{family}",
"status": "completed" if success else "failed",
"status_code": attack["status_code"],
"result_path": str(context.logs_dir / "attack.json"),
}
payload = {
"steps": [step],
"success": success,
"detail": proof["body"].get("detail") or attack["body"].get("detail") or "runner attack finished",
"before": before,
"attack": attack,
"after": after,
"proof": proof,
"assertions": [
{
"name": "proof-success",
"kind": "runner-proof",
"passed": success,
"detail": proof["body"].get("detail") or "runner proof endpoint returned success",
}
],
}
write_json(context.logs_dir / "attack.json", payload)
return payload

查看文件

@@ -0,0 +1,36 @@
from __future__ import annotations
import importlib
from pathlib import Path
from typing import Any, Dict
from lab.runners.common import RunnerContext
def _module_name(profile: Dict[str, Any]) -> str:
runner_id = str(profile.get("runner_id") or "").strip()
if runner_id:
system_name, family_name = runner_id.split(".", 1)
else:
system_name = str(profile.get("system_id") or "").strip()
family_name = str(profile.get("vuln_family") or "").strip()
system_name = system_name.replace("-", "_")
family_name = family_name.replace("-", "_")
return f"lab.runners.{system_name}.{family_name}"
def _load_runner(profile: Dict[str, Any]):
return importlib.import_module(_module_name(profile))
def run_seed(profile: Dict[str, Any], advisory: Dict[str, Any], run_dir: Path) -> Dict[str, Any]:
module = _load_runner(profile)
context = RunnerContext(profile=profile, advisory=advisory, run_dir=run_dir)
return module.run_seed(context)
def run_attack(profile: Dict[str, Any], advisory: Dict[str, Any], run_dir: Path) -> Dict[str, Any]:
module = _load_runner(profile)
context = RunnerContext(profile=profile, advisory=advisory, run_dir=run_dir)
return module.run_attack(context)

查看文件

@@ -0,0 +1,2 @@
"""Gitea family runners."""

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "authz-bypass")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "authz-bypass")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "file-upload")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "file-upload")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "proxy-boundary")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "proxy-boundary")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "ssrf")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "ssrf")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "xss")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "xss")

查看文件

@@ -0,0 +1,2 @@
"""Next.js family runners."""

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "authz-bypass")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "authz-bypass")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "deserialization")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "deserialization")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "proxy-boundary")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "proxy-boundary")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "ssrf")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "ssrf")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "xss")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "xss")

查看文件

@@ -0,0 +1,2 @@
"""Undici family runners."""

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "ssrf")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "ssrf")

查看文件

@@ -0,0 +1,2 @@
"""Vite family runners."""

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "file-upload")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "file-upload")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "proxy-boundary")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "proxy-boundary")

查看文件

@@ -0,0 +1,10 @@
from lab.runners.common import RunnerContext, run_fixture_attack, run_fixture_seed
def run_seed(context: RunnerContext):
return run_fixture_seed(context, "xss")
def run_attack(context: RunnerContext):
return run_fixture_attack(context, "xss")