文件
websafe-kb/scripts/lab/task_queue.py

103 行
3.3 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List
from lab.config import ADVISORIES_DIR, QUEUE_PATH
from lab.utils import load_json_dir, read_json, write_json
UTC = timezone.utc
def load_queue() -> Dict[str, Any]:
return read_json(QUEUE_PATH, default={"items": []}) or {"items": []}
def save_queue(queue: Dict[str, Any]) -> None:
write_json(QUEUE_PATH, queue)
def enqueue_items(items: List[Dict[str, Any]]) -> Dict[str, Any]:
queue = load_queue()
existing = {item["advisory_id"] for item in queue.get("items", [])}
added = 0
for item in items:
if item["advisory_id"] in existing:
continue
queue.setdefault("items", []).append(item)
existing.add(item["advisory_id"])
added += 1
save_queue(queue)
return {"queued": len(queue["items"]), "added": added}
def _parse_iso(value: str | None) -> datetime:
if not value:
return datetime(1970, 1, 1, tzinfo=UTC)
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC)
except ValueError:
return datetime(1970, 1, 1, tzinfo=UTC)
def _priority_tuple(advisory: Dict[str, Any], only_hotlane: bool) -> tuple[int, float]:
score = 0
verification_status = advisory.get("verification_status", "triage-manual")
if verification_status == "triage-manual":
score += 500
elif verification_status.startswith("blocked-"):
score += 450
elif verification_status == "verified-synthetic":
score += 300
else:
score += 150
last_verified = _parse_iso(advisory.get("last_verified_at"))
latest_upstream = max(_parse_iso(advisory.get("updated_at")), _parse_iso(advisory.get("published_at")))
if advisory.get("last_verified_at") is None:
score += 350
elif latest_upstream > last_verified:
score += 250
exploit_status = advisory.get("exploit_status")
if exploit_status in {"known_exploited", "active_exploitation", "in_the_wild"}:
score += 1000
severity = advisory.get("severity")
if severity == "critical":
score += 250
score += int((advisory.get("cvss_score") or 0) * 10)
if only_hotlane:
score += 100
return score, latest_upstream.timestamp()
def enqueue_from_registry(only_hotlane: bool = False, limit: int = 50) -> Dict[str, Any]:
advisories = load_json_dir(ADVISORIES_DIR)
advisories = sorted(advisories, key=lambda item: _priority_tuple(item, only_hotlane), reverse=True)
items = []
for advisory in advisories:
if only_hotlane:
hot = advisory.get("exploit_status") in {"known_exploited", "active_exploitation", "in_the_wild"}
if not hot and not (advisory.get("cvss_score") or 0) >= 8.8 and advisory.get("severity") != "critical":
continue
items.append(
{
"advisory_id": advisory["canonical_id"],
"system_id": advisory["system_id"],
"priority": "hotlane" if only_hotlane else "default",
}
)
return enqueue_items(items[:limit])
def dequeue(limit: int = 10) -> List[Dict[str, Any]]:
queue = load_queue()
items = queue.get("items", [])
selected = items[:limit]
queue["items"] = items[limit:]
save_queue(queue)
return selected