from __future__ import annotations import os import re from datetime import datetime from typing import Optional from urllib.parse import quote import requests from intel.config import GENERATED_DIR, ROOT from intel.utils import read_json, run PR_PATHS = [ "README.md", "00-environments", "01-sql-injection", "02-xss", "03-authentication", "04-server-security", "05-defense/secure-code", "06-case-studies/generated-runs", "07-framework-security", "08-threat-intel", "requirements-intel.txt", "scripts/intel", "scripts/lab", "scripts/tool_contract.py", ] def create_branch_name() -> str: return "codex/intel-" + datetime.now().strftime("%Y%m%d-%H%M%S") def _parse_origin() -> Optional[dict]: result = run(["git", "-C", str(ROOT), "remote", "get-url", "origin"], check=False) if result.returncode != 0: return None url = result.stdout.strip() match = re.match(r"https://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?", url) if not match: return None return {"host": match.group(1), "owner": match.group(2), "repo": match.group(3), "url": url} def _changed_paths() -> list[str]: status = run(["git", "-C", str(ROOT), "status", "--short", "--", *PR_PATHS], check=False) lines = [line.rstrip() for line in status.stdout.splitlines() if line.strip()] return lines def _current_branch() -> str: result = run(["git", "-C", str(ROOT), "branch", "--show-current"], check=False) return result.stdout.strip() def _push_remote(origin: dict, token: str | None) -> str: if token and origin["url"].startswith("https://"): return f"https://{quote(origin['owner'], safe='')}:{quote(token, safe='')}@{origin['host']}/{origin['owner']}/{origin['repo']}.git" return "origin" def open_pr(base_branch: str = "main", dry_run: bool = False) -> str: origin = _parse_origin() if not origin: raise RuntimeError("Unable to parse origin remote URL") changed = _changed_paths() if not changed: return "No intel-related changes to submit" branch = _current_branch() if not branch.startswith("codex/"): branch = create_branch_name() if dry_run: preview = "\n".join(f"- {line}" for line in changed[:40]) return f"Dry run only; would create branch {branch} with these paths:\n{preview}" if _current_branch() != branch: run(["git", "-C", str(ROOT), "checkout", "-b", branch]) run(["git", "-C", str(ROOT), "add", "--", *PR_PATHS]) run(["git", "-C", str(ROOT), "commit", "-m", f"lab: automated intel and verification sync {branch}"]) token = os.environ.get("GITEA_TOKEN") run(["git", "-C", str(ROOT), "push", "-u", _push_remote(origin, token), branch]) if not token: return f"Pushed branch {branch}, but GITEA_TOKEN is not set; PR not created" summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {} dashboard = read_json(GENERATED_DIR / "dashboard" / "summary.json", default={}) or {} body_lines = [ "Automated advisory ingest and local verification update.", "", f"- New advisories: {summary.get('new_count', 0)}", f"- Updated advisories: {summary.get('updated_count', 0)}", f"- Triage count: {summary.get('triage_count', 0)}", f"- Run bundles: {summary.get('run_bundle_count', 0)}", f"- verified-real: {dashboard.get('statuses', {}).get('verified-real', 0)}", f"- verified-synthetic: {dashboard.get('statuses', {}).get('verified-synthetic', 0)}", f"- blocked-artifact: {dashboard.get('statuses', {}).get('blocked-artifact', 0)}", f"- triage-manual: {dashboard.get('statuses', {}).get('triage-manual', 0)}", f"- Failure count: {len(summary.get('failures', []))}", ] if summary.get("systems_touched"): body_lines.append(f"- Systems touched: {', '.join(summary['systems_touched'])}") if summary.get("failures"): body_lines.extend(["", "Failed source adapters:"]) for failure in summary["failures"]: body_lines.append(f"- {failure}") if dashboard.get("recent_failures"): body_lines.extend(["", "Recent repro blockers:"]) for failure in dashboard["recent_failures"][:10]: body_lines.append(f"- {failure['run_id']} :: {failure['status']} :: {failure.get('blocked_reason') or '-'}") payload = { "title": f"Intel ingest {branch}", "head": branch, "base": base_branch, "body": "\n".join(body_lines), } response = requests.post( f"https://{origin['host']}/api/v1/repos/{origin['owner']}/{origin['repo']}/pulls", headers={"Authorization": f"token {token}", "Content-Type": "application/json"}, json=payload, timeout=30, ) response.raise_for_status() pr_url = response.json().get("html_url") or response.json().get("url") return f"Created PR: {pr_url}"