Add gitea-repo-sync skill

这个提交包含在:
X
2026-03-06 18:27:32 -08:00
父节点 71e6c95e23
当前提交 b2c5ef588d
修改 5 个文件,包含 433 行新增0 行删除

查看文件

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Optional
def log(message: str) -> None:
print(f"[gitea-repo-sync] {message}", flush=True)
def api_request(server_url: str, token: str, method: str, path: str, payload=None, ok_not_found: bool = False):
url = f"{server_url.rstrip('/')}/api/v1/{path.lstrip('/')}"
data = None
headers = {
"Authorization": f"token {token}",
"Accept": "application/json",
}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(request, timeout=30) as response:
raw = response.read().decode("utf-8")
return response.status, json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
if ok_not_found and exc.code == 404:
return 404, None
raise RuntimeError(f"HTTP {exc.code} {method} {url}: {body}") from exc
def run_git(args, cwd: Path, auth_header: Optional[str] = None, capture: bool = True) -> str:
cmd = ["git"]
if auth_header:
cmd += ["-c", f"http.extraHeader={auth_header}"]
cmd += args
result = subprocess.run(
cmd,
cwd=str(cwd),
text=True,
capture_output=capture,
check=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
raise RuntimeError(f"git {' '.join(args)} failed: {stderr or stdout}")
return (result.stdout or "").strip()
def is_git_repo(source_dir: Path) -> bool:
result = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
return result.returncode == 0 and (result.stdout or "").strip() == "true"
def ensure_git_repo(source_dir: Path, branch: str, init_git: bool) -> None:
if is_git_repo(source_dir):
return
if not init_git:
raise RuntimeError(f"{source_dir} 不是 Git 仓库;如需初始化请传 --init-git")
log(f"初始化 Git 仓库: {source_dir}")
run_git(["init", "-b", branch], cwd=source_dir)
def branch_name_from_head(source_dir: Path) -> str:
result = subprocess.run(
["git", "symbolic-ref", "--short", "HEAD"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
return (result.stdout or "").strip()
return run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=source_dir)
def ensure_branch(source_dir: Path, branch: str) -> None:
current = branch_name_from_head(source_dir)
if current == branch:
return
result = subprocess.run(
["git", "show-ref", "--verify", f"refs/heads/{branch}"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
run_git(["checkout", branch], cwd=source_dir)
else:
run_git(["checkout", "-b", branch], cwd=source_dir)
def maybe_commit(source_dir: Path, stage_all: bool, commit_message: Optional[str]) -> None:
if not stage_all and not commit_message:
return
if stage_all:
run_git(["add", "-A"], cwd=source_dir)
if commit_message:
status = run_git(["status", "--porcelain"], cwd=source_dir)
if status:
log("创建提交")
run_git(["commit", "-m", commit_message], cwd=source_dir)
def get_current_branch(source_dir: Path) -> str:
return branch_name_from_head(source_dir)
def get_remote_url(source_dir: Path, remote_name: str) -> Optional[str]:
result = subprocess.run(
["git", "remote", "get-url", remote_name],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode != 0:
return None
return (result.stdout or "").strip()
def ensure_remote(source_dir: Path, remote_name: str, remote_url: str, replace_remote: bool) -> None:
current = get_remote_url(source_dir, remote_name)
if not current:
run_git(["remote", "add", remote_name, remote_url], cwd=source_dir)
return
if current == remote_url:
return
if not replace_remote:
raise RuntimeError(
f"remote {remote_name} 已存在且指向 {current};如需替换请显式传 --replace-remote"
)
run_git(["remote", "set-url", remote_name, remote_url], cwd=source_dir)
def basic_auth_header(username: str, token: str) -> str:
raw = f"{username}:{token}".encode("utf-8")
return "Authorization: Basic " + base64.b64encode(raw).decode("ascii")
def ensure_repo(server_url: str, token: str, owner: str, repo: str, description: str, private: bool, default_branch: str):
_, user = api_request(server_url, token, "GET", "/user")
username = user["login"]
status, existing = api_request(server_url, token, "GET", f"/repos/{owner}/{repo}", ok_not_found=True)
if status == 200 and existing:
return username, existing, False
payload = {
"name": repo,
"description": description,
"private": private,
"auto_init": False,
"default_branch": default_branch,
}
if owner == username:
_, created = api_request(server_url, token, "POST", "/user/repos", payload=payload)
else:
_, created = api_request(server_url, token, "POST", f"/orgs/{owner}/repos", payload=payload)
return username, created, True
def parse_args():
parser = argparse.ArgumentParser(description="Create or reuse a Gitea repository and sync a local project.")
parser.add_argument("--server-url", default=os.getenv("GITEA_URL", ""), help="Gitea base URL")
parser.add_argument("--token", default=os.getenv("GITEA_TOKEN", ""), help="Gitea API token")
parser.add_argument("--owner", required=True, help="Repo owner or organization")
parser.add_argument("--repo", required=True, help="Repository name")
parser.add_argument("--source-dir", required=True, help="Local project path")
parser.add_argument("--description", default="", help="Repository description")
parser.add_argument("--branch", default="main", help="Branch to push")
parser.add_argument("--remote-name", default="origin", help="Git remote name")
parser.add_argument("--private", action="store_true", help="Create private repository")
parser.add_argument("--init-git", action="store_true", help="Initialize git when source-dir is not a repository")
parser.add_argument("--replace-remote", action="store_true", help="Replace remote URL when remote exists but differs")
parser.add_argument("--stage-all", action="store_true", help="Run git add -A before commit")
parser.add_argument("--commit-message", default="", help="Create a commit before pushing")
parser.add_argument("--push-all", action="store_true", help="Push all local branches")
parser.add_argument("--tags", action="store_true", help="Push tags after branch push")
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.server_url:
raise RuntimeError("缺少 --server-url 或 GITEA_URL")
if not args.token:
raise RuntimeError("缺少 --token 或 GITEA_TOKEN")
source_dir = Path(args.source_dir).expanduser().resolve()
if not source_dir.exists() or not source_dir.is_dir():
raise RuntimeError(f"source-dir 不存在或不是目录: {source_dir}")
username, repo_info, created = ensure_repo(
server_url=args.server_url,
token=args.token,
owner=args.owner,
repo=args.repo,
description=args.description,
private=args.private,
default_branch=args.branch,
)
remote_url = repo_info.get("clone_url") or repo_info.get("html_url", "").rstrip("/") + ".git"
if not remote_url:
raise RuntimeError("未能从 Gitea 响应中获取 clone_url")
ensure_git_repo(source_dir, args.branch, args.init_git)
ensure_branch(source_dir, args.branch)
maybe_commit(source_dir, args.stage_all, args.commit_message or None)
ensure_remote(source_dir, args.remote_name, remote_url, args.replace_remote)
auth_header = basic_auth_header(username, args.token)
branch = get_current_branch(source_dir)
if args.push_all:
log("推送全部分支")
run_git(["push", args.remote_name, "--all"], cwd=source_dir, auth_header=auth_header, capture=True)
else:
log(f"推送分支: {branch}")
run_git(["push", "--set-upstream", args.remote_name, branch], cwd=source_dir, auth_header=auth_header, capture=True)
if args.tags:
log("推送标签")
run_git(["push", args.remote_name, "--tags"], cwd=source_dir, auth_header=auth_header, capture=True)
summary = {
"created": created,
"owner": args.owner,
"repo": args.repo,
"branch": branch,
"source_dir": str(source_dir),
"remote_name": args.remote_name,
"remote_url": remote_url,
"html_url": repo_info.get("html_url"),
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1)