比较提交

...

3 次代码提交

作者 SHA1 备注 提交日期
Codex
fc8ad7c145 Add codex-task-server-sync skill 2026-03-16 23:46:45 -07:00
X
b2c5ef588d Add gitea-repo-sync skill 2026-03-06 18:27:32 -08:00
X
71e6c95e23 Add CN86 SMS keyword verification skill and README 2026-03-06 01:13:09 -08:00
修改 16 个文件,包含 2253 行新增0 行删除

48
README.md 普通文件
查看文件

@@ -0,0 +1,48 @@
# Skills Repository
这个仓库收集可直接复用的 Codex skills。每个技能目录至少包含一个 `SKILL.md`,部分技能还会附带 `scripts/``references/``agents/` 等资源,方便在具体任务里稳定复用。
## 使用方式
1. 进入对应技能目录阅读 `SKILL.md`
2. 按说明设置运行时环境变量,不要把真实密钥提交进仓库
3. 优先复用技能自带脚本,而不是重复写临时命令
4. 涉及外部 API 的技能,先做余额/连通性检查,再跑主流程
## 当前技能
| Skill | 说明 |
| --- | --- |
| `captcha-third-party-services` | 统一封装 2Captcha / YesCaptcha / Anti-Captcha 的官方 API 工作流。 |
| `cliproxy-traffic-proxy` | CLIProxy 流量代理相关技能。 |
| `codex-task-server-sync` | 把本地 Codex 工作区迁移到服务器并通过 Gitea 持续双向同步。 |
| `cn86-sms-keyword-verification` | 86 手机号 + 关键词验证码流程,基于 LubanSMS API 完成取号、取码、提码、释放号码。 |
| `email-verification` | 邮箱验证码获取服务,支持临时邮箱 API 拉取验证码、链接和邮件内容。 |
| `gitea-repo-sync` | 在 Gitea 上创建仓库并把本地项目安全同步过去。 |
| `similarweb-analytics` | Similarweb 分析相关技能。 |
| `simple-llm` | 轻量 LLM 调用技能。 |
| `uiuxmax` | UI/UX 设计辅助技能。 |
| `web-reverse` | 网页逆向工程技能,定位签名、加密、解密和接口复现。 |
## 新增技能:`cn86-sms-keyword-verification`
适用场景:
- 先取一个中国手机号,再按关键词轮询验证码
- 对接千问等短信登录/注册流程
- 需要显式的释放号码与历史排查动作
运行示例:
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
python3 cn86-sms-keyword-verification/scripts/lubansms_keyword_cli.py balance
python3 cn86-sms-keyword-verification/scripts/lubansms_keyword_cli.py demo --keyword 千问 --timeout 300 --interval 5
```
## 维护建议
- 新增 skill 时,保持目录名、`name` 字段和用途一致
- 密钥统一走环境变量,不进仓库
- 需要稳定 API 调用时,优先提交脚本到 `scripts/`
- 新增 skill 后同步更新本 README

查看文件

@@ -0,0 +1,127 @@
---
name: cn86-sms-keyword-verification
description: 86手机号关键词验证码流程,基于 LubanSMS 通用短信接收 API 完成取号、轮询短信、提取验证码、释放号码,并可用“千问”等关键词做短信登录/注册 demo。
type: workflow
domain: utilities
version: 1.0.0
tags: [sms, verification, phone, keyword, china, qwen, lubansms]
triggers:
keywords:
primary: [86 手机号, 86手机号, 手机号获取, 短信验证码, 关键词验证码, 获取验证码, 接码]
secondary: [千问验证码, qwen sms, lubansms, getKeywordNumber, getKeywordSms, keywordSmsHistory]
context_boost: [注册, 登录, 验证码, 短信, 手机号, 关键词]
priority: high
---
# CN86 SMS Keyword Verification
86 手机号 + 关键词获取验证码的标准流程技能。
本技能使用 `https://lubansms.com/v2/api` 的“通用短信接收”接口做 demo。你给的内部参考文档 `docs/sms-register-qwen.md` 里,附录实际也是按这组 LubanSMS 接口在写;文档里提到的千问Qwen示例关键词是 `千问`
如需稳定执行,优先使用打包好的 `scripts/lubansms_keyword_cli.py`,不要手写临时 curl 再去人肉判断状态。
## 什么时候用
- 需要先拿一个 86 号码,再根据短信关键词轮询验证码
- 目标站点会把验证码发到中国手机号,但平台只给“按关键词取短信”的能力
- 要做可重复的注册/登录自动化 demo,例如千问短信登录
- 需要历史记录、余额检查、释放号码这些配套动作
## 需要的输入
- `keyword`:短信中能稳定命中的关键词,例如 `千问`
- `LUBAN_SMS_APIKEY`:运行时环境变量,不要写死进仓库
- 可选 `phone`:想复用已有号码时传入;留空则随机取号
- 可选 `timeout` / `interval`:轮询等待时长和间隔
## API Key 处理
优先使用环境变量:
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
export LUBAN_SMS_API_BASE='https://lubansms.com/v2/api'
```
只在一次性调试时才用 `--api-key` 直传。
## 标准流程
1. 先查余额,避免轮询到一半才发现 key 无效或余额不足。
2.`getKeywordNumber` 申请号码。
3. 规范化手机号:
- API 内部继续使用原始数字串,例如 `16741251148`
- 对外展示可拼成 `+8616741251148`
- 如果目标站点像千问一样把国家码拆开填,就传 `phoneCode=86` + 原始手机号
4. 在目标站点触发发送短信。
5.`getKeywordSms` 按关键词轮询短信。
6. 从返回短信正文中提取验证码。
7. 成功或失败后都调用 `delKeywordNumber` 释放号码。
8. 排查问题时再查 `keywordSmsHistory`
## 平台状态判断
- `code=0`:成功
- `code=400``msg=尚未收到短信,请稍后重试`:可继续轮询
- `code=400``msg=不正确的apikey`:立即停止,检查 key
- 其他 `code!=0`:视为 API 失败,不要盲目重试到超时
## 千问QwenDemo
千问短信内容在参考文档里使用的关键词是 `千问`
### 1. 先检查余额
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
python3 scripts/lubansms_keyword_cli.py balance
```
### 2. 一次性 demo取号 → 等短信 → 提取验证码 → 释放号码
```bash
python3 scripts/lubansms_keyword_cli.py demo \
--keyword 千问 \
--timeout 300 \
--interval 5
```
这个命令会:
- 申请一个随机 86 号码
- 输出原始号码和 `+86` 格式
- 等你在目标站点触发短信发送
- 轮询关键词短信并提取验证码
- 默认在结束时释放号码
### 3. 拆开执行
```bash
python3 scripts/lubansms_keyword_cli.py request-number
python3 scripts/lubansms_keyword_cli.py wait-code --phone 16741251148 --keyword 千问
python3 scripts/lubansms_keyword_cli.py release --phone 16741251148
```
## 常见坑
- 不要把 `LUBAN_SMS_APIKEY` 写进提交文件。
- 不要把 `+86``86` 前缀后的完整国际格式直接塞回 `phone=` 参数;平台接口通常要原始号码数字串。
- 不要忘记释放号码;无论成功、失败、超时都应该释放。
- 不要把“尚未收到短信”当成致命错误;这是正常轮询态。
- 不要只看关键词命中,不提取验证码;很多自动化链路最后需要明确的 OTP 数值。
## 推荐脚本
使用 `scripts/lubansms_keyword_cli.py`
- `balance`:查余额
- `request-number`:申请号码
- `get-sms`:查一次关键词短信
- `wait-code`:轮询直到拿到验证码或超时
- `release`:释放号码
- `history`:查历史记录
- `demo`:完整演示流程
## 参考资料
- `references/lubansms-and-qwen-notes.md`

查看文件

@@ -0,0 +1,4 @@
interface:
display_name: "CN86 SMS Keyword Verification"
short_description: "Get 86 phone numbers and OTP codes via LubanSMS keyword APIs"
default_prompt: "Use LubanSMS keyword APIs to request an 86 phone number, poll SMS by keyword, extract the OTP code, and release the number for flows like Qwen SMS login."

查看文件

@@ -0,0 +1,96 @@
# LubanSMS / Qwen Notes
## 来源
这份参考整理自两部分:
1. 官方 API 文档:`https://lubansms.com/api_docs/`
2. 内部参考:`hao/one` 仓库里的 `docs/sms-register-qwen.md`
## 关键结论
- 这次要做的“86 手机号 + 关键词获取验证码”流程,落地接口是 LubanSMS 的 **通用短信接收** 系列。
- 内部参考文档把千问Qwen作为示例站点,关键词给的是 `千问`
- 运行时应该使用环境变量 `LUBAN_SMS_APIKEY`,而不是把真实 key 提交到仓库。
## 官方接口
### 1. 查询余额
`GET /getBalance?apikey=YOUR_APIKEY`
成功示例:
```json
{"code":0,"msg":"","balance":"96.72"}
```
### 2. 请求号码
`GET /getKeywordNumber?apikey=YOUR_APIKEY&phone=&cardType=全部`
成功示例:
```json
{"code":0,"msg":"","phone":"18888888888"}
```
说明:
- `phone` 留空表示随机号码
- `cardType` 在官方文档里标成“已弃用”,但仍可兼容传 `全部`
### 3. 获取关键词短信
`GET /getKeywordSms?apikey=YOUR_APIKEY&phone=<手机号>&keyword=<关键词>`
等待中:
```json
{"code":400,"msg":"尚未收到短信,请稍后重试"}
```
收到短信:
```json
{"code":0,"msg":"【百度】验证码xxxx,您正在进行登陆验证."}
```
### 4. 释放号码
`GET /delKeywordNumber?apikey=YOUR_APIKEY&phone=<手机号>`
成功示例:
```json
{"code":0,"msg":""}
```
### 5. 查询关键词短信历史
`GET /keywordSmsHistory?apikey=YOUR_APIKEY&page=1`
用途:
- 验证关键词是否正确
- 排查目标站点是否实际发过短信
- 辅助确认短信模板和验证码格式
## 千问专用备注
内部参考文档给出的要点:
- 千问示例关键词:`千问`
- 站点如果把国家码和手机号拆开,使用 `phoneCode=86` + 原始手机号
- 接码平台 API 内部仍然按原始手机号查询,不要拼上 `+86`
## 2026-03-06 验证结论
已验证:
- `getBalance` 可正常返回余额
- `keywordSmsHistory` 可正常返回最近的千问短信历史
- 说明本技能里的示例流程和关键词方向是成立的
未在仓库中提交:
- 实际 API key
- 实际号码或敏感历史记录

查看文件

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, Optional
DEFAULT_BASE_URL = os.getenv("LUBAN_SMS_API_BASE", "https://lubansms.com/v2/api")
DEFAULT_HTTP_TIMEOUT = 30
WAITING_MESSAGES = (
"尚未收到短信,请稍后重试",
"wait",
)
class LubanSMSError(RuntimeError):
pass
@dataclass
class PhoneFormats:
raw: str
e164: str
class LubanSMSClient:
def __init__(self, api_key: str, base_url: str = DEFAULT_BASE_URL, http_timeout: int = DEFAULT_HTTP_TIMEOUT) -> None:
if not api_key:
raise LubanSMSError("missing api key: set LUBAN_SMS_APIKEY or pass --api-key")
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.http_timeout = http_timeout
def _request(self, path: str, **params: Any) -> Dict[str, Any]:
query = {"apikey": self.api_key}
for key, value in params.items():
if value is None:
continue
query[key] = value
url = f"{self.base_url}/{path}?{urllib.parse.urlencode(query)}"
request = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"User-Agent": "codex-cn86-sms-keyword-verification/1.0",
},
)
try:
with urllib.request.urlopen(request, timeout=self.http_timeout) as response:
raw = response.read().decode("utf-8", errors="replace")
except Exception as exc:
raise LubanSMSError(f"request failed for {path}: {exc}") from exc
try:
payload = json.loads(raw)
except json.JSONDecodeError as exc:
raise LubanSMSError(f"invalid json from {path}: {raw}") from exc
return payload
def get_balance(self) -> Dict[str, Any]:
return self._request("getBalance")
def request_keyword_number(self, phone: str = "", card_type: str = "全部") -> Dict[str, Any]:
return self._request("getKeywordNumber", phone=normalize_phone_input(phone), cardType=card_type)
def get_keyword_sms(self, phone: str, keyword: str) -> Dict[str, Any]:
return self._request("getKeywordSms", phone=normalize_phone_input(phone), keyword=keyword)
def release_keyword_number(self, phone: str) -> Dict[str, Any]:
return self._request("delKeywordNumber", phone=normalize_phone_input(phone))
def keyword_sms_history(self, page: int = 1) -> Dict[str, Any]:
return self._request("keywordSmsHistory", page=page)
def normalize_phone_input(phone: str) -> str:
digits = re.sub(r"\D", "", phone or "")
if digits.startswith("86") and len(digits) > 11:
digits = digits[2:]
return digits
def format_cn86_phone(phone: str) -> PhoneFormats:
raw = normalize_phone_input(phone)
if not raw:
raise LubanSMSError("phone is required")
return PhoneFormats(raw=raw, e164=f"+86{raw}")
def extract_verification_code(message: str) -> Optional[str]:
patterns = [
r"验证码(?:为|是|[:])?\s*([A-Za-z0-9]{4,8})",
r"code(?: is|[:])?\s*([A-Za-z0-9]{4,8})",
r"\b(\d{6})\b",
r"\b(\d{4,8})\b",
]
for pattern in patterns:
match = re.search(pattern, message, flags=re.IGNORECASE)
if match:
return match.group(1)
return None
def is_waiting_payload(payload: Dict[str, Any]) -> bool:
if payload.get("code") == 0:
return False
message = str(payload.get("msg", ""))
return any(token in message for token in WAITING_MESSAGES)
def require_success(payload: Dict[str, Any], *, allow_wait: bool = False) -> Dict[str, Any]:
if payload.get("code") == 0:
return payload
if allow_wait and is_waiting_payload(payload):
return payload
raise LubanSMSError(f"api error: code={payload.get('code')} msg={payload.get('msg')}")
def wait_for_code(client: LubanSMSClient, phone: str, keyword: str, timeout: int, interval: int) -> Dict[str, Any]:
formats = format_cn86_phone(phone)
started = time.monotonic()
attempts = 0
last_payload: Dict[str, Any] | None = None
while time.monotonic() - started < timeout:
attempts += 1
payload = client.get_keyword_sms(formats.raw, keyword)
last_payload = payload
if payload.get("code") == 0:
message = str(payload.get("msg", ""))
return {
"status": "success",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"message": message,
"verification_code": extract_verification_code(message),
}
if not is_waiting_payload(payload):
raise LubanSMSError(f"unexpected sms response: code={payload.get('code')} msg={payload.get('msg')}")
time.sleep(interval)
return {
"status": "timeout",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"last_payload": last_payload,
"verification_code": None,
}
def print_json(data: Dict[str, Any]) -> None:
json.dump(data, sys.stdout, ensure_ascii=False, indent=2)
sys.stdout.write("\n")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="LubanSMS CN86 keyword SMS CLI")
parser.add_argument("--api-key", default=os.getenv("LUBAN_SMS_APIKEY", ""), help="LubanSMS API key; defaults to LUBAN_SMS_APIKEY")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="API base URL")
parser.add_argument("--http-timeout", type=int, default=DEFAULT_HTTP_TIMEOUT, help="HTTP request timeout in seconds")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("balance", help="Query account balance")
request_number = subparsers.add_parser("request-number", help="Request a keyword phone number")
request_number.add_argument("--phone", default="", help="Reuse a specific phone if supported")
request_number.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
get_sms = subparsers.add_parser("get-sms", help="Fetch SMS once by keyword")
get_sms.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
get_sms.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code = subparsers.add_parser("wait-code", help="Poll until OTP arrives or times out")
wait_code.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
wait_code.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
wait_code.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
wait_code.add_argument("--release-on-exit", action="store_true", help="Release the number after polling finishes")
release = subparsers.add_parser("release", help="Release a phone number")
release.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
history = subparsers.add_parser("history", help="Fetch keyword SMS history")
history.add_argument("--page", type=int, default=1, help="Page number")
demo = subparsers.add_parser("demo", help="Request number, wait for keyword SMS, then release")
demo.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
demo.add_argument("--phone", default="", help="Reuse a specific phone if supported")
demo.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
demo.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
demo.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
demo.add_argument("--keep-number", action="store_true", help="Do not release the number at the end")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
client = LubanSMSClient(api_key=args.api_key, base_url=args.base_url, http_timeout=args.http_timeout)
try:
if args.command == "balance":
print_json(require_success(client.get_balance()))
return 0
if args.command == "request-number":
payload = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(payload.get("phone", ""))
payload["phone"] = formats.raw
payload["phone_e164"] = formats.e164
print_json(payload)
return 0
if args.command == "get-sms":
payload = require_success(client.get_keyword_sms(phone=args.phone, keyword=args.keyword), allow_wait=True)
if payload.get("code") == 0:
payload["verification_code"] = extract_verification_code(str(payload.get("msg", "")))
print_json(payload)
return 0
if args.command == "wait-code":
result = wait_for_code(client, phone=args.phone, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
if args.release_on_exit:
try:
result["release"] = require_success(client.release_keyword_number(args.phone))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
if args.command == "release":
print_json(require_success(client.release_keyword_number(args.phone)))
return 0
if args.command == "history":
print_json(require_success(client.keyword_sms_history(page=args.page)))
return 0
if args.command == "demo":
allocation = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(allocation.get("phone", ""))
result: Dict[str, Any] = {
"status": "allocated",
"keyword": args.keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"allocation": allocation,
"notes": [
"Trigger the target site to send SMS after allocation.",
"For sites like Qwen, pass phoneCode=86 and the raw phone digits.",
],
}
try:
poll_result = wait_for_code(client, phone=formats.raw, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
result.update(poll_result)
except LubanSMSError as exc:
result["status"] = "error"
result["error"] = str(exc)
finally:
if not args.keep_number:
try:
result["release"] = require_success(client.release_keyword_number(formats.raw))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
parser.error(f"unsupported command: {args.command}")
return 1
except LubanSMSError as exc:
print_json({"error": str(exc), "command": args.command})
return 1
if __name__ == "__main__":
sys.exit(main())

查看文件

@@ -0,0 +1,49 @@
---
name: codex-task-server-sync
description: Migrate a macOS Codex workspace or running task to a Linux server, create or reuse a Gitea repo, sync it into /root/continue/project-name, and keep local and remote changes synchronized for uninterrupted continuation.
---
# Auto Sync Codex Task To Server
Use this skill when a local macOS Codex task needs to keep running from a Linux server without manually re-copying files or rebuilding context each time.
## Do This
1. Run the bootstrap script against the target workspace.
2. Let the script create or reuse the Gitea repo, push the current workspace snapshot, provision the server checkout, and install background sync on both hosts.
3. Continue work from the server by reading `.codex-sync/handoff.md` and resuming inside the migrated checkout.
## Command
```bash
./scripts/run.sh \
--project-dir /absolute/path/to/workspace \
--project-name workspace-slug \
--gitea-token "$GITEA_TOKEN"
```
Use `--gitea-token-file` instead of `--gitea-token` when the token already lives on disk.
## What The Bootstrap Writes
- `.codex-sync/manifest.json`
- `.codex-sync/handoff.md`
- `.codex-sync/README.md`
- `.codex-sync/bin/sync-once.sh`
- `.codex-sync/bin/continue-task.sh`
The runtime directory `.codex-sync/runtime/` is created on demand and stays ignored by git.
## Recovery
- Inspect `.codex-sync/runtime/status.json` first when sync stops.
- Inspect `.codex-sync/runtime/last-error.log` next when the status says `blocked` or `error`.
- Clear a resolved conflict by removing `.codex-sync/runtime/blocked` and re-running `.codex-sync/bin/sync-once.sh`.
- Continue on the server with:
```bash
cd /root/continue/<project>
./.codex-sync/bin/continue-task.sh
```
Read [references/recovery.md](./references/recovery.md) for the exact recovery flow and service commands.

查看文件

@@ -0,0 +1,4 @@
interface:
display_name: "Codex Task Sync"
short_description: "Migrate a Codex workspace to a server and keep it synced"
default_prompt: "Use $codex-task-server-sync to migrate this macOS Codex workspace to my Linux server, create or reuse a Gitea repo, and keep both sides synchronized."

查看文件

@@ -0,0 +1,60 @@
# Recovery Reference
Use this reference after bootstrap when the synced project needs inspection, conflict recovery, or server-side continuation.
## Inspect Status
Run these from the migrated project root:
```bash
cat .codex-sync/manifest.json
cat .codex-sync/runtime/status.json
cat .codex-sync/runtime/last-error.log
git status -sb
```
If `status.json` says `blocked`, the last sync hit a rebase or merge conflict and automatic sync has paused itself.
## Resolve A Conflict
1. Open the project and inspect `git status`.
2. Resolve the conflict or decide which side to keep.
3. Finish the git operation or abort it explicitly.
4. Remove `.codex-sync/runtime/blocked`.
5. Re-run `.codex-sync/bin/sync-once.sh`.
## Re-run Sync Manually
```bash
./.codex-sync/bin/sync-once.sh --project-dir "$PWD"
```
Pass `--host-label local-manual` or `--host-label remote-manual` when you want the autosave commit to identify where it was triggered.
## Continue The Task On The Server
```bash
cd /root/continue/<project>
./.codex-sync/bin/continue-task.sh
```
That command starts Codex with a prompt that tells it to read `.codex-sync/handoff.md`, inspect sync status, and continue work from the server checkout.
## Service Control
The exact service names are written into `.codex-sync/manifest.json` and `.codex-sync/README.md`.
Local macOS:
```bash
launchctl print "gui/$(id -u)/<launchd-label>"
launchctl kickstart -k "gui/$(id -u)/<launchd-label>"
```
Remote Linux:
```bash
systemctl status <systemd-service> <systemd-timer>
systemctl restart <systemd-service>
systemctl restart <systemd-timer>
```

查看文件

@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -euo pipefail
script_dir="$(cd "$(dirname "$0")" && pwd)"
project_dir="$(cd "$script_dir/../.." && pwd)"
if ! command -v codex >/dev/null 2>&1; then
echo "codex is not installed or not on PATH" >&2
exit 1
fi
default_prompt="Read .codex-sync/handoff.md, inspect .codex-sync/runtime/status.json and git status -sb, then continue the task from this server checkout."
if [[ $# -gt 0 ]]; then
exec codex -C "$project_dir" "$*"
else
exec codex -C "$project_dir" "$default_prompt"
fi

查看文件

@@ -0,0 +1,273 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from textwrap import shorten
IGNORE_PREFIXES = (
"# AGENTS.md instructions",
"<environment_context>",
"<turn_aborted>",
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a Codex handoff summary for a project.")
parser.add_argument("--project-dir", required=True, help="Target project directory")
parser.add_argument("--output", required=True, help="Output markdown path")
parser.add_argument(
"--sessions-root",
default=str(Path.home() / ".codex" / "sessions"),
help="Codex sessions root directory",
)
parser.add_argument("--limit", type=int, default=4, help="Number of recent turns to include")
return parser.parse_args()
def normalize_path(path: str | Path) -> Path:
return Path(path).expanduser().resolve()
def read_first_json_line(path: Path) -> dict | None:
try:
with path.open("r", encoding="utf-8") as handle:
line = handle.readline()
return json.loads(line) if line else None
except (OSError, json.JSONDecodeError):
return None
def find_latest_session(project_dir: Path, sessions_root: Path) -> Path | None:
if not sessions_root.exists():
return None
candidates: list[tuple[float, Path]] = []
for session_file in sessions_root.rglob("*.jsonl"):
first = read_first_json_line(session_file)
if not first:
continue
cwd = first.get("payload", {}).get("cwd")
if not cwd:
continue
try:
if normalize_path(cwd) == project_dir:
candidates.append((session_file.stat().st_mtime, session_file))
except OSError:
continue
if not candidates:
return None
candidates.sort(key=lambda item: item[0], reverse=True)
return candidates[0][1]
def extract_message_text(payload: dict) -> str:
parts = []
for item in payload.get("content", []):
if item.get("type") in {"input_text", "output_text"}:
text = (item.get("text") or "").strip()
if text:
parts.append(text)
return "\n\n".join(parts).strip()
def is_substantive(text: str) -> bool:
if not text:
return False
stripped = text.strip()
if any(stripped.startswith(prefix) for prefix in IGNORE_PREFIXES):
return False
if stripped.startswith("<environment_context>") or stripped.startswith("<turn_aborted>"):
return False
return True
def compact(text: str, width: int = 220) -> str:
return shorten(" ".join(text.split()), width=width, placeholder="...")
def parse_session(session_path: Path, limit: int) -> tuple[list[str], list[str]]:
user_turns: list[str] = []
assistant_turns: list[str] = []
with session_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
try:
obj = json.loads(raw_line)
except json.JSONDecodeError:
continue
if obj.get("type") != "response_item":
continue
payload = obj.get("payload", {})
if payload.get("type") != "message":
continue
role = payload.get("role")
text = extract_message_text(payload)
if not is_substantive(text):
continue
summary = compact(text, 300 if role == "user" else 220)
if role == "user":
user_turns.append(summary)
elif role == "assistant":
assistant_turns.append(summary)
return user_turns[-limit:], assistant_turns[-limit:]
def run_git(project_dir: Path, *args: str) -> str:
result = subprocess.run(
["git", *args],
cwd=str(project_dir),
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return ""
return result.stdout.strip()
def repo_state(project_dir: Path) -> dict[str, str]:
return {
"branch": run_git(project_dir, "rev-parse", "--abbrev-ref", "HEAD"),
"status": run_git(project_dir, "status", "--short", "--branch"),
"recent_commits": run_git(project_dir, "log", "-3", "--oneline", "--decorate", "--no-color"),
}
def render_with_session(project_dir: Path, session_path: Path, limit: int) -> str:
user_turns, assistant_turns = parse_session(session_path, limit)
state = repo_state(project_dir)
goal = user_turns[-1] if user_turns else "Continue the latest Codex task for this workspace."
lines = [
"# Codex Handoff",
"",
f"- Generated at: {datetime.now(timezone.utc).isoformat()}",
f"- Project root: `{project_dir}`",
f"- Session file: `{session_path}`",
"",
"## Goal",
"",
goal,
"",
"## Current State",
"",
f"- Branch: `{state['branch'] or 'unknown'}`",
"- Git status:",
"```text",
state["status"] or "(clean or unavailable)",
"```",
"- Recent commits:",
"```text",
state["recent_commits"] or "(no commits yet)",
"```",
"",
"## Recent User Turns",
"",
]
if user_turns:
lines.extend(f"- {turn}" for turn in user_turns)
else:
lines.append("- No substantive user turns were found in the matching session.")
lines.extend(
[
"",
"## Recent Assistant Turns",
"",
]
)
if assistant_turns:
lines.extend(f"- {turn}" for turn in assistant_turns)
else:
lines.append("- No substantive assistant turns were found in the matching session.")
next_action = (
"Open the project, inspect `.codex-sync/runtime/status.json` and `git status -sb`, then continue from the latest user request above."
)
if state["status"] and state["status"] != "## HEAD (no branch)":
next_action = (
"Read `.codex-sync/runtime/status.json`, inspect the git status block above, and continue from the latest user request while preserving any uncommitted work."
)
lines.extend(
[
"",
"## Next Action",
"",
next_action,
"",
]
)
return "\n".join(lines)
def render_fallback(project_dir: Path) -> str:
state = repo_state(project_dir)
lines = [
"# Codex Handoff",
"",
f"- Generated at: {datetime.now(timezone.utc).isoformat()}",
f"- Project root: `{project_dir}`",
"- Session file: `(no matching Codex session found)`",
"",
"## Goal",
"",
"Continue this repository from its current git state. No matching local Codex session was found for this workspace, so treat the repository contents and recent commits as the source of truth.",
"",
"## Current State",
"",
f"- Branch: `{state['branch'] or 'unknown'}`",
"- Git status:",
"```text",
state["status"] or "(clean or unavailable)",
"```",
"- Recent commits:",
"```text",
state["recent_commits"] or "(no commits yet)",
"```",
"",
"## Next Action",
"",
"Inspect the repository, read `.codex-sync/runtime/status.json`, and decide the next concrete implementation step from the current files and recent commits.",
"",
]
return "\n".join(lines)
def main() -> int:
args = parse_args()
project_dir = normalize_path(args.project_dir)
output_path = normalize_path(args.output)
sessions_root = normalize_path(args.sessions_root)
output_path.parent.mkdir(parents=True, exist_ok=True)
session_path = find_latest_session(project_dir, sessions_root)
if session_path:
content = render_with_session(project_dir, session_path, args.limit)
else:
content = render_fallback(project_dir)
output_path.write_text(content, encoding="utf-8")
return 0
if __name__ == "__main__":
sys.exit(main())

查看文件

@@ -0,0 +1,635 @@
#!/usr/bin/env bash
set -euo pipefail
project_dir=""
project_name=""
server_host="root@8.211.173.24"
server_root="/root/continue"
gitea_base_url="https://git.hk.hao.work"
repo_owner="zt"
visibility="private"
branch="main"
sync_interval="15"
continuity_mode="handoff-summary"
overwrite_existing="backup"
gitea_token="${GITEA_TOKEN:-}"
gitea_token_file=""
install_launchd=1
install_systemd=1
enable_services=1
skill_dir="$(cd "$(dirname "$0")/.." && pwd)"
usage() {
cat <<'EOF'
Usage: run.sh --project-dir <path> [options]
Bootstrap continuous sync for a local Codex workspace:
- create or reuse a Gitea repo
- push the current workspace snapshot
- provision /root/continue/<project> on the server
- generate .codex-sync metadata and handoff files
- install local launchd and remote systemd timers
Options:
--project-dir <path> Local workspace to migrate (required)
--project-name <name> Repo and server project name (default: basename of project dir)
--server-host <user@host> Remote SSH target (default: root@8.211.173.24)
--server-root <path> Remote parent directory (default: /root/continue)
--gitea-base-url <url> Gitea base URL (default: https://git.hk.hao.work)
--repo-owner <name> Gitea owner/org (default: zt)
--visibility <value> private|public|internal (default: private)
--branch <name> Git branch to synchronize (default: main)
--sync-interval <seconds> Background sync interval (default: 15)
--gitea-token <token> Gitea API token
--gitea-token-file <path> Read Gitea API token from file
--overwrite-existing <mode> backup|replace|abort (default: backup)
--no-launchd Skip local launchd installation
--no-systemd Skip remote systemd installation
--no-enable-services Install services but do not enable/start them
-h, --help Show this message
EOF
}
die() {
echo "$*" >&2
exit 1
}
require_cmd() {
command -v "$1" >/dev/null 2>&1 || die "Missing required command: $1"
}
sanitize_slug() {
printf '%s' "$1" | tr '[:upper:]' '[:lower:]' | sed -E 's/[^a-z0-9]+/-/g; s/^-+//; s/-+$//; s/-+/-/g'
}
abs_path() {
python3 - "$1" <<'PY'
from pathlib import Path
import sys
print(Path(sys.argv[1]).expanduser().resolve())
PY
}
append_unique_line() {
local file_path="$1"
local line="$2"
mkdir -p "$(dirname "$file_path")"
touch "$file_path"
if ! grep -Fxq "$line" "$file_path"; then
printf '%s\n' "$line" >>"$file_path"
fi
}
while [[ $# -gt 0 ]]; do
case "$1" in
--project-dir)
project_dir="$2"
shift 2
;;
--project-name)
project_name="$2"
shift 2
;;
--server-host)
server_host="$2"
shift 2
;;
--server-root)
server_root="$2"
shift 2
;;
--gitea-base-url)
gitea_base_url="$2"
shift 2
;;
--repo-owner)
repo_owner="$2"
shift 2
;;
--visibility)
visibility="$2"
shift 2
;;
--branch)
branch="$2"
shift 2
;;
--sync-interval)
sync_interval="$2"
shift 2
;;
--gitea-token)
gitea_token="$2"
shift 2
;;
--gitea-token-file)
gitea_token_file="$2"
shift 2
;;
--overwrite-existing)
overwrite_existing="$2"
shift 2
;;
--no-launchd)
install_launchd=0
shift
;;
--no-systemd)
install_systemd=0
shift
;;
--no-enable-services)
enable_services=0
shift
;;
-h|--help)
usage
exit 0
;;
*)
die "Unknown argument: $1"
;;
esac
done
[[ -n "$project_dir" ]] || die "--project-dir is required"
case "$visibility" in
private|public|internal) ;;
*) die "--visibility must be private, public, or internal" ;;
esac
if [[ "$visibility" == "internal" ]]; then
echo "Warning: this Gitea instance does not expose an internal repository visibility field in the create/edit API; the repo will be created as private." >&2
fi
case "$overwrite_existing" in
backup|replace|abort) ;;
*) die "--overwrite-existing must be backup, replace, or abort" ;;
esac
[[ "$sync_interval" =~ ^[0-9]+$ ]] || die "--sync-interval must be an integer number of seconds"
if [[ -n "$gitea_token_file" ]]; then
gitea_token_file="$(abs_path "$gitea_token_file")"
[[ -f "$gitea_token_file" ]] || die "Token file not found: $gitea_token_file"
gitea_token="$(<"$gitea_token_file")"
fi
gitea_token="${gitea_token#"${gitea_token%%[![:space:]]*}"}"
gitea_token="${gitea_token%"${gitea_token##*[![:space:]]}"}"
[[ -n "$gitea_token" ]] || die "Provide --gitea-token, --gitea-token-file, or GITEA_TOKEN"
require_cmd bash
require_cmd curl
require_cmd git
require_cmd jq
require_cmd python3
require_cmd rsync
require_cmd ssh
if [[ "$install_launchd" -eq 1 ]]; then
require_cmd launchctl
fi
project_dir="$(abs_path "$project_dir")"
[[ -d "$project_dir" ]] || die "Project directory not found: $project_dir"
if [[ -z "$project_name" ]]; then
project_name="$(basename "$project_dir")"
fi
project_slug="$(sanitize_slug "$project_name")"
[[ -n "$project_slug" ]] || die "Failed to derive a valid slug from project name: $project_name"
remote_dir="${server_root%/}/$project_name"
local_launchd_label="com.codex.sync.${project_slug}"
local_launchd_plist="$HOME/Library/LaunchAgents/${local_launchd_label}.plist"
remote_service_name="codex-sync-${project_slug}.service"
remote_timer_name="codex-sync-${project_slug}.timer"
ssh_base=(
ssh
-o BatchMode=yes
-o StrictHostKeyChecking=accept-new
"$server_host"
)
for remote_cmd in git python3 bash; do
"${ssh_base[@]}" "command -v $remote_cmd >/dev/null 2>&1" || die "Remote host is missing required command: $remote_cmd"
done
if [[ "$install_systemd" -eq 1 ]]; then
"${ssh_base[@]}" "command -v systemctl >/dev/null 2>&1" || die "Remote host is missing required command: systemctl"
fi
repo_info_json="$(python3 - "$gitea_base_url" "$gitea_token" "$repo_owner" "$project_name" "$visibility" "$branch" <<'PY'
import json
import sys
import urllib.error
import urllib.request
base_url, token, owner, repo_name, visibility, branch = sys.argv[1:]
headers = {
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def request_json(url, method="GET", payload=None):
req = urllib.request.Request(url, method=method, headers=headers, data=payload)
with urllib.request.urlopen(req, timeout=60) as response:
return json.loads(response.read().decode("utf-8"))
user = request_json(f"{base_url}/api/v1/user")
login = user["login"]
repo_endpoint = f"{base_url}/api/v1/repos/{owner}/{repo_name}"
try:
repo = request_json(repo_endpoint)
except urllib.error.HTTPError as exc:
if exc.code != 404:
raise
payload = {
"name": repo_name,
"default_branch": branch,
"description": f"Auto-synced Codex workspace for {repo_name}",
"auto_init": False,
"private": visibility != "public",
}
create_url = (
f"{base_url}/api/v1/user/repos"
if owner == login
else f"{base_url}/api/v1/orgs/{owner}/repos"
)
repo = request_json(create_url, method="POST", payload=json.dumps(payload).encode("utf-8"))
print(json.dumps({"login": login, "repo": repo}))
PY
)"
repo_login="$(printf '%s' "$repo_info_json" | jq -r '.login')"
repo_ssh_url="$(printf '%s' "$repo_info_json" | jq -r '.repo.ssh_url')"
repo_http_url="$(printf '%s' "$repo_info_json" | jq -r '.repo.html_url')"
repo_clone_url="$(printf '%s' "$repo_info_json" | jq -r '.repo.clone_url')"
transport_url="$repo_ssh_url"
transport_mode="ssh"
check_remote_access() {
local url="$1"
local mode="$2"
if [[ "$mode" == "local" ]]; then
git ls-remote "$url" >/dev/null 2>&1
else
"${ssh_base[@]}" "git ls-remote '$url' >/dev/null 2>&1"
fi
}
if ! check_remote_access "$repo_ssh_url" "local" || ! check_remote_access "$repo_ssh_url" "remote"; then
transport_url="$(python3 - "$repo_clone_url" "$repo_login" "$gitea_token" <<'PY'
from urllib.parse import urlsplit, urlunsplit, quote
import sys
clone_url, username, token = sys.argv[1:]
parts = urlsplit(clone_url)
netloc = f"{quote(username, safe='')}:{quote(token, safe='')}@{parts.hostname}"
if parts.port:
netloc += f":{parts.port}"
print(urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)))
PY
)"
transport_mode="https-token"
if ! check_remote_access "$transport_url" "local"; then
die "Neither SSH nor HTTPS token transport can access the repo from the local host"
fi
fi
if [[ ! -d "$project_dir/.git" ]]; then
git -C "$project_dir" init -b "$branch" >/dev/null
fi
git -C "$project_dir" checkout -B "$branch" >/dev/null
if [[ -z "$(git -C "$project_dir" config user.name || true)" ]]; then
git -C "$project_dir" config user.name "Codex Sync"
fi
if [[ -z "$(git -C "$project_dir" config user.email || true)" ]]; then
git -C "$project_dir" config user.email "codex-sync@local"
fi
mkdir -p "$project_dir/.codex-sync/bin"
mkdir -p "$project_dir/.codex-sync/runtime"
cp "$skill_dir/scripts/sync_once.sh" "$project_dir/.codex-sync/bin/sync-once.sh"
cp "$skill_dir/scripts/continue_task.sh" "$project_dir/.codex-sync/bin/continue-task.sh"
chmod +x "$project_dir/.codex-sync/bin/sync-once.sh" "$project_dir/.codex-sync/bin/continue-task.sh"
append_unique_line "$project_dir/.gitignore" ".codex-sync/runtime/"
python3 - "$project_dir/.codex-sync/manifest.json" "$project_name" "$project_slug" "$branch" "$repo_owner" "$repo_http_url" "$repo_ssh_url" "$transport_mode" "$server_host" "$remote_dir" "$sync_interval" "$continuity_mode" "$local_launchd_label" "$local_launchd_plist" "$remote_service_name" "$remote_timer_name" <<'PY'
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
(
manifest_path,
project_name,
project_slug,
branch,
repo_owner,
repo_http_url,
repo_ssh_url,
transport_mode,
server_host,
remote_dir,
sync_interval,
continuity_mode,
local_launchd_label,
local_launchd_plist,
remote_service_name,
remote_timer_name,
) = sys.argv[1:]
manifest = {
"version": 1,
"generatedAt": datetime.now(timezone.utc).isoformat(),
"projectName": project_name,
"projectSlug": project_slug,
"branch": branch,
"repoOwner": repo_owner,
"repoHttpUrl": repo_http_url,
"repoSshUrl": repo_ssh_url,
"remoteTransport": transport_mode,
"serverHost": server_host,
"serverPath": remote_dir,
"syncIntervalSeconds": int(sync_interval),
"continuityMode": continuity_mode,
"conflictPolicy": "stop-and-surface-status",
"gitIdentity": {
"name": "Codex Sync",
"email": "codex-sync@local",
},
"services": {
"launchdLabel": local_launchd_label,
"launchdPlist": local_launchd_plist,
"systemdService": remote_service_name,
"systemdTimer": remote_timer_name,
},
}
path = Path(manifest_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
PY
python3 "$skill_dir/scripts/generate_handoff.py" \
--project-dir "$project_dir" \
--output "$project_dir/.codex-sync/handoff.md"
cat >"$project_dir/.codex-sync/README.md" <<EOF
# Codex Sync
This workspace is synchronized through Gitea and mirrored to \`${remote_dir}\` on \`${server_host}\`.
## Inspect Status
\`\`\`bash
cat .codex-sync/manifest.json
cat .codex-sync/runtime/status.json
cat .codex-sync/runtime/last-error.log
git status -sb
\`\`\`
## Local launchd
- Label: \`${local_launchd_label}\`
- Plist: \`${local_launchd_plist}\`
\`\`\`bash
launchctl print "gui/\$(id -u)/${local_launchd_label}"
launchctl kickstart -k "gui/\$(id -u)/${local_launchd_label}"
\`\`\`
## Remote systemd
- Service: \`${remote_service_name}\`
- Timer: \`${remote_timer_name}\`
\`\`\`bash
ssh ${server_host} 'systemctl status ${remote_service_name} ${remote_timer_name}'
ssh ${server_host} 'systemctl restart ${remote_service_name}'
ssh ${server_host} 'systemctl restart ${remote_timer_name}'
\`\`\`
## Continue On The Server
\`\`\`bash
ssh ${server_host} 'cd ${remote_dir} && ./.codex-sync/bin/continue-task.sh'
\`\`\`
If sync stops after a conflict, resolve the git state, remove \`.codex-sync/runtime/blocked\`, and run \`./.codex-sync/bin/sync-once.sh\` again.
EOF
git -C "$project_dir" add .gitignore .codex-sync
if ! git -C "$project_dir" diff --cached --quiet || [[ -z "$(git -C "$project_dir" rev-parse --verify HEAD 2>/dev/null || true)" ]]; then
git -C "$project_dir" add -A
git -C "$project_dir" commit -m "Bootstrap Codex task sync" >/dev/null
fi
if git -C "$project_dir" remote get-url origin >/dev/null 2>&1; then
git -C "$project_dir" remote set-url origin "$transport_url"
else
git -C "$project_dir" remote add origin "$transport_url"
fi
git -C "$project_dir" push -u origin "$branch" >/dev/null 2>&1 || "$project_dir/.codex-sync/bin/sync-once.sh" --project-dir "$project_dir" --host-label "bootstrap-local"
"${ssh_base[@]}" bash -s -- "$remote_dir" "$branch" "$transport_url" "$repo_ssh_url" "$repo_clone_url" "$overwrite_existing" <<'EOF_REMOTE_REPO'
set -euo pipefail
remote_dir="$1"
branch="$2"
transport_url="$3"
repo_ssh_url="$4"
repo_clone_url="$5"
overwrite_existing="$6"
normalize_remote() {
python3 - "$1" <<'PY'
import re
import sys
from urllib.parse import urlsplit, urlunsplit
raw = sys.argv[1]
if raw.startswith("ssh://"):
parts = urlsplit(raw)
print(f"{parts.hostname}:{parts.path.lstrip('/')}")
elif re.match(r"^[^@]+@[^:]+:.+$", raw):
print(raw.split("@", 1)[1])
else:
parts = urlsplit(raw)
host = parts.hostname or ""
print(f"{host}:{parts.path.lstrip('/')}")
PY
}
expected_ssh="$(normalize_remote "$repo_ssh_url")"
expected_http="$(normalize_remote "$repo_clone_url")"
if [[ -d "$remote_dir" ]]; then
current_remote=""
if [[ -d "$remote_dir/.git" ]]; then
current_remote="$(git -C "$remote_dir" config --get remote.origin.url || true)"
fi
if [[ -n "$current_remote" ]]; then
current_norm="$(normalize_remote "$current_remote")"
else
current_norm=""
fi
if [[ "$current_norm" != "$expected_ssh" && "$current_norm" != "$expected_http" ]]; then
case "$overwrite_existing" in
backup)
mv "$remote_dir" "${remote_dir}.bak.$(date +%Y%m%d%H%M%S)"
;;
replace)
rm -rf "$remote_dir"
;;
abort)
echo "Remote path exists and is unrelated: $remote_dir" >&2
exit 1
;;
esac
fi
fi
mkdir -p "$(dirname "$remote_dir")"
if [[ ! -d "$remote_dir/.git" ]]; then
git clone "$transport_url" "$remote_dir" >/dev/null 2>&1
fi
if [[ -n "$(git -C "$remote_dir" status --porcelain || true)" ]]; then
echo "Remote checkout is dirty and cannot be updated safely: $remote_dir" >&2
exit 1
fi
git -C "$remote_dir" fetch origin "$branch" >/dev/null 2>&1 || true
if git -C "$remote_dir" show-ref --verify --quiet "refs/remotes/origin/$branch"; then
git -C "$remote_dir" checkout -B "$branch" "origin/$branch" >/dev/null 2>&1
else
git -C "$remote_dir" checkout -B "$branch" >/dev/null 2>&1
fi
if [[ -z "$(git -C "$remote_dir" config user.name || true)" ]]; then
git -C "$remote_dir" config user.name "Codex Sync"
fi
if [[ -z "$(git -C "$remote_dir" config user.email || true)" ]]; then
git -C "$remote_dir" config user.email "codex-sync@remote"
fi
EOF_REMOTE_REPO
if [[ "$install_launchd" -eq 1 ]]; then
mkdir -p "$(dirname "$local_launchd_plist")"
cat >"$local_launchd_plist" <<EOF
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>${local_launchd_label}</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>${project_dir}/.codex-sync/bin/sync-once.sh</string>
<string>--project-dir</string>
<string>${project_dir}</string>
<string>--host-label</string>
<string>local-$(hostname -s)</string>
</array>
<key>WorkingDirectory</key>
<string>${project_dir}</string>
<key>StartInterval</key>
<integer>${sync_interval}</integer>
<key>RunAtLoad</key>
<true/>
<key>StandardOutPath</key>
<string>${project_dir}/.codex-sync/runtime/launchd.out.log</string>
<key>StandardErrorPath</key>
<string>${project_dir}/.codex-sync/runtime/launchd.err.log</string>
</dict>
</plist>
EOF
if [[ "$enable_services" -eq 1 ]]; then
launchctl bootout "gui/$(id -u)" "$local_launchd_plist" >/dev/null 2>&1 || true
if ! launchctl bootstrap "gui/$(id -u)" "$local_launchd_plist" >/dev/null 2>&1; then
launchctl load -w "$local_launchd_plist" >/dev/null 2>&1
fi
launchctl kickstart -k "gui/$(id -u)/${local_launchd_label}" >/dev/null 2>&1 || true
fi
fi
if [[ "$install_systemd" -eq 1 ]]; then
"${ssh_base[@]}" bash -s -- "$remote_service_name" "$remote_timer_name" "$remote_dir" "$sync_interval" "$enable_services" <<'EOF_REMOTE_SYSTEMD'
set -euo pipefail
service_name="$1"
timer_name="$2"
remote_dir="$3"
sync_interval="$4"
enable_services="$5"
service_path="/etc/systemd/system/$service_name"
timer_path="/etc/systemd/system/$timer_name"
cat >"$service_path" <<EOF_SERVICE
[Unit]
Description=Codex task sync for $(basename "$remote_dir")
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
WorkingDirectory=$remote_dir
ExecStart=/usr/bin/bash -lc 'cd "$remote_dir" && ./.codex-sync/bin/sync-once.sh --project-dir "$remote_dir" --host-label "remote-$(hostname -s)"'
EOF_SERVICE
cat >"$timer_path" <<EOF_TIMER
[Unit]
Description=Run $service_name every ${sync_interval}s
[Timer]
OnBootSec=15sec
OnUnitActiveSec=${sync_interval}s
Persistent=true
Unit=$service_name
[Install]
WantedBy=timers.target
EOF_TIMER
systemctl daemon-reload
if [[ "$enable_services" -eq 1 ]]; then
systemctl enable --now "$timer_name" >/dev/null 2>&1
systemctl start "$service_name" >/dev/null 2>&1 || true
fi
EOF_REMOTE_SYSTEMD
fi
"$project_dir/.codex-sync/bin/sync-once.sh" --project-dir "$project_dir" --host-label "bootstrap-local"
"${ssh_base[@]}" "cd '$remote_dir' && ./.codex-sync/bin/sync-once.sh --project-dir '$remote_dir' --host-label 'bootstrap-remote'" || true
cat <<EOF
Bootstrap complete.
Project: $project_name
Project root: $project_dir
Repo owner: $repo_owner
Repo URL: $repo_http_url
Transport: $transport_mode
Remote path: $remote_dir
Branch: $branch
Launchd: $local_launchd_label
Systemd: $remote_service_name / $remote_timer_name
EOF

查看文件

@@ -0,0 +1,217 @@
#!/usr/bin/env bash
set -euo pipefail
project_dir=""
host_label=""
usage() {
cat <<'EOF'
Usage: sync-once.sh --project-dir <path> [--host-label <label>]
Synchronize one git-backed Codex workspace against its configured origin:
- autosave dirty work into a host-stamped commit
- fetch origin
- rebase onto the configured branch
- push
- retry once after a non-fast-forward rejection
- stop and surface runtime status on conflicts
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--project-dir)
project_dir="$2"
shift 2
;;
--host-label)
host_label="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown argument: $1" >&2
usage >&2
exit 1
;;
esac
done
if [[ -z "$project_dir" ]]; then
script_dir="$(cd "$(dirname "$0")" && pwd)"
project_dir="$(cd "$script_dir/../.." && pwd)"
fi
project_dir="$(cd "$project_dir" && pwd)"
host_label="${host_label:-$(hostname -s 2>/dev/null || hostname)}"
manifest_path="$project_dir/.codex-sync/manifest.json"
runtime_dir="$project_dir/.codex-sync/runtime"
status_path="$runtime_dir/status.json"
error_log="$runtime_dir/last-error.log"
blocked_path="$runtime_dir/blocked"
lock_dir="$runtime_dir/lock"
mkdir -p "$runtime_dir"
write_status() {
local state="$1"
local message="$2"
python3 - "$status_path" "$state" "$host_label" "$message" "$project_dir" <<'PY'
import json
import sys
from datetime import datetime, timezone
path, state, host_label, message, project_dir = sys.argv[1:]
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"state": state,
"host": host_label,
"projectDir": project_dir,
"message": message,
}
with open(path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
handle.write("\n")
PY
}
read_manifest_field() {
local expression="$1"
python3 - "$manifest_path" "$expression" <<'PY'
import json
import sys
path = sys.argv[1]
expr = sys.argv[2]
data = json.load(open(path, "r", encoding="utf-8"))
value = data
for part in expr.split("."):
value = value.get(part, "") if isinstance(value, dict) else ""
print(value if value is not None else "")
PY
}
abort_rebase_if_needed() {
if [[ -d "$project_dir/.git/rebase-apply" || -d "$project_dir/.git/rebase-merge" ]]; then
git -C "$project_dir" rebase --abort >>"$error_log" 2>&1 || true
fi
}
block_sync() {
local reason="$1"
abort_rebase_if_needed
{
printf 'timestamp=%s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
printf 'host=%s\n' "$host_label"
printf 'reason=%s\n' "$reason"
} >"$blocked_path"
write_status "blocked" "$reason"
exit 1
}
run_git() {
git -C "$project_dir" "$@" >>"$error_log" 2>&1
}
if [[ ! -f "$manifest_path" ]]; then
write_status "error" "Missing manifest at $manifest_path"
echo "Missing manifest at $manifest_path" >"$error_log"
exit 1
fi
if ! mkdir "$lock_dir" 2>/dev/null; then
write_status "skipped" "Another sync run is already in progress"
exit 0
fi
trap 'rmdir "$lock_dir" 2>/dev/null || true' EXIT
if [[ -f "$blocked_path" ]]; then
write_status "blocked" "Sync is blocked until .codex-sync/runtime/blocked is cleared"
exit 0
fi
: >"$error_log"
write_status "running" "Synchronizing workspace"
branch="$(read_manifest_field branch)"
git_name="$(read_manifest_field gitIdentity.name)"
git_email="$(read_manifest_field gitIdentity.email)"
branch="${branch:-main}"
git_name="${git_name:-Codex Sync}"
git_email="${git_email:-codex-sync@local}"
if [[ ! -d "$project_dir/.git" ]]; then
write_status "error" "Project is not a git repository"
echo "Project is not a git repository: $project_dir" >"$error_log"
exit 1
fi
if [[ -z "$(git -C "$project_dir" config user.name || true)" ]]; then
git -C "$project_dir" config user.name "$git_name"
fi
if [[ -z "$(git -C "$project_dir" config user.email || true)" ]]; then
git -C "$project_dir" config user.email "$git_email"
fi
if git -C "$project_dir" show-ref --verify --quiet "refs/heads/$branch"; then
git -C "$project_dir" checkout "$branch" >>"$error_log" 2>&1
else
git -C "$project_dir" checkout -B "$branch" >>"$error_log" 2>&1
fi
if [[ -n "$(git -C "$project_dir" status --porcelain)" ]]; then
git -C "$project_dir" add -A >>"$error_log" 2>&1
if ! git -C "$project_dir" diff --cached --quiet; then
stamp="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
git -C "$project_dir" commit -m "autosync(${host_label}): save work ${stamp}" >>"$error_log" 2>&1 || {
write_status "error" "Failed to commit local changes before sync"
exit 1
}
fi
fi
if ! run_git fetch origin; then
write_status "error" "Failed to fetch origin"
exit 1
fi
if git -C "$project_dir" show-ref --verify --quiet "refs/remotes/origin/$branch"; then
if ! git -C "$project_dir" rebase "origin/$branch" >>"$error_log" 2>&1; then
block_sync "Rebase conflict while replaying local commits onto origin/$branch"
fi
fi
push_ok=0
for attempt in 1 2; do
if git -C "$project_dir" push -u origin "$branch" >>"$error_log" 2>&1; then
push_ok=1
break
fi
if [[ "$attempt" -eq 2 ]]; then
break
fi
if ! run_git fetch origin; then
break
fi
if git -C "$project_dir" show-ref --verify --quiet "refs/remotes/origin/$branch"; then
if ! git -C "$project_dir" rebase "origin/$branch" >>"$error_log" 2>&1; then
block_sync "Push retry failed because origin/$branch caused a rebase conflict"
fi
fi
done
if [[ "$push_ok" -ne 1 ]]; then
write_status "error" "Push failed after retry"
exit 1
fi
rm -f "$blocked_path"
head_commit="$(git -C "$project_dir" rev-parse --short HEAD 2>/dev/null || true)"
write_status "ok" "Synchronized successfully at ${head_commit:-unknown}"

119
gitea-repo-sync/SKILL.md 普通文件
查看文件

@@ -0,0 +1,119 @@
---
name: gitea-repo-sync
description: 在自建或托管 Gitea 上自动创建仓库、初始化或复用本地 Git 项目、配置远程并安全推送分支/标签。适用于“给当前目录建 Gitea 仓库”“同步本地项目到 Gitea”“给组织创建仓库再推送代码”这类请求。
metadata:
short-description: Create Gitea repositories and sync local projects safely
---
# Gitea Repo Sync
在 Gitea 上建仓、连远程、推送代码的标准化技能。
优先使用打包好的 `scripts/gitea_repo_sync.py`,不要重复手写零散 `curl` + `git` 命令。脚本会先探测仓库是否已存在,再按需要创建仓库、初始化本地 Git、配置远程并执行 push。
## 什么时候用
- 用户要把当前目录项目同步到 Gitea
- 用户要在某个 owner 或 organization 下创建新仓库
- 本地目录还不是 Git 仓库,但需要初始化后推送
- 需要复用已有远程仓库,而不是盲目重复创建
- 需要在自动化里稳定处理 token、remote 和 push 顺序
## 需要的输入
- `GITEA_URL`:例如 `https://git.hk.hao.work`
- `GITEA_TOKEN`:具备建仓和推送权限的 token
- `owner`:用户名或组织名,例如 `hao`
- `repo`:仓库名,例如 `demo-project`
- `source_dir`:要同步的本地目录
- 可选 `branch`:默认推送的分支名,常见为 `main`
- 可选 `description` / `private` / `tags`
优先使用环境变量:
```bash
export GITEA_URL='https://git.hk.hao.work'
export GITEA_TOKEN='<your_token>'
```
## 标准流程
1. 先确认本地目录路径、目标 owner 和 repo 名。
2. 调 Gitea API 查询仓库是否已存在。
3. 若不存在,则按 owner 类型创建:
- 当前登录用户自己的仓库:`POST /api/v1/user/repos`
- 组织仓库:`POST /api/v1/orgs/{org}/repos`
4. 进入本地目录检查是否为 Git 仓库:
- 不是 Git 仓库时,只有在明确需要时才初始化
- 已经是 Git 仓库时,优先复用现有提交历史
5. 配置或校验远程:
- 无 remote 时新增
- remote 已存在但 URL 不同,默认停止并提示;只有明确允许时才替换
6. 按请求推送:
- 单分支:`push --set-upstream`
- 全部分支:`push --all`
- 标签:`push --tags`
7. 输出最终仓库 URL、remote URL、本地分支和 push 结果。
## 安全默认值
- 不强制覆盖现有 remote,除非明确传 `--replace-remote`
- 不自动 `git add` / `git commit`,除非明确传 `--stage-all``--commit-message`
- 不自动 `force push`
- 不把 token 写进 git remote URL
- 仓库已存在时优先复用,不重复创建
## 推荐脚本
使用 `scripts/gitea_repo_sync.py`
### 常见用法
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--init-git \
--branch main \
--stage-all \
--commit-message 'Initial import'
```
如果仓库已存在,只同步当前分支:
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--branch main
```
推送全部分支和标签:
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--push-all \
--tags
```
## 常见坑
- 不要把 `GITEA_TOKEN` 提交进仓库。
- 不要在 remote URL 里长期保存带 token 的认证串。
- 不要在 remote 已存在时静默改写 URL。
- 不要在工作区有未确认修改时自动提交;先确认是否需要 `--stage-all`
- 不要默认 `--push-all`;很多场景只需要当前分支。
## 参考资料
- `references/gitea-api-notes.md`

查看文件

@@ -0,0 +1,4 @@
interface:
display_name: "Gitea Repo Sync"
short_description: "Create Gitea repositories and sync local projects safely"
default_prompt: "Create or reuse a Gitea repository, initialize the current project as git if needed, configure a safe remote, and push the requested branches or tags without storing the token in the remote URL."

查看文件

@@ -0,0 +1,49 @@
# Gitea API Notes
本技能默认对接 Gitea v1.25.x,核心只依赖少量稳定接口
- `GET /api/v1/user`
- `GET /api/v1/repos/{owner}/{repo}`
- `POST /api/v1/user/repos`
- `POST /api/v1/orgs/{org}/repos`
## 创建仓库的 owner 选择
1. 先调用 `GET /api/v1/user` 获取当前 token 对应登录名。
2. 如果目标 `owner` 与当前登录名一致,走 `POST /api/v1/user/repos`
3. 如果目标 `owner` 不同,按组织仓库处理,走 `POST /api/v1/orgs/{org}/repos`
## 推荐请求头
```text
Authorization: token <token>
Content-Type: application/json
Accept: application/json
```
## Git Push 认证建议
不要把 token 写入 `git remote -v`
优先使用一次性的 HTTP header
- 用户名Gitea 登录名
- 密码token
- Header`Authorization: Basic <base64(username:token)>`
这样可以在 push 时认证,但不会把 token 持久化到仓库配置里。
## 建议的环境变量
```bash
export GITEA_URL='https://git.hk.hao.work'
export GITEA_TOKEN='<your_token>'
```
## 失败时优先检查
- token 是否有 repo 创建/写入权限
- `owner` 是用户还是组织
- 本地目录是否已经是 Git 仓库
- 当前分支是否存在提交
- 远程 URL 是否已指向别的仓库

查看文件

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Optional
def log(message: str) -> None:
print(f"[gitea-repo-sync] {message}", flush=True)
def api_request(server_url: str, token: str, method: str, path: str, payload=None, ok_not_found: bool = False):
url = f"{server_url.rstrip('/')}/api/v1/{path.lstrip('/')}"
data = None
headers = {
"Authorization": f"token {token}",
"Accept": "application/json",
}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(request, timeout=30) as response:
raw = response.read().decode("utf-8")
return response.status, json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
if ok_not_found and exc.code == 404:
return 404, None
raise RuntimeError(f"HTTP {exc.code} {method} {url}: {body}") from exc
def run_git(args, cwd: Path, auth_header: Optional[str] = None, capture: bool = True) -> str:
cmd = ["git"]
if auth_header:
cmd += ["-c", f"http.extraHeader={auth_header}"]
cmd += args
result = subprocess.run(
cmd,
cwd=str(cwd),
text=True,
capture_output=capture,
check=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
raise RuntimeError(f"git {' '.join(args)} failed: {stderr or stdout}")
return (result.stdout or "").strip()
def is_git_repo(source_dir: Path) -> bool:
result = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
return result.returncode == 0 and (result.stdout or "").strip() == "true"
def ensure_git_repo(source_dir: Path, branch: str, init_git: bool) -> None:
if is_git_repo(source_dir):
return
if not init_git:
raise RuntimeError(f"{source_dir} 不是 Git 仓库;如需初始化请传 --init-git")
log(f"初始化 Git 仓库: {source_dir}")
run_git(["init", "-b", branch], cwd=source_dir)
def branch_name_from_head(source_dir: Path) -> str:
result = subprocess.run(
["git", "symbolic-ref", "--short", "HEAD"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
return (result.stdout or "").strip()
return run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=source_dir)
def ensure_branch(source_dir: Path, branch: str) -> None:
current = branch_name_from_head(source_dir)
if current == branch:
return
result = subprocess.run(
["git", "show-ref", "--verify", f"refs/heads/{branch}"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
run_git(["checkout", branch], cwd=source_dir)
else:
run_git(["checkout", "-b", branch], cwd=source_dir)
def maybe_commit(source_dir: Path, stage_all: bool, commit_message: Optional[str]) -> None:
if not stage_all and not commit_message:
return
if stage_all:
run_git(["add", "-A"], cwd=source_dir)
if commit_message:
status = run_git(["status", "--porcelain"], cwd=source_dir)
if status:
log("创建提交")
run_git(["commit", "-m", commit_message], cwd=source_dir)
def get_current_branch(source_dir: Path) -> str:
return branch_name_from_head(source_dir)
def get_remote_url(source_dir: Path, remote_name: str) -> Optional[str]:
result = subprocess.run(
["git", "remote", "get-url", remote_name],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode != 0:
return None
return (result.stdout or "").strip()
def ensure_remote(source_dir: Path, remote_name: str, remote_url: str, replace_remote: bool) -> None:
current = get_remote_url(source_dir, remote_name)
if not current:
run_git(["remote", "add", remote_name, remote_url], cwd=source_dir)
return
if current == remote_url:
return
if not replace_remote:
raise RuntimeError(
f"remote {remote_name} 已存在且指向 {current};如需替换请显式传 --replace-remote"
)
run_git(["remote", "set-url", remote_name, remote_url], cwd=source_dir)
def basic_auth_header(username: str, token: str) -> str:
raw = f"{username}:{token}".encode("utf-8")
return "Authorization: Basic " + base64.b64encode(raw).decode("ascii")
def ensure_repo(server_url: str, token: str, owner: str, repo: str, description: str, private: bool, default_branch: str):
_, user = api_request(server_url, token, "GET", "/user")
username = user["login"]
status, existing = api_request(server_url, token, "GET", f"/repos/{owner}/{repo}", ok_not_found=True)
if status == 200 and existing:
return username, existing, False
payload = {
"name": repo,
"description": description,
"private": private,
"auto_init": False,
"default_branch": default_branch,
}
if owner == username:
_, created = api_request(server_url, token, "POST", "/user/repos", payload=payload)
else:
_, created = api_request(server_url, token, "POST", f"/orgs/{owner}/repos", payload=payload)
return username, created, True
def parse_args():
parser = argparse.ArgumentParser(description="Create or reuse a Gitea repository and sync a local project.")
parser.add_argument("--server-url", default=os.getenv("GITEA_URL", ""), help="Gitea base URL")
parser.add_argument("--token", default=os.getenv("GITEA_TOKEN", ""), help="Gitea API token")
parser.add_argument("--owner", required=True, help="Repo owner or organization")
parser.add_argument("--repo", required=True, help="Repository name")
parser.add_argument("--source-dir", required=True, help="Local project path")
parser.add_argument("--description", default="", help="Repository description")
parser.add_argument("--branch", default="main", help="Branch to push")
parser.add_argument("--remote-name", default="origin", help="Git remote name")
parser.add_argument("--private", action="store_true", help="Create private repository")
parser.add_argument("--init-git", action="store_true", help="Initialize git when source-dir is not a repository")
parser.add_argument("--replace-remote", action="store_true", help="Replace remote URL when remote exists but differs")
parser.add_argument("--stage-all", action="store_true", help="Run git add -A before commit")
parser.add_argument("--commit-message", default="", help="Create a commit before pushing")
parser.add_argument("--push-all", action="store_true", help="Push all local branches")
parser.add_argument("--tags", action="store_true", help="Push tags after branch push")
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.server_url:
raise RuntimeError("缺少 --server-url 或 GITEA_URL")
if not args.token:
raise RuntimeError("缺少 --token 或 GITEA_TOKEN")
source_dir = Path(args.source_dir).expanduser().resolve()
if not source_dir.exists() or not source_dir.is_dir():
raise RuntimeError(f"source-dir 不存在或不是目录: {source_dir}")
username, repo_info, created = ensure_repo(
server_url=args.server_url,
token=args.token,
owner=args.owner,
repo=args.repo,
description=args.description,
private=args.private,
default_branch=args.branch,
)
remote_url = repo_info.get("clone_url") or repo_info.get("html_url", "").rstrip("/") + ".git"
if not remote_url:
raise RuntimeError("未能从 Gitea 响应中获取 clone_url")
ensure_git_repo(source_dir, args.branch, args.init_git)
ensure_branch(source_dir, args.branch)
maybe_commit(source_dir, args.stage_all, args.commit_message or None)
ensure_remote(source_dir, args.remote_name, remote_url, args.replace_remote)
auth_header = basic_auth_header(username, args.token)
branch = get_current_branch(source_dir)
if args.push_all:
log("推送全部分支")
run_git(["push", args.remote_name, "--all"], cwd=source_dir, auth_header=auth_header, capture=True)
else:
log(f"推送分支: {branch}")
run_git(["push", "--set-upstream", args.remote_name, branch], cwd=source_dir, auth_header=auth_header, capture=True)
if args.tags:
log("推送标签")
run_git(["push", args.remote_name, "--tags"], cwd=source_dir, auth_header=auth_header, capture=True)
summary = {
"created": created,
"owner": args.owner,
"repo": args.repo,
"branch": branch,
"source_dir": str(source_dir),
"remote_name": args.remote_name,
"remote_url": remote_url,
"html_url": repo_info.get("html_url"),
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1)