文件
websafe-kb/scripts/intel/pr.py

132 行
4.8 KiB
Python

from __future__ import annotations
import os
import re
from datetime import datetime
from typing import Optional
from urllib.parse import quote
import requests
from intel.config import GENERATED_DIR, ROOT
from intel.utils import read_json, run
PR_PATHS = [
"README.md",
"00-environments",
"01-sql-injection",
"02-xss",
"03-authentication",
"04-server-security",
"05-defense/secure-code",
"06-case-studies/generated-runs",
"07-framework-security",
"08-threat-intel",
"requirements-intel.txt",
"scripts/intel",
"scripts/lab",
"scripts/tool_contract.py",
]
def create_branch_name() -> str:
return "codex/intel-" + datetime.now().strftime("%Y%m%d-%H%M%S")
def _parse_origin() -> Optional[dict]:
result = run(["git", "-C", str(ROOT), "remote", "get-url", "origin"], check=False)
if result.returncode != 0:
return None
url = result.stdout.strip()
match = re.match(r"https://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?", url)
if not match:
return None
return {"host": match.group(1), "owner": match.group(2), "repo": match.group(3), "url": url}
def _changed_paths() -> list[str]:
status = run(["git", "-C", str(ROOT), "status", "--short", "--", *PR_PATHS], check=False)
lines = [line.rstrip() for line in status.stdout.splitlines() if line.strip()]
return lines
def _current_branch() -> str:
result = run(["git", "-C", str(ROOT), "branch", "--show-current"], check=False)
return result.stdout.strip()
def _push_remote(origin: dict, token: str | None) -> str:
if token and origin["url"].startswith("https://"):
return f"https://{quote(origin['owner'], safe='')}:{quote(token, safe='')}@{origin['host']}/{origin['owner']}/{origin['repo']}.git"
return "origin"
def open_pr(base_branch: str = "main", dry_run: bool = False) -> str:
origin = _parse_origin()
if not origin:
raise RuntimeError("Unable to parse origin remote URL")
changed = _changed_paths()
if not changed:
return "No intel-related changes to submit"
branch = _current_branch()
if not branch.startswith("codex/"):
branch = create_branch_name()
if dry_run:
preview = "\n".join(f"- {line}" for line in changed[:40])
return f"Dry run only; would create branch {branch} with these paths:\n{preview}"
if _current_branch() != branch:
run(["git", "-C", str(ROOT), "checkout", "-b", branch])
run(["git", "-C", str(ROOT), "add", "--", *PR_PATHS])
run(["git", "-C", str(ROOT), "commit", "-m", f"lab: automated intel and verification sync {branch}"])
token = os.environ.get("GITEA_TOKEN")
run(["git", "-C", str(ROOT), "push", "-u", _push_remote(origin, token), branch])
if not token:
return f"Pushed branch {branch}, but GITEA_TOKEN is not set; PR not created"
summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
dashboard = read_json(GENERATED_DIR / "dashboard" / "summary.json", default={}) or {}
body_lines = [
"Automated advisory ingest and local verification update.",
"",
f"- New advisories: {summary.get('new_count', 0)}",
f"- Updated advisories: {summary.get('updated_count', 0)}",
f"- Triage count: {summary.get('triage_count', 0)}",
f"- Run bundles: {summary.get('run_bundle_count', 0)}",
f"- verified-real: {dashboard.get('statuses', {}).get('verified-real', 0)}",
f"- verified-synthetic: {dashboard.get('statuses', {}).get('verified-synthetic', 0)}",
f"- blocked-artifact: {dashboard.get('statuses', {}).get('blocked-artifact', 0)}",
f"- triage-manual: {dashboard.get('statuses', {}).get('triage-manual', 0)}",
f"- Failure count: {len(summary.get('failures', []))}",
]
if summary.get("systems_touched"):
body_lines.append(f"- Systems touched: {', '.join(summary['systems_touched'])}")
if summary.get("failures"):
body_lines.extend(["", "Failed source adapters:"])
for failure in summary["failures"]:
body_lines.append(f"- {failure}")
if dashboard.get("recent_failures"):
body_lines.extend(["", "Recent repro blockers:"])
for failure in dashboard["recent_failures"][:10]:
body_lines.append(f"- {failure['run_id']} :: {failure['status']} :: {failure.get('blocked_reason') or '-'}")
payload = {
"title": f"Intel ingest {branch}",
"head": branch,
"base": base_branch,
"body": "\n".join(body_lines),
}
response = requests.post(
f"https://{origin['host']}/api/v1/repos/{origin['owner']}/{origin['repo']}/pulls",
headers={"Authorization": f"token {token}", "Content-Type": "application/json"},
json=payload,
timeout=30,
)
response.raise_for_status()
pr_url = response.json().get("html_url") or response.json().get("url")
return f"Created PR: {pr_url}"