Add CN86 SMS keyword verification skill and README

这个提交包含在:
X
2026-03-06 01:13:09 -08:00
父节点 be5f7c8808
当前提交 71e6c95e23
修改 5 个文件,包含 563 行新增0 行删除

查看文件

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, Optional
DEFAULT_BASE_URL = os.getenv("LUBAN_SMS_API_BASE", "https://lubansms.com/v2/api")
DEFAULT_HTTP_TIMEOUT = 30
WAITING_MESSAGES = (
"尚未收到短信,请稍后重试",
"wait",
)
class LubanSMSError(RuntimeError):
pass
@dataclass
class PhoneFormats:
raw: str
e164: str
class LubanSMSClient:
def __init__(self, api_key: str, base_url: str = DEFAULT_BASE_URL, http_timeout: int = DEFAULT_HTTP_TIMEOUT) -> None:
if not api_key:
raise LubanSMSError("missing api key: set LUBAN_SMS_APIKEY or pass --api-key")
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.http_timeout = http_timeout
def _request(self, path: str, **params: Any) -> Dict[str, Any]:
query = {"apikey": self.api_key}
for key, value in params.items():
if value is None:
continue
query[key] = value
url = f"{self.base_url}/{path}?{urllib.parse.urlencode(query)}"
request = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"User-Agent": "codex-cn86-sms-keyword-verification/1.0",
},
)
try:
with urllib.request.urlopen(request, timeout=self.http_timeout) as response:
raw = response.read().decode("utf-8", errors="replace")
except Exception as exc:
raise LubanSMSError(f"request failed for {path}: {exc}") from exc
try:
payload = json.loads(raw)
except json.JSONDecodeError as exc:
raise LubanSMSError(f"invalid json from {path}: {raw}") from exc
return payload
def get_balance(self) -> Dict[str, Any]:
return self._request("getBalance")
def request_keyword_number(self, phone: str = "", card_type: str = "全部") -> Dict[str, Any]:
return self._request("getKeywordNumber", phone=normalize_phone_input(phone), cardType=card_type)
def get_keyword_sms(self, phone: str, keyword: str) -> Dict[str, Any]:
return self._request("getKeywordSms", phone=normalize_phone_input(phone), keyword=keyword)
def release_keyword_number(self, phone: str) -> Dict[str, Any]:
return self._request("delKeywordNumber", phone=normalize_phone_input(phone))
def keyword_sms_history(self, page: int = 1) -> Dict[str, Any]:
return self._request("keywordSmsHistory", page=page)
def normalize_phone_input(phone: str) -> str:
digits = re.sub(r"\D", "", phone or "")
if digits.startswith("86") and len(digits) > 11:
digits = digits[2:]
return digits
def format_cn86_phone(phone: str) -> PhoneFormats:
raw = normalize_phone_input(phone)
if not raw:
raise LubanSMSError("phone is required")
return PhoneFormats(raw=raw, e164=f"+86{raw}")
def extract_verification_code(message: str) -> Optional[str]:
patterns = [
r"验证码(?:为|是|[:])?\s*([A-Za-z0-9]{4,8})",
r"code(?: is|[:])?\s*([A-Za-z0-9]{4,8})",
r"\b(\d{6})\b",
r"\b(\d{4,8})\b",
]
for pattern in patterns:
match = re.search(pattern, message, flags=re.IGNORECASE)
if match:
return match.group(1)
return None
def is_waiting_payload(payload: Dict[str, Any]) -> bool:
if payload.get("code") == 0:
return False
message = str(payload.get("msg", ""))
return any(token in message for token in WAITING_MESSAGES)
def require_success(payload: Dict[str, Any], *, allow_wait: bool = False) -> Dict[str, Any]:
if payload.get("code") == 0:
return payload
if allow_wait and is_waiting_payload(payload):
return payload
raise LubanSMSError(f"api error: code={payload.get('code')} msg={payload.get('msg')}")
def wait_for_code(client: LubanSMSClient, phone: str, keyword: str, timeout: int, interval: int) -> Dict[str, Any]:
formats = format_cn86_phone(phone)
started = time.monotonic()
attempts = 0
last_payload: Dict[str, Any] | None = None
while time.monotonic() - started < timeout:
attempts += 1
payload = client.get_keyword_sms(formats.raw, keyword)
last_payload = payload
if payload.get("code") == 0:
message = str(payload.get("msg", ""))
return {
"status": "success",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"message": message,
"verification_code": extract_verification_code(message),
}
if not is_waiting_payload(payload):
raise LubanSMSError(f"unexpected sms response: code={payload.get('code')} msg={payload.get('msg')}")
time.sleep(interval)
return {
"status": "timeout",
"keyword": keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"attempts": attempts,
"elapsed_seconds": round(time.monotonic() - started, 2),
"last_payload": last_payload,
"verification_code": None,
}
def print_json(data: Dict[str, Any]) -> None:
json.dump(data, sys.stdout, ensure_ascii=False, indent=2)
sys.stdout.write("\n")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="LubanSMS CN86 keyword SMS CLI")
parser.add_argument("--api-key", default=os.getenv("LUBAN_SMS_APIKEY", ""), help="LubanSMS API key; defaults to LUBAN_SMS_APIKEY")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="API base URL")
parser.add_argument("--http-timeout", type=int, default=DEFAULT_HTTP_TIMEOUT, help="HTTP request timeout in seconds")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("balance", help="Query account balance")
request_number = subparsers.add_parser("request-number", help="Request a keyword phone number")
request_number.add_argument("--phone", default="", help="Reuse a specific phone if supported")
request_number.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
get_sms = subparsers.add_parser("get-sms", help="Fetch SMS once by keyword")
get_sms.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
get_sms.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code = subparsers.add_parser("wait-code", help="Poll until OTP arrives or times out")
wait_code.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
wait_code.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
wait_code.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
wait_code.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
wait_code.add_argument("--release-on-exit", action="store_true", help="Release the number after polling finishes")
release = subparsers.add_parser("release", help="Release a phone number")
release.add_argument("--phone", required=True, help="Raw CN phone number or +86 number")
history = subparsers.add_parser("history", help="Fetch keyword SMS history")
history.add_argument("--page", type=int, default=1, help="Page number")
demo = subparsers.add_parser("demo", help="Request number, wait for keyword SMS, then release")
demo.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问")
demo.add_argument("--phone", default="", help="Reuse a specific phone if supported")
demo.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated")
demo.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds")
demo.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
demo.add_argument("--keep-number", action="store_true", help="Do not release the number at the end")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
client = LubanSMSClient(api_key=args.api_key, base_url=args.base_url, http_timeout=args.http_timeout)
try:
if args.command == "balance":
print_json(require_success(client.get_balance()))
return 0
if args.command == "request-number":
payload = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(payload.get("phone", ""))
payload["phone"] = formats.raw
payload["phone_e164"] = formats.e164
print_json(payload)
return 0
if args.command == "get-sms":
payload = require_success(client.get_keyword_sms(phone=args.phone, keyword=args.keyword), allow_wait=True)
if payload.get("code") == 0:
payload["verification_code"] = extract_verification_code(str(payload.get("msg", "")))
print_json(payload)
return 0
if args.command == "wait-code":
result = wait_for_code(client, phone=args.phone, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
if args.release_on_exit:
try:
result["release"] = require_success(client.release_keyword_number(args.phone))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
if args.command == "release":
print_json(require_success(client.release_keyword_number(args.phone)))
return 0
if args.command == "history":
print_json(require_success(client.keyword_sms_history(page=args.page)))
return 0
if args.command == "demo":
allocation = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type))
formats = format_cn86_phone(allocation.get("phone", ""))
result: Dict[str, Any] = {
"status": "allocated",
"keyword": args.keyword,
"phone": formats.raw,
"phone_e164": formats.e164,
"allocation": allocation,
"notes": [
"Trigger the target site to send SMS after allocation.",
"For sites like Qwen, pass phoneCode=86 and the raw phone digits.",
],
}
try:
poll_result = wait_for_code(client, phone=formats.raw, keyword=args.keyword, timeout=args.timeout, interval=args.interval)
result.update(poll_result)
except LubanSMSError as exc:
result["status"] = "error"
result["error"] = str(exc)
finally:
if not args.keep_number:
try:
result["release"] = require_success(client.release_keyword_number(formats.raw))
except Exception as exc:
result["release_error"] = str(exc)
print_json(result)
return 0 if result.get("status") == "success" else 2
parser.error(f"unsupported command: {args.command}")
return 1
except LubanSMSError as exc:
print_json({"error": str(exc), "command": args.command})
return 1
if __name__ == "__main__":
sys.exit(main())