#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re import sys import time import urllib.parse import urllib.request from dataclasses import dataclass from typing import Any, Dict, Optional DEFAULT_BASE_URL = os.getenv("LUBAN_SMS_API_BASE", "https://lubansms.com/v2/api") DEFAULT_HTTP_TIMEOUT = 30 WAITING_MESSAGES = ( "尚未收到短信,请稍后重试", "wait", ) class LubanSMSError(RuntimeError): pass @dataclass class PhoneFormats: raw: str e164: str class LubanSMSClient: def __init__(self, api_key: str, base_url: str = DEFAULT_BASE_URL, http_timeout: int = DEFAULT_HTTP_TIMEOUT) -> None: if not api_key: raise LubanSMSError("missing api key: set LUBAN_SMS_APIKEY or pass --api-key") self.api_key = api_key self.base_url = base_url.rstrip("/") self.http_timeout = http_timeout def _request(self, path: str, **params: Any) -> Dict[str, Any]: query = {"apikey": self.api_key} for key, value in params.items(): if value is None: continue query[key] = value url = f"{self.base_url}/{path}?{urllib.parse.urlencode(query)}" request = urllib.request.Request( url, headers={ "Accept": "application/json", "User-Agent": "codex-cn86-sms-keyword-verification/1.0", }, ) try: with urllib.request.urlopen(request, timeout=self.http_timeout) as response: raw = response.read().decode("utf-8", errors="replace") except Exception as exc: raise LubanSMSError(f"request failed for {path}: {exc}") from exc try: payload = json.loads(raw) except json.JSONDecodeError as exc: raise LubanSMSError(f"invalid json from {path}: {raw}") from exc return payload def get_balance(self) -> Dict[str, Any]: return self._request("getBalance") def request_keyword_number(self, phone: str = "", card_type: str = "全部") -> Dict[str, Any]: return self._request("getKeywordNumber", phone=normalize_phone_input(phone), cardType=card_type) def get_keyword_sms(self, phone: str, keyword: str) -> Dict[str, Any]: return self._request("getKeywordSms", phone=normalize_phone_input(phone), keyword=keyword) def release_keyword_number(self, phone: str) -> Dict[str, Any]: return self._request("delKeywordNumber", phone=normalize_phone_input(phone)) def keyword_sms_history(self, page: int = 1) -> Dict[str, Any]: return self._request("keywordSmsHistory", page=page) def normalize_phone_input(phone: str) -> str: digits = re.sub(r"\D", "", phone or "") if digits.startswith("86") and len(digits) > 11: digits = digits[2:] return digits def format_cn86_phone(phone: str) -> PhoneFormats: raw = normalize_phone_input(phone) if not raw: raise LubanSMSError("phone is required") return PhoneFormats(raw=raw, e164=f"+86{raw}") def extract_verification_code(message: str) -> Optional[str]: patterns = [ r"验证码(?:为|是|[::])?\s*([A-Za-z0-9]{4,8})", r"code(?: is|[::])?\s*([A-Za-z0-9]{4,8})", r"\b(\d{6})\b", r"\b(\d{4,8})\b", ] for pattern in patterns: match = re.search(pattern, message, flags=re.IGNORECASE) if match: return match.group(1) return None def is_waiting_payload(payload: Dict[str, Any]) -> bool: if payload.get("code") == 0: return False message = str(payload.get("msg", "")) return any(token in message for token in WAITING_MESSAGES) def require_success(payload: Dict[str, Any], *, allow_wait: bool = False) -> Dict[str, Any]: if payload.get("code") == 0: return payload if allow_wait and is_waiting_payload(payload): return payload raise LubanSMSError(f"api error: code={payload.get('code')} msg={payload.get('msg')}") def wait_for_code(client: LubanSMSClient, phone: str, keyword: str, timeout: int, interval: int) -> Dict[str, Any]: formats = format_cn86_phone(phone) started = time.monotonic() attempts = 0 last_payload: Dict[str, Any] | None = None while time.monotonic() - started < timeout: attempts += 1 payload = client.get_keyword_sms(formats.raw, keyword) last_payload = payload if payload.get("code") == 0: message = str(payload.get("msg", "")) return { "status": "success", "keyword": keyword, "phone": formats.raw, "phone_e164": formats.e164, "attempts": attempts, "elapsed_seconds": round(time.monotonic() - started, 2), "message": message, "verification_code": extract_verification_code(message), } if not is_waiting_payload(payload): raise LubanSMSError(f"unexpected sms response: code={payload.get('code')} msg={payload.get('msg')}") time.sleep(interval) return { "status": "timeout", "keyword": keyword, "phone": formats.raw, "phone_e164": formats.e164, "attempts": attempts, "elapsed_seconds": round(time.monotonic() - started, 2), "last_payload": last_payload, "verification_code": None, } def print_json(data: Dict[str, Any]) -> None: json.dump(data, sys.stdout, ensure_ascii=False, indent=2) sys.stdout.write("\n") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="LubanSMS CN86 keyword SMS CLI") parser.add_argument("--api-key", default=os.getenv("LUBAN_SMS_APIKEY", ""), help="LubanSMS API key; defaults to LUBAN_SMS_APIKEY") parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="API base URL") parser.add_argument("--http-timeout", type=int, default=DEFAULT_HTTP_TIMEOUT, help="HTTP request timeout in seconds") subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("balance", help="Query account balance") request_number = subparsers.add_parser("request-number", help="Request a keyword phone number") request_number.add_argument("--phone", default="", help="Reuse a specific phone if supported") request_number.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated") get_sms = subparsers.add_parser("get-sms", help="Fetch SMS once by keyword") get_sms.add_argument("--phone", required=True, help="Raw CN phone number or +86 number") get_sms.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问") wait_code = subparsers.add_parser("wait-code", help="Poll until OTP arrives or times out") wait_code.add_argument("--phone", required=True, help="Raw CN phone number or +86 number") wait_code.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问") wait_code.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds") wait_code.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") wait_code.add_argument("--release-on-exit", action="store_true", help="Release the number after polling finishes") release = subparsers.add_parser("release", help="Release a phone number") release.add_argument("--phone", required=True, help="Raw CN phone number or +86 number") history = subparsers.add_parser("history", help="Fetch keyword SMS history") history.add_argument("--page", type=int, default=1, help="Page number") demo = subparsers.add_parser("demo", help="Request number, wait for keyword SMS, then release") demo.add_argument("--keyword", required=True, help="Keyword to match, e.g. 千问") demo.add_argument("--phone", default="", help="Reuse a specific phone if supported") demo.add_argument("--card-type", default="全部", help="Card type; official docs mark this deprecated") demo.add_argument("--timeout", type=int, default=300, help="Polling timeout in seconds") demo.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") demo.add_argument("--keep-number", action="store_true", help="Do not release the number at the end") return parser def main() -> int: parser = build_parser() args = parser.parse_args() client = LubanSMSClient(api_key=args.api_key, base_url=args.base_url, http_timeout=args.http_timeout) try: if args.command == "balance": print_json(require_success(client.get_balance())) return 0 if args.command == "request-number": payload = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type)) formats = format_cn86_phone(payload.get("phone", "")) payload["phone"] = formats.raw payload["phone_e164"] = formats.e164 print_json(payload) return 0 if args.command == "get-sms": payload = require_success(client.get_keyword_sms(phone=args.phone, keyword=args.keyword), allow_wait=True) if payload.get("code") == 0: payload["verification_code"] = extract_verification_code(str(payload.get("msg", ""))) print_json(payload) return 0 if args.command == "wait-code": result = wait_for_code(client, phone=args.phone, keyword=args.keyword, timeout=args.timeout, interval=args.interval) if args.release_on_exit: try: result["release"] = require_success(client.release_keyword_number(args.phone)) except Exception as exc: result["release_error"] = str(exc) print_json(result) return 0 if result.get("status") == "success" else 2 if args.command == "release": print_json(require_success(client.release_keyword_number(args.phone))) return 0 if args.command == "history": print_json(require_success(client.keyword_sms_history(page=args.page))) return 0 if args.command == "demo": allocation = require_success(client.request_keyword_number(phone=args.phone, card_type=args.card_type)) formats = format_cn86_phone(allocation.get("phone", "")) result: Dict[str, Any] = { "status": "allocated", "keyword": args.keyword, "phone": formats.raw, "phone_e164": formats.e164, "allocation": allocation, "notes": [ "Trigger the target site to send SMS after allocation.", "For sites like Qwen, pass phoneCode=86 and the raw phone digits.", ], } try: poll_result = wait_for_code(client, phone=formats.raw, keyword=args.keyword, timeout=args.timeout, interval=args.interval) result.update(poll_result) except LubanSMSError as exc: result["status"] = "error" result["error"] = str(exc) finally: if not args.keep_number: try: result["release"] = require_success(client.release_keyword_number(formats.raw)) except Exception as exc: result["release_error"] = str(exc) print_json(result) return 0 if result.get("status") == "success" else 2 parser.error(f"unsupported command: {args.command}") return 1 except LubanSMSError as exc: print_json({"error": str(exc), "command": args.command}) return 1 if __name__ == "__main__": sys.exit(main())