from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, Iterable, List from intel.config import FRAMEWORK_ROOT, GENERATED_DIR, REGISTRY_ROOT, ROOT, SECURE_CODE_ROOT, SYSTEMS_DIR, TRIAGE_DIR from intel.models import AdvisoryRecord from intel.utils import ensure_dir, isoformat, now_utc, write_json, write_text from lab.render import render_dashboard as render_lab_dashboard from lab.repro import annotate_with_latest_run, latest_runs_by_advisory UTC = timezone.utc LANGUAGES = [ "javascript-typescript", "nodejs", "java", "php", "python", "ruby", "csharp", "go", ] TOPIC_DESCRIPTIONS = { "xss-output-encoding": "对不可信输入做上下文输出编码,避免 HTML、属性和脚本上下文执行。", "dom-sink-hardening": "限制 `innerHTML`、模板拼接和 DOM sink 的直接写入。", "csp-trusted-types": "用 CSP 和 Trusted Types 缩小前端执行面。", "token-cookie-storage": "避免把敏感令牌长期暴露在可读浏览器存储中。", "authz-server-side-recheck": "前置代理或 middleware 不能替代服务端最终鉴权。", "ssrf-url-validation": "对 URL、协议、IP 和重定向做 allowlist 校验。", "request-smuggling-boundary": "统一代理层和应用层的请求边界解释。", "path-traversal-guard": "标准化路径并限制访问根目录。", "file-upload-validation": "校验文件类型、名称、存储位置和执行权限。", "plugin-extension-trust-policy": "限制第三方插件、模块和主题的信任边界。", "dependency-upgrade-policy": "用自动化升级、锁版本和审查策略降低供应链风险。", "proxy-trust-boundary": "只信任明确代理并限制头部透传。", "deserialization-safety": "避免对不可信数据做危险反序列化。", "template-injection-guard": "模板上下文中禁用危险表达式执行。", } TOPIC_SCENARIOS = { "xss-output-encoding": "适用于模板输出、服务端渲染片段和后台管理界面回显场景。", "dom-sink-hardening": "适用于前端模板拼接、Markdown 渲染器和富文本预览逻辑。", "csp-trusted-types": "适用于高风险前端应用、管理端和需要限制脚本执行面的页面。", "token-cookie-storage": "适用于浏览器端会话、管理接口令牌和单页应用认证态。", "authz-server-side-recheck": "适用于代理层放行、路由守卫和后端最终授权重新确认。", "ssrf-url-validation": "适用于 webhook、URL 导入、远程图片抓取和插件联动调用。", "request-smuggling-boundary": "适用于代理链、WAF、CDN 和应用服务器之间的请求解析边界。", "path-traversal-guard": "适用于下载、导入、附件预览和主题/模板读取路径。", "file-upload-validation": "适用于媒体上传、插件安装、主题导入和日志附件接收。", "plugin-extension-trust-policy": "适用于插件市场、主题仓库、第三方扩展和模块化系统。", "dependency-upgrade-policy": "适用于 lockfile、SBOM、CI 审查和供应链更新节奏治理。", "proxy-trust-boundary": "适用于真实 IP 透传、认证头转发和反向代理旁路风险。", "deserialization-safety": "适用于缓存、任务队列、对象恢复和跨服务消息传递。", "template-injection-guard": "适用于 SSR、模板引擎、邮件渲染和后台自定义视图。", } BAD_GOOD_SNIPPETS = { "javascript-typescript": ( "const output = `
${userInput}
`;", "const output = `
${escapeHtml(userInput)}
`;", ), "nodejs": ( "res.send(`
${req.query.q}
`);", "res.send(`
${escapeHtml(req.query.q)}
`);", ), "java": ( "response.getWriter().write(\"
\" + value + \"
\");", "response.getWriter().write(\"
\" + HtmlUtils.htmlEscape(value) + \"
\");", ), "php": ( "echo \"
{$value}
\";", "echo '
' . htmlspecialchars($value, ENT_QUOTES, 'UTF-8') . '
';", ), "python": ( "return f\"
{value}
\"", "return f\"
{escape(value)}
\"", ), "ruby": ( "render inline: \"
#{value}
\"", "render inline: \"
#{ERB::Util.html_escape(value)}
\"", ), "csharp": ( "return Content($\"
{value}
\", \"text/html\");", "return Content($\"
{HtmlEncoder.Default.Encode(value)}
\", \"text/html\");", ), "go": ( "fmt.Fprintf(w, \"
%s
\", value)", "template.HTMLEscape(w, []byte(value))", ), } SOURCE_KIND_URLS = { "ghsa-global": "https://github.com/advisories", "osv-batch": "https://osv.dev/", "nvd-search": "https://nvd.nist.gov/vuln/search", "kev-json": "https://www.cisa.gov/known-exploited-vulnerabilities-catalog", "rss-feed": "https://www.rssboard.org/rss-specification", } TARGET_TYPES = ["lab-local", "lab-public", "authorized-third-party"] MINIMAL_VALIDATION_GUIDANCE = "最小化验证、只读探测、可审计回显、受控注入。" FORBIDDEN_SCENARIOS = [ "无归属证明或无明确授权的公网目标", "知名公共网站或与测试无关的第三方资产", "会造成持久破坏、数据越权下载或不可回滚影响的动作", ] def _merged_item(item: AdvisoryRecord, run_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: return annotate_with_latest_run(item.to_dict(), run_map.get(item.canonical_id)) def _status_counts(items: List[Dict[str, Any]]) -> Dict[str, int]: counts = {"verified_real": 0, "verified_synthetic": 0, "blocked": 0, "manual": 0} for item in items: status = item.get("verification_status") if status == "verified-real": counts["verified_real"] += 1 elif status == "verified-synthetic": counts["verified_synthetic"] += 1 elif status and status.startswith("blocked-"): counts["blocked"] += 1 else: counts["manual"] += 1 return counts def _recent_count(items: Iterable[AdvisoryRecord], days: int = 30) -> int: cutoff = now_utc() - timedelta(days=days) total = 0 for item in items: for stamp in (item.updated_at, item.published_at): if not stamp: continue try: dt = datetime.fromisoformat(stamp.replace("Z", "+00:00")).astimezone(UTC) except ValueError: continue if dt >= cutoff: total += 1 break return total def _group_name(output_dir: str) -> str: return Path(output_dir).parts[1] def _abs_repo_path(*parts: str) -> str: cleaned: List[str] = [] for part in parts: if not part: continue cleaned.extend(Path(part).parts) return str(ROOT.joinpath(*cleaned)) def _source_reference(source: Dict[str, Any]) -> str: url = source.get("url") or SOURCE_KIND_URLS.get(source.get("kind", "")) qualifiers = [] if source.get("ecosystem"): qualifiers.append(f"ecosystem={source['ecosystem']}") if source.get("keyword"): qualifiers.append(f"keyword={source['keyword']}") if source.get("advisory_mode"): qualifiers.append(f"mode={source['advisory_mode']}") suffix = f" ({'; '.join(qualifiers)})" if qualifiers else "" if url: return f"`{source['confidence']}` [{source['name']}]({url}){suffix}" return f"`{source['confidence']}` {source['name']}{suffix}" def _clear_json_dir(path: Path) -> None: ensure_dir(path) for file_path in path.glob("*.json"): file_path.unlink() def render_system_scaffolding(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> None: run_map = latest_runs_by_advisory() grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list) for advisory in advisories: grouped[advisory.system_id].append(advisory) groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for system in source_map["systems"]: groups[_group_name(system["output_dir"])].append(system) system_dir = FRAMEWORK_ROOT / _group_name(system["output_dir"]) / system["system_id"] ensure_dir(system_dir / "cases") items = sorted(grouped.get(system["system_id"], []), key=lambda item: item.published_at or "", reverse=True) merged_items = [_merged_item(item, run_map) for item in items] markdown_count = len([item for item in items if item.render_markdown and item.case_path]) counts = _status_counts(merged_items) index_lines = [ f"# {system['display_name']}", "", "> `LAB ONLY` | `AUTHORIZED TARGETS ONLY` | 自动生成索引", "", f"- 系统 ID: `{system['system_id']}`", f"- 分类: `{system['category']}`", f"- 覆盖策略: `{system['tier']}`", f"- 总案例数: `{len(items)}`", f"- 近 30 天新增/更新: `{_recent_count(items)}`", f"- 重点 Markdown 案例数: `{markdown_count}`", f"- 已实证(真实版本): `{counts['verified_real']}`", f"- 已实证(synthetic): `{counts['verified_synthetic']}`", f"- 阻塞数: `{counts['blocked']}`", f"- 待人工/缺浏览器证据: `{counts['manual']}`", f"- 最近渲染时间: `{isoformat(now_utc())}`", "", "## 目标约束", "", f"- 适用目标类型: `{', '.join(TARGET_TYPES)}`", "- 是否允许公网验证: `yes, but ownership or authorization is required`", "- 授权前提: 资产归属可证明,或已取得书面/明确授权。", f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}", f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}", "", "## 来源", "", ] for bucket in ("official_sources", "ecosystem_sources", "research_sources"): for source in system.get(bucket, []): index_lines.append(f"- {_source_reference(source)}") index_lines.extend( [ "", "## 案例列表", "", "| 标题 | 严重度 | 案例状态 | 实证状态 | 实证方式 | 来源置信度 | 更新时间 | 案例页 |", "|------|--------|----------|----------|----------|------------|----------|--------|", ] ) if merged_items: for item in merged_items: case_link = f"[link]({_abs_repo_path(item['case_path'])})" if item.get("case_path") else "-" timestamp = item.get("updated_at") or item.get("published_at") or "" index_lines.append( f"| {item['title']} | `{item['severity']}` | `{item['status']}` | `{item.get('verification_status', 'triage-manual')}` | `{item.get('verification_mode', '-')}` | `{item['source_confidence']}` | `{timestamp}` | {case_link} |" ) else: index_lines.append("| No advisories yet | `n/a` | `empty` | `n/a` | `n/a` | `n/a` | `n/a` | - |") write_text(system_dir / "INDEX.md", "\n".join(index_lines)) system_registry_path = _abs_repo_path("08-threat-intel", "registry", "systems", f"{system['system_id']}.json") readme_lines = [ f"# {system['display_name']}", "", "> `LAB ONLY` | `AUTHORIZED TARGETS ONLY`", "", f"- 分类: `{system['category']}`", f"- 覆盖层级: `{system['tier']}`", f"- Advisory 模式: {', '.join(system.get('advisory_modes', []))}", f"- 输出目录: `{system['output_dir']}`", f"- 修复主题: {', '.join(system.get('secure_code_topics', []))}", f"- 适用目标类型: `{', '.join(TARGET_TYPES)}`", "- 是否允许公网验证: `yes, but only for owned or authorized targets`", f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}", f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}", "", f"- 自动索引: [INDEX.md]({_abs_repo_path(system['output_dir'], 'INDEX.md')})", f"- Registry 统计: [{system['system_id']}.json]({system_registry_path})", ] write_text(system_dir / "README.md", "\n".join(readme_lines)) for group, systems in groups.items(): lines = [ f"# {group}", "", "> 自动生成系统分组索引", "", f"- 系统数量: `{len(systems)}`", "- 允许范围: `lab-local`, `lab-public`, `authorized-third-party`", "", ] for system in sorted(systems, key=lambda item: item["display_name"].lower()): lines.append(f"- [{system['display_name']}]({_abs_repo_path(system['output_dir'], 'README.md')})") write_text(FRAMEWORK_ROOT / group / "README.md", "\n".join(lines)) root_lines = [ "# 主流开源 Web 系统安全", "", "> `LAB ONLY` | `AUTHORIZED TARGETS ONLY`", "", f"- 系统总数: `{len(source_map['systems'])}`", "- 覆盖语境: 授权攻防实验、验证性注入、最小化验证、案例映射。", "- 不适用: 未授权公网目标、泛互联网枚举、默认生产推荐基线。", "", f"- [cms]({_abs_repo_path('07-framework-security', 'cms', 'README.md')})", f"- [ecommerce]({_abs_repo_path('07-framework-security', 'ecommerce', 'README.md')})", f"- [frameworks]({_abs_repo_path('07-framework-security', 'frameworks', 'README.md')})", f"- [servers]({_abs_repo_path('07-framework-security', 'servers', 'README.md')})", f"- [platforms]({_abs_repo_path('07-framework-security', 'platforms', 'README.md')})", ] write_text(FRAMEWORK_ROOT / "README.md", "\n".join(root_lines)) def render_case_pages(advisories: List[AdvisoryRecord]) -> None: run_map = latest_runs_by_advisory() for item in advisories: if not item.render_markdown or not item.case_path: continue merged = _merged_item(item, run_map) lines = [ "---", f'title: "{item.title.replace(chr(34), chr(39))}"', f'system_id: "{item.system_id}"', f'category: "{item.category}"', f'advisory_mode: "{item.advisory_mode}"', f'published_date: "{item.published_at or ""}"', f'updated_date: "{item.updated_at or item.published_at or ""}"', f'severity: "{item.severity}"', f'exploit_status: "{item.exploit_status}"', f'source_confidence: "{item.source_confidence}"', f'verification_status: "{merged.get("verification_status", "triage-manual")}"', f'verification_mode: "{merged.get("verification_mode", "synthetic")}"', f'artifact_mode: "{merged.get("artifact_mode") or ""}"', f'last_run_id: "{merged.get("last_run_id") or ""}"', 'target_types:', ' - "lab-local"', ' - "lab-public"', ' - "authorized-third-party"', 'allow_public_validation: "yes, with ownership or explicit authorization"', 'authorization_prerequisite: "asset ownership proof or explicit written authorization"', 'minimal_validation: "read-only probe, controlled payload, reversible test"', "aliases:", ] for alias in item.aliases or []: lines.append(f' - "{alias}"') lines.append("affected_versions:") for version in (item.affected_versions or [])[:20]: lines.append(f' - "{version}"') lines.append("fixed_versions:") for version in (item.fixed_versions or [])[:20]: lines.append(f' - "{version}"') lines.append("secure_code_topics:") for topic in item.secure_code_topics or []: lines.append(f' - "{topic}"') lines.extend( [ f'primary_source: "{item.official_source_url or ""}"', "---", "", f"# {item.title}", "", "## 本地实证状态", "", f"- 实证状态: `{merged.get('verification_status', 'triage-manual')}`", f"- 实证方式: `{merged.get('verification_mode', 'synthetic')}`", f"- Artifact 模式: `{merged.get('artifact_mode') or 'unknown'}`", f"- 最近运行: `{merged.get('last_run_id') or '-'}`", f"- 浏览器证据: `{'present' if merged.get('browser_evidence', {}).get('present') else 'missing'}`", f"- Run Bundle: `{merged.get('evidence_bundle') or '-'}`", "", "## 事件层", "", f"- Canonical ID: `{item.canonical_id}`", f"- 系统: `{item.system_id}`", f"- 严重度: `{item.severity}`", f"- 来源置信度: `{item.source_confidence}`", f"- 官方主源: {item.official_source_url or '-'}", f"- 影响版本: `{', '.join((item.affected_versions or [])[:10]) or 'unknown'}`", f"- 修复版本: `{', '.join((item.fixed_versions or [])[:10]) or 'unknown'}`", "", "## 其他来源", "", ] ) if item.secondary_source_urls: for ref in (item.secondary_source_urls or [])[:20]: lines.append(f"- {ref}") else: lines.append("- 无额外来源") lines.extend( [ "", "## 实验层", "", "- 仅用于自有资产、测试环境或已明确授权目标。", "- 允许公网可达目标,但必须满足资产归属或明确授权前提。", f"- 最小化验证方式: {MINIMAL_VALIDATION_GUIDANCE}", "- 若该案例涉及插件、模块或扩展,应同时检查供应链与升级策略。", f"- 禁止场景: {';'.join(FORBIDDEN_SCENARIOS)}", "", "## 修复示例", "", ] ) for topic in item.secure_code_topics or []: for language in LANGUAGES: path = SECURE_CODE_ROOT / language / f"{topic}.md" if path.exists(): lines.append(f"- [{language}:{topic}]({_abs_repo_path('05-defense', 'secure-code', language, f'{topic}.md')})") write_text(ROOT / item.case_path, "\n".join(lines)) def render_registry(source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]]) -> None: _clear_json_dir(REGISTRY_ROOT / "advisories") _clear_json_dir(REGISTRY_ROOT / "systems") _clear_json_dir(TRIAGE_DIR) run_map = latest_runs_by_advisory() grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list) for advisory in advisories: write_json(REGISTRY_ROOT / "advisories" / f"{advisory.canonical_id}.json", _merged_item(advisory, run_map)) grouped[advisory.system_id].append(advisory) triage_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for item in triage: triage_by_system[item["system_id"]].append(item) write_json(TRIAGE_DIR / f"{item['canonical_id']}.json", item) for system in source_map["systems"]: system_id = system["system_id"] items = grouped.get(system_id, []) merged_items = [_merged_item(item, run_map) for item in items] counts = _status_counts(merged_items) payload = { "system_id": system_id, "display_name": system["display_name"], "category": system["category"], "tier": system["tier"], "total": len(items), "markdown_cases": len([item for item in items if item.case_path]), "triage_count": len(triage_by_system.get(system_id, [])), "latest_update": max((item.updated_at or item.published_at or "" for item in items), default=""), "output_dir": system["output_dir"], "secure_code_topics": system.get("secure_code_topics", []), "verified_real": counts["verified_real"], "verified_synthetic": counts["verified_synthetic"], "blocked_count": counts["blocked"], "manual_count": counts["manual"], "items": [item.canonical_id for item in sorted(items, key=lambda item: item.published_at or "", reverse=True)], } write_json(SYSTEMS_DIR / f"{system_id}.json", payload) def render_generated( source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]], failures: List[str], change_summary: Dict[str, Any] | None = None, ) -> None: ensure_dir(GENERATED_DIR) systems = {item["system_id"]: item for item in source_map["systems"]} run_map = latest_runs_by_advisory() change_summary = change_summary or {} triage_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for item in triage: triage_by_system[item["system_id"]].append(item) coverage_lines = [ "# 覆盖矩阵", "", "| 系统 | 分类 | 覆盖策略 | 历史全量 | 近两年全量 | 全量 registry | 重点案例 Markdown | secure-code 关联 | 自动同步状态 | 本地实证状态 | 浏览器证据 | run bundle | triage | 最近更新 |", "|------|------|----------|----------|------------|--------------|--------------------|------------------|--------------|--------------|------------|-----------|--------|----------|", ] by_system: Dict[str, List[AdvisoryRecord]] = defaultdict(list) for advisory in advisories: by_system[advisory.system_id].append(advisory) for system_id, system in sorted(systems.items()): items = by_system.get(system_id, []) merged_items = [_merged_item(item, run_map) for item in items] counts = _status_counts(merged_items) markdown_count = len([item for item in items if item.case_path]) sync_state = "seeded" if items else "scaffolded" recent = max((item.updated_at or item.published_at or "" for item in items), default="") browser_present = len([item for item in merged_items if item.get("browser_evidence", {}).get("present")]) run_bundle_count = len([item for item in merged_items if item.get("last_run_id")]) proof_state = f"real:{counts['verified_real']}/synthetic:{counts['verified_synthetic']}/blocked:{counts['blocked']}" coverage_lines.append( f"| {system['display_name']} | `{system['category']}` | `{system['tier']}` | `{'yes' if system['tier'] == 'history-full' else '-'}` | `yes` | `{len(items)}` | `{markdown_count}` | `{len(system.get('secure_code_topics', []))}` | `{sync_state}` | `{proof_state}` | `{browser_present}` | `{run_bundle_count}` | `{len(triage_by_system.get(system_id, []))}` | `{recent}` |" ) write_text(GENERATED_DIR / "coverage-matrix.md", "\n".join(coverage_lines)) markdown_total = len([item for item in advisories if item.case_path]) latest_lines = [ "# 最新同步摘要", "", f"- 渲染时间: `{isoformat(now_utc())}`", f"- 系统数量: `{len(source_map['systems'])}`", f"- Advisory 数量: `{len(advisories)}`", f"- 重点 Markdown 数量: `{markdown_total}`", f"- Run Bundle 数量: `{len(run_map)}`", f"- 新增记录: `{change_summary.get('new_count', 0)}`", f"- 更新记录: `{change_summary.get('updated_count', 0)}`", f"- Triage 数量: `{len(triage)}`", f"- 失败的 source adapter: `{len(failures)}`", "", ] if failures: latest_lines.extend(["## 失败列表", ""]) for failure in failures: latest_lines.append(f"- {failure}") write_text(GENERATED_DIR / "latest-ingest.md", "\n".join(latest_lines)) write_json( GENERATED_DIR / "run-summary.json", { "generated_at": isoformat(now_utc()), "system_count": len(source_map["systems"]), "advisory_count": len(advisories), "markdown_count": markdown_total, "new_count": change_summary.get("new_count", 0), "updated_count": change_summary.get("updated_count", 0), "systems_touched": change_summary.get("systems_touched", []), "triage_count": len(triage), "run_bundle_count": len(run_map), "failures": failures, }, ) render_lab_dashboard() def render_secure_code(source_map: Dict[str, Any]) -> None: systems = source_map["systems"] related = defaultdict(set) for system in systems: for topic in system.get("secure_code_topics", []): related[topic].add(system["display_name"]) root_lines = [ "# 安全编码修复库", "", "> `LAB ONLY` | 修复主题用于把实验发现映射回代码整改,不代表默认生产基线。", "", "- 语言范围: `javascript-typescript`, `nodejs`, `java`, `php`, `python`, `ruby`, `csharp`, `go`", "- 主题范围: 输出编码、DOM sink、CSP / Trusted Types、令牌存储、鉴权复核、SSRF、走私边界、路径穿越、文件上传、插件信任、依赖升级、代理信任、反序列化、模板注入。", "", ] for language in LANGUAGES: root_lines.append(f"- [{language}]({_abs_repo_path('05-defense', 'secure-code', language, 'README.md')})") write_text(SECURE_CODE_ROOT / "README.md", "\n".join(root_lines)) write_text(SECURE_CODE_ROOT / "INDEX.md", "\n".join(root_lines)) for language in LANGUAGES: language_dir = SECURE_CODE_ROOT / language ensure_dir(language_dir) index_lines = [ f"# {language}", "", "> 自动生成修复主题索引", "", "- 语境: 授权攻防实验后的修复映射,不作为生产默认推荐模版。", "", ] for topic, description in TOPIC_DESCRIPTIONS.items(): index_lines.append(f"- [{topic}]({_abs_repo_path('05-defense', 'secure-code', language, f'{topic}.md')}) - {description}") bad, good = BAD_GOOD_SNIPPETS[language] lines = [ f"# {topic}", "", "> `LAB ONLY` | 修复主题页", "", f"- 语言: `{language}`", f"- 主题: `{topic}`", f"- 说明: {description}", f"- 典型场景: {TOPIC_SCENARIOS.get(topic, '把实验问题还原为可修复的代码模式。')}", "", "## 脆弱示例", "", f"```{_code_fence(language)}", bad, "```", "", "## 更安全的写法", "", f"```{_code_fence(language)}", good, "```", "", "## 检查清单", "", "- 明确输入边界与不可信来源", "- 在服务端或可信封装层统一做校验/转义/约束", "- 对关键路径补充自动化测试和依赖升级策略", "", "## 相关系统", "", ] for display_name in sorted(related.get(topic, [])): lines.append(f"- {display_name}") write_text(language_dir / f"{topic}.md", "\n".join(lines)) write_text(language_dir / "INDEX.md", "\n".join(index_lines)) write_text(language_dir / "README.md", "\n".join(index_lines)) def _code_fence(language: str) -> str: mapping = { "javascript-typescript": "ts", "nodejs": "js", "java": "java", "php": "php", "python": "py", "ruby": "rb", "csharp": "cs", "go": "go", } return mapping.get(language, "")