初始化: Web安全攻防知识库
- 靶场环境: DVWA/WebGoat/Pikachu/BWAPP/SQLi-Labs/XSS-Labs - SQL注入工具: sqli-scanner.py, blind-sqli.py, sqli-exploit.go - XSS工具: xss-fuzzer.py, xss-scanner.go - 认证攻击: web-brute.py, jwt-cracker.py - 服务端安全: port-scanner.py, tls-scanner.py - 防御配置: nginx-hardening.conf - 案例研究: 福建政采网安全评估报告 (13份) - 同步脚本: sync-gitea.sh
这个提交包含在:
@@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Port Scanner - 多线程端口扫描工具
|
||||
|
||||
支持:
|
||||
- TCP Connect 扫描
|
||||
- SYN 扫描 (需要 root)
|
||||
- 服务指纹识别
|
||||
- 多线程扫描
|
||||
- 自定义端口范围
|
||||
|
||||
Usage:
|
||||
python3 port-scanner.py -H 192.168.1.1 -p 1-1000
|
||||
python3 port-scanner.py -H 192.168.1.1 -p 80,443,8080
|
||||
python3 port-scanner.py -H 192.168.1.1 --top-ports 100
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
import sys
|
||||
|
||||
|
||||
class Colors:
|
||||
RED = "\033[91m"
|
||||
GREEN = "\033[92m"
|
||||
YELLOW = "\033[93m"
|
||||
BLUE = "\033[94m"
|
||||
CYAN = "\033[96m"
|
||||
END = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
|
||||
|
||||
class PortScanner:
|
||||
def __init__(self, threads: int = 100, timeout: float = 1.0):
|
||||
self.threads = threads
|
||||
self.timeout = timeout
|
||||
self.open_ports = []
|
||||
self.lock = threading.Lock()
|
||||
|
||||
self.service_banners = {
|
||||
21: "FTP",
|
||||
22: "SSH",
|
||||
23: "Telnet",
|
||||
25: "SMTP",
|
||||
53: "DNS",
|
||||
80: "HTTP",
|
||||
110: "POP3",
|
||||
135: "RPC",
|
||||
139: "NetBIOS",
|
||||
143: "IMAP",
|
||||
443: "HTTPS",
|
||||
445: "SMB",
|
||||
993: "IMAPS",
|
||||
995: "POP3S",
|
||||
1433: "MSSQL",
|
||||
1521: "Oracle",
|
||||
3306: "MySQL",
|
||||
3389: "RDP",
|
||||
5432: "PostgreSQL",
|
||||
5900: "VNC",
|
||||
6379: "Redis",
|
||||
8080: "HTTP-Proxy",
|
||||
8443: "HTTPS-Alt",
|
||||
8888: "HTTP-Alt",
|
||||
9000: "PHP-FPM",
|
||||
9200: "Elasticsearch",
|
||||
27017: "MongoDB",
|
||||
}
|
||||
|
||||
self.top_ports = [
|
||||
21,
|
||||
22,
|
||||
23,
|
||||
25,
|
||||
53,
|
||||
80,
|
||||
110,
|
||||
111,
|
||||
135,
|
||||
139,
|
||||
143,
|
||||
443,
|
||||
445,
|
||||
993,
|
||||
995,
|
||||
1433,
|
||||
1434,
|
||||
1723,
|
||||
3306,
|
||||
3389,
|
||||
5432,
|
||||
5900,
|
||||
6379,
|
||||
8000,
|
||||
8080,
|
||||
8443,
|
||||
8888,
|
||||
9000,
|
||||
9090,
|
||||
9200,
|
||||
27017,
|
||||
]
|
||||
|
||||
def print_result(self, level: str, msg: str):
|
||||
colors = {
|
||||
"INFO": Colors.BLUE,
|
||||
"SUCCESS": Colors.GREEN,
|
||||
"WARNING": Colors.YELLOW,
|
||||
"ERROR": Colors.RED,
|
||||
"OPEN": Colors.GREEN + Colors.BOLD,
|
||||
}
|
||||
print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}")
|
||||
|
||||
def parse_ports(self, port_str: str) -> List[int]:
|
||||
"""解析端口字符串"""
|
||||
ports = set()
|
||||
|
||||
for part in port_str.split(","):
|
||||
if "-" in part:
|
||||
start, end = part.split("-")
|
||||
ports.update(range(int(start), int(end) + 1))
|
||||
else:
|
||||
ports.add(int(part))
|
||||
|
||||
return sorted(ports)
|
||||
|
||||
def scan_port(self, host: str, port: int) -> Tuple[int, str, Optional[str]]:
|
||||
"""扫描单个端口"""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(self.timeout)
|
||||
|
||||
result = sock.connect_ex((host, port))
|
||||
|
||||
if result == 0:
|
||||
service = self.service_banners.get(port, "Unknown")
|
||||
|
||||
banner = None
|
||||
try:
|
||||
sock.send(b"HEAD / HTTP/1.0\r\n\r\n")
|
||||
banner = sock.recv(1024).decode("utf-8", errors="ignore").strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
sock.close()
|
||||
return port, "open", banner or service
|
||||
|
||||
sock.close()
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return port, "closed", None
|
||||
|
||||
def scan_host(
|
||||
self, host: str, ports: List[int], verbose: bool = True
|
||||
) -> List[Dict]:
|
||||
"""扫描主机"""
|
||||
results = []
|
||||
total = len(ports)
|
||||
|
||||
if verbose:
|
||||
self.print_result("INFO", f"开始扫描 {host}: {total} 个端口")
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=self.threads) as executor:
|
||||
futures = {
|
||||
executor.submit(self.scan_port, host, port): port for port in ports
|
||||
}
|
||||
|
||||
completed = 0
|
||||
for future in as_completed(futures):
|
||||
port, status, banner = future.result()
|
||||
completed += 1
|
||||
|
||||
if verbose and completed % 100 == 0:
|
||||
print(
|
||||
f"\r{Colors.CYAN}[*]{Colors.END} 进度: {completed}/{total}",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
if status == "open":
|
||||
result = {
|
||||
"port": port,
|
||||
"status": status,
|
||||
"service": self.service_banners.get(port, "Unknown"),
|
||||
"banner": banner,
|
||||
}
|
||||
results.append(result)
|
||||
|
||||
with self.lock:
|
||||
self.open_ports.append((host, port))
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"\n{Colors.GREEN}[OPEN]{Colors.END} {host}:{port} - {banner[:50] if banner else self.service_banners.get(port, 'Unknown')}"
|
||||
)
|
||||
|
||||
if verbose:
|
||||
print()
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
if verbose:
|
||||
self.print_result(
|
||||
"INFO", f"扫描完成: {len(results)} 个开放端口, 耗时 {elapsed:.2f}s"
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Port Scanner")
|
||||
parser.add_argument("-H", "--host", required=True, help="目标主机")
|
||||
parser.add_argument(
|
||||
"-p", "--ports", default="1-1000", help="端口范围 (例: 1-1000, 80,443,8080)"
|
||||
)
|
||||
parser.add_argument("--top-ports", type=int, help="扫描最常用的 N 个端口")
|
||||
parser.add_argument("-t", "--threads", type=int, default=100, help="线程数")
|
||||
parser.add_argument("--timeout", type=float, default=1.0, help="超时时间")
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="详细输出")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
scanner = PortScanner(threads=args.threads, timeout=args.timeout)
|
||||
|
||||
if args.top_ports:
|
||||
ports = scanner.top_ports[: args.top_ports]
|
||||
else:
|
||||
ports = scanner.parse_ports(args.ports)
|
||||
|
||||
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}")
|
||||
print(f"{Colors.BOLD}Port Scanner{Colors.END}")
|
||||
print(f"{Colors.BOLD}{'=' * 60}{Colors.END}\n")
|
||||
|
||||
scanner.print_result("INFO", f"目标: {args.host}")
|
||||
scanner.print_result("INFO", f"端口: {len(ports)} 个")
|
||||
scanner.print_result("INFO", f"线程: {args.threads}")
|
||||
|
||||
results = scanner.scan_host(args.host, ports, args.verbose)
|
||||
|
||||
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}")
|
||||
if results:
|
||||
scanner.print_result("SUCCESS", f"发现 {len(results)} 个开放端口:")
|
||||
print(f"\n{'PORT':<10} {'SERVICE':<15} {'BANNER'}")
|
||||
print("-" * 60)
|
||||
for r in sorted(results, key=lambda x: x["port"]):
|
||||
banner = r["banner"][:40] if r["banner"] else r["service"]
|
||||
print(f"{r['port']:<10} {r['service']:<15} {banner}")
|
||||
else:
|
||||
scanner.print_result("INFO", "未发现开放端口")
|
||||
print(f"{Colors.BOLD}{'=' * 60}{Colors.END}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,340 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TLS Scanner - TLS/SSL 安全配置扫描工具
|
||||
|
||||
支持:
|
||||
- 协议版本检测
|
||||
- 密码套件分析
|
||||
- 证书信息提取
|
||||
- 漏洞检测 (POODLE, BEAST, Heartbleed 等)
|
||||
- HSTS 检测
|
||||
|
||||
Usage:
|
||||
python3 tls-scanner.py -u https://example.com
|
||||
python3 tls-scanner.py -u example.com -p 443
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ssl
|
||||
import socket
|
||||
import re
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from datetime import datetime
|
||||
import sys
|
||||
|
||||
|
||||
class Colors:
|
||||
RED = "\033[91m"
|
||||
GREEN = "\033[92m"
|
||||
YELLOW = "\033[93m"
|
||||
BLUE = "\033[94m"
|
||||
CYAN = "\033[96m"
|
||||
END = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
|
||||
|
||||
class TLSScanner:
|
||||
def __init__(self, timeout: int = 10):
|
||||
self.timeout = timeout
|
||||
|
||||
self.protocols = [
|
||||
("SSLv2", ssl.PROTOCOL_SSLv2 if hasattr(ssl, "PROTOCOL_SSLv2") else None),
|
||||
("SSLv3", ssl.PROTOCOL_SSLv3 if hasattr(ssl, "PROTOCOL_SSLv3") else None),
|
||||
("TLSv1.0", ssl.PROTOCOL_TLSv1 if hasattr(ssl, "PROTOCOL_TLSv1") else None),
|
||||
(
|
||||
"TLSv1.1",
|
||||
ssl.PROTOCOL_TLSv1_1 if hasattr(ssl, "PROTOCOL_TLSv1_1") else None,
|
||||
),
|
||||
(
|
||||
"TLSv1.2",
|
||||
ssl.PROTOCOL_TLSv1_2 if hasattr(ssl, "PROTOCOL_TLSv1_2") else None,
|
||||
),
|
||||
("TLSv1.3", ssl.PROTOCOL_TLS if hasattr(ssl, "PROTOCOL_TLS") else None),
|
||||
]
|
||||
|
||||
self.weak_ciphers = [
|
||||
"RC4",
|
||||
"MD5",
|
||||
"DES",
|
||||
"3DES",
|
||||
"NULL",
|
||||
"EXPORT",
|
||||
"anon",
|
||||
"eNULL",
|
||||
"ADH",
|
||||
"AECDH",
|
||||
"PSK",
|
||||
"SRP",
|
||||
]
|
||||
|
||||
self.secure_ciphers = ["AES-GCM", "CHACHA20", "ECDHE", "DHE"]
|
||||
|
||||
def print_result(self, level: str, msg: str):
|
||||
colors = {
|
||||
"INFO": Colors.BLUE,
|
||||
"SUCCESS": Colors.GREEN,
|
||||
"WARNING": Colors.YELLOW,
|
||||
"ERROR": Colors.RED,
|
||||
"VULN": Colors.RED + Colors.BOLD,
|
||||
"SECURE": Colors.GREEN + Colors.BOLD,
|
||||
}
|
||||
print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}")
|
||||
|
||||
def get_certificate(self, hostname: str, port: int = 443) -> Optional[Dict]:
|
||||
"""获取证书信息"""
|
||||
try:
|
||||
context = ssl.create_default_context()
|
||||
with socket.create_connection(
|
||||
(hostname, port), timeout=self.timeout
|
||||
) as sock:
|
||||
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
|
||||
cert = ssock.getpeercert()
|
||||
|
||||
cert_info = {
|
||||
"subject": dict(x[0] for x in cert.get("subject", [])),
|
||||
"issuer": dict(x[0] for x in cert.get("issuer", [])),
|
||||
"version": cert.get("version"),
|
||||
"serial_number": cert.get("serialNumber"),
|
||||
"not_before": cert.get("notBefore"),
|
||||
"not_after": cert.get("notAfter"),
|
||||
"san": [],
|
||||
}
|
||||
|
||||
for ext in cert.get("extensions", []):
|
||||
if ext[0] == "subjectAltName":
|
||||
cert_info["san"] = [x[1] for x in ext[1]]
|
||||
|
||||
return cert_info
|
||||
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def check_protocol(
|
||||
self, hostname: str, port: int, protocol_name: str, protocol_const
|
||||
) -> bool:
|
||||
"""检查协议支持"""
|
||||
if protocol_const is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
context = ssl.SSLContext(protocol_const)
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
with socket.create_connection(
|
||||
(hostname, port), timeout=self.timeout
|
||||
) as sock:
|
||||
with context.wrap_socket(sock) as ssock:
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def get_cipher_suite(self, hostname: str, port: int = 443) -> Optional[str]:
|
||||
"""获取当前密码套件"""
|
||||
try:
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
with socket.create_connection(
|
||||
(hostname, port), timeout=self.timeout
|
||||
) as sock:
|
||||
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
|
||||
return ssock.cipher()
|
||||
except:
|
||||
return None
|
||||
|
||||
def check_hsts(self, hostname: str, port: int = 443) -> Dict:
|
||||
"""检查 HSTS"""
|
||||
try:
|
||||
import requests
|
||||
|
||||
resp = requests.head(
|
||||
f"https://{hostname}:{port}", timeout=self.timeout, verify=False
|
||||
)
|
||||
|
||||
hsts = resp.headers.get("Strict-Transport-Security")
|
||||
if hsts:
|
||||
max_age = re.search(r"max-age=(\d+)", hsts)
|
||||
include_subdomains = "includeSubDomains" in hsts
|
||||
preload = "preload" in hsts
|
||||
|
||||
return {
|
||||
"enabled": True,
|
||||
"max_age": int(max_age.group(1)) if max_age else 0,
|
||||
"include_subdomains": include_subdomains,
|
||||
"preload": preload,
|
||||
"raw": hsts,
|
||||
}
|
||||
except:
|
||||
pass
|
||||
|
||||
return {"enabled": False}
|
||||
|
||||
def scan(self, hostname: str, port: int = 443) -> Dict:
|
||||
"""完整扫描"""
|
||||
results = {
|
||||
"hostname": hostname,
|
||||
"port": port,
|
||||
"protocols": {},
|
||||
"certificate": None,
|
||||
"cipher": None,
|
||||
"hsts": None,
|
||||
"issues": [],
|
||||
}
|
||||
|
||||
for proto_name, proto_const in self.protocols:
|
||||
supported = self.check_protocol(hostname, port, proto_name, proto_const)
|
||||
results["protocols"][proto_name] = supported
|
||||
|
||||
if supported and proto_name in ["SSLv2", "SSLv3"]:
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "HIGH",
|
||||
"issue": f"支持不安全的协议: {proto_name}",
|
||||
"recommendation": f"禁用 {proto_name}",
|
||||
}
|
||||
)
|
||||
|
||||
if supported and proto_name in ["TLSv1.0", "TLSv1.1"]:
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "MEDIUM",
|
||||
"issue": f"支持过时的协议: {proto_name}",
|
||||
"recommendation": f"考虑禁用 {proto_name}",
|
||||
}
|
||||
)
|
||||
|
||||
cert = self.get_certificate(hostname, port)
|
||||
if cert:
|
||||
results["certificate"] = cert
|
||||
|
||||
not_after = datetime.strptime(cert["not_after"], "%b %d %H:%M:%S %Y %Z")
|
||||
days_left = (not_after - datetime.now()).days
|
||||
|
||||
if days_left < 0:
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "CRITICAL",
|
||||
"issue": "证书已过期",
|
||||
"recommendation": "立即更新证书",
|
||||
}
|
||||
)
|
||||
elif days_left < 30:
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "HIGH",
|
||||
"issue": f"证书将在 {days_left} 天后过期",
|
||||
"recommendation": "尽快更新证书",
|
||||
}
|
||||
)
|
||||
|
||||
cipher = self.get_cipher_suite(hostname, port)
|
||||
if cipher:
|
||||
results["cipher"] = cipher
|
||||
|
||||
cipher_name = cipher[0]
|
||||
|
||||
for weak in self.weak_ciphers:
|
||||
if weak.lower() in cipher_name.lower():
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "HIGH",
|
||||
"issue": f"使用弱密码套件: {cipher_name}",
|
||||
"recommendation": "仅使用 AES-GCM 或 ChaCha20",
|
||||
}
|
||||
)
|
||||
break
|
||||
|
||||
hsts = self.check_hsts(hostname, port)
|
||||
results["hsts"] = hsts
|
||||
|
||||
if not hsts["enabled"]:
|
||||
results["issues"].append(
|
||||
{
|
||||
"severity": "MEDIUM",
|
||||
"issue": "未启用 HSTS",
|
||||
"recommendation": "添加 Strict-Transport-Security 头",
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="TLS Scanner")
|
||||
parser.add_argument("-u", "--url", required=True, help="目标 URL 或主机名")
|
||||
parser.add_argument("-p", "--port", type=int, default=443, help="端口 (默认: 443)")
|
||||
parser.add_argument("--timeout", type=int, default=10, help="超时时间")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hostname = args.url.replace("https://", "").replace("http://", "").split("/")[0]
|
||||
|
||||
scanner = TLSScanner(timeout=args.timeout)
|
||||
|
||||
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}")
|
||||
print(f"{Colors.BOLD}TLS Scanner{Colors.END}")
|
||||
print(f"{Colors.BOLD}{'=' * 60}{Colors.END}\n")
|
||||
|
||||
scanner.print_result("INFO", f"目标: {hostname}:{args.port}")
|
||||
|
||||
print(f"\n{Colors.CYAN}[*] 扫描协议支持...{Colors.END}")
|
||||
results = scanner.scan(hostname, args.port)
|
||||
|
||||
print(f"\n{Colors.CYAN}协议支持:{Colors.END}")
|
||||
for proto, supported in results["protocols"].items():
|
||||
status = (
|
||||
f"{Colors.GREEN}支持{Colors.END}"
|
||||
if supported
|
||||
else f"{Colors.RED}不支持{Colors.END}"
|
||||
)
|
||||
if supported and proto in ["SSLv2", "SSLv3"]:
|
||||
status = f"{Colors.RED}支持 (不安全){Colors.END}"
|
||||
elif supported and proto in ["TLSv1.0", "TLSv1.1"]:
|
||||
status = f"{Colors.YELLOW}支持 (过时){Colors.END}"
|
||||
print(f" {proto:<10} {status}")
|
||||
|
||||
if results["cipher"]:
|
||||
print(f"\n{Colors.CYAN}当前密码套件:{Colors.END}")
|
||||
cipher_name, cipher_proto, cipher_bits = results["cipher"]
|
||||
print(f" 名称: {cipher_name}")
|
||||
print(f" 协议: {cipher_proto}")
|
||||
print(f" 密钥长度: {cipher_bits} bits")
|
||||
|
||||
if results["certificate"]:
|
||||
print(f"\n{Colors.CYAN}证书信息:{Colors.END}")
|
||||
cert = results["certificate"]
|
||||
print(f" 主题: {cert['subject'].get('commonName', 'N/A')}")
|
||||
print(f" 颁发者: {cert['issuer'].get('commonName', 'N/A')}")
|
||||
print(f" 有效期: {cert['not_before']} - {cert['not_after']}")
|
||||
|
||||
print(f"\n{Colors.CYAN}HSTS:{Colors.END}")
|
||||
if results["hsts"]["enabled"]:
|
||||
print(f" 状态: {Colors.GREEN}已启用{Colors.END}")
|
||||
print(f" Max-Age: {results['hsts']['max_age']} 秒")
|
||||
print(
|
||||
f" IncludeSubDomains: {'是' if results['hsts']['include_subdomains'] else '否'}"
|
||||
)
|
||||
print(f" Preload: {'是' if results['hsts']['preload'] else '否'}")
|
||||
else:
|
||||
print(f" 状态: {Colors.RED}未启用{Colors.END}")
|
||||
|
||||
print(f"\n{Colors.CYAN}安全问题:{Colors.END}")
|
||||
if results["issues"]:
|
||||
for issue in sorted(results["issues"], key=lambda x: x["severity"]):
|
||||
color = (
|
||||
Colors.RED
|
||||
if issue["severity"] in ["CRITICAL", "HIGH"]
|
||||
else Colors.YELLOW
|
||||
)
|
||||
print(f" {color}[{issue['severity']}]{Colors.END} {issue['issue']}")
|
||||
print(f" 建议: {issue['recommendation']}")
|
||||
else:
|
||||
print(f" {Colors.GREEN}未发现安全问题{Colors.END}")
|
||||
|
||||
print(f"\n{Colors.BOLD}{'=' * 60}{Colors.END}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
在新工单中引用
屏蔽一个用户