文件
websafe-kb/scripts/lab/baseline.py

65 行
2.5 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List
import requests
from lab.utils import write_json
def collect(profile: Dict[str, Any], run_dir: Path, timeout: float = 8.0) -> Dict[str, Any]:
observations: List[Dict[str, Any]] = []
steps: List[Dict[str, Any]] = []
for url in profile.get("baseline_urls", []):
try:
response = requests.get(url, timeout=timeout, verify=False)
observations.append(
{
"url": url,
"status_code": response.status_code,
"headers": dict(response.headers),
"body_excerpt": response.text[:400],
}
)
except Exception as exc:
observations.append({"url": url, "error": str(exc)})
base_url = str((profile.get("baseline_urls") or [""])[0]).rstrip("/")
for action in profile.get("baseline_actions", []) or []:
kind = action.get("kind", "note")
if kind == "note":
steps.append({"kind": kind, "status": "recorded", "message": action.get("message", "")})
continue
try:
if kind == "http-get":
path = action.get("path", "/")
response = requests.get(f"{base_url}{path}", timeout=timeout)
steps.append(
{
"kind": kind,
"status": "completed",
"path": path,
"status_code": response.status_code,
"body_excerpt": response.text[:200],
}
)
elif kind == "http-post":
path = action.get("path", "/")
response = requests.post(f"{base_url}{path}", json=action.get("json", {}), timeout=timeout)
steps.append(
{
"kind": kind,
"status": "completed",
"path": path,
"status_code": response.status_code,
"body_excerpt": response.text[:200],
}
)
else:
steps.append({"kind": kind, "status": "skipped", "message": "baseline action type not automated"})
except Exception as exc:
steps.append({"kind": kind, "status": "failed", "message": str(exc)})
payload = {"observations": observations, "steps": steps}
write_json(run_dir / "logs" / "baseline.json", payload)
return payload