261 行
9.5 KiB
Python
可执行文件
261 行
9.5 KiB
Python
可执行文件
#!/usr/bin/env python3
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
def log(message: str) -> None:
|
|
print(f"[gitea-repo-sync] {message}", flush=True)
|
|
|
|
|
|
def api_request(server_url: str, token: str, method: str, path: str, payload=None, ok_not_found: bool = False):
|
|
url = f"{server_url.rstrip('/')}/api/v1/{path.lstrip('/')}"
|
|
data = None
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
}
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
request = urllib.request.Request(url, data=data, method=method.upper(), headers=headers)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
raw = response.read().decode("utf-8")
|
|
return response.status, json.loads(raw) if raw else {}
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
if ok_not_found and exc.code == 404:
|
|
return 404, None
|
|
raise RuntimeError(f"HTTP {exc.code} {method} {url}: {body}") from exc
|
|
|
|
|
|
def run_git(args, cwd: Path, auth_header: Optional[str] = None, capture: bool = True) -> str:
|
|
cmd = ["git"]
|
|
if auth_header:
|
|
cmd += ["-c", f"http.extraHeader={auth_header}"]
|
|
cmd += args
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=str(cwd),
|
|
text=True,
|
|
capture_output=capture,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
stderr = (result.stderr or "").strip()
|
|
stdout = (result.stdout or "").strip()
|
|
raise RuntimeError(f"git {' '.join(args)} failed: {stderr or stdout}")
|
|
return (result.stdout or "").strip()
|
|
|
|
|
|
def is_git_repo(source_dir: Path) -> bool:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--is-inside-work-tree"],
|
|
cwd=str(source_dir),
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
return result.returncode == 0 and (result.stdout or "").strip() == "true"
|
|
|
|
|
|
def ensure_git_repo(source_dir: Path, branch: str, init_git: bool) -> None:
|
|
if is_git_repo(source_dir):
|
|
return
|
|
if not init_git:
|
|
raise RuntimeError(f"{source_dir} 不是 Git 仓库;如需初始化请传 --init-git")
|
|
log(f"初始化 Git 仓库: {source_dir}")
|
|
run_git(["init", "-b", branch], cwd=source_dir)
|
|
|
|
|
|
def branch_name_from_head(source_dir: Path) -> str:
|
|
result = subprocess.run(
|
|
["git", "symbolic-ref", "--short", "HEAD"],
|
|
cwd=str(source_dir),
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
return (result.stdout or "").strip()
|
|
return run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=source_dir)
|
|
|
|
|
|
def ensure_branch(source_dir: Path, branch: str) -> None:
|
|
current = branch_name_from_head(source_dir)
|
|
if current == branch:
|
|
return
|
|
result = subprocess.run(
|
|
["git", "show-ref", "--verify", f"refs/heads/{branch}"],
|
|
cwd=str(source_dir),
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
run_git(["checkout", branch], cwd=source_dir)
|
|
else:
|
|
run_git(["checkout", "-b", branch], cwd=source_dir)
|
|
|
|
|
|
def maybe_commit(source_dir: Path, stage_all: bool, commit_message: Optional[str]) -> None:
|
|
if not stage_all and not commit_message:
|
|
return
|
|
if stage_all:
|
|
run_git(["add", "-A"], cwd=source_dir)
|
|
if commit_message:
|
|
status = run_git(["status", "--porcelain"], cwd=source_dir)
|
|
if status:
|
|
log("创建提交")
|
|
run_git(["commit", "-m", commit_message], cwd=source_dir)
|
|
|
|
|
|
def get_current_branch(source_dir: Path) -> str:
|
|
return branch_name_from_head(source_dir)
|
|
|
|
|
|
def get_remote_url(source_dir: Path, remote_name: str) -> Optional[str]:
|
|
result = subprocess.run(
|
|
["git", "remote", "get-url", remote_name],
|
|
cwd=str(source_dir),
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
return (result.stdout or "").strip()
|
|
|
|
|
|
def ensure_remote(source_dir: Path, remote_name: str, remote_url: str, replace_remote: bool) -> None:
|
|
current = get_remote_url(source_dir, remote_name)
|
|
if not current:
|
|
run_git(["remote", "add", remote_name, remote_url], cwd=source_dir)
|
|
return
|
|
if current == remote_url:
|
|
return
|
|
if not replace_remote:
|
|
raise RuntimeError(
|
|
f"remote {remote_name} 已存在且指向 {current};如需替换请显式传 --replace-remote"
|
|
)
|
|
run_git(["remote", "set-url", remote_name, remote_url], cwd=source_dir)
|
|
|
|
|
|
def basic_auth_header(username: str, token: str) -> str:
|
|
raw = f"{username}:{token}".encode("utf-8")
|
|
return "Authorization: Basic " + base64.b64encode(raw).decode("ascii")
|
|
|
|
|
|
def ensure_repo(server_url: str, token: str, owner: str, repo: str, description: str, private: bool, default_branch: str):
|
|
_, user = api_request(server_url, token, "GET", "/user")
|
|
username = user["login"]
|
|
status, existing = api_request(server_url, token, "GET", f"/repos/{owner}/{repo}", ok_not_found=True)
|
|
if status == 200 and existing:
|
|
return username, existing, False
|
|
|
|
payload = {
|
|
"name": repo,
|
|
"description": description,
|
|
"private": private,
|
|
"auto_init": False,
|
|
"default_branch": default_branch,
|
|
}
|
|
if owner == username:
|
|
_, created = api_request(server_url, token, "POST", "/user/repos", payload=payload)
|
|
else:
|
|
_, created = api_request(server_url, token, "POST", f"/orgs/{owner}/repos", payload=payload)
|
|
return username, created, True
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Create or reuse a Gitea repository and sync a local project.")
|
|
parser.add_argument("--server-url", default=os.getenv("GITEA_URL", ""), help="Gitea base URL")
|
|
parser.add_argument("--token", default=os.getenv("GITEA_TOKEN", ""), help="Gitea API token")
|
|
parser.add_argument("--owner", required=True, help="Repo owner or organization")
|
|
parser.add_argument("--repo", required=True, help="Repository name")
|
|
parser.add_argument("--source-dir", required=True, help="Local project path")
|
|
parser.add_argument("--description", default="", help="Repository description")
|
|
parser.add_argument("--branch", default="main", help="Branch to push")
|
|
parser.add_argument("--remote-name", default="origin", help="Git remote name")
|
|
parser.add_argument("--private", action="store_true", help="Create private repository")
|
|
parser.add_argument("--init-git", action="store_true", help="Initialize git when source-dir is not a repository")
|
|
parser.add_argument("--replace-remote", action="store_true", help="Replace remote URL when remote exists but differs")
|
|
parser.add_argument("--stage-all", action="store_true", help="Run git add -A before commit")
|
|
parser.add_argument("--commit-message", default="", help="Create a commit before pushing")
|
|
parser.add_argument("--push-all", action="store_true", help="Push all local branches")
|
|
parser.add_argument("--tags", action="store_true", help="Push tags after branch push")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if not args.server_url:
|
|
raise RuntimeError("缺少 --server-url 或 GITEA_URL")
|
|
if not args.token:
|
|
raise RuntimeError("缺少 --token 或 GITEA_TOKEN")
|
|
|
|
source_dir = Path(args.source_dir).expanduser().resolve()
|
|
if not source_dir.exists() or not source_dir.is_dir():
|
|
raise RuntimeError(f"source-dir 不存在或不是目录: {source_dir}")
|
|
|
|
username, repo_info, created = ensure_repo(
|
|
server_url=args.server_url,
|
|
token=args.token,
|
|
owner=args.owner,
|
|
repo=args.repo,
|
|
description=args.description,
|
|
private=args.private,
|
|
default_branch=args.branch,
|
|
)
|
|
remote_url = repo_info.get("clone_url") or repo_info.get("html_url", "").rstrip("/") + ".git"
|
|
if not remote_url:
|
|
raise RuntimeError("未能从 Gitea 响应中获取 clone_url")
|
|
|
|
ensure_git_repo(source_dir, args.branch, args.init_git)
|
|
ensure_branch(source_dir, args.branch)
|
|
maybe_commit(source_dir, args.stage_all, args.commit_message or None)
|
|
ensure_remote(source_dir, args.remote_name, remote_url, args.replace_remote)
|
|
|
|
auth_header = basic_auth_header(username, args.token)
|
|
branch = get_current_branch(source_dir)
|
|
if args.push_all:
|
|
log("推送全部分支")
|
|
run_git(["push", args.remote_name, "--all"], cwd=source_dir, auth_header=auth_header, capture=True)
|
|
else:
|
|
log(f"推送分支: {branch}")
|
|
run_git(["push", "--set-upstream", args.remote_name, branch], cwd=source_dir, auth_header=auth_header, capture=True)
|
|
|
|
if args.tags:
|
|
log("推送标签")
|
|
run_git(["push", args.remote_name, "--tags"], cwd=source_dir, auth_header=auth_header, capture=True)
|
|
|
|
summary = {
|
|
"created": created,
|
|
"owner": args.owner,
|
|
"repo": args.repo,
|
|
"branch": branch,
|
|
"source_dir": str(source_dir),
|
|
"remote_name": args.remote_name,
|
|
"remote_url": remote_url,
|
|
"html_url": repo_info.get("html_url"),
|
|
}
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except Exception as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
raise SystemExit(1)
|