diff --git a/client/src/components/TaskCenter.tsx b/client/src/components/TaskCenter.tsx index 2c30507..bce25c9 100644 --- a/client/src/components/TaskCenter.tsx +++ b/client/src/components/TaskCenter.tsx @@ -3,6 +3,7 @@ import { trpc } from "@/lib/trpc"; import { Button } from "@/components/ui/button"; import { Badge } from "@/components/ui/badge"; import { Progress } from "@/components/ui/progress"; +import { Alert, AlertDescription } from "@/components/ui/alert"; import { Sheet, SheetContent, SheetHeader, SheetTitle, SheetTrigger } from "@/components/ui/sheet"; import { ScrollArea } from "@/components/ui/scroll-area"; import { toast } from "sonner"; @@ -21,6 +22,22 @@ function formatTaskStatus(status: string) { } } +function formatTaskTiming(task: { + createdAt: string | Date; + startedAt?: string | Date | null; + completedAt?: string | Date | null; +}) { + const createdAt = new Date(task.createdAt).getTime(); + const startedAt = task.startedAt ? new Date(task.startedAt).getTime() : null; + const completedAt = task.completedAt ? new Date(task.completedAt).getTime() : null; + const durationMs = (completedAt ?? Date.now()) - (startedAt ?? createdAt); + const seconds = Math.max(0, Math.round(durationMs / 1000)); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const rest = seconds % 60; + return `${minutes}m ${rest.toString().padStart(2, "0")}s`; +} + export function TaskCenter({ compact = false }: { compact?: boolean }) { const utils = trpc.useUtils(); const retryMutation = trpc.task.retry.useMutation({ @@ -36,6 +53,9 @@ export function TaskCenter({ compact = false }: { compact?: boolean }) { const taskListQuery = trpc.task.list.useQuery( { limit: 20 }, { + retry: 3, + retryDelay: (attempt) => Math.min(1_000 * 2 ** attempt, 8_000), + placeholderData: (previous) => previous, refetchInterval: (query) => { const hasActiveTask = (query.state.data ?? []).some((task) => task.status === "queued" || task.status === "running"); return hasActiveTask ? 3_000 : 8_000; @@ -86,6 +106,15 @@ export function TaskCenter({ compact = false }: { compact?: boolean }) {
+ {taskListQuery.isError ? ( + + + + 任务列表刷新失败,当前显示最近一次成功结果。 + + + ) : null} + {(taskListQuery.data ?? []).length === 0 ? (
当前没有后台任务。 @@ -117,7 +146,9 @@ export function TaskCenter({ compact = false }: { compact?: boolean }) { ) : null}
- {new Date(task.createdAt).toLocaleString("zh-CN")} + + {new Date(task.createdAt).toLocaleString("zh-CN")} · 耗时 {formatTaskTiming(task)} + {task.status === "failed" ? ( + {actionStats.map((item) => ( + + ))} +
+ ) : null} + + {filteredVisibleSegments.length === 0 ? (
开始分析后,这里会按时间区间显示识别出的动作片段。
) : ( - visibleSegments.map((segment) => { + filteredVisibleSegments.map((segment) => { const meta = ACTION_META[segment.actionType]; return (
@@ -1151,6 +1276,7 @@ export default function LiveCamera() { {meta.label} {formatDuration(segment.startMs)} - {formatDuration(segment.endMs)} 时长 {formatDuration(segment.durationMs)} + 关键帧 {segment.keyFrames.length}
{segment.issueSummary.join(" · ") || "当前片段节奏稳定"}
@@ -1189,6 +1315,7 @@ export default function LiveCamera() {
{heroAction.label} 置信度 {liveScore.confidence}% + {sessionBand.label}
@@ -1207,6 +1334,39 @@ export default function LiveCamera() { + + + 动作分布 + 按识别出的非未知动作统计区间数量、时长和平均质量。 + + + {actionStats.length === 0 ? ( +
+ 累积到稳定动作区间后,这里会展示分布。 +
+ ) : ( + actionStats.map((item) => ( +
+
+
+ {ACTION_META[item.actionType].label} + {item.count} 段 +
+
+ 平均 {Math.round(item.averageScore)} 分 · {Math.round(item.averageConfidence * 100)}% +
+
+ +
+ 累计时长 {formatDuration(item.durationMs)} + 占有效片段 {item.sharePct}% +
+
+ )) + )} +
+
+ 实时反馈 @@ -1234,6 +1394,18 @@ export default function LiveCamera() { className="mt-3 h-2" />
+ +
+
+ 有效识别率 + {Math.round(knownRatio * 100)}% +
+ +
+
最佳片段 {bestSegment ? `${Math.round(bestSegment.score)} 分` : "暂无"}
+
主动作 {actionStats[0] ? ACTION_META[actionStats[0].actionType].label : "未知"}
+
+
@@ -1265,6 +1437,17 @@ export default function LiveCamera() {
有效片段 {session.effectiveSegments || 0}
时长 {formatDuration(session.durationMs || 0)}
+ {session.videoUrl ? ( +
+ +
+ ) : null} )) )} diff --git a/client/src/pages/Logs.tsx b/client/src/pages/Logs.tsx index b22d7a3..b39e148 100644 --- a/client/src/pages/Logs.tsx +++ b/client/src/pages/Logs.tsx @@ -37,11 +37,30 @@ function formatStructuredValue(value: unknown) { } } +function formatTaskTiming(task: { + createdAt: string | Date; + startedAt?: string | Date | null; + completedAt?: string | Date | null; +}) { + const createdAt = new Date(task.createdAt).getTime(); + const startedAt = task.startedAt ? new Date(task.startedAt).getTime() : null; + const completedAt = task.completedAt ? new Date(task.completedAt).getTime() : null; + const durationMs = (completedAt ?? Date.now()) - (startedAt ?? createdAt); + const seconds = Math.max(0, Math.round(durationMs / 1000)); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const rest = seconds % 60; + return `${minutes}m ${rest.toString().padStart(2, "0")}s`; +} + export default function Logs() { const utils = trpc.useUtils(); const taskListQuery = trpc.task.list.useQuery( { limit: 50 }, { + retry: 3, + retryDelay: (attempt) => Math.min(1_000 * 2 ** attempt, 8_000), + placeholderData: (previous) => previous, refetchInterval: (query) => { const hasActiveTask = (query.state.data ?? []).some((task) => task.status === "queued" || task.status === "running"); return hasActiveTask ? 3_000 : 10_000; @@ -103,6 +122,16 @@ export default function Logs() { + {taskListQuery.isError ? ( + + + 任务列表刷新失败 + + 当前显示最近一次成功拉取的数据。服务恢复后页面会自动继续刷新。 + + + ) : null} + 后台任务 @@ -153,7 +182,9 @@ export default function Logs() { ) : null}
- 进度 {task.progress}% · 尝试 {task.attempts}/{task.maxAttempts} + + 进度 {task.progress}% · 尝试 {task.attempts}/{task.maxAttempts} · 耗时 {formatTaskTiming(task)} + {task.status === "failed" ? ( + +
@@ -469,7 +554,20 @@ export default function Videos() { if (previewRef.current) previewRef.current.currentTime = clip.startSec; }} > - 预览 + 载入区间 + +
@@ -497,18 +595,52 @@ export default function Videos() {
{clip.label} {clip.source === "manual" ? "手动" : "建议"} + {formatSeconds(Math.max(0, clip.endSec - clip.startSec))}
{formatSeconds(clip.startSec)} - {formatSeconds(clip.endSec)}
- +
+ + + +
{clip.notes ?
{clip.notes}
: null} @@ -525,19 +657,31 @@ export default function Videos() { variant="outline" onClick={() => { if (!selectedVideo) return; - downloadJson(`${selectedVideo.title}-clip-plan.json`, { + const payload = { videoId: selectedVideo.id, title: selectedVideo.title, url: selectedVideo.url, clipDrafts, exportedAt: new Date().toISOString(), - }); + }; + downloadJson(`${selectedVideo.title}-clip-plan.json`, payload); }} className="gap-2" > 导出草稿 + diff --git a/server/_core/env.ts b/server/_core/env.ts index 0640267..2276a5f 100644 --- a/server/_core/env.ts +++ b/server/_core/env.ts @@ -51,7 +51,13 @@ export const ENV = { llmMaxTokens: parseInteger(process.env.LLM_MAX_TOKENS, 32768), llmEnableThinking: parseBoolean(process.env.LLM_ENABLE_THINKING, false), llmThinkingBudget: parseInteger(process.env.LLM_THINKING_BUDGET, 128), + llmTimeoutMs: parseInteger(process.env.LLM_TIMEOUT_MS, 45000), + llmRetryCount: parseInteger(process.env.LLM_RETRY_COUNT, 1), mediaServiceUrl: process.env.MEDIA_SERVICE_URL ?? "", + mediaFetchTimeoutMs: parseInteger(process.env.MEDIA_FETCH_TIMEOUT_MS, 12000), + mediaFetchRetryCount: parseInteger(process.env.MEDIA_FETCH_RETRY_COUNT, 2), + youtubeApiKey: process.env.YOUTUBE_API_KEY ?? "", backgroundTaskPollMs: parseInteger(process.env.BACKGROUND_TASK_POLL_MS, 3000), backgroundTaskStaleMs: parseInteger(process.env.BACKGROUND_TASK_STALE_MS, 300000), + backgroundTaskHeartbeatMs: parseInteger(process.env.BACKGROUND_TASK_HEARTBEAT_MS, 5000), }; diff --git a/server/_core/fetch.ts b/server/_core/fetch.ts new file mode 100644 index 0000000..68750fc --- /dev/null +++ b/server/_core/fetch.ts @@ -0,0 +1,85 @@ +type FetchRetryOptions = { + timeoutMs: number; + retries?: number; + retryStatuses?: number[]; + retryMethods?: string[]; + baseDelayMs?: number; +}; + +const DEFAULT_RETRY_STATUSES = [408, 425, 429, 502, 503, 504]; + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function shouldRetryResponse(method: string, response: Response, options: FetchRetryOptions) { + const allowedMethods = options.retryMethods ?? ["GET", "HEAD"]; + const retryStatuses = options.retryStatuses ?? DEFAULT_RETRY_STATUSES; + return allowedMethods.includes(method) && retryStatuses.includes(response.status); +} + +function shouldRetryError(method: string, error: unknown, options: FetchRetryOptions) { + const allowedMethods = options.retryMethods ?? ["GET", "HEAD"]; + if (!allowedMethods.includes(method)) { + return false; + } + + if (error instanceof Error) { + return error.name === "AbortError" || error.name === "TimeoutError" || error.message.includes("fetch"); + } + + return false; +} + +export async function fetchWithTimeout(input: string | URL, init: RequestInit | undefined, options: FetchRetryOptions) { + const method = (init?.method ?? "GET").toUpperCase(); + const retries = Math.max(0, options.retries ?? 0); + const baseDelayMs = Math.max(150, options.baseDelayMs ?? 350); + let lastError: unknown; + + for (let attempt = 0; attempt <= retries; attempt += 1) { + const controller = new AbortController(); + const upstreamSignal = init?.signal; + let didTimeout = false; + + const timeout = setTimeout(() => { + didTimeout = true; + controller.abort(); + }, options.timeoutMs); + + const abortHandler = () => controller.abort(); + upstreamSignal?.addEventListener("abort", abortHandler, { once: true }); + + try { + const response = await fetch(input, { + ...init, + signal: controller.signal, + }); + + if (attempt < retries && shouldRetryResponse(method, response, options)) { + await response.text().catch(() => undefined); + await sleep(baseDelayMs * (attempt + 1)); + continue; + } + + return response; + } catch (error) { + if (didTimeout) { + lastError = new Error(`Request timed out after ${options.timeoutMs}ms`); + } else { + lastError = error; + } + + if (attempt >= retries || !shouldRetryError(method, lastError, options)) { + throw lastError instanceof Error ? lastError : new Error("Request failed"); + } + + await sleep(baseDelayMs * (attempt + 1)); + } finally { + clearTimeout(timeout); + upstreamSignal?.removeEventListener("abort", abortHandler); + } + } + + throw lastError instanceof Error ? lastError : new Error("Request failed"); +} diff --git a/server/_core/llm.ts b/server/_core/llm.ts index eb2cba7..ded2102 100644 --- a/server/_core/llm.ts +++ b/server/_core/llm.ts @@ -1,4 +1,5 @@ import { ENV } from "./env"; +import { fetchWithTimeout } from "./fetch"; export type Role = "system" | "user" | "assistant" | "tool" | "function"; @@ -323,13 +324,17 @@ export async function invokeLLM(params: InvokeParams): Promise { payload.response_format = normalizedResponseFormat; } - const response = await fetch(resolveApiUrl(apiUrl), { + const response = await fetchWithTimeout(resolveApiUrl(apiUrl), { method: "POST", headers: { "content-type": "application/json", authorization: `Bearer ${apiKey || ENV.llmApiKey}`, }, body: JSON.stringify(payload), + }, { + timeoutMs: ENV.llmTimeoutMs, + retries: ENV.llmRetryCount, + retryMethods: ["POST"], }); if (!response.ok) { diff --git a/server/db.ts b/server/db.ts index 7f42f5e..faa4627 100644 --- a/server/db.ts +++ b/server/db.ts @@ -1631,7 +1631,11 @@ export async function claimNextBackgroundTask(workerId: string) { const now = new Date(); const [nextTask] = await db.select().from(backgroundTasks) - .where(and(eq(backgroundTasks.status, "queued"), lte(backgroundTasks.runAfter, now))) + .where(and( + eq(backgroundTasks.status, "queued"), + lte(backgroundTasks.runAfter, now), + sql`${backgroundTasks.attempts} < ${backgroundTasks.maxAttempts}`, + )) .orderBy(asc(backgroundTasks.runAfter), asc(backgroundTasks.createdAt)) .limit(1); @@ -1733,6 +1737,24 @@ export async function retryBackgroundTask(userId: number, taskId: string) { return getBackgroundTaskById(taskId); } +export async function failExhaustedBackgroundTasks(now: Date = new Date()) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + status: "failed", + progress: 100, + message: "任务达到最大重试次数,已停止自动重试", + error: sql`coalesce(${backgroundTasks.error}, '任务达到最大重试次数')`, + workerId: null, + lockedAt: null, + completedAt: now, + }).where(and( + eq(backgroundTasks.status, "queued"), + lte(backgroundTasks.runAfter, now), + sql`${backgroundTasks.attempts} >= ${backgroundTasks.maxAttempts}`, + )); +} + export async function requeueStaleBackgroundTasks(staleBefore: Date) { const db = await getDb(); if (!db) return; diff --git a/server/mediaService.test.ts b/server/mediaService.test.ts index 4095ddd..f2ff786 100644 --- a/server/mediaService.test.ts +++ b/server/mediaService.test.ts @@ -41,8 +41,16 @@ describe("getRemoteMediaSession", () => { const session = await getRemoteMediaSession("session-1"); expect(session.id).toBe("session-1"); - expect(fetchMock).toHaveBeenNthCalledWith(1, "http://127.0.0.1:8081/sessions/session-1"); - expect(fetchMock).toHaveBeenNthCalledWith(2, "http://127.0.0.1:8081/media/sessions/session-1"); + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + "http://127.0.0.1:8081/sessions/session-1", + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(fetchMock).toHaveBeenNthCalledWith( + 2, + "http://127.0.0.1:8081/media/sessions/session-1", + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); }); it("uses the configured /media base URL directly when already present", async () => { @@ -68,6 +76,9 @@ describe("getRemoteMediaSession", () => { expect(session.id).toBe("session-2"); expect(fetchMock).toHaveBeenCalledTimes(1); - expect(fetchMock).toHaveBeenCalledWith("http://media:8081/media/sessions/session-2"); + expect(fetchMock).toHaveBeenCalledWith( + "http://media:8081/media/sessions/session-2", + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); }); }); diff --git a/server/mediaService.ts b/server/mediaService.ts index c52c4d1..706a9c8 100644 --- a/server/mediaService.ts +++ b/server/mediaService.ts @@ -1,4 +1,5 @@ import { ENV } from "./_core/env"; +import { fetchWithTimeout } from "./_core/fetch"; export type RemoteMediaSession = { id: string; @@ -35,7 +36,11 @@ export async function getRemoteMediaSession(sessionId: string) { let lastError: Error | null = null; for (const url of getMediaCandidateUrls(`/sessions/${encodeURIComponent(sessionId)}`)) { - const response = await fetch(url); + const response = await fetchWithTimeout(url, undefined, { + timeoutMs: ENV.mediaFetchTimeoutMs, + retries: ENV.mediaFetchRetryCount, + retryMethods: ["GET"], + }); if (response.ok) { const payload = await response.json() as { session: RemoteMediaSession }; return payload.session; diff --git a/server/routers.ts b/server/routers.ts index be67bcf..7b4314a 100644 --- a/server/routers.ts +++ b/server/routers.ts @@ -9,7 +9,6 @@ import { ENV } from "./_core/env"; import { storagePut } from "./storage"; import * as db from "./db"; import { nanoid } from "nanoid"; -import { getRemoteMediaSession } from "./mediaService"; import { prepareCorrectionImageUrls } from "./taskWorker"; import { toPublicUrl } from "./publicUrl"; import { ACTION_LABELS, refreshUserNtrp, syncAnalysisTrainingData, syncLiveTrainingData } from "./trainingAutomation"; @@ -602,11 +601,6 @@ export const appRouter = router({ durationMinutes: z.number().min(1).max(720).optional(), })) .mutation(async ({ ctx, input }) => { - const session = await getRemoteMediaSession(input.sessionId); - if (session.userId !== String(ctx.user.id)) { - throw new Error("Media session not found"); - } - return enqueueTask({ userId: ctx.user.id, type: "media_finalize", diff --git a/server/worker.ts b/server/worker.ts index 17d676b..2e6c890 100644 --- a/server/worker.ts +++ b/server/worker.ts @@ -10,6 +10,7 @@ function sleep(ms: number) { } async function workOnce() { + await db.failExhaustedBackgroundTasks(); await db.requeueStaleBackgroundTasks(new Date(Date.now() - ENV.backgroundTaskStaleMs)); const task = await db.claimNextBackgroundTask(workerId); @@ -17,6 +18,12 @@ async function workOnce() { return false; } + const heartbeatTimer = setInterval(() => { + void db.heartbeatBackgroundTask(task.id, workerId).catch((error) => { + console.error(`[worker] heartbeat failed for ${task.id}:`, error); + }); + }, ENV.backgroundTaskHeartbeatMs); + try { const result = await processBackgroundTask(task); if (result !== null) { @@ -27,6 +34,8 @@ async function workOnce() { await db.failBackgroundTask(task.id, message); await db.failVisionTestRun(task.id, message); console.error(`[worker] task ${task.id} failed:`, error); + } finally { + clearInterval(heartbeatTimer); } return true; diff --git a/tests/e2e/helpers/mockApp.ts b/tests/e2e/helpers/mockApp.ts index a1c2ba2..f72595a 100644 --- a/tests/e2e/helpers/mockApp.ts +++ b/tests/e2e/helpers/mockApp.ts @@ -232,6 +232,10 @@ function createTask(state: MockAppState, input: { progress: input.progress ?? 100, result: input.result ?? null, error: input.error ?? null, + attempts: input.status === "failed" ? 2 : 1, + maxAttempts: input.type === "media_finalize" ? 90 : 3, + startedAt: nowIso(), + completedAt: input.status === "queued" || input.status === "running" ? null : nowIso(), createdAt: nowIso(), updatedAt: nowIso(), };