151 行
4.0 KiB
Python
151 行
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import subprocess
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
|
|
UTC = timezone.utc
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(tz=UTC)
|
|
|
|
|
|
def isoformat(dt: datetime) -> str:
|
|
return dt.astimezone(UTC).replace(microsecond=0).isoformat()
|
|
|
|
|
|
def parse_dt(value: Optional[str]) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
if not isinstance(value, str):
|
|
return None
|
|
value = value.strip()
|
|
if value.endswith("Z"):
|
|
value = value[:-1] + "+00:00"
|
|
for fmt in (
|
|
None,
|
|
"%a, %d %b %Y %H:%M:%S %z",
|
|
"%Y-%m-%d",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
):
|
|
try:
|
|
if fmt is None:
|
|
parsed = datetime.fromisoformat(value)
|
|
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=UTC)
|
|
parsed = datetime.strptime(value, fmt)
|
|
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=UTC)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def parse_since(value: str, default_days: int = 30) -> datetime:
|
|
value = (value or "").strip()
|
|
if not value:
|
|
return now_utc() - timedelta(days=default_days)
|
|
match = re.fullmatch(r"(\d+)d", value)
|
|
if match:
|
|
return now_utc() - timedelta(days=int(match.group(1)))
|
|
parsed = parse_dt(value)
|
|
if parsed:
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC)
|
|
raise ValueError(f"Unsupported --since value: {value}")
|
|
|
|
|
|
def slugify(value: str) -> str:
|
|
value = value.lower().strip()
|
|
value = re.sub(r"[^a-z0-9]+", "-", value)
|
|
value = re.sub(r"-+", "-", value).strip("-")
|
|
return value or "item"
|
|
|
|
|
|
def short_hash(*parts: str) -> str:
|
|
digest = hashlib.sha1("::".join(parts).encode("utf-8")).hexdigest()
|
|
return digest[:10]
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def read_json(path: Path, default: Any = None) -> Any:
|
|
if not path.exists():
|
|
return default
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
|
|
|
|
def write_json(path: Path, data: Any) -> None:
|
|
ensure_dir(path.parent)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2, ensure_ascii=True, sort_keys=False)
|
|
handle.write("\n")
|
|
|
|
|
|
def write_text(path: Path, content: str) -> None:
|
|
ensure_dir(path.parent)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
handle.write(content.rstrip() + "\n")
|
|
|
|
|
|
def run(cmd: List[str], cwd: Optional[Path] = None, check: bool = True) -> subprocess.CompletedProcess:
|
|
return subprocess.run(
|
|
cmd,
|
|
cwd=str(cwd) if cwd else None,
|
|
check=check,
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
|
|
def load_all_json(path: Path) -> List[Dict[str, Any]]:
|
|
items: List[Dict[str, Any]] = []
|
|
if not path.exists():
|
|
return items
|
|
for file_path in sorted(path.glob("*.json")):
|
|
content = read_json(file_path, default=None)
|
|
if isinstance(content, dict):
|
|
items.append(content)
|
|
return items
|
|
|
|
|
|
def unique(values: Iterable[str]) -> List[str]:
|
|
seen = set()
|
|
result = []
|
|
for value in values:
|
|
if not value:
|
|
continue
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
result.append(value)
|
|
return result
|
|
|
|
|
|
def severity_rank(value: Optional[str]) -> int:
|
|
order = {
|
|
"critical": 5,
|
|
"high": 4,
|
|
"important": 4,
|
|
"medium": 3,
|
|
"moderate": 3,
|
|
"low": 2,
|
|
"info": 1,
|
|
"unknown": 0,
|
|
None: 0,
|
|
}
|
|
return order.get((value or "").lower(), 0)
|
|
|
|
|
|
def best_severity(values: Iterable[Optional[str]]) -> str:
|
|
ordered = sorted(values, key=severity_rank, reverse=True)
|
|
return next((value for value in ordered if value), "unknown")
|