比较提交

..

2 次代码提交

作者 SHA1 备注 提交日期
X
b2c5ef588d Add gitea-repo-sync skill 2026-03-06 18:27:32 -08:00
X
71e6c95e23 Add CN86 SMS keyword verification skill and README 2026-03-06 01:13:09 -08:00
修改 9 个文件,包含 996 行新增0 行删除

47
README.md 普通文件
查看文件

@@ -0,0 +1,47 @@
# Skills Repository
这个仓库收集可直接复用的 Codex skills。每个技能目录至少包含一个 `SKILL.md`,部分技能还会附带 `scripts/``references/``agents/` 等资源,方便在具体任务里稳定复用。
## 使用方式
1. 进入对应技能目录阅读 `SKILL.md`
2. 按说明设置运行时环境变量,不要把真实密钥提交进仓库
3. 优先复用技能自带脚本,而不是重复写临时命令
4. 涉及外部 API 的技能,先做余额/连通性检查,再跑主流程
## 当前技能
| Skill | 说明 |
| --- | --- |
| `captcha-third-party-services` | 统一封装 2Captcha / YesCaptcha / Anti-Captcha 的官方 API 工作流。 |
| `cliproxy-traffic-proxy` | CLIProxy 流量代理相关技能。 |
| `cn86-sms-keyword-verification` | 86 手机号 + 关键词验证码流程,基于 LubanSMS API 完成取号、取码、提码、释放号码。 |
| `email-verification` | 邮箱验证码获取服务,支持临时邮箱 API 拉取验证码、链接和邮件内容。 |
| `gitea-repo-sync` | 在 Gitea 上创建仓库并把本地项目安全同步过去。 |
| `similarweb-analytics` | Similarweb 分析相关技能。 |
| `simple-llm` | 轻量 LLM 调用技能。 |
| `uiuxmax` | UI/UX 设计辅助技能。 |
| `web-reverse` | 网页逆向工程技能,定位签名、加密、解密和接口复现。 |
## 新增技能:`cn86-sms-keyword-verification`
适用场景:
- 先取一个中国手机号,再按关键词轮询验证码
- 对接千问等短信登录/注册流程
- 需要显式的释放号码与历史排查动作
运行示例:
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
python3 cn86-sms-keyword-verification/scripts/lubansms_keyword_cli.py balance
python3 cn86-sms-keyword-verification/scripts/lubansms_keyword_cli.py demo --keyword 千问 --timeout 300 --interval 5
```
## 维护建议
- 新增 skill 时,保持目录名、`name` 字段和用途一致
- 密钥统一走环境变量,不进仓库
- 需要稳定 API 调用时,优先提交脚本到 `scripts/`
- 新增 skill 后同步更新本 README

查看文件

@@ -0,0 +1,127 @@
---
name: cn86-sms-keyword-verification
description: 86手机号关键词验证码流程,基于 LubanSMS 通用短信接收 API 完成取号、轮询短信、提取验证码、释放号码,并可用“千问”等关键词做短信登录/注册 demo。
type: workflow
domain: utilities
version: 1.0.0
tags: [sms, verification, phone, keyword, china, qwen, lubansms]
triggers:
keywords:
primary: [86 手机号, 86手机号, 手机号获取, 短信验证码, 关键词验证码, 获取验证码, 接码]
secondary: [千问验证码, qwen sms, lubansms, getKeywordNumber, getKeywordSms, keywordSmsHistory]
context_boost: [注册, 登录, 验证码, 短信, 手机号, 关键词]
priority: high
---
# CN86 SMS Keyword Verification
86 手机号 + 关键词获取验证码的标准流程技能。
本技能使用 `https://lubansms.com/v2/api` 的“通用短信接收”接口做 demo。你给的内部参考文档 `docs/sms-register-qwen.md` 里,附录实际也是按这组 LubanSMS 接口在写;文档里提到的千问Qwen示例关键词是 `千问`
如需稳定执行,优先使用打包好的 `scripts/lubansms_keyword_cli.py`,不要手写临时 curl 再去人肉判断状态。
## 什么时候用
- 需要先拿一个 86 号码,再根据短信关键词轮询验证码
- 目标站点会把验证码发到中国手机号,但平台只给“按关键词取短信”的能力
- 要做可重复的注册/登录自动化 demo,例如千问短信登录
- 需要历史记录、余额检查、释放号码这些配套动作
## 需要的输入
- `keyword`:短信中能稳定命中的关键词,例如 `千问`
- `LUBAN_SMS_APIKEY`:运行时环境变量,不要写死进仓库
- 可选 `phone`:想复用已有号码时传入;留空则随机取号
- 可选 `timeout` / `interval`:轮询等待时长和间隔
## API Key 处理
优先使用环境变量:
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
export LUBAN_SMS_API_BASE='https://lubansms.com/v2/api'
```
只在一次性调试时才用 `--api-key` 直传。
## 标准流程
1. 先查余额,避免轮询到一半才发现 key 无效或余额不足。
2.`getKeywordNumber` 申请号码。
3. 规范化手机号:
- API 内部继续使用原始数字串,例如 `16741251148`
- 对外展示可拼成 `+8616741251148`
- 如果目标站点像千问一样把国家码拆开填,就传 `phoneCode=86` + 原始手机号
4. 在目标站点触发发送短信。
5.`getKeywordSms` 按关键词轮询短信。
6. 从返回短信正文中提取验证码。
7. 成功或失败后都调用 `delKeywordNumber` 释放号码。
8. 排查问题时再查 `keywordSmsHistory`
## 平台状态判断
- `code=0`:成功
- `code=400``msg=尚未收到短信,请稍后重试`:可继续轮询
- `code=400``msg=不正确的apikey`:立即停止,检查 key
- 其他 `code!=0`:视为 API 失败,不要盲目重试到超时
## 千问QwenDemo
千问短信内容在参考文档里使用的关键词是 `千问`
### 1. 先检查余额
```bash
export LUBAN_SMS_APIKEY='<your_api_key>'
python3 scripts/lubansms_keyword_cli.py balance
```
### 2. 一次性 demo取号 → 等短信 → 提取验证码 → 释放号码
```bash
python3 scripts/lubansms_keyword_cli.py demo \
--keyword 千问 \
--timeout 300 \
--interval 5
```
这个命令会:
- 申请一个随机 86 号码
- 输出原始号码和 `+86` 格式
- 等你在目标站点触发短信发送
- 轮询关键词短信并提取验证码
- 默认在结束时释放号码
### 3. 拆开执行
```bash
python3 scripts/lubansms_keyword_cli.py request-number
python3 scripts/lubansms_keyword_cli.py wait-code --phone 16741251148 --keyword 千问
python3 scripts/lubansms_keyword_cli.py release --phone 16741251148
```
## 常见坑
- 不要把 `LUBAN_SMS_APIKEY` 写进提交文件。
- 不要把 `+86``86` 前缀后的完整国际格式直接塞回 `phone=` 参数;平台接口通常要原始号码数字串。
- 不要忘记释放号码;无论成功、失败、超时都应该释放。
- 不要把“尚未收到短信”当成致命错误;这是正常轮询态。
- 不要只看关键词命中,不提取验证码;很多自动化链路最后需要明确的 OTP 数值。
## 推荐脚本
使用 `scripts/lubansms_keyword_cli.py`
- `balance`:查余额
- `request-number`:申请号码
- `get-sms`:查一次关键词短信
- `wait-code`:轮询直到拿到验证码或超时
- `release`:释放号码
- `history`:查历史记录
- `demo`:完整演示流程
## 参考资料
- `references/lubansms-and-qwen-notes.md`

查看文件

@@ -0,0 +1,4 @@
interface:
display_name: "CN86 SMS Keyword Verification"
short_description: "Get 86 phone numbers and OTP codes via LubanSMS keyword APIs"
default_prompt: "Use LubanSMS keyword APIs to request an 86 phone number, poll SMS by keyword, extract the OTP code, and release the number for flows like Qwen SMS login."

查看文件

@@ -0,0 +1,96 @@
# LubanSMS / Qwen Notes
## 来源
这份参考整理自两部分:
1. 官方 API 文档:`https://lubansms.com/api_docs/`
2. 内部参考:`hao/one` 仓库里的 `docs/sms-register-qwen.md`
## 关键结论
- 这次要做的“86 手机号 + 关键词获取验证码”流程,落地接口是 LubanSMS 的 **通用短信接收** 系列。
- 内部参考文档把千问Qwen作为示例站点,关键词给的是 `千问`
- 运行时应该使用环境变量 `LUBAN_SMS_APIKEY`,而不是把真实 key 提交到仓库。
## 官方接口
### 1. 查询余额
`GET /getBalance?apikey=YOUR_APIKEY`
成功示例:
```json
{"code":0,"msg":"","balance":"96.72"}
```
### 2. 请求号码
`GET /getKeywordNumber?apikey=YOUR_APIKEY&phone=&cardType=全部`
成功示例:
```json
{"code":0,"msg":"","phone":"18888888888"}
```
说明:
- `phone` 留空表示随机号码
- `cardType` 在官方文档里标成“已弃用”,但仍可兼容传 `全部`
### 3. 获取关键词短信
`GET /getKeywordSms?apikey=YOUR_APIKEY&phone=<手机号>&keyword=<关键词>`
等待中:
```json
{"code":400,"msg":"尚未收到短信,请稍后重试"}
```
收到短信:
```json
{"code":0,"msg":"【百度】验证码xxxx,您正在进行登陆验证."}
```
### 4. 释放号码
`GET /delKeywordNumber?apikey=YOUR_APIKEY&phone=<手机号>`
成功示例:
```json
{"code":0,"msg":""}
```
### 5. 查询关键词短信历史
`GET /keywordSmsHistory?apikey=YOUR_APIKEY&page=1`
用途:
- 验证关键词是否正确
- 排查目标站点是否实际发过短信
- 辅助确认短信模板和验证码格式
## 千问专用备注
内部参考文档给出的要点:
- 千问示例关键词:`千问`
- 站点如果把国家码和手机号拆开,使用 `phoneCode=86` + 原始手机号
- 接码平台 API 内部仍然按原始手机号查询,不要拼上 `+86`
## 2026-03-06 验证结论
已验证:
- `getBalance` 可正常返回余额
- `keywordSmsHistory` 可正常返回最近的千问短信历史
- 说明本技能里的示例流程和关键词方向是成立的
未在仓库中提交:
- 实际 API key
- 实际号码或敏感历史记录

查看文件

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, Optional
DEFAULT_BASE_URL = os.getenv("LUBAN_SMS_API_BASE", "https://lubansms.com/v2/api")
DEFAULT_HTTP_TIMEOUT = 30
WAITING_MESSAGES = (
"尚未收到短信,请稍后重试",
"wait",
)
class LubanSMSError(RuntimeError):
pass
@dataclass
class PhoneFormats:
raw: str
e164: str
class LubanSMSClient:
def __init__(self, api_key: str, base_url: str = DEFAULT_BASE_URL, http_timeout: int = DEFAULT_HTTP_TIMEOUT) -> None:
if not api_key:
raise LubanSMSError("missing api key: set LUBAN_SMS_APIKEY or pass --api-key")
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.http_timeout = http_timeout
def _request(self, path: str, **params: Any) -> Dict[str, Any]:
query = {"apikey": self.api_key}
for key, value in params.items():
if value is None:
continue
query[key] = value
url = f"{self.base_url}/{path}?{urllib.parse.urlencode(query)}"
request = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"User-Agent": "codex-cn86-sms-keyword-verification/1.0",
},
)
try:
with urllib.request.urlopen(request, timeout=self.http_timeout) as response:
raw = response.read().decode("utf-8", errors="replace")
except Exception as exc:
raise LubanSMSError(f"request failed for {path}: {exc}") from exc
try:
payload = json.loads(raw)
except json.JSONDecodeError as exc:
raise LubanSMSError(f"invalid json from {path}: {raw}") from exc
return payload
def get_balance(self) -> Dict[str, Any]:
return self._request("getBalance")
def request_keyword_number(self, phone: str = "", card_type: str = "全部") -> Dict[str, Any]:
return self._request("getKeywordNumber", phone=normalize_phone_input(phone), cardType=card_type)
def get_keyword_sms(self, phone: str, keyword: str) -> Dict[str, Any]:
return self._request("getKeywordSms", phone=normalize_phone_input(phone), keyword=keyword)
def release_keyword_number(self, phone: str) -> Dict[str, Any]:
return self._request("delKeywordNumber", phone=normalize_phone_input(phone))
def keyword_sms_history(self, page: int = 1) -> Dict[str, Any]:
return self._request("keywordSmsHistory", page=page)
def normalize_phone_input(phone: str) -> str:
digits = re.sub(r"\D", "", phone or "")
if digits.startswith("86") and len(digits) > 11:
digits = digits[2:]
return digits
def format_cn86_phone(phone: str) -> PhoneFormats:
raw = normalize_phone_input(phone)
if not raw:
raise LubanSMSError("phone is required")
return PhoneFormats(raw=raw, e164=f"+86{raw}")
def extract_verification_code(message: str) -> Optional[str]:
patterns = [
r"验证码(?:为|是|[:])?\s*([A-Za-z0-9]{4,8})",
r"code(?: is|[:])?\s*([A-Za-z0-9]{4,8})",
r"\b(\d{6})\b",
r"\b(\d{4,8})\b",
]
for pattern in patterns:
match = re.search(pattern, message, flags=re.IGNORECASE)
if match:
return match.group(1)
return None
def is_waiting_payload(payload: Dict[str, Any]) -> bool:
if payload.get("code") == 0:
return False
message = str(payload.get("msg", ""))
return any(token in message for token in WAITING_MESSAGES)
def require_success(payload: Dict[str, Any], *, allow_wait: bool = False) -> Dict[str, Any]:
if payload.get("code") == 0:
return payload
if allow_wait and is_waiting_payload(payload):
return payload
raise LubanSMSError(f"api error: code={payload.get('code')} msg={payload.get('msg')}")
def wait_for_code(client: LubanSMSClient, phone: str, keyword: str, timeout: int, interval: int) -> Dict[str, Any]:
formats = format_cn86_phone(phone)
started = time.monotonic()
attempts = 0
last_payload: Dict[str, Any] | None = None
while time.monotonic() - started < timeout:
attempts += 1
payload = client.get_keyword_sms(formats.raw, keyword)
last_payload = payload
if payload.get("code") == 0:
message = str(payload.get("msg", ""))
return {
"status": "success",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"message": message,
"verification_code": extract_verification_code(message),
}
if not is_waiting_payload(payload):
raise LubanSMSError(f"unexpected sms response: code={payload.get('code')} msg={payload.get('msg')}")
time.sleep(interval)
return {
"status": "timeout",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"last_payload": last_payload,
"verification_code": None,
}
def print_json(data: Dict[str, Any]) -> None:
json.dump(data, sys.stdout, ensure_ascii=False, indent=2)
sys.stdout.write("\n")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="LubanSMS CN86 keyword SMS CLI")
parser.add_argument("--api-key", default=os.getenv("LUBAN_SMS_APIKEY", ""), help="LubanSMS API key; defaults to LUBAN_SMS_APIKEY")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="API base URL")
parser.add_argument("--http-timeout", type=int, default=DEFAULT_HTTP_TIMEOUT, help="HTTP request timeout in seconds")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("balance", help="Query account balance")
request_number = subparsers.add_parser("request-number", help="Request a keyword phone number")
request_number.add_argument("--phone", default="", help="Reuse a specific phone if supported")
request_number.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
get_sms = subparsers.add_parser("get-sms", help="Fetch SMS once by keyword")
get_sms.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
get_sms.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code = subparsers.add_parser("wait-code", help="Poll until OTP arrives or times out")
wait_code.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
wait_code.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
wait_code.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
wait_code.add_argument("--release-on-exit", action="store_true", help="Release the number after polling finishes")
release = subparsers.add_parser("release", help="Release a phone number")
release.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
history = subparsers.add_parser("history", help="Fetch keyword SMS history")
history.add_argument("--page", type=int, default=1, help="Page number")
demo = subparsers.add_parser("demo", help="Request number, wait for keyword SMS, then release")
demo.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
demo.add_argument("--phone", default="", help="Reuse a specific phone if supported")
demo.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
demo.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
demo.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
demo.add_argument("--keep-number", action="store_true", help="Do not release the number at the end")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
client = LubanSMSClient(api_key=args.api_key, base_url=args.base_url, http_timeout=args.http_timeout)
try:
if args.command == "balance":
print_json(require_success(client.get_balance()))
return 0
if args.command == "request-number":
payload = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(payload.get("phone", ""))
payload["phone"] = formats.raw
payload["phone_e164"] = formats.e164
print_json(payload)
return 0
if args.command == "get-sms":
payload = require_success(client.get_keyword_sms(phone=args.phone, keyword=args.keyword), allow_wait=True)
if payload.get("code") == 0:
payload["verification_code"] = extract_verification_code(str(payload.get("msg", "")))
print_json(payload)
return 0
if args.command == "wait-code":
result = wait_for_code(client, phone=args.phone, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
if args.release_on_exit:
try:
result["release"] = require_success(client.release_keyword_number(args.phone))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
if args.command == "release":
print_json(require_success(client.release_keyword_number(args.phone)))
return 0
if args.command == "history":
print_json(require_success(client.keyword_sms_history(page=args.page)))
return 0
if args.command == "demo":
allocation = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(allocation.get("phone", ""))
result: Dict[str, Any] = {
"status": "allocated",
"keyword": args.keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"allocation": allocation,
"notes": [
"Trigger the target site to send SMS after allocation.",
"For sites like Qwen, pass phoneCode=86 and the raw phone digits.",
],
}
try:
poll_result = wait_for_code(client, phone=formats.raw, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
result.update(poll_result)
except LubanSMSError as exc:
result["status"] = "error"
result["error"] = str(exc)
finally:
if not args.keep_number:
try:
result["release"] = require_success(client.release_keyword_number(formats.raw))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
parser.error(f"unsupported command: {args.command}")
return 1
except LubanSMSError as exc:
print_json({"error": str(exc), "command": args.command})
return 1
if __name__ == "__main__":
sys.exit(main())

119
gitea-repo-sync/SKILL.md 普通文件
查看文件

@@ -0,0 +1,119 @@
---
name: gitea-repo-sync
description: 在自建或托管 Gitea 上自动创建仓库、初始化或复用本地 Git 项目、配置远程并安全推送分支/标签。适用于“给当前目录建 Gitea 仓库”“同步本地项目到 Gitea”“给组织创建仓库再推送代码”这类请求。
metadata:
short-description: Create Gitea repositories and sync local projects safely
---
# Gitea Repo Sync
在 Gitea 上建仓、连远程、推送代码的标准化技能。
优先使用打包好的 `scripts/gitea_repo_sync.py`,不要重复手写零散 `curl` + `git` 命令。脚本会先探测仓库是否已存在,再按需要创建仓库、初始化本地 Git、配置远程并执行 push。
## 什么时候用
- 用户要把当前目录项目同步到 Gitea
- 用户要在某个 owner 或 organization 下创建新仓库
- 本地目录还不是 Git 仓库,但需要初始化后推送
- 需要复用已有远程仓库,而不是盲目重复创建
- 需要在自动化里稳定处理 token、remote 和 push 顺序
## 需要的输入
- `GITEA_URL`:例如 `https://git.hk.hao.work`
- `GITEA_TOKEN`:具备建仓和推送权限的 token
- `owner`:用户名或组织名,例如 `hao`
- `repo`:仓库名,例如 `demo-project`
- `source_dir`:要同步的本地目录
- 可选 `branch`:默认推送的分支名,常见为 `main`
- 可选 `description` / `private` / `tags`
优先使用环境变量:
```bash
export GITEA_URL='https://git.hk.hao.work'
export GITEA_TOKEN='<your_token>'
```
## 标准流程
1. 先确认本地目录路径、目标 owner 和 repo 名。
2. 调 Gitea API 查询仓库是否已存在。
3. 若不存在,则按 owner 类型创建:
- 当前登录用户自己的仓库:`POST /api/v1/user/repos`
- 组织仓库:`POST /api/v1/orgs/{org}/repos`
4. 进入本地目录检查是否为 Git 仓库:
- 不是 Git 仓库时,只有在明确需要时才初始化
- 已经是 Git 仓库时,优先复用现有提交历史
5. 配置或校验远程:
- 无 remote 时新增
- remote 已存在但 URL 不同,默认停止并提示;只有明确允许时才替换
6. 按请求推送:
- 单分支:`push --set-upstream`
- 全部分支:`push --all`
- 标签:`push --tags`
7. 输出最终仓库 URL、remote URL、本地分支和 push 结果。
## 安全默认值
- 不强制覆盖现有 remote,除非明确传 `--replace-remote`
- 不自动 `git add` / `git commit`,除非明确传 `--stage-all``--commit-message`
- 不自动 `force push`
- 不把 token 写进 git remote URL
- 仓库已存在时优先复用,不重复创建
## 推荐脚本
使用 `scripts/gitea_repo_sync.py`
### 常见用法
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--init-git \
--branch main \
--stage-all \
--commit-message 'Initial import'
```
如果仓库已存在,只同步当前分支:
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--branch main
```
推送全部分支和标签:
```bash
python3 scripts/gitea_repo_sync.py \
--server-url "$GITEA_URL" \
--token "$GITEA_TOKEN" \
--owner hao \
--repo demo-project \
--source-dir /path/to/project \
--push-all \
--tags
```
## 常见坑
- 不要把 `GITEA_TOKEN` 提交进仓库。
- 不要在 remote URL 里长期保存带 token 的认证串。
- 不要在 remote 已存在时静默改写 URL。
- 不要在工作区有未确认修改时自动提交;先确认是否需要 `--stage-all`
- 不要默认 `--push-all`;很多场景只需要当前分支。
## 参考资料
- `references/gitea-api-notes.md`

查看文件

@@ -0,0 +1,4 @@
interface:
display_name: "Gitea Repo Sync"
short_description: "Create Gitea repositories and sync local projects safely"
default_prompt: "Create or reuse a Gitea repository, initialize the current project as git if needed, configure a safe remote, and push the requested branches or tags without storing the token in the remote URL."

查看文件

@@ -0,0 +1,49 @@
# Gitea API Notes
本技能默认对接 Gitea v1.25.x,核心只依赖少量稳定接口
- `GET /api/v1/user`
- `GET /api/v1/repos/{owner}/{repo}`
- `POST /api/v1/user/repos`
- `POST /api/v1/orgs/{org}/repos`
## 创建仓库的 owner 选择
1. 先调用 `GET /api/v1/user` 获取当前 token 对应登录名。
2. 如果目标 `owner` 与当前登录名一致,走 `POST /api/v1/user/repos`
3. 如果目标 `owner` 不同,按组织仓库处理,走 `POST /api/v1/orgs/{org}/repos`
## 推荐请求头
```text
Authorization: token <token>
Content-Type: application/json
Accept: application/json
```
## Git Push 认证建议
不要把 token 写入 `git remote -v`
优先使用一次性的 HTTP header
- 用户名Gitea 登录名
- 密码token
- Header`Authorization: Basic <base64(username:token)>`
这样可以在 push 时认证,但不会把 token 持久化到仓库配置里。
## 建议的环境变量
```bash
export GITEA_URL='https://git.hk.hao.work'
export GITEA_TOKEN='<your_token>'
```
## 失败时优先检查
- token 是否有 repo 创建/写入权限
- `owner` 是用户还是组织
- 本地目录是否已经是 Git 仓库
- 当前分支是否存在提交
- 远程 URL 是否已指向别的仓库

查看文件

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Optional
def log(message: str) -> None:
print(f"[gitea-repo-sync] {message}", flush=True)
def api_request(server_url: str, token: str, method: str, path: str, payload=None, ok_not_found: bool = False):
url = f"{server_url.rstrip('/')}/api/v1/{path.lstrip('/')}"
data = None
headers = {
"Authorization": f"token {token}",
"Accept": "application/json",
}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(request, timeout=30) as response:
raw = response.read().decode("utf-8")
return response.status, json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
if ok_not_found and exc.code == 404:
return 404, None
raise RuntimeError(f"HTTP {exc.code} {method} {url}: {body}") from exc
def run_git(args, cwd: Path, auth_header: Optional[str] = None, capture: bool = True) -> str:
cmd = ["git"]
if auth_header:
cmd += ["-c", f"http.extraHeader={auth_header}"]
cmd += args
result = subprocess.run(
cmd,
cwd=str(cwd),
text=True,
capture_output=capture,
check=False,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
raise RuntimeError(f"git {' '.join(args)} failed: {stderr or stdout}")
return (result.stdout or "").strip()
def is_git_repo(source_dir: Path) -> bool:
result = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
return result.returncode == 0 and (result.stdout or "").strip() == "true"
def ensure_git_repo(source_dir: Path, branch: str, init_git: bool) -> None:
if is_git_repo(source_dir):
return
if not init_git:
raise RuntimeError(f"{source_dir} 不是 Git 仓库;如需初始化请传 --init-git")
log(f"初始化 Git 仓库: {source_dir}")
run_git(["init", "-b", branch], cwd=source_dir)
def branch_name_from_head(source_dir: Path) -> str:
result = subprocess.run(
["git", "symbolic-ref", "--short", "HEAD"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
return (result.stdout or "").strip()
return run_git(["rev-parse", "--abbrev-ref", "HEAD"], cwd=source_dir)
def ensure_branch(source_dir: Path, branch: str) -> None:
current = branch_name_from_head(source_dir)
if current == branch:
return
result = subprocess.run(
["git", "show-ref", "--verify", f"refs/heads/{branch}"],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode == 0:
run_git(["checkout", branch], cwd=source_dir)
else:
run_git(["checkout", "-b", branch], cwd=source_dir)
def maybe_commit(source_dir: Path, stage_all: bool, commit_message: Optional[str]) -> None:
if not stage_all and not commit_message:
return
if stage_all:
run_git(["add", "-A"], cwd=source_dir)
if commit_message:
status = run_git(["status", "--porcelain"], cwd=source_dir)
if status:
log("创建提交")
run_git(["commit", "-m", commit_message], cwd=source_dir)
def get_current_branch(source_dir: Path) -> str:
return branch_name_from_head(source_dir)
def get_remote_url(source_dir: Path, remote_name: str) -> Optional[str]:
result = subprocess.run(
["git", "remote", "get-url", remote_name],
cwd=str(source_dir),
text=True,
capture_output=True,
check=False,
)
if result.returncode != 0:
return None
return (result.stdout or "").strip()
def ensure_remote(source_dir: Path, remote_name: str, remote_url: str, replace_remote: bool) -> None:
current = get_remote_url(source_dir, remote_name)
if not current:
run_git(["remote", "add", remote_name, remote_url], cwd=source_dir)
return
if current == remote_url:
return
if not replace_remote:
raise RuntimeError(
f"remote {remote_name} 已存在且指向 {current};如需替换请显式传 --replace-remote"
)
run_git(["remote", "set-url", remote_name, remote_url], cwd=source_dir)
def basic_auth_header(username: str, token: str) -> str:
raw = f"{username}:{token}".encode("utf-8")
return "Authorization: Basic " + base64.b64encode(raw).decode("ascii")
def ensure_repo(server_url: str, token: str, owner: str, repo: str, description: str, private: bool, default_branch: str):
_, user = api_request(server_url, token, "GET", "/user")
username = user["login"]
status, existing = api_request(server_url, token, "GET", f"/repos/{owner}/{repo}", ok_not_found=True)
if status == 200 and existing:
return username, existing, False
payload = {
"name": repo,
"description": description,
"private": private,
"auto_init": False,
"default_branch": default_branch,
}
if owner == username:
_, created = api_request(server_url, token, "POST", "/user/repos", payload=payload)
else:
_, created = api_request(server_url, token, "POST", f"/orgs/{owner}/repos", payload=payload)
return username, created, True
def parse_args():
parser = argparse.ArgumentParser(description="Create or reuse a Gitea repository and sync a local project.")
parser.add_argument("--server-url", default=os.getenv("GITEA_URL", ""), help="Gitea base URL")
parser.add_argument("--token", default=os.getenv("GITEA_TOKEN", ""), help="Gitea API token")
parser.add_argument("--owner", required=True, help="Repo owner or organization")
parser.add_argument("--repo", required=True, help="Repository name")
parser.add_argument("--source-dir", required=True, help="Local project path")
parser.add_argument("--description", default="", help="Repository description")
parser.add_argument("--branch", default="main", help="Branch to push")
parser.add_argument("--remote-name", default="origin", help="Git remote name")
parser.add_argument("--private", action="store_true", help="Create private repository")
parser.add_argument("--init-git", action="store_true", help="Initialize git when source-dir is not a repository")
parser.add_argument("--replace-remote", action="store_true", help="Replace remote URL when remote exists but differs")
parser.add_argument("--stage-all", action="store_true", help="Run git add -A before commit")
parser.add_argument("--commit-message", default="", help="Create a commit before pushing")
parser.add_argument("--push-all", action="store_true", help="Push all local branches")
parser.add_argument("--tags", action="store_true", help="Push tags after branch push")
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.server_url:
raise RuntimeError("缺少 --server-url 或 GITEA_URL")
if not args.token:
raise RuntimeError("缺少 --token 或 GITEA_TOKEN")
source_dir = Path(args.source_dir).expanduser().resolve()
if not source_dir.exists() or not source_dir.is_dir():
raise RuntimeError(f"source-dir 不存在或不是目录: {source_dir}")
username, repo_info, created = ensure_repo(
server_url=args.server_url,
token=args.token,
owner=args.owner,
repo=args.repo,
description=args.description,
private=args.private,
default_branch=args.branch,
)
remote_url = repo_info.get("clone_url") or repo_info.get("html_url", "").rstrip("/") + ".git"
if not remote_url:
raise RuntimeError("未能从 Gitea 响应中获取 clone_url")
ensure_git_repo(source_dir, args.branch, args.init_git)
ensure_branch(source_dir, args.branch)
maybe_commit(source_dir, args.stage_all, args.commit_message or None)
ensure_remote(source_dir, args.remote_name, remote_url, args.replace_remote)
auth_header = basic_auth_header(username, args.token)
branch = get_current_branch(source_dir)
if args.push_all:
log("推送全部分支")
run_git(["push", args.remote_name, "--all"], cwd=source_dir, auth_header=auth_header, capture=True)
else:
log(f"推送分支: {branch}")
run_git(["push", "--set-upstream", args.remote_name, branch], cwd=source_dir, auth_header=auth_header, capture=True)
if args.tags:
log("推送标签")
run_git(["push", args.remote_name, "--tags"], cwd=source_dir, auth_header=auth_header, capture=True)
summary = {
"created": created,
"owner": args.owner,
"repo": args.repo,
"branch": branch,
"source_dir": str(source_dir),
"remote_name": args.remote_name,
"remote_url": remote_url,
"html_url": repo_info.get("html_url"),
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1)