617 行
28 KiB
Python
617 行
28 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List
|
|
|
|
from intel.config import FRAMEWORK_ROOT, GENERATED_DIR, REGISTRY_ROOT, ROOT, SECURE_CODE_ROOT, SYSTEMS_DIR, TRIAGE_DIR
|
|
from intel.models import AdvisoryRecord
|
|
from intel.utils import ensure_dir, isoformat, now_utc, write_json, write_text
|
|
from lab.render import render_dashboard as render_lab_dashboard
|
|
from lab.repro import annotate_with_latest_run, latest_runs_by_advisory
|
|
|
|
|
|
UTC = timezone.utc
|
|
LANGUAGES = [
|
|
"javascript-typescript",
|
|
"nodejs",
|
|
"java",
|
|
"php",
|
|
"python",
|
|
"ruby",
|
|
"csharp",
|
|
"go",
|
|
]
|
|
|
|
|
|
TOPIC_DESCRIPTIONS = {
|
|
"xss-output-encoding": "对不可信输入做上下文输出编码,避免 HTML、属性和脚本上下文执行。",
|
|
"dom-sink-hardening": "限制 `innerHTML`、模板拼接和 DOM sink 的直接写入。",
|
|
"csp-trusted-types": "用 CSP 和 Trusted Types 缩小前端执行面。",
|
|
"token-cookie-storage": "避免把敏感令牌长期暴露在可读浏览器存储中。",
|
|
"authz-server-side-recheck": "前置代理或 middleware 不能替代服务端最终鉴权。",
|
|
"ssrf-url-validation": "对 URL、协议、IP 和重定向做 allowlist 校验。",
|
|
"request-smuggling-boundary": "统一代理层和应用层的请求边界解释。",
|
|
"path-traversal-guard": "标准化路径并限制访问根目录。",
|
|
"file-upload-validation": "校验文件类型、名称、存储位置和执行权限。",
|
|
"plugin-extension-trust-policy": "限制第三方插件、模块和主题的信任边界。",
|
|
"dependency-upgrade-policy": "用自动化升级、锁版本和审查策略降低供应链风险。",
|
|
"proxy-trust-boundary": "只信任明确代理并限制头部透传。",
|
|
"deserialization-safety": "避免对不可信数据做危险反序列化。",
|
|
"template-injection-guard": "模板上下文中禁用危险表达式执行。",
|
|
}
|
|
|
|
TOPIC_SCENARIOS = {
|
|
"xss-output-encoding": "适用于模板输出、服务端渲染片段和后台管理界面回显场景。",
|
|
"dom-sink-hardening": "适用于前端模板拼接、Markdown 渲染器和富文本预览逻辑。",
|
|
"csp-trusted-types": "适用于高风险前端应用、管理端和需要限制脚本执行面的页面。",
|
|
"token-cookie-storage": "适用于浏览器端会话、管理接口令牌和单页应用认证态。",
|
|
"authz-server-side-recheck": "适用于代理层放行、路由守卫和后端最终授权重新确认。",
|
|
"ssrf-url-validation": "适用于 webhook、URL 导入、远程图片抓取和插件联动调用。",
|
|
"request-smuggling-boundary": "适用于代理链、WAF、CDN 和应用服务器之间的请求解析边界。",
|
|
"path-traversal-guard": "适用于下载、导入、附件预览和主题/模板读取路径。",
|
|
"file-upload-validation": "适用于媒体上传、插件安装、主题导入和日志附件接收。",
|
|
"plugin-extension-trust-policy": "适用于插件市场、主题仓库、第三方扩展和模块化系统。",
|
|
"dependency-upgrade-policy": "适用于 lockfile、SBOM、CI 审查和供应链更新节奏治理。",
|
|
"proxy-trust-boundary": "适用于真实 IP 透传、认证头转发和反向代理旁路风险。",
|
|
"deserialization-safety": "适用于缓存、任务队列、对象恢复和跨服务消息传递。",
|
|
"template-injection-guard": "适用于 SSR、模板引擎、邮件渲染和后台自定义视图。",
|
|
}
|
|
|
|
BAD_GOOD_SNIPPETS = {
|
|
"javascript-typescript": (
|
|
"const output = `<div>${userInput}</div>`;",
|
|
"const output = `<div>${escapeHtml(userInput)}</div>`;",
|
|
),
|
|
"nodejs": (
|
|
"res.send(`<div>${req.query.q}</div>`);",
|
|
"res.send(`<div>${escapeHtml(req.query.q)}</div>`);",
|
|
),
|
|
"java": (
|
|
"response.getWriter().write(\"<div>\" + value + \"</div>\");",
|
|
"response.getWriter().write(\"<div>\" + HtmlUtils.htmlEscape(value) + \"</div>\");",
|
|
),
|
|
"php": (
|
|
"echo \"<div>{$value}</div>\";",
|
|
"echo '<div>' . htmlspecialchars($value, ENT_QUOTES, 'UTF-8') . '</div>';",
|
|
),
|
|
"python": (
|
|
"return f\"<div>{value}</div>\"",
|
|
"return f\"<div>{escape(value)}</div>\"",
|
|
),
|
|
"ruby": (
|
|
"render inline: \"<div>#{value}</div>\"",
|
|
"render inline: \"<div>#{ERB::Util.html_escape(value)}</div>\"",
|
|
),
|
|
"csharp": (
|
|
"return Content($\"<div>{value}</div>\", \"text/html\");",
|
|
"return Content($\"<div>{HtmlEncoder.Default.Encode(value)}</div>\", \"text/html\");",
|
|
),
|
|
"go": (
|
|
"fmt.Fprintf(w, \"<div>%s</div>\", value)",
|
|
"template.HTMLEscape(w, []byte(value))",
|
|
),
|
|
}
|
|
|
|
|
|
def _failure_text(item: Any) -> str:
|
|
if isinstance(item, dict):
|
|
return item.get("summary") or f"{item.get('system_id')}::{item.get('source_name')}::{item.get('category')}::{item.get('message')}"
|
|
return str(item)
|
|
|
|
|
|
SOURCE_KIND_URLS = {
|
|
"ghsa-global": "https://github.com/advisories",
|
|
"osv-batch": "https://osv.dev/",
|
|
"nvd-search": "https://nvd.nist.gov/vuln/search",
|
|
"kev-json": "https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
|
|
"rss-feed": "https://www.rssboard.org/rss-specification",
|
|
"atom-feed": "https://datatracker.ietf.org/doc/html/rfc4287",
|
|
"json-feed": "https://www.jsonfeed.org/version/1.1/",
|
|
"vendor-index": "https://example.com/vendor-index",
|
|
}
|
|
|
|
TARGET_TYPES = ["lab-local", "lab-public", "authorized-third-party"]
|
|
MINIMAL_VALIDATION_GUIDANCE = "最小化验证、只读探测、可审计回显、受控注入。"
|
|
FORBIDDEN_SCENARIOS = [
|
|
"无归属证明或无明确授权的公网目标",
|
|
"知名公共网站或与测试无关的第三方资产",
|
|
"会造成持久破坏、数据越权下载或不可回滚影响的动作",
|
|
]
|
|
|
|
|
|
def _merged_item(item: AdvisoryRecord, run_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
|
|
return annotate_with_latest_run(item.to_dict(), run_map.get(item.canonical_id))
|
|
|
|
|
|
def _status_counts(items: List[Dict[str, Any]]) -> Dict[str, int]:
|
|
counts = {"verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0}
|
|
for item in items:
|
|
status = item.get("verification_status")
|
|
if status == "verified-real":
|
|
counts["verified_real"] += 1
|
|
elif status == "verified-synthetic":
|
|
counts["verified_synthetic"] += 1
|
|
elif status and status.startswith("blocked-"):
|
|
counts["blocked"] += 1
|
|
else:
|
|
counts["manual"] += 1
|
|
return counts
|
|
|
|
|
|
def _recent_count(items: Iterable[AdvisoryRecord], days: int = 30) -> int:
|
|
cutoff = now_utc() - timedelta(days=days)
|
|
total = 0
|
|
for item in items:
|
|
for stamp in (item.updated_at, item.published_at):
|
|
if not stamp:
|
|
continue
|
|
try:
|
|
dt = datetime.fromisoformat(stamp.replace("Z", "+00:00")).astimezone(UTC)
|
|
except ValueError:
|
|
continue
|
|
if dt >= cutoff:
|
|
total += 1
|
|
break
|
|
return total
|
|
|
|
|
|
def _group_name(output_dir: str) -> str:
|
|
return Path(output_dir).parts[1]
|
|
|
|
|
|
def _abs_repo_path(*parts: str) -> str:
|
|
cleaned: List[str] = []
|
|
for part in parts:
|
|
if not part:
|
|
continue
|
|
cleaned.extend(Path(part).parts)
|
|
return str(ROOT.joinpath(*cleaned))
|
|
|
|
|
|
def _source_reference(source: Dict[str, Any]) -> str:
|
|
url = source.get("url") or SOURCE_KIND_URLS.get(source.get("kind", ""))
|
|
qualifiers = []
|
|
if source.get("ecosystem"):
|
|
qualifiers.append(f"ecosystem={source['ecosystem']}")
|
|
if source.get("keyword"):
|
|
qualifiers.append(f"keyword={source['keyword']}")
|
|
if source.get("advisory_mode"):
|
|
qualifiers.append(f"mode={source['advisory_mode']}")
|
|
suffix = f" ({'; '.join(qualifiers)})" if qualifiers else ""
|
|
if url:
|
|
return f"`{source['confidence']}` [{source['name']}]({url}){suffix}"
|
|
return f"`{source['confidence']}` {source['name']}{suffix}"
|
|
|
|
|
|
def _clear_json_dir(path: Path) -> None:
|
|
ensure_dir(path)
|
|
for file_path in path.glob("*.json"):
|
|
file_path.unlink()
|
|
|
|
|
|
def render_system_scaffolding(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> None:
|
|
run_map = latest_runs_by_advisory()
|
|
grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list)
|
|
for advisory in advisories:
|
|
grouped[advisory.system_id].append(advisory)
|
|
|
|
groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for system in source_map["systems"]:
|
|
groups[_group_name(system["output_dir"])].append(system)
|
|
system_dir = FRAMEWORK_ROOT / _group_name(system["output_dir"]) / system["system_id"]
|
|
ensure_dir(system_dir / "cases")
|
|
|
|
items = sorted(grouped.get(system["system_id"], []), key=lambda item: item.published_at or "", reverse=True)
|
|
merged_items = [_merged_item(item, run_map) for item in items]
|
|
markdown_count = len([item for item in items if item.render_markdown and item.case_path])
|
|
counts = _status_counts(merged_items)
|
|
index_lines = [
|
|
f"# {system['display_name']}",
|
|
"",
|
|
"> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成索引",
|
|
"",
|
|
f"- 系统 ID: `{system['system_id']}`",
|
|
f"- 分类: `{system['category']}`",
|
|
f"- 覆盖策略: `{system['tier']}`",
|
|
f"- 总案例数: `{len(items)}`",
|
|
f"- 近 30 天新增/更新: `{_recent_count(items)}`",
|
|
f"- 重点 Markdown 案例数: `{markdown_count}`",
|
|
f"- 已实证(真实版本): `{counts['verified_real']}`",
|
|
f"- 已实证(synthetic): `{counts['verified_synthetic']}`",
|
|
f"- 阻塞数: `{counts['blocked']}`",
|
|
f"- 待人工/缺浏览器证据: `{counts['manual']}`",
|
|
f"- 最近渲染时间: `{isoformat(now_utc())}`",
|
|
"",
|
|
"## 目标约束",
|
|
"",
|
|
f"- 适用目标类型: `{', '.join(TARGET_TYPES)}`",
|
|
"- 是否允许公网验证: `yes, but ownership or authorization is required`",
|
|
"- 授权前提: 资产归属可证明,或已取得书面/明确授权。",
|
|
f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}",
|
|
f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}",
|
|
"",
|
|
"## 来源",
|
|
"",
|
|
]
|
|
for bucket in ("official_sources", "ecosystem_sources", "research_sources"):
|
|
for source in system.get(bucket, []):
|
|
index_lines.append(f"- {_source_reference(source)}")
|
|
index_lines.extend(
|
|
[
|
|
"",
|
|
"## 案例列表",
|
|
"",
|
|
"| 标题 | 严重度 | 案例状态 | 实证状态 | 实证方式 | 来源置信度 | 更新时间 | 案例页 |",
|
|
"|------|--------|----------|----------|----------|------------|----------|--------|",
|
|
]
|
|
)
|
|
if merged_items:
|
|
for item in merged_items:
|
|
case_link = f"[link]({_abs_repo_path(item['case_path'])})" if item.get("case_path") else "-"
|
|
timestamp = item.get("updated_at") or item.get("published_at") or ""
|
|
index_lines.append(
|
|
f"| {item['title']} | `{item['severity']}` | `{item['status']}` | `{item.get('verification_status', 'triage-manual')}` | `{item.get('verification_mode', '-')}` | `{item['source_confidence']}` | `{timestamp}` | {case_link} |"
|
|
)
|
|
else:
|
|
index_lines.append("| No advisories yet | `n/a` | `empty` | `n/a` | `n/a` | `n/a` | `n/a` | - |")
|
|
write_text(system_dir / "INDEX.md", "\n".join(index_lines))
|
|
|
|
system_registry_path = _abs_repo_path("08-threat-intel", "registry", "systems", f"{system['system_id']}.json")
|
|
readme_lines = [
|
|
f"# {system['display_name']}",
|
|
"",
|
|
"> `LAB ONLY` | `AUTHORIZED TARGETS ONLY`",
|
|
"",
|
|
f"- 分类: `{system['category']}`",
|
|
f"- 覆盖层级: `{system['tier']}`",
|
|
f"- Advisory 模式: {', '.join(system.get('advisory_modes', []))}",
|
|
f"- 输出目录: `{system['output_dir']}`",
|
|
f"- 修复主题: {', '.join(system.get('secure_code_topics', []))}",
|
|
f"- 适用目标类型: `{', '.join(TARGET_TYPES)}`",
|
|
"- 是否允许公网验证: `yes, but only for owned or authorized targets`",
|
|
f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}",
|
|
f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}",
|
|
"",
|
|
f"- 自动索引: [INDEX.md]({_abs_repo_path(system['output_dir'], 'INDEX.md')})",
|
|
f"- Registry 统计: [{system['system_id']}.json]({system_registry_path})",
|
|
]
|
|
write_text(system_dir / "README.md", "\n".join(readme_lines))
|
|
|
|
for group, systems in groups.items():
|
|
lines = [
|
|
f"# {group}",
|
|
"",
|
|
"> 自动生成系统分组索引",
|
|
"",
|
|
f"- 系统数量: `{len(systems)}`",
|
|
"- 允许范围: `lab-local`, `lab-public`, `authorized-third-party`",
|
|
"",
|
|
]
|
|
for system in sorted(systems, key=lambda item: item["display_name"].lower()):
|
|
lines.append(f"- [{system['display_name']}]({_abs_repo_path(system['output_dir'], 'README.md')})")
|
|
write_text(FRAMEWORK_ROOT / group / "README.md", "\n".join(lines))
|
|
|
|
root_lines = [
|
|
"# 主流开源 Web 系统安全",
|
|
"",
|
|
"> `LAB ONLY` | `AUTHORIZED TARGETS ONLY`",
|
|
"",
|
|
f"- 系统总数: `{len(source_map['systems'])}`",
|
|
"- 覆盖语境: 授权攻防实验、验证性注入、最小化验证、案例映射。",
|
|
"- 不适用: 未授权公网目标、泛互联网枚举、默认生产推荐基线。",
|
|
"",
|
|
f"- [cms]({_abs_repo_path('07-framework-security', 'cms', 'README.md')})",
|
|
f"- [ecommerce]({_abs_repo_path('07-framework-security', 'ecommerce', 'README.md')})",
|
|
f"- [frameworks]({_abs_repo_path('07-framework-security', 'frameworks', 'README.md')})",
|
|
f"- [servers]({_abs_repo_path('07-framework-security', 'servers', 'README.md')})",
|
|
f"- [platforms]({_abs_repo_path('07-framework-security', 'platforms', 'README.md')})",
|
|
]
|
|
write_text(FRAMEWORK_ROOT / "README.md", "\n".join(root_lines))
|
|
|
|
|
|
def render_case_pages(advisories: List[AdvisoryRecord]) -> None:
|
|
run_map = latest_runs_by_advisory()
|
|
for item in advisories:
|
|
if not item.render_markdown or not item.case_path:
|
|
continue
|
|
merged = _merged_item(item, run_map)
|
|
lines = [
|
|
"---",
|
|
f'title: "{item.title.replace(chr(34), chr(39))}"',
|
|
f'system_id: "{item.system_id}"',
|
|
f'category: "{item.category}"',
|
|
f'advisory_mode: "{item.advisory_mode}"',
|
|
f'published_date: "{item.published_at or ""}"',
|
|
f'updated_date: "{item.updated_at or item.published_at or ""}"',
|
|
f'severity: "{item.severity}"',
|
|
f'exploit_status: "{item.exploit_status}"',
|
|
f'source_confidence: "{item.source_confidence}"',
|
|
f'verification_status: "{merged.get("verification_status", "triage-manual")}"',
|
|
f'verification_mode: "{merged.get("verification_mode", "synthetic")}"',
|
|
f'artifact_mode: "{merged.get("artifact_mode") or ""}"',
|
|
f'last_run_id: "{merged.get("last_run_id") or ""}"',
|
|
'target_types:',
|
|
' - "lab-local"',
|
|
' - "lab-public"',
|
|
' - "authorized-third-party"',
|
|
'allow_public_validation: "yes, with ownership or explicit authorization"',
|
|
'authorization_prerequisite: "asset ownership proof or explicit written authorization"',
|
|
'minimal_validation: "read-only probe, controlled payload, reversible test"',
|
|
"aliases:",
|
|
]
|
|
for alias in item.aliases or []:
|
|
lines.append(f' - "{alias}"')
|
|
lines.append("affected_versions:")
|
|
for version in (item.affected_versions or [])[:20]:
|
|
lines.append(f' - "{version}"')
|
|
lines.append("fixed_versions:")
|
|
for version in (item.fixed_versions or [])[:20]:
|
|
lines.append(f' - "{version}"')
|
|
lines.append("secure_code_topics:")
|
|
for topic in item.secure_code_topics or []:
|
|
lines.append(f' - "{topic}"')
|
|
lines.extend(
|
|
[
|
|
f'primary_source: "{item.official_source_url or ""}"',
|
|
"---",
|
|
"",
|
|
f"# {item.title}",
|
|
"",
|
|
"## 本地实证状态",
|
|
"",
|
|
f"- 实证状态: `{merged.get('verification_status', 'triage-manual')}`",
|
|
f"- 实证方式: `{merged.get('verification_mode', 'synthetic')}`",
|
|
f"- Artifact 模式: `{merged.get('artifact_mode') or 'unknown'}`",
|
|
f"- 最近运行: `{merged.get('last_run_id') or '-'}`",
|
|
f"- 浏览器证据: `{'present' if merged.get('browser_evidence', {}).get('present') else 'missing'}`",
|
|
f"- Run Bundle: `{merged.get('evidence_bundle') or '-'}`",
|
|
"",
|
|
"## 事件层",
|
|
"",
|
|
f"- Canonical ID: `{item.canonical_id}`",
|
|
f"- 系统: `{item.system_id}`",
|
|
f"- 严重度: `{item.severity}`",
|
|
f"- 来源置信度: `{item.source_confidence}`",
|
|
f"- 官方主源: {item.official_source_url or '-'}",
|
|
f"- 影响版本: `{', '.join((item.affected_versions or [])[:10]) or 'unknown'}`",
|
|
f"- 修复版本: `{', '.join((item.fixed_versions or [])[:10]) or 'unknown'}`",
|
|
"",
|
|
"## 其他来源",
|
|
"",
|
|
]
|
|
)
|
|
if item.secondary_source_urls:
|
|
for ref in (item.secondary_source_urls or [])[:20]:
|
|
lines.append(f"- {ref}")
|
|
else:
|
|
lines.append("- 无额外来源")
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"## 实验层",
|
|
"",
|
|
"- 仅用于自有资产、测试环境或已明确授权目标。",
|
|
"- 允许公网可达目标,但必须满足资产归属或明确授权前提。",
|
|
f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}",
|
|
"- 若该案例涉及插件、模块或扩展,应同时检查供应链与升级策略。",
|
|
f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}",
|
|
"",
|
|
"## 修复示例",
|
|
"",
|
|
]
|
|
)
|
|
for topic in item.secure_code_topics or []:
|
|
for language in LANGUAGES:
|
|
path = SECURE_CODE_ROOT / language / f"{topic}.md"
|
|
if path.exists():
|
|
lines.append(f"- [{language}:{topic}]({_abs_repo_path('05-defense', 'secure-code', language, f'{topic}.md')})")
|
|
write_text(ROOT / item.case_path, "\n".join(lines))
|
|
|
|
|
|
def render_registry(source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]]) -> None:
|
|
_clear_json_dir(REGISTRY_ROOT / "advisories")
|
|
_clear_json_dir(REGISTRY_ROOT / "systems")
|
|
_clear_json_dir(TRIAGE_DIR)
|
|
|
|
run_map = latest_runs_by_advisory()
|
|
grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list)
|
|
for advisory in advisories:
|
|
write_json(REGISTRY_ROOT / "advisories" / f"{advisory.canonical_id}.json", _merged_item(advisory, run_map))
|
|
grouped[advisory.system_id].append(advisory)
|
|
|
|
triage_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for item in triage:
|
|
triage_by_system[item["system_id"]].append(item)
|
|
write_json(TRIAGE_DIR / f"{item['canonical_id']}.json", item)
|
|
|
|
for system in source_map["systems"]:
|
|
system_id = system["system_id"]
|
|
items = grouped.get(system_id, [])
|
|
merged_items = [_merged_item(item, run_map) for item in items]
|
|
counts = _status_counts(merged_items)
|
|
payload = {
|
|
"system_id": system_id,
|
|
"display_name": system["display_name"],
|
|
"category": system["category"],
|
|
"tier": system["tier"],
|
|
"total": len(items),
|
|
"markdown_cases": len([item for item in items if item.case_path]),
|
|
"triage_count": len(triage_by_system.get(system_id, [])),
|
|
"latest_update": max((item.updated_at or item.published_at or "" for item in items), default=""),
|
|
"output_dir": system["output_dir"],
|
|
"secure_code_topics": system.get("secure_code_topics", []),
|
|
"verified_real": counts["verified_real"],
|
|
"verified_synthetic": counts["verified_synthetic"],
|
|
"blocked_count": counts["blocked"],
|
|
"manual_count": counts["manual"],
|
|
"items": [item.canonical_id for item in sorted(items, key=lambda item: item.published_at or "", reverse=True)],
|
|
}
|
|
write_json(SYSTEMS_DIR / f"{system_id}.json", payload)
|
|
|
|
|
|
def render_generated(
|
|
source_map: Dict[str, Any],
|
|
advisories: List[AdvisoryRecord],
|
|
triage: List[Dict[str, Any]],
|
|
failures: List[str],
|
|
change_summary: Dict[str, Any] | None = None,
|
|
) -> None:
|
|
ensure_dir(GENERATED_DIR)
|
|
systems = {item["system_id"]: item for item in source_map["systems"]}
|
|
run_map = latest_runs_by_advisory()
|
|
change_summary = change_summary or {}
|
|
triage_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for item in triage:
|
|
triage_by_system[item["system_id"]].append(item)
|
|
|
|
coverage_lines = [
|
|
"# 覆盖矩阵",
|
|
"",
|
|
"| 系统 | 分类 | 覆盖策略 | 历史全量 | 近两年全量 | 全量 registry | 重点案例 Markdown | secure-code 关联 | 自动同步状态 | 本地实证状态 | 浏览器证据 | run bundle | triage | 最近更新 |",
|
|
"|------|------|----------|----------|------------|--------------|--------------------|------------------|--------------|--------------|------------|-----------|--------|----------|",
|
|
]
|
|
by_system: Dict[str, List[AdvisoryRecord]] = defaultdict(list)
|
|
for advisory in advisories:
|
|
by_system[advisory.system_id].append(advisory)
|
|
for system_id, system in sorted(systems.items()):
|
|
items = by_system.get(system_id, [])
|
|
merged_items = [_merged_item(item, run_map) for item in items]
|
|
counts = _status_counts(merged_items)
|
|
markdown_count = len([item for item in items if item.case_path])
|
|
sync_state = "seeded" if items else "scaffolded"
|
|
recent = max((item.updated_at or item.published_at or "" for item in items), default="")
|
|
browser_present = len([item for item in merged_items if item.get("browser_evidence", {}).get("present")])
|
|
run_bundle_count = len([item for item in merged_items if item.get("last_run_id")])
|
|
proof_state = f"real:{counts['verified_real']}/synthetic:{counts['verified_synthetic']}/blocked:{counts['blocked']}"
|
|
coverage_lines.append(
|
|
f"| {system['display_name']} | `{system['category']}` | `{system['tier']}` | `{'yes' if system['tier'] == 'history-full' else '-'}` | `yes` | `{len(items)}` | `{markdown_count}` | `{len(system.get('secure_code_topics', []))}` | `{sync_state}` | `{proof_state}` | `{browser_present}` | `{run_bundle_count}` | `{len(triage_by_system.get(system_id, []))}` | `{recent}` |"
|
|
)
|
|
write_text(GENERATED_DIR / "coverage-matrix.md", "\n".join(coverage_lines))
|
|
|
|
markdown_total = len([item for item in advisories if item.case_path])
|
|
latest_lines = [
|
|
"# 最新同步摘要",
|
|
"",
|
|
f"- 渲染时间: `{isoformat(now_utc())}`",
|
|
f"- 系统数量: `{len(source_map['systems'])}`",
|
|
f"- Advisory 数量: `{len(advisories)}`",
|
|
f"- 重点 Markdown 数量: `{markdown_total}`",
|
|
f"- Run Bundle 数量: `{len(run_map)}`",
|
|
f"- 新增记录: `{change_summary.get('new_count', 0)}`",
|
|
f"- 更新记录: `{change_summary.get('updated_count', 0)}`",
|
|
f"- Triage 数量: `{len(triage)}`",
|
|
f"- 失败的 source adapter: `{len(failures)}`",
|
|
"",
|
|
]
|
|
if failures:
|
|
latest_lines.extend(["## 失败列表", ""])
|
|
for failure in failures:
|
|
latest_lines.append(f"- {_failure_text(failure)}")
|
|
write_text(GENERATED_DIR / "latest-ingest.md", "\n".join(latest_lines))
|
|
write_json(
|
|
GENERATED_DIR / "run-summary.json",
|
|
{
|
|
"generated_at": isoformat(now_utc()),
|
|
"system_count": len(source_map["systems"]),
|
|
"advisory_count": len(advisories),
|
|
"markdown_count": markdown_total,
|
|
"new_count": change_summary.get("new_count", 0),
|
|
"updated_count": change_summary.get("updated_count", 0),
|
|
"systems_touched": change_summary.get("systems_touched", []),
|
|
"triage_count": len(triage),
|
|
"run_bundle_count": len(run_map),
|
|
"failures": failures,
|
|
},
|
|
)
|
|
render_lab_dashboard()
|
|
|
|
|
|
def render_secure_code(source_map: Dict[str, Any]) -> None:
|
|
systems = source_map["systems"]
|
|
related = defaultdict(set)
|
|
for system in systems:
|
|
for topic in system.get("secure_code_topics", []):
|
|
related[topic].add(system["display_name"])
|
|
|
|
root_lines = [
|
|
"# 安全编码修复库",
|
|
"",
|
|
"> `LAB ONLY` | 修复主题用于把实验发现映射回代码整改,不代表默认生产基线。",
|
|
"",
|
|
"- 语言范围: `javascript-typescript`, `nodejs`, `java`, `php`, `python`, `ruby`, `csharp`, `go`",
|
|
"- 主题范围: 输出编码、DOM sink、CSP / Trusted Types、令牌存储、鉴权复核、SSRF、走私边界、路径穿越、文件上传、插件信任、依赖升级、代理信任、反序列化、模板注入。",
|
|
"",
|
|
]
|
|
for language in LANGUAGES:
|
|
root_lines.append(f"- [{language}]({_abs_repo_path('05-defense', 'secure-code', language, 'README.md')})")
|
|
write_text(SECURE_CODE_ROOT / "README.md", "\n".join(root_lines))
|
|
write_text(SECURE_CODE_ROOT / "INDEX.md", "\n".join(root_lines))
|
|
|
|
for language in LANGUAGES:
|
|
language_dir = SECURE_CODE_ROOT / language
|
|
ensure_dir(language_dir)
|
|
index_lines = [
|
|
f"# {language}",
|
|
"",
|
|
"> 自动生成修复主题索引",
|
|
"",
|
|
"- 语境: 授权攻防实验后的修复映射,不作为生产默认推荐模版。",
|
|
"",
|
|
]
|
|
for topic, description in TOPIC_DESCRIPTIONS.items():
|
|
index_lines.append(f"- [{topic}]({_abs_repo_path('05-defense', 'secure-code', language, f'{topic}.md')}) - {description}")
|
|
|
|
bad, good = BAD_GOOD_SNIPPETS[language]
|
|
lines = [
|
|
f"# {topic}",
|
|
"",
|
|
"> `LAB ONLY` | 修复主题页",
|
|
"",
|
|
f"- 语言: `{language}`",
|
|
f"- 主题: `{topic}`",
|
|
f"- 说明: {description}",
|
|
f"- 典型场景: {TOPIC_SCENARIOS.get(topic, '把实验问题还原为可修复的代码模式。')}",
|
|
"",
|
|
"## 脆弱示例",
|
|
"",
|
|
f"```{_code_fence(language)}",
|
|
bad,
|
|
"```",
|
|
"",
|
|
"## 更安全的写法",
|
|
"",
|
|
f"```{_code_fence(language)}",
|
|
good,
|
|
"```",
|
|
"",
|
|
"## 检查清单",
|
|
"",
|
|
"- 明确输入边界与不可信来源",
|
|
"- 在服务端或可信封装层统一做校验/转义/约束",
|
|
"- 对关键路径补充自动化测试和依赖升级策略",
|
|
"",
|
|
"## 相关系统",
|
|
"",
|
|
]
|
|
for display_name in sorted(related.get(topic, [])):
|
|
lines.append(f"- {display_name}")
|
|
write_text(language_dir / f"{topic}.md", "\n".join(lines))
|
|
write_text(language_dir / "INDEX.md", "\n".join(index_lines))
|
|
write_text(language_dir / "README.md", "\n".join(index_lines))
|
|
|
|
|
|
def _code_fence(language: str) -> str:
|
|
mapping = {
|
|
"javascript-typescript": "ts",
|
|
"nodejs": "js",
|
|
"java": "java",
|
|
"php": "php",
|
|
"python": "py",
|
|
"ruby": "rb",
|
|
"csharp": "cs",
|
|
"go": "go",
|
|
}
|
|
return mapping.get(language, "")
|