更新: 21 个文件 - 2026-03-17 00:00:00
这个提交包含在:
@@ -1,11 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from lab.config import ADVISORIES_DIR, QUEUE_PATH
|
||||
from lab.utils import load_json_dir, read_json, write_json
|
||||
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
def load_queue() -> Dict[str, Any]:
|
||||
return read_json(QUEUE_PATH, default={"items": []}) or {"items": []}
|
||||
|
||||
@@ -28,15 +32,64 @@ def enqueue_items(items: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
return {"queued": len(queue["items"]), "added": added}
|
||||
|
||||
|
||||
def _parse_iso(value: str | None) -> datetime:
|
||||
if not value:
|
||||
return datetime(1970, 1, 1, tzinfo=UTC)
|
||||
try:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC)
|
||||
except ValueError:
|
||||
return datetime(1970, 1, 1, tzinfo=UTC)
|
||||
|
||||
|
||||
def _priority_tuple(advisory: Dict[str, Any], only_hotlane: bool) -> tuple[int, float]:
|
||||
score = 0
|
||||
verification_status = advisory.get("verification_status", "triage-manual")
|
||||
if verification_status == "triage-manual":
|
||||
score += 500
|
||||
elif verification_status.startswith("blocked-"):
|
||||
score += 450
|
||||
elif verification_status == "verified-synthetic":
|
||||
score += 300
|
||||
else:
|
||||
score += 150
|
||||
|
||||
last_verified = _parse_iso(advisory.get("last_verified_at"))
|
||||
latest_upstream = max(_parse_iso(advisory.get("updated_at")), _parse_iso(advisory.get("published_at")))
|
||||
if advisory.get("last_verified_at") is None:
|
||||
score += 350
|
||||
elif latest_upstream > last_verified:
|
||||
score += 250
|
||||
|
||||
exploit_status = advisory.get("exploit_status")
|
||||
if exploit_status in {"known_exploited", "active_exploitation", "in_the_wild"}:
|
||||
score += 1000
|
||||
severity = advisory.get("severity")
|
||||
if severity == "critical":
|
||||
score += 250
|
||||
score += int((advisory.get("cvss_score") or 0) * 10)
|
||||
|
||||
if only_hotlane:
|
||||
score += 100
|
||||
|
||||
return score, latest_upstream.timestamp()
|
||||
|
||||
|
||||
def enqueue_from_registry(only_hotlane: bool = False, limit: int = 50) -> Dict[str, Any]:
|
||||
advisories = load_json_dir(ADVISORIES_DIR)
|
||||
advisories = sorted(advisories, key=lambda item: _priority_tuple(item, only_hotlane), reverse=True)
|
||||
items = []
|
||||
for advisory in advisories:
|
||||
if only_hotlane:
|
||||
hot = advisory.get("exploit_status") in {"known_exploited", "active_exploitation", "in_the_wild"}
|
||||
if not hot and not (advisory.get("cvss_score") or 0) >= 8.8 and advisory.get("severity") != "critical":
|
||||
continue
|
||||
items.append({"advisory_id": advisory["canonical_id"], "system_id": advisory["system_id"], "priority": "hotlane" if only_hotlane else "default"})
|
||||
items.append(
|
||||
{
|
||||
"advisory_id": advisory["canonical_id"],
|
||||
"system_id": advisory["system_id"],
|
||||
"priority": "hotlane" if only_hotlane else "default",
|
||||
}
|
||||
)
|
||||
return enqueue_items(items[:limit])
|
||||
|
||||
|
||||
|
||||
在新工单中引用
屏蔽一个用户