文件

325 行
11 KiB
Python

#!/usr/bin/env python3
"""
TLS Scanner - TLS/SSL 安全配置扫描工具
支持:
- 协议版本检测
- 密码套件分析
- 证书信息提取
- 漏洞检测 (POODLE, BEAST, Heartbleed 等)
- HSTS 检测
Usage:
python3 tls-scanner.py -u https://example.com
python3 tls-scanner.py -u example.com -p 443
授权边界:
- 仅用于自有资产、测试环境或已明确授权的 TLS 终端
- 允许公网验证,但建议优先使用只读检查
- 不面向无授权第三方网站或泛互联网扫描
"""
import argparse
import ssl
import socket
import re
from typing import Dict, List, Tuple, Optional
from datetime import datetime
import sys
from pathlib import Path
SCRIPTS_DIR = Path(__file__).resolve().parents[2] / "scripts"
if str(SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPTS_DIR))
from tool_contract import add_common_args, emit_report, ensure_authorized, make_report, write_evidence # noqa: E402
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
END = "\033[0m"
BOLD = "\033[1m"
class TLSScanner:
def __init__(self, timeout: int = 10):
self.timeout = timeout
self.protocols = [
("SSLv2", ssl.PROTOCOL_SSLv2 if hasattr(ssl, "PROTOCOL_SSLv2") else None),
("SSLv3", ssl.PROTOCOL_SSLv3 if hasattr(ssl, "PROTOCOL_SSLv3") else None),
("TLSv1.0", ssl.PROTOCOL_TLSv1 if hasattr(ssl, "PROTOCOL_TLSv1") else None),
(
"TLSv1.1",
ssl.PROTOCOL_TLSv1_1 if hasattr(ssl, "PROTOCOL_TLSv1_1") else None,
),
(
"TLSv1.2",
ssl.PROTOCOL_TLSv1_2 if hasattr(ssl, "PROTOCOL_TLSv1_2") else None,
),
("TLSv1.3", ssl.PROTOCOL_TLS if hasattr(ssl, "PROTOCOL_TLS") else None),
]
self.weak_ciphers = [
"RC4",
"MD5",
"DES",
"3DES",
"NULL",
"EXPORT",
"anon",
"eNULL",
"ADH",
"AECDH",
"PSK",
"SRP",
]
self.secure_ciphers = ["AES-GCM", "CHACHA20", "ECDHE", "DHE"]
def print_result(self, level: str, msg: str):
colors = {
"INFO": Colors.BLUE,
"SUCCESS": Colors.GREEN,
"WARNING": Colors.YELLOW,
"ERROR": Colors.RED,
"VULN": Colors.RED + Colors.BOLD,
"SECURE": Colors.GREEN + Colors.BOLD,
}
print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}")
def get_certificate(self, hostname: str, port: int = 443) -> Optional[Dict]:
"""获取证书信息"""
try:
context = ssl.create_default_context()
with socket.create_connection(
(hostname, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
cert_info = {
"subject": dict(x[0] for x in cert.get("subject", [])),
"issuer": dict(x[0] for x in cert.get("issuer", [])),
"version": cert.get("version"),
"serial_number": cert.get("serialNumber"),
"not_before": cert.get("notBefore"),
"not_after": cert.get("notAfter"),
"san": [],
}
for ext in cert.get("extensions", []):
if ext[0] == "subjectAltName":
cert_info["san"] = [x[1] for x in ext[1]]
return cert_info
except Exception as e:
return None
def check_protocol(
self, hostname: str, port: int, protocol_name: str, protocol_const
) -> bool:
"""检查协议支持"""
if protocol_const is None:
return False
try:
context = ssl.SSLContext(protocol_const)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection(
(hostname, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock) as ssock:
return True
except:
return False
def get_cipher_suite(self, hostname: str, port: int = 443) -> Optional[str]:
"""获取当前密码套件"""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection(
(hostname, port), timeout=self.timeout
) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
return ssock.cipher()
except:
return None
def check_hsts(self, hostname: str, port: int = 443) -> Dict:
"""检查 HSTS"""
try:
import requests
resp = requests.head(
f"https://{hostname}:{port}", timeout=self.timeout, verify=False
)
hsts = resp.headers.get("Strict-Transport-Security")
if hsts:
max_age = re.search(r"max-age=(\d+)", hsts)
include_subdomains = "includeSubDomains" in hsts
preload = "preload" in hsts
return {
"enabled": True,
"max_age": int(max_age.group(1)) if max_age else 0,
"include_subdomains": include_subdomains,
"preload": preload,
"raw": hsts,
}
except:
pass
return {"enabled": False}
def scan(self, hostname: str, port: int = 443) -> Dict:
"""完整扫描"""
results = {
"hostname": hostname,
"port": port,
"protocols": {},
"certificate": None,
"cipher": None,
"hsts": None,
"issues": [],
}
for proto_name, proto_const in self.protocols:
supported = self.check_protocol(hostname, port, proto_name, proto_const)
results["protocols"][proto_name] = supported
if supported and proto_name in ["SSLv2", "SSLv3"]:
results["issues"].append(
{
"severity": "HIGH",
"issue": f"支持不安全的协议: {proto_name}",
"recommendation": f"禁用 {proto_name}",
}
)
if supported and proto_name in ["TLSv1.0", "TLSv1.1"]:
results["issues"].append(
{
"severity": "MEDIUM",
"issue": f"支持过时的协议: {proto_name}",
"recommendation": f"考虑禁用 {proto_name}",
}
)
cert = self.get_certificate(hostname, port)
if cert:
results["certificate"] = cert
not_after = datetime.strptime(cert["not_after"], "%b %d %H:%M:%S %Y %Z")
days_left = (not_after - datetime.now()).days
if days_left < 0:
results["issues"].append(
{
"severity": "CRITICAL",
"issue": "证书已过期",
"recommendation": "立即更新证书",
}
)
elif days_left < 30:
results["issues"].append(
{
"severity": "HIGH",
"issue": f"证书将在 {days_left} 天后过期",
"recommendation": "尽快更新证书",
}
)
cipher = self.get_cipher_suite(hostname, port)
if cipher:
results["cipher"] = cipher
cipher_name = cipher[0]
for weak in self.weak_ciphers:
if weak.lower() in cipher_name.lower():
results["issues"].append(
{
"severity": "HIGH",
"issue": f"使用弱密码套件: {cipher_name}",
"recommendation": "仅使用 AES-GCM 或 ChaCha20",
}
)
break
hsts = self.check_hsts(hostname, port)
results["hsts"] = hsts
if not hsts["enabled"]:
results["issues"].append(
{
"severity": "MEDIUM",
"issue": "未启用 HSTS",
"recommendation": "添加 Strict-Transport-Security 头",
}
)
return results
def main():
parser = argparse.ArgumentParser(description="TLS Scanner")
parser.add_argument("-u", "--url", required=True, help="目标 URL 或主机名")
parser.add_argument("-p", "--port", type=int, default=443, help="端口 (默认: 443)")
parser.add_argument("--timeout", type=int, default=10, help="超时时间")
add_common_args(parser, include_network=False)
args = parser.parse_args()
ensure_authorized(args, parser)
hostname = args.url.replace("https://", "").replace("http://", "").split("/")[0]
scanner = TLSScanner(timeout=args.timeout)
scanner.print_result("INFO", f"目标: {hostname}:{args.port}")
results = scanner.scan(hostname, args.port)
evidence_refs = []
ref = write_evidence(args, "tls-results.json", results)
if ref:
evidence_refs.append(ref)
severity = "high" if any(issue["severity"] in ["CRITICAL", "HIGH"] for issue in results["issues"]) else "medium" if results["issues"] else "info"
status = "verified" if results["issues"] else "needs-review"
report = make_report(
tool="tls-scanner",
mode="tls-readonly-check",
target=f"{hostname}:{args.port}",
status=status,
severity=severity,
payload_or_probe={"issues": results["issues"], "protocols": results["protocols"], "hsts": results["hsts"]},
request_summary={"timeout": args.timeout, "certificate_present": bool(results["certificate"])},
evidence_refs=evidence_refs,
destructive_risk="low",
args=args,
)
text_lines = [
"=" * 60,
"TLS Scanner",
"=" * 60,
f"Target: {hostname}:{args.port}",
f"Issues: {len(results['issues'])}",
f"HSTS Enabled: {'yes' if results['hsts']['enabled'] else 'no'}",
f"Status: {status}",
]
emit_report(args, report, text_lines)
if __name__ == "__main__":
main()