Expand intel coverage and refresh monitoring
这个提交包含在:
@@ -173,10 +173,11 @@ def _write_outputs(
|
||||
triage: List[Dict[str, Any]],
|
||||
failures: List[str],
|
||||
change_summary: Dict[str, Any],
|
||||
selected_system_ids: set[str] | None = None,
|
||||
) -> None:
|
||||
render_registry(source_map, advisories, triage)
|
||||
render_system_scaffolding(source_map, advisories)
|
||||
render_case_pages(advisories)
|
||||
render_registry(source_map, advisories, triage, selected_system_ids=selected_system_ids)
|
||||
render_system_scaffolding(source_map, advisories, selected_system_ids=selected_system_ids)
|
||||
render_case_pages(advisories, selected_system_ids=selected_system_ids)
|
||||
render_secure_code(source_map)
|
||||
render_generated(source_map, advisories, triage, failures, change_summary)
|
||||
|
||||
@@ -187,7 +188,10 @@ def _refresh_render_state(
|
||||
) -> None:
|
||||
render_map, advisories, triage = _load_existing_selection(full_source_map, source_map)
|
||||
summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
|
||||
_write_outputs(render_map, advisories, triage, summary.get("failures", []), summary)
|
||||
selected_system_ids = None
|
||||
if len(source_map["systems"]) != len(full_source_map["systems"]):
|
||||
selected_system_ids = {system["system_id"] for system in source_map["systems"]}
|
||||
_write_outputs(render_map, advisories, triage, summary.get("failures", []), summary, selected_system_ids=selected_system_ids)
|
||||
|
||||
|
||||
def _retry_degraded_sources(
|
||||
@@ -235,7 +239,12 @@ def pipeline(
|
||||
include_undated: bool,
|
||||
hotlane_only: bool = False,
|
||||
) -> tuple[list[AdvisoryRecord], list[Dict[str, Any]], list[str], Dict[str, Any]]:
|
||||
since_dt = None if tier == "history-full" else parse_since(since_arg, default_days=30)
|
||||
if tier == "history-full":
|
||||
since_dt = None
|
||||
elif tier == "rolling-24m":
|
||||
since_dt = parse_since("730d")
|
||||
else:
|
||||
since_dt = parse_since(since_arg, default_days=30)
|
||||
candidates, failures = collect_candidates(source_map, since_dt=since_dt, tier=tier, include_undated=include_undated)
|
||||
advisories, triage = normalize_candidates(candidates)
|
||||
advisories = route_advisories(source_map, advisories)
|
||||
@@ -244,9 +253,11 @@ def pipeline(
|
||||
advisories, triage = _merge_existing_registry(advisories, triage)
|
||||
change_summary = _summarize_changes(advisories)
|
||||
render_map = source_map
|
||||
selected_system_ids = None
|
||||
if len(source_map["systems"]) != len(full_source_map["systems"]):
|
||||
render_map = full_source_map
|
||||
_write_outputs(render_map, advisories, triage, failures, change_summary)
|
||||
selected_system_ids = {system["system_id"] for system in source_map["systems"]}
|
||||
_write_outputs(render_map, advisories, triage, failures, change_summary, selected_system_ids=selected_system_ids)
|
||||
return advisories, triage, failures, change_summary
|
||||
|
||||
|
||||
@@ -256,7 +267,10 @@ def cmd_render(args) -> int:
|
||||
render_map, advisories, triage = _load_existing_selection(full_source_map, source_map)
|
||||
summary = read_json(GENERATED_DIR / "run-summary.json", default={}) or {}
|
||||
failures = summary.get("failures", [])
|
||||
_write_outputs(render_map, advisories, triage, failures, summary)
|
||||
selected_system_ids = None
|
||||
if len(source_map["systems"]) != len(full_source_map["systems"]):
|
||||
selected_system_ids = {system["system_id"] for system in source_map["systems"]}
|
||||
_write_outputs(render_map, advisories, triage, failures, summary, selected_system_ids=selected_system_ids)
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -69,6 +69,8 @@ class AdvisoryRecord:
|
||||
last_verified_at: Optional[str] = None
|
||||
last_run_id: Optional[str] = None
|
||||
evidence_bundle: Optional[str] = None
|
||||
historical_status: Optional[str] = None
|
||||
latest_status: Optional[str] = None
|
||||
browser_evidence: Dict[str, Any] = field(
|
||||
default_factory=lambda: {
|
||||
"required": False,
|
||||
|
||||
@@ -191,7 +191,38 @@ def _clear_json_dir(path: Path) -> None:
|
||||
file_path.unlink()
|
||||
|
||||
|
||||
def render_system_scaffolding(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]) -> None:
|
||||
def _sync_json_dir(path: Path, payloads: Dict[str, Any]) -> None:
|
||||
ensure_dir(path)
|
||||
desired = set(payloads.keys())
|
||||
for file_path in path.glob("*.json"):
|
||||
if file_path.name not in desired:
|
||||
file_path.unlink()
|
||||
for filename, payload in payloads.items():
|
||||
write_json(path / filename, payload)
|
||||
|
||||
|
||||
def _sync_selected_json_dir(path: Path, payloads: Dict[str, Any], selected_system_ids: set[str], *, systems_dir: bool = False) -> None:
|
||||
ensure_dir(path)
|
||||
desired = set(payloads.keys())
|
||||
for file_path in path.glob("*.json"):
|
||||
if systems_dir:
|
||||
system_id = file_path.stem
|
||||
if system_id not in selected_system_ids:
|
||||
continue
|
||||
else:
|
||||
if not any(file_path.name.startswith(f"{system_id}--") for system_id in selected_system_ids):
|
||||
continue
|
||||
if file_path.name not in desired:
|
||||
file_path.unlink()
|
||||
for filename, payload in payloads.items():
|
||||
write_json(path / filename, payload)
|
||||
|
||||
|
||||
def render_system_scaffolding(
|
||||
source_map: Dict[str, Any],
|
||||
advisories: List[AdvisoryRecord],
|
||||
selected_system_ids: set[str] | None = None,
|
||||
) -> None:
|
||||
run_map = latest_runs_by_advisory()
|
||||
grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list)
|
||||
for advisory in advisories:
|
||||
@@ -200,6 +231,10 @@ def render_system_scaffolding(source_map: Dict[str, Any], advisories: List[Advis
|
||||
groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
||||
for system in source_map["systems"]:
|
||||
groups[_group_name(system["output_dir"])].append(system)
|
||||
target_systems = source_map["systems"]
|
||||
if selected_system_ids:
|
||||
target_systems = [system for system in source_map["systems"] if system["system_id"] in selected_system_ids]
|
||||
for system in target_systems:
|
||||
system_dir = FRAMEWORK_ROOT / _group_name(system["output_dir"]) / system["system_id"]
|
||||
ensure_dir(system_dir / "cases")
|
||||
|
||||
@@ -311,9 +346,11 @@ def render_system_scaffolding(source_map: Dict[str, Any], advisories: List[Advis
|
||||
write_text(FRAMEWORK_ROOT / "README.md", "\n".join(root_lines))
|
||||
|
||||
|
||||
def render_case_pages(advisories: List[AdvisoryRecord]) -> None:
|
||||
def render_case_pages(advisories: List[AdvisoryRecord], selected_system_ids: set[str] | None = None) -> None:
|
||||
run_map = latest_runs_by_advisory()
|
||||
for item in advisories:
|
||||
if selected_system_ids and item.system_id not in selected_system_ids:
|
||||
continue
|
||||
if not item.render_markdown or not item.case_path:
|
||||
continue
|
||||
merged = _merged_item(item, run_map)
|
||||
@@ -410,28 +447,36 @@ def render_case_pages(advisories: List[AdvisoryRecord]) -> None:
|
||||
write_text(ROOT / item.case_path, "\n".join(lines))
|
||||
|
||||
|
||||
def render_registry(source_map: Dict[str, Any], advisories: List[AdvisoryRecord], triage: List[Dict[str, Any]]) -> None:
|
||||
_clear_json_dir(REGISTRY_ROOT / "advisories")
|
||||
_clear_json_dir(REGISTRY_ROOT / "systems")
|
||||
_clear_json_dir(TRIAGE_DIR)
|
||||
|
||||
def render_registry(
|
||||
source_map: Dict[str, Any],
|
||||
advisories: List[AdvisoryRecord],
|
||||
triage: List[Dict[str, Any]],
|
||||
selected_system_ids: set[str] | None = None,
|
||||
) -> None:
|
||||
run_map = latest_runs_by_advisory()
|
||||
grouped: Dict[str, List[AdvisoryRecord]] = defaultdict(list)
|
||||
advisory_payloads: Dict[str, Dict[str, Any]] = {}
|
||||
for advisory in advisories:
|
||||
write_json(REGISTRY_ROOT / "advisories" / f"{advisory.canonical_id}.json", _merged_item(advisory, run_map))
|
||||
if selected_system_ids and advisory.system_id not in selected_system_ids:
|
||||
continue
|
||||
advisory_payloads[f"{advisory.canonical_id}.json"] = _merged_item(advisory, run_map)
|
||||
grouped[advisory.system_id].append(advisory)
|
||||
|
||||
triage_by_system: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
||||
triage_payloads: Dict[str, Dict[str, Any]] = {}
|
||||
for item in triage:
|
||||
if selected_system_ids and item["system_id"] not in selected_system_ids:
|
||||
continue
|
||||
triage_by_system[item["system_id"]].append(item)
|
||||
write_json(TRIAGE_DIR / f"{item['canonical_id']}.json", item)
|
||||
triage_payloads[f"{item['canonical_id']}.json"] = item
|
||||
|
||||
system_payloads: Dict[str, Dict[str, Any]] = {}
|
||||
for system in source_map["systems"]:
|
||||
system_id = system["system_id"]
|
||||
items = grouped.get(system_id, [])
|
||||
merged_items = [_merged_item(item, run_map) for item in items]
|
||||
counts = _status_counts(merged_items)
|
||||
payload = {
|
||||
system_payloads[f"{system_id}.json"] = {
|
||||
"system_id": system_id,
|
||||
"display_name": system["display_name"],
|
||||
"category": system["category"],
|
||||
@@ -448,7 +493,14 @@ def render_registry(source_map: Dict[str, Any], advisories: List[AdvisoryRecord]
|
||||
"manual_count": counts["manual"],
|
||||
"items": [item.canonical_id for item in sorted(items, key=lambda item: item.published_at or "", reverse=True)],
|
||||
}
|
||||
write_json(SYSTEMS_DIR / f"{system_id}.json", payload)
|
||||
if selected_system_ids:
|
||||
_sync_selected_json_dir(REGISTRY_ROOT / "advisories", advisory_payloads, selected_system_ids)
|
||||
_sync_selected_json_dir(TRIAGE_DIR, triage_payloads, selected_system_ids)
|
||||
_sync_selected_json_dir(REGISTRY_ROOT / "systems", system_payloads, selected_system_ids, systems_dir=True)
|
||||
return
|
||||
_sync_json_dir(REGISTRY_ROOT / "advisories", advisory_payloads)
|
||||
_sync_json_dir(TRIAGE_DIR, triage_payloads)
|
||||
_sync_json_dir(REGISTRY_ROOT / "systems", system_payloads)
|
||||
|
||||
|
||||
def render_generated(
|
||||
@@ -525,7 +577,10 @@ def render_generated(
|
||||
"failures": failures,
|
||||
},
|
||||
)
|
||||
render_lab_dashboard()
|
||||
render_lab_dashboard(
|
||||
advisory_records=[item.to_dict() for item in advisories],
|
||||
source_map_data=source_map,
|
||||
)
|
||||
|
||||
|
||||
def render_secure_code(source_map: Dict[str, Any]) -> None:
|
||||
|
||||
@@ -14,6 +14,16 @@ from intel.utils import unique
|
||||
|
||||
ANCHOR_RE = re.compile(r"<a[^>]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)</a>", re.IGNORECASE | re.DOTALL)
|
||||
TAG_RE = re.compile(r"<[^>]+>")
|
||||
GENERIC_TITLES = {
|
||||
"permalink",
|
||||
"discuss this topic",
|
||||
"read full topic",
|
||||
"read more",
|
||||
}
|
||||
|
||||
|
||||
def _is_generic_title(title: str) -> bool:
|
||||
return title.strip().lower() in GENERIC_TITLES
|
||||
|
||||
|
||||
def canonicalize_url(url: str) -> str:
|
||||
@@ -37,7 +47,8 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
exclude_patterns = parser_hints.get("exclude_url_patterns") or []
|
||||
|
||||
candidates: List[Candidate] = []
|
||||
seen = set()
|
||||
by_url: Dict[str, Candidate] = {}
|
||||
ordered_urls: List[str] = []
|
||||
for href, text in ANCHOR_RE.findall(html):
|
||||
title = unescape(TAG_RE.sub(" ", text)).strip()
|
||||
if not title:
|
||||
@@ -50,11 +61,10 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
continue
|
||||
if exclude_patterns and _matches_patterns(absolute, exclude_patterns):
|
||||
continue
|
||||
if absolute in seen:
|
||||
continue
|
||||
seen.add(absolute)
|
||||
candidates.append(
|
||||
Candidate(
|
||||
existing = by_url.get(absolute)
|
||||
if existing is None:
|
||||
ordered_urls.append(absolute)
|
||||
by_url[absolute] = Candidate(
|
||||
system_id=system["system_id"],
|
||||
display_name=system["display_name"],
|
||||
category=system["category"],
|
||||
@@ -69,7 +79,17 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
references=unique([absolute]),
|
||||
raw={"href": absolute, "title": title},
|
||||
)
|
||||
)
|
||||
if len(candidates) >= source.get("max_items", 50):
|
||||
break
|
||||
continue
|
||||
if _is_generic_title(existing.title) and not _is_generic_title(title):
|
||||
existing.title = title
|
||||
existing.raw = {"href": absolute, "title": title}
|
||||
continue
|
||||
if _is_generic_title(title) and not _is_generic_title(existing.title):
|
||||
continue
|
||||
if len(title) > len(existing.title):
|
||||
existing.title = title
|
||||
existing.raw = {"href": absolute, "title": title}
|
||||
|
||||
for absolute in ordered_urls[: source.get("max_items", 50)]:
|
||||
candidates.append(by_url[absolute])
|
||||
return candidates
|
||||
|
||||
@@ -17,11 +17,33 @@ def _refs(item: Dict[str, Any]) -> List[str]:
|
||||
return unique(values)
|
||||
|
||||
|
||||
def _list_value(item: Dict[str, Any], *keys: str) -> List[str]:
|
||||
values: List[str] = []
|
||||
for key in keys:
|
||||
raw = item.get(key)
|
||||
if isinstance(raw, str) and raw:
|
||||
values.append(raw)
|
||||
elif isinstance(raw, list):
|
||||
values.extend(str(entry) for entry in raw if entry)
|
||||
return unique(values)
|
||||
|
||||
|
||||
def _title(item: Dict[str, Any], system: Dict[str, Any]) -> str:
|
||||
for key in ("title", "name", "summary", "issue_id", "cve_id", "id"):
|
||||
value = item.get(key)
|
||||
if isinstance(value, str) and value.strip():
|
||||
return value.strip()
|
||||
return f"JSON entry for {system['display_name']}"
|
||||
|
||||
|
||||
def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
response = request("GET", source["url"], source=source)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
||||
if isinstance(payload, list):
|
||||
items = payload
|
||||
else:
|
||||
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
||||
if not isinstance(items, list):
|
||||
return []
|
||||
|
||||
@@ -31,9 +53,15 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
for item in items[: source.get("max_items", 50)]:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
title = item.get("title") or item.get("name") or item.get("summary") or f"JSON entry for {system['display_name']}"
|
||||
title = _title(item, system)
|
||||
link = item.get("url") or item.get("external_url") or item.get("html_url") or source["url"]
|
||||
summary = item.get("summary") or item.get("content_text") or item.get("description") or ""
|
||||
summary = (
|
||||
item.get("summary")
|
||||
or item.get("content_text")
|
||||
or item.get("description")
|
||||
or item.get("details")
|
||||
or ""
|
||||
)
|
||||
if keywords:
|
||||
haystack = " ".join(filter(None, [title, summary, link])).lower()
|
||||
if not any(keyword in haystack for keyword in keywords):
|
||||
@@ -41,6 +69,10 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
refs = _refs(item)
|
||||
if link and link not in refs:
|
||||
refs.insert(0, link)
|
||||
aliases = _list_value(item, "aliases", "id", "issue_id", "cve_id", "ghsa_id", "osv_id")
|
||||
cve_ids = [value for value in aliases if value.startswith("CVE-")]
|
||||
ghsa_ids = [value for value in aliases if value.startswith("GHSA-")]
|
||||
osv_ids = [value for value in aliases if value.startswith("OSV-")]
|
||||
candidates.append(
|
||||
Candidate(
|
||||
system_id=system["system_id"],
|
||||
@@ -52,11 +84,29 @@ def fetch(system: Dict[str, Any], source: Dict[str, Any]) -> List[Candidate]:
|
||||
source_confidence=source["confidence"],
|
||||
source_url=link,
|
||||
title=title,
|
||||
published_at=item.get("date_published") or item.get("published_at") or item.get("published") or item.get("created_at"),
|
||||
updated_at=item.get("date_modified") or item.get("updated_at") or item.get("modified") or item.get("updated"),
|
||||
published_at=(
|
||||
item.get("date_published")
|
||||
or item.get("published_at")
|
||||
or item.get("published")
|
||||
or item.get("created_at")
|
||||
or item.get("fix_release_date")
|
||||
),
|
||||
updated_at=(
|
||||
item.get("date_modified")
|
||||
or item.get("updated_at")
|
||||
or item.get("modified")
|
||||
or item.get("updated")
|
||||
or item.get("fix_release_date")
|
||||
),
|
||||
summary=summary,
|
||||
severity=str(item.get("severity") or "unknown").lower(),
|
||||
aliases=unique(item.get("aliases", []) or [item.get("id")]),
|
||||
aliases=aliases,
|
||||
cve_ids=cve_ids,
|
||||
ghsa_ids=ghsa_ids,
|
||||
osv_ids=osv_ids,
|
||||
affected_versions=_list_value(item, "affected_versions"),
|
||||
fixed_versions=_list_value(item, "fixed_versions", "fix_versions"),
|
||||
package_name=item.get("package_name") or item.get("platform"),
|
||||
references=refs,
|
||||
raw=item,
|
||||
)
|
||||
|
||||
@@ -147,7 +147,12 @@ def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, An
|
||||
response = request("GET", source["url"], source=source)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
||||
if isinstance(payload, list):
|
||||
items = payload
|
||||
elif isinstance(payload, dict):
|
||||
items = payload.get("items") or payload.get("entries") or payload.get("advisories") or []
|
||||
else:
|
||||
raise ValueError("JSON feed probe returned unsupported payload type")
|
||||
if not isinstance(items, list):
|
||||
raise ValueError("JSON feed probe returned non-list items")
|
||||
return {"kind": kind, "items_seen": len(items)}
|
||||
|
||||
@@ -85,15 +85,20 @@ def read_json(path: Path, default: Any = None) -> Any:
|
||||
|
||||
def write_json(path: Path, data: Any) -> None:
|
||||
ensure_dir(path.parent)
|
||||
content = json.dumps(data, indent=2, ensure_ascii=True, sort_keys=False) + "\n"
|
||||
if path.exists() and path.read_text(encoding="utf-8") == content:
|
||||
return
|
||||
with path.open("w", encoding="utf-8") as handle:
|
||||
json.dump(data, handle, indent=2, ensure_ascii=True, sort_keys=False)
|
||||
handle.write("\n")
|
||||
handle.write(content)
|
||||
|
||||
|
||||
def write_text(path: Path, content: str) -> None:
|
||||
ensure_dir(path.parent)
|
||||
rendered = content.rstrip() + "\n"
|
||||
if path.exists() and path.read_text(encoding="utf-8") == rendered:
|
||||
return
|
||||
with path.open("w", encoding="utf-8") as handle:
|
||||
handle.write(content.rstrip() + "\n")
|
||||
handle.write(rendered)
|
||||
|
||||
|
||||
def run(cmd: List[str], cwd: Optional[Path] = None, check: bool = True) -> subprocess.CompletedProcess:
|
||||
|
||||
@@ -1263,17 +1263,23 @@ def render_run(run: Dict[str, Any]) -> Dict[str, str]:
|
||||
return {"bundle_dir": str(run_dir), "report_md": str(report_md), "report_html": str(report_html), "timeline": str(timeline_path)}
|
||||
|
||||
|
||||
def render_dashboard() -> Dict[str, str]:
|
||||
def render_dashboard(
|
||||
*,
|
||||
advisory_records: List[Dict[str, Any]] | None = None,
|
||||
runs: List[Dict[str, Any]] | None = None,
|
||||
source_map_data: Dict[str, Any] | None = None,
|
||||
repro_map_data: Dict[str, Any] | None = None,
|
||||
) -> Dict[str, str]:
|
||||
ensure_dir(DASHBOARD_DIR)
|
||||
advisory_records = load_json_dir(ADVISORIES_DIR)
|
||||
runs = load_json_dir(RUNS_DIR)
|
||||
advisory_records = advisory_records if advisory_records is not None else load_json_dir(ADVISORIES_DIR)
|
||||
runs = runs if runs is not None else load_json_dir(RUNS_DIR)
|
||||
run_summary = read_json(ROOT / "08-threat-intel" / "generated" / "run-summary.json", default={}) or {}
|
||||
source_health = read_json(ROOT / "08-threat-intel" / "generated" / "source-health.json", default={}) or {}
|
||||
alerts = read_json(ROOT / "08-threat-intel" / "generated" / "alerts.json", default=[]) or []
|
||||
monitor_summary = read_json(ROOT / "08-threat-intel" / "generated" / "monitor-summary.json", default={}) or {}
|
||||
source_catalog_audit = read_json(ROOT / "08-threat-intel" / "generated" / "source-catalog-audit.json", default={}) or {}
|
||||
source_map = read_yaml(SOURCE_MAP_PATH, default={}) or {}
|
||||
repro_map = read_yaml(REPRO_MAP_PATH, default={}) or {}
|
||||
source_map = source_map_data if source_map_data is not None else (read_yaml(SOURCE_MAP_PATH, default={}) or {})
|
||||
repro_map = repro_map_data if repro_map_data is not None else (read_yaml(REPRO_MAP_PATH, default={}) or {})
|
||||
source_system_map = {item["system_id"]: item for item in source_map.get("systems", []) if item.get("system_id")}
|
||||
merged_advisories = _merge_latest_advisories(advisory_records, runs, source_system_map)
|
||||
advisory_map = {item["canonical_id"]: item for item in merged_advisories if item.get("canonical_id")}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
@@ -25,10 +26,12 @@ FAMILY_KEYWORDS = {
|
||||
}
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_repro_map() -> Dict[str, Any]:
|
||||
return read_yaml(REPRO_MAP_PATH, default={"systems": []}) or {"systems": []}
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_profiles() -> Dict[str, Dict[str, Any]]:
|
||||
profiles: Dict[str, Dict[str, Any]] = {}
|
||||
if not REPRO_PROFILES_DIR.exists():
|
||||
|
||||
@@ -33,7 +33,10 @@ def read_json(path: Path, default: Any = None) -> Any:
|
||||
|
||||
def write_json(path: Path, data: Any) -> None:
|
||||
ensure_dir(path.parent)
|
||||
path.write_text(json.dumps(data, indent=2, ensure_ascii=True, sort_keys=False) + "\n", encoding="utf-8")
|
||||
content = json.dumps(data, indent=2, ensure_ascii=True, sort_keys=False) + "\n"
|
||||
if path.exists() and path.read_text(encoding="utf-8") == content:
|
||||
return
|
||||
path.write_text(content, encoding="utf-8")
|
||||
|
||||
|
||||
def read_yaml(path: Path, default: Any = None) -> Any:
|
||||
@@ -51,7 +54,10 @@ def write_yaml(path: Path, data: Any) -> None:
|
||||
|
||||
def write_text(path: Path, content: str) -> None:
|
||||
ensure_dir(path.parent)
|
||||
path.write_text(content.rstrip() + "\n", encoding="utf-8")
|
||||
rendered = content.rstrip() + "\n"
|
||||
if path.exists() and path.read_text(encoding="utf-8") == rendered:
|
||||
return
|
||||
path.write_text(rendered, encoding="utf-8")
|
||||
|
||||
|
||||
def load_json_dir(path: Path) -> List[Dict[str, Any]]:
|
||||
|
||||
在新工单中引用
屏蔽一个用户