33 行
1.1 KiB
Python
33 行
1.1 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
from lab.utils import command_available, run, write_json
|
|
|
|
|
|
def collect_container_logs(run_dir: Path, compose_path: Path) -> List[str]:
|
|
if not command_available("docker"):
|
|
return []
|
|
log_dir = run_dir / "logs" / "docker"
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
ps = run(["docker", "compose", "-f", str(compose_path), "ps", "--services"], cwd=run_dir)
|
|
refs: List[str] = []
|
|
if ps.returncode != 0:
|
|
return refs
|
|
for service in ps.stdout.splitlines():
|
|
service = service.strip()
|
|
if not service:
|
|
continue
|
|
result = run(["docker", "compose", "-f", str(compose_path), "logs", "--no-color", service], cwd=run_dir)
|
|
path = log_dir / f"{service}.log"
|
|
path.write_text(result.stdout or result.stderr or "", encoding="utf-8")
|
|
refs.append(str(path))
|
|
return refs
|
|
|
|
|
|
def write_run_bundle(run_dir: Path, bundle: Dict[str, Any]) -> Path:
|
|
path = run_dir / "run.json"
|
|
write_json(path, bundle)
|
|
return path
|