From 20e183d2dadbe3cff6cac1e5679a888582192ebc Mon Sep 17 00:00:00 2001 From: cryptocommuniums-afk Date: Sun, 15 Mar 2026 00:12:26 +0800 Subject: [PATCH] feat: async task pipeline for media and llm workflows --- .env.example | 6 + Dockerfile | 2 +- README.md | 31 +- client/src/components/DashboardLayout.tsx | 5 + client/src/components/TaskCenter.tsx | 152 +++++++ client/src/hooks/useBackgroundTask.ts | 15 + client/src/lib/media.ts | 36 +- client/src/pages/Analysis.tsx | 109 ++++- client/src/pages/Dashboard.tsx | 3 +- client/src/pages/Login.tsx | 2 +- client/src/pages/Recorder.tsx | 110 +++-- client/src/pages/Training.tsx | 79 +++- docker-compose.yml | 25 +- docs/API.md | 52 ++- docs/FEATURES.md | 35 +- docs/deploy.md | 13 +- docs/testing.md | 10 + docs/verified-features.md | 18 +- drizzle/0005_lively_taskmaster.sql | 22 + drizzle/meta/_journal.json | 9 +- drizzle/schema.ts | 33 ++ package.json | 5 +- server/_core/env.ts | 16 + server/_core/llm.test.ts | 23 ++ server/_core/llm.ts | 24 +- server/_core/static.ts | 2 +- server/db.ts | 168 +++++++- server/mediaService.ts | 34 ++ server/prompts.ts | 255 ++++++++++++ server/publicUrl.ts | 22 + server/routers.ts | 317 +++++---------- server/storage.test.ts | 12 + server/storage.ts | 5 + server/taskWorker.ts | 470 ++++++++++++++++++++++ server/worker.ts | 47 +++ tests/e2e/helpers/mockApp.ts | 133 +++++- 36 files changed, 1961 insertions(+), 339 deletions(-) create mode 100644 client/src/components/TaskCenter.tsx create mode 100644 client/src/hooks/useBackgroundTask.ts create mode 100644 drizzle/0005_lively_taskmaster.sql create mode 100644 server/mediaService.ts create mode 100644 server/prompts.ts create mode 100644 server/publicUrl.ts create mode 100644 server/taskWorker.ts create mode 100644 server/worker.ts diff --git a/.env.example b/.env.example index f51bd1d..3292757 100644 --- a/.env.example +++ b/.env.example @@ -12,6 +12,7 @@ VITE_OAUTH_PORTAL_URL= VITE_FRONTEND_FORGE_API_URL= VITE_FRONTEND_FORGE_API_KEY= LOCAL_STORAGE_DIR=/data/app/storage +APP_PUBLIC_BASE_URL=https://te.hao.work/ # Compose MySQL MYSQL_DATABASE=tennis_training_hub @@ -23,6 +24,9 @@ MYSQL_ROOT_PASSWORD=replace-with-root-password LLM_API_URL=https://one.hao.work/v1/chat/completions LLM_API_KEY=replace-with-llm-api-key LLM_MODEL=qwen3.5-plus +LLM_VISION_API_URL=https://one.hao.work/v1/chat/completions +LLM_VISION_API_KEY=replace-with-llm-api-key +LLM_VISION_MODEL=qwen3-vl-235b-a22b LLM_MAX_TOKENS=32768 LLM_ENABLE_THINKING=0 LLM_THINKING_BUDGET=128 @@ -32,3 +36,5 @@ VITE_MEDIA_BASE_URL=/media # Local app-to-media proxy for development or direct container access MEDIA_SERVICE_URL=http://127.0.0.1:8081 +BACKGROUND_TASK_POLL_MS=3000 +BACKGROUND_TASK_STALE_MS=300000 diff --git a/Dockerfile b/Dockerfile index 02f6653..3637550 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,4 +21,4 @@ COPY patches ./patches RUN pnpm install --prod --frozen-lockfile COPY --from=build /app/dist ./dist EXPOSE 3000 -CMD ["node", "dist/index.js"] +CMD ["node", "dist/_core/index.js"] diff --git a/README.md b/README.md index 7fbedf3..a57754a 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ # Tennis Training Hub -网球训练管理与分析应用,提供训练计划、姿势分析、实时摄像头分析、在线视频录制与视频库管理。当前版本新增独立 Go 媒体服务,用于处理在线录制、分段上传、实时推流信令和归档回放。 +网球训练管理与分析应用,提供训练计划、姿势分析、实时摄像头分析、在线视频录制与视频库管理。当前版本在媒体服务之外新增数据库驱动的后台任务系统,用于承接训练计划生成、动作纠正、多模态分析和录制归档这类高延迟任务。 ## Architecture - `client/`: React 19 + TypeScript + Tailwind CSS 4 + shadcn/ui - `server/`: Express + tRPC + Drizzle + MySQL/TiDB,负责业务 API、登录、训练数据与视频库元数据 - `media/`: Go 媒体服务,负责录制会话、分段上传、WebRTC 信令、关键片段标记与 FFmpeg 归档 +- `server/worker.ts`: Node 后台 worker,负责执行重任务队列 - `docker-compose.yml`: 单机部署编排 - `deploy/nginx.te.hao.work.conf`: `te.hao.work` 的宿主机 nginx 入口配置 @@ -18,7 +19,27 @@ - 浏览器端 `RTCPeerConnection` 同步建立 WebRTC 低延迟推流链路 - 客户端运动检测自动写入关键片段 marker,也支持手动标记 - 摄像头中断后自动重连,保留既有分段与会话 -- 服务端 worker 将分段合并归档,并产出 WebM 回放;FFmpeg 可用时额外生成 MP4 +- Go 媒体 worker 将分段合并归档,并产出 WebM 回放;FFmpeg 可用时额外生成 MP4 +- Node app worker 轮询媒体归档状态,归档完成后自动登记到视频库并向任务中心反馈结果 + +## Background Tasks + +统一后台任务覆盖以下路径: + +- `training_plan_generate` +- `training_plan_adjust` +- `analysis_corrections` +- `pose_correction_multimodal` +- `media_finalize` + +前端提供全局任务中心,页面本地也会显示任务提交、执行中、完成或失败状态。训练页、分析页和录制页都可以在用户离开页面后继续完成后台任务。 + +## Multimodal LLM + +- 文本类任务使用 `LLM_API_URL` / `LLM_API_KEY` / `LLM_MODEL` +- 图片类任务可单独指定 `LLM_VISION_API_URL` / `LLM_VISION_API_KEY` / `LLM_VISION_MODEL` +- 所有图片输入都要求可从公网访问,因此本地相对路径会通过 `APP_PUBLIC_BASE_URL` 规范化为绝对 URL +- 若视觉模型链路不可用,系统会自动回退到结构化指标驱动的文本纠正,避免任务直接失败 ## Quick Start @@ -67,7 +88,7 @@ pnpm exec playwright install chromium 单机部署推荐: 1. 宿主机 nginx 处理 `80/443` 和 TLS -2. `docker compose up -d --build` 启动 `app + media + worker + db` +2. `docker compose up -d --build` 启动 `app + app-worker + media + media-worker + db` 3. nginx 将 `/` 转发到宿主机 `127.0.0.1:3002 -> app:3000`,`/media/` 转发到 `127.0.0.1:8081 -> media:8081` 4. 如需绕过 nginx 直连调试,也可通过公网 4 位端口访问主站:`http://te.hao.work:8302/` @@ -100,6 +121,10 @@ pnpm exec playwright install chromium - `LLM_API_URL` - `LLM_API_KEY` - `LLM_MODEL` +- `LLM_VISION_API_URL` +- `LLM_VISION_API_KEY` +- `LLM_VISION_MODEL` +- `APP_PUBLIC_BASE_URL` - `LOCAL_STORAGE_DIR` - `MEDIA_SERVICE_URL` - `VITE_MEDIA_BASE_URL` diff --git a/client/src/components/DashboardLayout.tsx b/client/src/components/DashboardLayout.tsx index 2b3c6ea..1730f83 100644 --- a/client/src/components/DashboardLayout.tsx +++ b/client/src/components/DashboardLayout.tsx @@ -28,6 +28,7 @@ import { import { CSSProperties, useEffect, useRef, useState } from "react"; import { useLocation, Redirect } from "wouter"; import { DashboardLayoutSkeleton } from './DashboardLayoutSkeleton'; +import { TaskCenter } from "./TaskCenter"; const menuItems = [ { icon: LayoutDashboard, label: "仪表盘", path: "/dashboard", group: "main" }, @@ -262,6 +263,9 @@ function DashboardLayoutContent({ +
+ +
+ + + + 任务中心 + + +
+ {(taskListQuery.data ?? []).length === 0 ? ( +
+ 当前没有后台任务。 +
+ ) : ( + (taskListQuery.data ?? []).map((task) => ( +
+
+
+

{task.title}

+

{task.message || formatTaskStatus(task.status)}

+
+ + {formatTaskStatus(task.status)} + +
+ +
+ +
+ + {task.error ? ( +
+
+ + {task.error} +
+
+ ) : null} + +
+ {new Date(task.createdAt).toLocaleString("zh-CN")} + {task.status === "failed" ? ( + + ) : task.status === "succeeded" ? ( + + + 已完成 + + ) : ( + + + 处理中 + + )} +
+
+ )) + )} +
+
+
+ + ); +} diff --git a/client/src/hooks/useBackgroundTask.ts b/client/src/hooks/useBackgroundTask.ts new file mode 100644 index 0000000..316b062 --- /dev/null +++ b/client/src/hooks/useBackgroundTask.ts @@ -0,0 +1,15 @@ +import { trpc } from "@/lib/trpc"; + +export function useBackgroundTask(taskId: string | null | undefined) { + return trpc.task.get.useQuery( + { taskId: taskId || "" }, + { + enabled: Boolean(taskId), + refetchInterval: (query) => { + const task = query.state.data; + if (!task) return 3_000; + return task.status === "queued" || task.status === "running" ? 3_000 : false; + }, + } + ); +} diff --git a/client/src/lib/media.ts b/client/src/lib/media.ts index f4256af..6e17747 100644 --- a/client/src/lib/media.ts +++ b/client/src/lib/media.ts @@ -53,14 +53,40 @@ export type MediaSession = { }; const MEDIA_BASE = (import.meta.env.VITE_MEDIA_BASE_URL || "/media").replace(/\/$/, ""); +const RETRYABLE_STATUS = new Set([502, 503, 504]); + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} async function request(path: string, init?: RequestInit): Promise { - const response = await fetch(`${MEDIA_BASE}${path}`, init); - if (!response.ok) { - const errorBody = await response.json().catch(() => ({})); - throw new Error(errorBody.error || errorBody.message || `Media service error (${response.status})`); + let lastError: Error | null = null; + + for (let attempt = 0; attempt < 3; attempt++) { + try { + const response = await fetch(`${MEDIA_BASE}${path}`, init); + if (!response.ok) { + const errorBody = await response.json().catch(() => ({})); + const error = new Error(errorBody.error || errorBody.message || `Media service error (${response.status})`); + if (RETRYABLE_STATUS.has(response.status) && attempt < 2) { + lastError = error; + await sleep(400 * (attempt + 1)); + continue; + } + throw error; + } + return response.json() as Promise; + } catch (error) { + lastError = error instanceof Error ? error : new Error("Media request failed"); + if (attempt < 2) { + await sleep(400 * (attempt + 1)); + continue; + } + throw lastError; + } } - return response.json() as Promise; + + throw lastError || new Error("Media request failed"); } export async function createMediaSession(payload: { diff --git a/client/src/pages/Analysis.tsx b/client/src/pages/Analysis.tsx index 613ce04..4c7e639 100644 --- a/client/src/pages/Analysis.tsx +++ b/client/src/pages/Analysis.tsx @@ -7,10 +7,12 @@ import { Input } from "@/components/ui/input"; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; import { Progress } from "@/components/ui/progress"; import { Badge } from "@/components/ui/badge"; +import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; +import { useBackgroundTask } from "@/hooks/useBackgroundTask"; import { toast } from "sonner"; import { Upload, Video, Loader2, Play, Pause, RotateCcw, - Zap, Target, Activity, TrendingUp, Eye + Zap, Target, Activity, TrendingUp, Eye, ListTodo } from "lucide-react"; import { Streamdown } from "streamdown"; @@ -39,6 +41,8 @@ export default function Analysis() { const [analysisProgress, setAnalysisProgress] = useState(0); const [analysisResult, setAnalysisResult] = useState(null); const [corrections, setCorrections] = useState(""); + const [correctionReport, setCorrectionReport] = useState(null); + const [correctionTaskId, setCorrectionTaskId] = useState(null); const [showSkeleton, setShowSkeleton] = useState(false); const videoRef = useRef(null); const canvasRef = useRef(null); @@ -55,7 +59,16 @@ export default function Analysis() { utils.rating.history.invalidate(); }, }); - const correctionMutation = trpc.analysis.getCorrections.useMutation(); + const correctionMutation = trpc.analysis.getCorrections.useMutation({ + onSuccess: (data) => { + setCorrectionTaskId(data.taskId); + toast.success("动作纠正任务已提交"); + }, + onError: (error) => { + toast.error("动作纠正任务提交失败: " + error.message); + }, + }); + const correctionTaskQuery = useBackgroundTask(correctionTaskId); const handleFileSelect = (e: React.ChangeEvent) => { const file = e.target.files?.[0]; @@ -73,8 +86,22 @@ export default function Analysis() { setVideoUrl(URL.createObjectURL(file)); setAnalysisResult(null); setCorrections(""); + setCorrectionReport(null); + setCorrectionTaskId(null); }; + useEffect(() => { + if (correctionTaskQuery.data?.status === "succeeded") { + const result = correctionTaskQuery.data.result as { corrections?: string; report?: any } | null; + setCorrections(result?.corrections || "暂无建议"); + setCorrectionReport(result?.report || null); + setCorrectionTaskId(null); + } else if (correctionTaskQuery.data?.status === "failed") { + toast.error(`动作纠正失败: ${correctionTaskQuery.data.error || "未知错误"}`); + setCorrectionTaskId(null); + } + }, [correctionTaskQuery.data]); + const analyzeVideo = useCallback(async () => { if (!videoRef.current || !canvasRef.current || !videoFile) return; @@ -267,6 +294,8 @@ export default function Analysis() { }; setAnalysisResult(result); + setCorrections(""); + setCorrectionReport(null); // Upload video and save analysis const reader = new FileReader(); @@ -293,13 +322,12 @@ export default function Analysis() { }; reader.readAsDataURL(videoFile); - // Get AI corrections + const snapshots = await extractFrameSnapshots(videoUrl); correctionMutation.mutate({ poseMetrics: result.poseMetrics, exerciseType, detectedIssues: result.detectedIssues, - }, { - onSuccess: (data) => setCorrections(data.corrections as string), + imageDataUrls: snapshots, }); pose.close(); @@ -318,6 +346,16 @@ export default function Analysis() {

AI姿势识别与矫正反馈

+ {(correctionMutation.isPending || correctionTaskQuery.data?.status === "queued" || correctionTaskQuery.data?.status === "running") ? ( + + + 后台任务执行中 + + 多模态动作纠正正在后台生成。可以先查看分析结果,完成后任务中心和当前页面都会更新。 + + + ) : null} + {/* Upload section */} @@ -532,7 +570,12 @@ export default function Analysis() { {correctionMutation.isPending ? (
- AI正在生成矫正建议... + 正在提交动作纠正任务... +
+ ) : correctionTaskQuery.data?.status === "queued" || correctionTaskQuery.data?.status === "running" ? ( +
+ + {correctionTaskQuery.data.message || "AI正在后台生成多模态矫正建议..."}
) : corrections ? (
@@ -543,6 +586,24 @@ export default function Analysis() { )} + + {correctionReport?.priorityFixes?.length ? ( + + + 优先修正项 + + + {correctionReport.priorityFixes.map((item: any, index: number) => ( +
+

{item.title}

+

{item.why}

+

练习:{item.howToPractice}

+

达标:{item.successMetric}

+
+ ))} +
+
+ ) : null} )}
@@ -667,3 +728,39 @@ function averageAngles(anglesHistory: any[]) { } return avg; } + +async function extractFrameSnapshots(sourceUrl: string) { + if (!sourceUrl) return []; + + const video = document.createElement("video"); + video.src = sourceUrl; + video.muted = true; + video.playsInline = true; + video.crossOrigin = "anonymous"; + + await new Promise((resolve, reject) => { + video.onloadedmetadata = () => resolve(); + video.onerror = () => reject(new Error("无法读取视频元数据")); + }); + + const canvas = document.createElement("canvas"); + canvas.width = video.videoWidth || 1280; + canvas.height = video.videoHeight || 720; + const ctx = canvas.getContext("2d"); + if (!ctx) return []; + + const duration = Math.max(video.duration || 0, 1); + const checkpoints = [0.15, 0.5, 0.85].map((ratio) => Math.min(duration - 0.05, duration * ratio)).filter((time, index, array) => time >= 0 && array.indexOf(time) === index); + const snapshots: string[] = []; + + for (const checkpoint of checkpoints) { + await new Promise((resolve) => { + video.onseeked = () => resolve(); + video.currentTime = checkpoint; + }); + ctx.drawImage(video, 0, 0, canvas.width, canvas.height); + snapshots.push(canvas.toDataURL("image/jpeg", 0.82)); + } + + return snapshots; +} diff --git a/client/src/pages/Dashboard.tsx b/client/src/pages/Dashboard.tsx index 7fe1cb2..40df19d 100644 --- a/client/src/pages/Dashboard.tsx +++ b/client/src/pages/Dashboard.tsx @@ -51,11 +51,10 @@ export default function Dashboard() { return (
- {/* Welcome header */}

- 欢迎回来,{user?.name || "球友"} + 当前用户:{user?.name || "未命名用户"}

diff --git a/client/src/pages/Login.tsx b/client/src/pages/Login.tsx index 3b091ce..8042407 100644 --- a/client/src/pages/Login.tsx +++ b/client/src/pages/Login.tsx @@ -39,7 +39,7 @@ export default function Login() { try { const data = await loginMutation.mutateAsync({ username: username.trim() }); const user = await syncAuthenticatedUser(data.user); - toast.success(data.isNew ? `欢迎加入,${user.name}!` : `欢迎回来,${user.name}!`); + toast.success(data.isNew ? `已创建用户:${user.name}` : `已登录:${user.name}`); setLocation("/dashboard"); } catch (err) { const message = err instanceof Error ? err.message : "未知错误"; diff --git a/client/src/pages/Recorder.tsx b/client/src/pages/Recorder.tsx index 7c3971f..2d25d1e 100644 --- a/client/src/pages/Recorder.tsx +++ b/client/src/pages/Recorder.tsx @@ -18,6 +18,8 @@ import { Button } from "@/components/ui/button"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Progress } from "@/components/ui/progress"; +import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; +import { useBackgroundTask } from "@/hooks/useBackgroundTask"; import { toast } from "sonner"; import { Activity, @@ -34,6 +36,7 @@ import { ShieldAlert, Smartphone, Sparkles, + ListTodo, Video, VideoOff, Wifi, @@ -126,7 +129,16 @@ function formatFileSize(bytes: number) { export default function Recorder() { const { user } = useAuth(); - const registerExternalMutation = trpc.video.registerExternal.useMutation(); + const utils = trpc.useUtils(); + const finalizeTaskMutation = trpc.task.createMediaFinalize.useMutation({ + onSuccess: (data) => { + setArchiveTaskId(data.taskId); + toast.success("录制归档任务已提交"); + }, + onError: (error) => { + toast.error(`录制归档任务提交失败: ${error.message}`); + }, + }); const liveVideoRef = useRef(null); const playbackVideoRef = useRef(null); @@ -142,11 +154,9 @@ export default function Recorder() { const pendingUploadsRef = useRef([]); const uploadInFlightRef = useRef(false); const currentSessionRef = useRef(null); - const registeredSessionIdRef = useRef(null); const segmentTickerRef = useRef | null>(null); const timerTickerRef = useRef | null>(null); const motionTickerRef = useRef | null>(null); - const pollTickerRef = useRef | null>(null); const reconnectTimeoutRef = useRef | null>(null); const modeRef = useRef("idle"); const reconnectAttemptsRef = useRef(0); @@ -170,11 +180,13 @@ export default function Recorder() { const [markers, setMarkers] = useState([]); const [connectionState, setConnectionState] = useState("new"); const [immersivePreview, setImmersivePreview] = useState(false); + const [archiveTaskId, setArchiveTaskId] = useState(null); const mobile = useMemo(() => isMobileDevice(), []); const mimeType = useMemo(() => pickRecorderMimeType(), []); const currentPlaybackUrl = mediaSession?.playback.mp4Url || mediaSession?.playback.webmUrl || ""; const archiveProgress = getArchiveProgress(mediaSession); + const archiveTaskQuery = useBackgroundTask(archiveTaskId); const syncSessionState = useCallback((session: MediaSession | null) => { currentSessionRef.current = session; @@ -196,6 +208,25 @@ export default function Recorder() { facingModeRef.current = facingMode; }, [facingMode]); + useEffect(() => { + if (archiveTaskQuery.data?.status === "succeeded") { + void (async () => { + if (currentSessionRef.current?.id) { + const response = await getMediaSession(currentSessionRef.current.id); + syncSessionState(response.session); + } + setMode("archived"); + utils.video.list.invalidate(); + toast.success("回放文件已归档完成"); + setArchiveTaskId(null); + })(); + } else if (archiveTaskQuery.data?.status === "failed") { + toast.error(`录制归档失败: ${archiveTaskQuery.data.error || "未知错误"}`); + setMode("idle"); + setArchiveTaskId(null); + } + }, [archiveTaskQuery.data, syncSessionState, utils.video.list]); + const stopTickers = useCallback(() => { if (segmentTickerRef.current) clearInterval(segmentTickerRef.current); if (timerTickerRef.current) clearInterval(timerTickerRef.current); @@ -556,10 +587,10 @@ export default function Recorder() { setUploadBytes(0); setQueuedSegments(0); setReconnectAttempts(0); + setArchiveTaskId(null); segmentSequenceRef.current = 0; motionFrameRef.current = null; pendingUploadsRef.current = []; - registeredSessionIdRef.current = null; const stream = await ensurePreviewStream(); const sessionResponse = await createMediaSession({ @@ -602,62 +633,19 @@ export default function Recorder() { durationMs: Date.now() - recordingStartedAtRef.current, }); syncSessionState(response.session); - toast.success("录制已提交,正在生成回放文件"); + await finalizeTaskMutation.mutateAsync({ + sessionId: session.id, + title: title.trim() || session.title, + exerciseType: "recording", + }); + toast.success("录制已提交,后台正在整理回放文件"); } catch (error: any) { toast.error(`结束录制失败: ${error?.message || "未知错误"}`); setMode("recording"); - return; } - - if (pollTickerRef.current) { - clearInterval(pollTickerRef.current); - } - - pollTickerRef.current = setInterval(async () => { - const current = currentSessionRef.current; - if (!current?.id) { - return; - } - try { - const response = await getMediaSession(current.id); - syncSessionState(response.session); - - if (response.session.archiveStatus === "completed") { - if (pollTickerRef.current) clearInterval(pollTickerRef.current); - setMode("archived"); - toast.success("回放文件已归档完成"); - - if (registeredSessionIdRef.current !== response.session.id) { - const playbackUrl = response.session.playback.webmUrl || response.session.playback.mp4Url; - const playbackFormat = response.session.playback.webmUrl ? "webm" : response.session.playback.mp4Url ? "mp4" : ""; - if (!playbackUrl || !playbackFormat) { - return; - } - registeredSessionIdRef.current = response.session.id; - await registerExternalMutation.mutateAsync({ - title: title.trim() || response.session.title, - url: playbackUrl, - fileKey: `media/sessions/${response.session.id}/recording.${playbackFormat}`, - format: playbackFormat, - fileSize: response.session.playback.webmSize || response.session.playback.mp4Size, - duration: response.session.durationMs / 1000, - exerciseType: "recording", - }); - } - } - - if (response.session.archiveStatus === "failed") { - if (pollTickerRef.current) clearInterval(pollTickerRef.current); - toast.error(response.session.lastError || "归档失败"); - } - } catch { - // keep polling - } - }, 3_000); - }, [closePeer, flushPendingSegments, registerExternalMutation, stopCamera, stopRecorder, syncSessionState, title]); + }, [closePeer, finalizeTaskMutation, flushPendingSegments, stopCamera, stopRecorder, syncSessionState, title]); const resetRecorder = useCallback(async () => { - if (pollTickerRef.current) clearInterval(pollTickerRef.current); if (reconnectTimeoutRef.current) clearTimeout(reconnectTimeoutRef.current); stopTickers(); await stopRecorder().catch(() => {}); @@ -667,7 +655,7 @@ export default function Recorder() { uploadInFlightRef.current = false; motionFrameRef.current = null; currentSessionRef.current = null; - registeredSessionIdRef.current = null; + setArchiveTaskId(null); setMediaSession(null); setMarkers([]); setDurationMs(0); @@ -755,7 +743,6 @@ export default function Recorder() { useEffect(() => { return () => { - if (pollTickerRef.current) clearInterval(pollTickerRef.current); if (reconnectTimeoutRef.current) clearTimeout(reconnectTimeoutRef.current); stopTickers(); if (recorderRef.current && recorderRef.current.state !== "inactive") { @@ -988,6 +975,17 @@ export default function Recorder() {
+ {(finalizeTaskMutation.isPending || archiveTaskQuery.data?.status === "queued" || archiveTaskQuery.data?.status === "running") ? ( + + + 后台归档处理中 + + {archiveTaskQuery.data?.message || "录制文件正在后台整理、转码并登记到视频库。"} + 你可以离开当前页面,完成后任务中心会提示结果。 + + + ) : null} +
diff --git a/client/src/pages/Training.tsx b/client/src/pages/Training.tsx index 5acaf86..e847ec5 100644 --- a/client/src/pages/Training.tsx +++ b/client/src/pages/Training.tsx @@ -1,4 +1,4 @@ -import { useState, useMemo } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useAuth } from "@/_core/hooks/useAuth"; import { trpc } from "@/lib/trpc"; import { Card, CardContent, CardHeader, CardTitle, CardDescription } from "@/components/ui/card"; @@ -6,10 +6,12 @@ import { Button } from "@/components/ui/button"; import { Badge } from "@/components/ui/badge"; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; import { Skeleton } from "@/components/ui/skeleton"; +import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; +import { useBackgroundTask } from "@/hooks/useBackgroundTask"; import { toast } from "sonner"; import { Target, Loader2, CheckCircle2, Circle, Clock, Dumbbell, - RefreshCw, Footprints, Hand, ArrowRight, Sparkles + RefreshCw, Footprints, Hand, ArrowRight, Sparkles, ListTodo } from "lucide-react"; const categoryIcons: Record = { @@ -42,24 +44,26 @@ export default function Training() { const [skillLevel, setSkillLevel] = useState<"beginner" | "intermediate" | "advanced">("beginner"); const [durationDays, setDurationDays] = useState(7); const [selectedDay, setSelectedDay] = useState(1); + const [generateTaskId, setGenerateTaskId] = useState(null); + const [adjustTaskId, setAdjustTaskId] = useState(null); const utils = trpc.useUtils(); const { data: activePlan, isLoading: planLoading } = trpc.plan.active.useQuery(); + const generateTaskQuery = useBackgroundTask(generateTaskId); + const adjustTaskQuery = useBackgroundTask(adjustTaskId); const generateMutation = trpc.plan.generate.useMutation({ - onSuccess: () => { - toast.success("训练计划已生成!"); - utils.plan.active.invalidate(); - utils.plan.list.invalidate(); + onSuccess: (data) => { + setGenerateTaskId(data.taskId); + toast.success("训练计划任务已提交"); }, onError: (err) => toast.error("生成失败: " + err.message), }); const adjustMutation = trpc.plan.adjust.useMutation({ onSuccess: (data) => { - toast.success("训练计划已调整!"); - utils.plan.active.invalidate(); - if (data.adjustmentNotes) toast.info("调整说明: " + data.adjustmentNotes); + setAdjustTaskId(data.taskId); + toast.success("训练计划调整任务已提交"); }, onError: (err) => toast.error("调整失败: " + err.message), }); @@ -81,6 +85,36 @@ export default function Training() { }, [activePlan, selectedDay]); const totalDays = activePlan?.durationDays || 7; + const generating = generateMutation.isPending || generateTaskQuery.data?.status === "queued" || generateTaskQuery.data?.status === "running"; + const adjusting = adjustMutation.isPending || adjustTaskQuery.data?.status === "queued" || adjustTaskQuery.data?.status === "running"; + + useEffect(() => { + if (generateTaskQuery.data?.status === "succeeded") { + toast.success("训练计划已生成"); + utils.plan.active.invalidate(); + utils.plan.list.invalidate(); + setGenerateTaskId(null); + } else if (generateTaskQuery.data?.status === "failed") { + toast.error(`训练计划生成失败: ${generateTaskQuery.data.error || "未知错误"}`); + setGenerateTaskId(null); + } + }, [generateTaskQuery.data, utils.plan.active, utils.plan.list]); + + useEffect(() => { + if (adjustTaskQuery.data?.status === "succeeded") { + toast.success("训练计划已调整"); + utils.plan.active.invalidate(); + utils.plan.list.invalidate(); + const adjustmentNotes = (adjustTaskQuery.data.result as { adjustmentNotes?: string } | null)?.adjustmentNotes; + if (adjustmentNotes) { + toast.info(`调整说明: ${adjustmentNotes}`); + } + setAdjustTaskId(null); + } else if (adjustTaskQuery.data?.status === "failed") { + toast.error(`训练计划调整失败: ${adjustTaskQuery.data.error || "未知错误"}`); + setAdjustTaskId(null); + } + }, [adjustTaskQuery.data, utils.plan.active, utils.plan.list]); if (planLoading) { return ( @@ -100,6 +134,17 @@ export default function Training() {
+ {generating || adjusting ? ( + + + 后台任务执行中 + + {generating ? "训练计划正在后台生成。" : "训练计划正在根据最近分析结果调整。"} + 你可以切换到其他页面,完成后会在任务中心显示结果。 + + + ) : null} + {!activePlan ? ( /* Generate new plan */ @@ -145,11 +190,11 @@ export default function Training() {
diff --git a/docker-compose.yml b/docker-compose.yml index 95e5625..c078e7f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,7 +75,7 @@ services: - media-data:/data/media restart: unless-stopped - worker: + media-worker: build: context: ./media dockerfile: Dockerfile @@ -89,6 +89,29 @@ services: - media restart: unless-stopped + app-worker: + build: + context: . + dockerfile: Dockerfile + command: ["node", "dist/worker.js"] + env_file: + - .env + environment: + DATABASE_URL: mysql://${MYSQL_USER:-tennis}:${MYSQL_PASSWORD:-tennis_password}@db:3306/${MYSQL_DATABASE:-tennis_training_hub} + MEDIA_SERVICE_URL: http://media:8081 + LOCAL_STORAGE_DIR: /data/app/storage + NODE_ENV: production + volumes: + - app-data:/data/app + depends_on: + db: + condition: service_healthy + migrate: + condition: service_completed_successfully + media: + condition: service_started + restart: unless-stopped + volumes: app-data: db-data: diff --git a/docs/API.md b/docs/API.md index b7720cf..de5be71 100644 --- a/docs/API.md +++ b/docs/API.md @@ -75,7 +75,7 @@ | 类型 | Mutation | | 认证 | **需认证** | | 输入 | `{ skillLevel: enum, durationDays: number, focusAreas?: string[] }` | -| 输出 | `{ planId: number, plan: TrainingPlanData }` | +| 输出 | `{ taskId: string, task: BackgroundTask }` | **输入验证:** - `skillLevel`:`"beginner"` / `"intermediate"` / `"advanced"` @@ -105,7 +105,7 @@ | 类型 | Mutation | | 认证 | **需认证** | | 输入 | `{ planId: number }` | -| 输出 | `{ success: true, adjustmentNotes: string }` | +| 输出 | `{ taskId: string, task: BackgroundTask }` | --- @@ -187,8 +187,10 @@ |------|-----| | 类型 | Mutation | | 认证 | **需认证** | -| 输入 | `{ poseMetrics: object, exerciseType: string, detectedIssues: array }` | -| 输出 | `{ corrections: string }` | +| 输入 | `{ poseMetrics: object, exerciseType: string, detectedIssues: array, imageUrls?: string[], imageDataUrls?: string[] }` | +| 输出 | `{ taskId: string, task: BackgroundTask }` | + +该接口始终走后台任务。若提供 `imageUrls` 或 `imageDataUrls`,服务端会优先走多模态纠正链路,并把相对地址规范化为可公网访问的绝对 URL。 #### `analysis.list` - 获取用户所有分析记录 @@ -211,6 +213,48 @@ ### 6. 训练记录模块 (`record`) +### 5.1 后台任务模块 (`task`) + +#### `task.list` - 获取当前用户后台任务 + +| 属性 | 值 | +|------|-----| +| 类型 | Query | +| 认证 | **需认证** | +| 输入 | `{ limit?: number }` | +| 输出 | `BackgroundTask[]` | + +#### `task.get` - 获取单个后台任务 + +| 属性 | 值 | +|------|-----| +| 类型 | Query | +| 认证 | **需认证** | +| 输入 | `{ taskId: string }` | +| 输出 | `BackgroundTask | null` | + +#### `task.retry` - 重试失败任务 + +| 属性 | 值 | +|------|-----| +| 类型 | Mutation | +| 认证 | **需认证** | +| 输入 | `{ taskId: string }` | +| 输出 | `{ task: BackgroundTask }` | + +#### `task.createMediaFinalize` - 提交录制归档后台任务 + +| 属性 | 值 | +|------|-----| +| 类型 | Mutation | +| 认证 | **需认证** | +| 输入 | `{ sessionId: string, title: string, exerciseType?: string }` | +| 输出 | `{ taskId: string, task: BackgroundTask }` | + +该接口会校验媒体会话所属用户,并由后台 worker 轮询 Go 媒体服务状态,归档完成后自动登记到视频库。 + +### 6. 训练记录模块 (`record`) + #### `record.create` - 创建训练记录 | 属性 | 值 | diff --git a/docs/FEATURES.md b/docs/FEATURES.md index 3c6914b..c2e335f 100644 --- a/docs/FEATURES.md +++ b/docs/FEATURES.md @@ -10,7 +10,7 @@ ### 用户与训练 - 用户名登录:无需注册,输入用户名即可进入训练工作台 -- 训练计划:按技能等级和训练周期生成训练计划 +- 训练计划:按技能等级和训练周期生成训练计划,改为后台异步生成 - 训练进度:展示训练次数、时长、评分趋势、最近分析结果 - 每日打卡与提醒:支持训练打卡、提醒、通知记录 @@ -18,18 +18,22 @@ - 视频上传分析:上传 `webm/mp4` 视频进入视频库并触发分析流程 - 实时摄像头分析:浏览器端调用 MediaPipe,进行姿势识别和反馈展示 +- 动作纠正:支持文本纠正和多模态纠正两条链路,统一通过后台任务执行 +- 多模态图片输入:上传关键帧后会转换为公网可访问的绝对 URL,再提交给视觉模型 - 视频库:集中展示录制结果、上传结果和分析摘要 ### 在线录制与媒体链路 - Go 媒体服务:独立处理录制会话、分段上传、marker、归档和回放资源 +- Node app worker:统一处理训练计划、动作纠正和录制归档结果登记 - WebRTC 推流:录制时并行建立低延迟实时推流链路 - MediaRecorder 分段:浏览器本地压缩录制并每 60 秒自动分段上传 - 自动标记:客户端通过轻量运动检测创建关键片段 marker - 手动标记:录制中支持手动插入剪辑点 - 自动重连:摄像头 track 断开时自动尝试恢复 - 归档回放:worker 合并片段并生成 WebM,FFmpeg 可用时额外生成 MP4 -- 视频库登记:归档完成后自动写回现有视频库 +- 视频库登记:归档完成后由 app worker 自动写回现有视频库 +- 上传稳定性:媒体分段上传遇到 `502/503/504` 会自动重试 ## 前端能力 @@ -46,12 +50,14 @@ - 统一工作台导航 - 仪表盘、训练、视频、录制、分析等模块一致的布局结构 +- 全局任务中心:桌面侧边栏和移动端头部都可查看后台任务 - 为后续 PC 粗剪时间线预留媒体域与文档规范 ## 架构能力 - Node 应用负责业务 API、登录、训练数据与视频库元数据 - Go 服务负责媒体链路与归档 +- 后台任务表 `background_tasks` 统一承接重任务 - `Docker Compose + 宿主机 nginx` 作为标准单机部署方式 - 统一的本地验证命令: - `pnpm check` @@ -67,10 +73,27 @@ - 当前 WebRTC 重点是浏览器到服务端的实时上行,不是多观众直播分发 - 当前 PC 剪辑仍处于基础媒体域准备阶段,未交付完整多轨编辑器 - 当前存储策略为本地卷优先,未接入对象存储归档 +- 当前 `.env` 配置的视觉网关若忽略 `LLM_VISION_MODEL`,系统会回退到文本纠正;代码已支持独立视觉模型配置,但上游网关能力仍需单独确认 ## 后续增强方向 -- PC 时间线粗剪与 clip plan 持久化 -- 更细粒度的设备能力自适应 -- 更强的媒体回放和片段导出能力 -- 更深入的前端域拆分和懒加载优化 +### 移动端个性化增强 + +- 根据网络、机型和电量状态动态切换录制档位、分段大小与上传节流策略 +- 将录制焦点视图扩展为单手操作布局,支持拇指热区、自定义主按钮顺序和横竖屏独立面板 +- 为不同训练项目提供场景化预设,例如发球、正手、反手、步伐训练各自保存摄像头方向、裁切比例和提示文案 +- 增加弱网回传面板,向用户展示排队片段、预计上传耗时和失败重试建议 + +### PC 轻剪与训练回放 + +- 交付单轨时间线粗剪:入点、出点、片段删除、关键帧封面和 marker 跳转 +- 增加“剪辑计划”实体,允许把自动 marker、手动 marker 和 AI 建议片段一起保存 +- 提供双栏回放模式:左侧原视频,右侧姿态轨迹、节奏评分和文字纠正同步滚动 +- 支持从视频库直接发起导出任务,在后台生成训练集锦或问题片段合集 + +### 高性能前端重构 + +- 将训练、分析、录制、视频库拆分为按域加载的路由包,继续降低首屏主包体积 +- 把共享媒体状态、任务状态和用户状态从页面本地逻辑收拢为稳定的数据域层 +- 统一上传、任务轮询、错误提示和绝对 URL 规范化逻辑,减少当前多处重复实现 +- 为重计算页面增加惰性加载、按需图表加载和更严格的移动端资源预算 diff --git a/docs/deploy.md b/docs/deploy.md index a6cbb95..509a90a 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -6,9 +6,10 @@ - `db` 容器:MySQL 8,数据持久化到 `db-data` - `migrate` 容器:一次性执行 Drizzle 迁移,成功后退出 - `app` 容器:Node 应用,端口 `3000` +- `app-worker` 容器:Node 后台任务 worker,共享应用卷与数据库 - 宿主机公开调试端口:`8302 -> app:3000` - `media` 容器:Go 媒体服务,端口 `8081` -- `worker` 容器:Go 媒体归档 worker,共享媒体卷 +- `media-worker` 容器:Go 媒体归档 worker,共享媒体卷 - `app-data` 卷:上传视频等本地文件存储 - `db-data` 卷:MySQL 数据目录 - `media-data` 卷:录制片段、会话状态、归档成片 @@ -32,6 +33,13 @@ docker compose up -d --build - `MYSQL_PASSWORD` - `MYSQL_ROOT_PASSWORD` - `LLM_API_KEY` +- `APP_PUBLIC_BASE_URL` +- `LLM_VISION_MODEL` + +如需启用独立视觉模型端点,再补: + +- `LLM_VISION_API_URL` +- `LLM_VISION_API_KEY` ## nginx @@ -54,6 +62,7 @@ systemctl reload nginx - `curl http://127.0.0.1:3002/api/trpc/auth.me` - `curl http://te.hao.work:8302/` - `curl http://127.0.0.1:8081/media/health` +- `docker compose exec app-worker node dist/worker.js --help` 不适用;应通过 `docker compose ps app-worker` 确认 worker 常驻 ## External access links @@ -77,4 +86,4 @@ systemctl reload nginx 2. 回退 Git 版本 3. 重新执行 `docker compose up -d --build` -如果只需停止录制链路,可单独关闭 `media` 与 `worker`,主站业务仍可继续运行。 +如果只需停止录制链路,可单独关闭 `media` 与 `media-worker`,主站业务仍可继续运行;如需暂停训练计划/动作纠正等后台任务,再额外停止 `app-worker`。 diff --git a/docs/testing.md b/docs/testing.md index 466b041..c730f33 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -20,6 +20,7 @@ - Node/tRPC 路由输入校验与权限检查 - LLM 模块请求配置与环境变量回退逻辑 +- 视觉模型 per-request model override 能力 - 媒体工具函数,例如录制时长格式化与码率选择 ### 3. Go 媒体服务测试 @@ -43,6 +44,7 @@ - 注入假媒体设备、假 `MediaRecorder` 和假 `RTCPeerConnection` 这样可以自动验证前端主流程,而不依赖数据库、真实摄像头权限和真实 WebRTC 网络环境。 +当前 E2E 已覆盖新的后台任务流和任务中心依赖的接口 mock。 ## Unified verification @@ -75,6 +77,14 @@ pnpm test:llm -- "你好,做个自我介绍" - 适合验证 `LLM_API_KEY`、`LLM_MODEL` 和网关连通性 - 不建议纳入 `pnpm verify`,因为它依赖外部网络和真实密钥 +多模态链路建议额外执行一次手工 smoke test: + +```bash +pnpm exec tsx -e 'import "dotenv/config"; import { invokeLLM } from "./server/_core/llm"; const result = await invokeLLM({ model: process.env.LLM_VISION_MODEL, apiUrl: process.env.LLM_VISION_API_URL, apiKey: process.env.LLM_VISION_API_KEY, messages: [{ role: "user", content: [{ type: "text", text: "请用中文一句话描述图片" }, { type: "image_url", image_url: { url: "https://..." } }] }] }); console.log(result.model, result.choices[0]?.message?.content);' +``` + +如果返回模型与 `LLM_VISION_MODEL` 不一致,说明上游网关忽略了视觉模型选择,业务任务会自动回退到文本纠正结果。 + ## Production smoke checks 部署到宿主机后,建议至少补以下联测: diff --git a/docs/verified-features.md b/docs/verified-features.md index ab22127..a630232 100644 --- a/docs/verified-features.md +++ b/docs/verified-features.md @@ -1,12 +1,12 @@ # Verified Features -本文档记录当前已经通过自动化验证或构建验证的项目。更新时间:2026-03-14 22:24 CST。 +本文档记录当前已经通过自动化验证或构建验证的项目。更新时间:2026-03-15 00:11 CST。 ## 最新完整验证记录 - 通过命令:`pnpm verify` -- 验证时间:2026-03-14 22:23 CST -- 结果摘要:`pnpm check` 通过,`pnpm test` 通过(74/74),`pnpm test:go` 通过,`pnpm build` 通过,`pnpm test:e2e` 通过(5/5) +- 验证时间:2026-03-15 00:10 CST +- 结果摘要:`pnpm check` 通过,`pnpm test` 通过(80/80),`pnpm test:go` 通过,`pnpm build` 通过,`pnpm test:e2e` 通过(6/6),`pnpm test:llm` 通过 ## 生产部署联测 @@ -15,10 +15,13 @@ | `https://te.hao.work/` HTTPS 访问 | `curl -I https://te.hao.work/` | 通过 | | `http://te.hao.work:8302/` 4 位端口访问 | `curl -I http://te.hao.work:8302/` | 通过 | | 站点 TLS 证书 | Let’s Encrypt ECDSA 证书已签发并由宿主机 nginx 加载 | 通过 | -| 生产首页、登录页、录制页浏览器打开 | Playwright 访问 `https://te.hao.work/`、`/login`、`/recorder` | 通过 | +| 生产登录与首次进入工作台 | Playwright 登录真实站点并跳转 `/dashboard` | 通过 | +| 生产训练 / 实时分析 / 录制 / 视频库页面加载 | Playwright 访问 `/training`、`/live-camera`、`/recorder`、`/videos` | 通过 | +| 生产训练计划后台任务提交 | Playwright 点击训练计划生成按钮并收到后台任务反馈 | 通过 | +| 生产移动端录制焦点视图 | Playwright 移动端视口打开 `/recorder` 并验证焦点入口与操作壳层 | 通过 | | 生产前端运行时异常检查 | Playwright `pageerror` / `console.error` 检查 | 通过 | | 媒体健康检查 | `curl http://127.0.0.1:8081/media/health` | 通过 | -| compose 自包含服务 | `docker compose ps` 中 `app` / `db` / `media` / `worker` 正常运行,`migrate` 成功退出 | 通过 | +| compose 自包含服务 | `docker compose ps -a` 中 `app` / `app-worker` / `db` / `media` / `media-worker` 正常运行,`migrate` 成功退出 | 通过 | ## 构建与编译通过 @@ -43,6 +46,7 @@ | badge | `pnpm test` | 通过 | | leaderboard | `pnpm test` | 通过 | | tutorial / reminder / notification 路由校验 | `pnpm test` | 通过 | +| task 后台任务路由 | `pnpm test` / `pnpm test:e2e` | 通过 | | media 工具函数 | `pnpm test` | 通过 | | 登录 URL 回退逻辑 | `pnpm test` | 通过 | @@ -63,7 +67,9 @@ | 训练计划 | 训练计划页加载与生成入口可见 | 通过 | | 视频库 | 视频卡片渲染 | 通过 | | 实时分析 | 摄像头启动入口渲染 | 通过 | +| 实时分析打分 | 启动分析后出现实时评分结果 | 通过 | | 在线录制 | 启动摄像头、开始录制、手动标记、结束归档 | 通过 | +| 录制焦点视图 | 移动端最大化焦点视图与主操作按钮渲染 | 通过 | | 录制结果入库 | 归档完成后视频库可见录制结果 | 通过 | ## LLM 模块验证 @@ -72,12 +78,14 @@ |------|----------|------| | `.env` 中的 `LLM_API_URL` / `LLM_API_KEY` / `LLM_MODEL` | `pnpm test:llm` | 通过 | | `https://one.hao.work/v1/chat/completions` 联通性 | `pnpm test:llm` 实际返回文本 | 通过 | +| 视觉模型独立配置路径 | `server/_core/llm.test.ts` + 手工 smoke 检查 | 通过 | ## 已知非阻断警告 - 测试与开发日志中会出现 `OAUTH_SERVER_URL` 未配置提示;当前 mocked auth 和本地验证链路不依赖真实 OAuth 服务,因此不会导致失败 - `pnpm build` 仍有 Vite 大 chunk 警告;当前属于性能优化待办,不影响本次产物生成 - Playwright 运行依赖 mocked media/network,不等价于真机摄像头、真实弱网和真实 WebRTC 质量验收 +- 当前上游视觉网关可能忽略 `LLM_VISION_MODEL` 并回退为文本模型;服务端已实现自动降级,任务不会因此直接失败 ## 当前未纳入自动验证的内容 diff --git a/drizzle/0005_lively_taskmaster.sql b/drizzle/0005_lively_taskmaster.sql new file mode 100644 index 0000000..624bf27 --- /dev/null +++ b/drizzle/0005_lively_taskmaster.sql @@ -0,0 +1,22 @@ +CREATE TABLE `background_tasks` ( + `id` varchar(36) NOT NULL, + `userId` int NOT NULL, + `type` enum('media_finalize','training_plan_generate','training_plan_adjust','analysis_corrections','pose_correction_multimodal') NOT NULL, + `status` enum('queued','running','succeeded','failed') NOT NULL DEFAULT 'queued', + `title` varchar(256) NOT NULL, + `message` text, + `progress` int NOT NULL DEFAULT 0, + `payload` json NOT NULL, + `result` json, + `error` text, + `attempts` int NOT NULL DEFAULT 0, + `maxAttempts` int NOT NULL DEFAULT 3, + `workerId` varchar(96), + `runAfter` timestamp NOT NULL DEFAULT (now()), + `lockedAt` timestamp, + `startedAt` timestamp, + `completedAt` timestamp, + `createdAt` timestamp NOT NULL DEFAULT (now()), + `updatedAt` timestamp NOT NULL DEFAULT (now()) ON UPDATE CURRENT_TIMESTAMP, + CONSTRAINT `background_tasks_id` PRIMARY KEY(`id`) +); diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 324c54e..6ec2fc8 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -36,6 +36,13 @@ "when": 1773490358606, "tag": "0004_exotic_randall", "breakpoints": true + }, + { + "idx": 5, + "version": "5", + "when": 1773504000000, + "tag": "0005_lively_taskmaster", + "breakpoints": true } ] -} \ No newline at end of file +} diff --git a/drizzle/schema.ts b/drizzle/schema.ts index 5d19678..49647b5 100644 --- a/drizzle/schema.ts +++ b/drizzle/schema.ts @@ -301,3 +301,36 @@ export const notificationLog = mysqlTable("notification_log", { export type NotificationLogEntry = typeof notificationLog.$inferSelect; export type InsertNotificationLog = typeof notificationLog.$inferInsert; +/** + * Background task queue for long-running or retryable work. + */ +export const backgroundTasks = mysqlTable("background_tasks", { + id: varchar("id", { length: 36 }).primaryKey(), + userId: int("userId").notNull(), + type: mysqlEnum("type", [ + "media_finalize", + "training_plan_generate", + "training_plan_adjust", + "analysis_corrections", + "pose_correction_multimodal", + ]).notNull(), + status: mysqlEnum("status", ["queued", "running", "succeeded", "failed"]).notNull().default("queued"), + title: varchar("title", { length: 256 }).notNull(), + message: text("message"), + progress: int("progress").notNull().default(0), + payload: json("payload").notNull(), + result: json("result"), + error: text("error"), + attempts: int("attempts").notNull().default(0), + maxAttempts: int("maxAttempts").notNull().default(3), + workerId: varchar("workerId", { length: 96 }), + runAfter: timestamp("runAfter").defaultNow().notNull(), + lockedAt: timestamp("lockedAt"), + startedAt: timestamp("startedAt"), + completedAt: timestamp("completedAt"), + createdAt: timestamp("createdAt").defaultNow().notNull(), + updatedAt: timestamp("updatedAt").defaultNow().onUpdateNow().notNull(), +}); + +export type BackgroundTask = typeof backgroundTasks.$inferSelect; +export type InsertBackgroundTask = typeof backgroundTasks.$inferInsert; diff --git a/package.json b/package.json index 9761f9d..df8f2d0 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,9 @@ "scripts": { "dev": "NODE_ENV=development tsx watch server/_core/index.ts", "dev:test": "PORT=41731 STRICT_PORT=1 VITE_APP_ID=test-app VITE_OAUTH_PORTAL_URL=http://127.0.0.1:41731 NODE_ENV=development tsx server/_core/index.ts", - "build": "vite build && esbuild server/_core/index.ts --platform=node --packages=external --bundle --format=esm --outdir=dist", - "start": "NODE_ENV=production node dist/index.js", + "build": "vite build && esbuild server/_core/index.ts server/worker.ts --platform=node --packages=external --bundle --format=esm --outdir=dist", + "start": "NODE_ENV=production node dist/_core/index.js", + "start:worker": "NODE_ENV=production node dist/worker.js", "check": "tsc --noEmit", "format": "prettier --write .", "test": "vitest run", diff --git a/server/_core/env.ts b/server/_core/env.ts index ec3f41c..0529d10 100644 --- a/server/_core/env.ts +++ b/server/_core/env.ts @@ -11,6 +11,7 @@ const parseBoolean = (value: string | undefined, fallback: boolean) => { export const ENV = { appId: process.env.VITE_APP_ID ?? "", + appPublicBaseUrl: process.env.APP_PUBLIC_BASE_URL ?? "", cookieSecret: process.env.JWT_SECRET ?? "", databaseUrl: process.env.DATABASE_URL ?? "", oAuthServerUrl: process.env.OAUTH_SERVER_URL ?? "", @@ -27,7 +28,22 @@ export const ENV = { llmApiKey: process.env.LLM_API_KEY ?? process.env.BUILT_IN_FORGE_API_KEY ?? "", llmModel: process.env.LLM_MODEL ?? "gemini-2.5-flash", + llmVisionApiUrl: + process.env.LLM_VISION_API_URL ?? + process.env.LLM_API_URL ?? + (process.env.BUILT_IN_FORGE_API_URL + ? `${process.env.BUILT_IN_FORGE_API_URL.replace(/\/$/, "")}/v1/chat/completions` + : ""), + llmVisionApiKey: + process.env.LLM_VISION_API_KEY ?? + process.env.LLM_API_KEY ?? + process.env.BUILT_IN_FORGE_API_KEY ?? + "", + llmVisionModel: process.env.LLM_VISION_MODEL ?? process.env.LLM_MODEL ?? "gemini-2.5-flash", llmMaxTokens: parseInteger(process.env.LLM_MAX_TOKENS, 32768), llmEnableThinking: parseBoolean(process.env.LLM_ENABLE_THINKING, false), llmThinkingBudget: parseInteger(process.env.LLM_THINKING_BUDGET, 128), + mediaServiceUrl: process.env.MEDIA_SERVICE_URL ?? "", + backgroundTaskPollMs: parseInteger(process.env.BACKGROUND_TASK_POLL_MS, 3000), + backgroundTaskStaleMs: parseInteger(process.env.BACKGROUND_TASK_STALE_MS, 300000), }; diff --git a/server/_core/llm.test.ts b/server/_core/llm.test.ts index 6d862b0..6eb6e49 100644 --- a/server/_core/llm.test.ts +++ b/server/_core/llm.test.ts @@ -68,6 +68,29 @@ describe("invokeLLM", () => { expect(JSON.parse(request.body)).not.toHaveProperty("thinking"); }); + it("allows overriding the model per request", async () => { + process.env.LLM_API_URL = "https://one.hao.work/v1/chat/completions"; + process.env.LLM_API_KEY = "test-key"; + process.env.LLM_MODEL = "qwen3.5-plus"; + + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + json: async () => mockSuccessResponse, + }); + vi.stubGlobal("fetch", fetchMock); + + const { invokeLLM } = await import("./llm"); + await invokeLLM({ + model: "qwen3-vl-235b-a22b", + messages: [{ role: "user", content: "describe image" }], + }); + + const [, request] = fetchMock.mock.calls[0] as [string, { body: string }]; + expect(JSON.parse(request.body)).toMatchObject({ + model: "qwen3-vl-235b-a22b", + }); + }); + it("falls back to legacy forge variables when LLM_* values are absent", async () => { delete process.env.LLM_API_URL; delete process.env.LLM_API_KEY; diff --git a/server/_core/llm.ts b/server/_core/llm.ts index 49f6cda..eb2cba7 100644 --- a/server/_core/llm.ts +++ b/server/_core/llm.ts @@ -57,6 +57,9 @@ export type ToolChoice = export type InvokeParams = { messages: Message[]; + model?: string; + apiUrl?: string; + apiKey?: string; tools?: Tool[]; toolChoice?: ToolChoice; tool_choice?: ToolChoice; @@ -209,13 +212,15 @@ const normalizeToolChoice = ( return toolChoice; }; -const resolveApiUrl = () => - ENV.llmApiUrl && ENV.llmApiUrl.trim().length > 0 +const resolveApiUrl = (apiUrl?: string) => + apiUrl && apiUrl.trim().length > 0 + ? apiUrl + : ENV.llmApiUrl && ENV.llmApiUrl.trim().length > 0 ? ENV.llmApiUrl : "https://forge.manus.im/v1/chat/completions"; -const assertApiKey = () => { - if (!ENV.llmApiKey) { +const assertApiKey = (apiKey?: string) => { + if (!(apiKey || ENV.llmApiKey)) { throw new Error("LLM_API_KEY is not configured"); } }; @@ -266,10 +271,13 @@ const normalizeResponseFormat = ({ }; export async function invokeLLM(params: InvokeParams): Promise { - assertApiKey(); + assertApiKey(params.apiKey); const { messages, + model, + apiUrl, + apiKey, tools, toolChoice, tool_choice, @@ -280,7 +288,7 @@ export async function invokeLLM(params: InvokeParams): Promise { } = params; const payload: Record = { - model: ENV.llmModel, + model: model || ENV.llmModel, messages: messages.map(normalizeMessage), }; @@ -315,11 +323,11 @@ export async function invokeLLM(params: InvokeParams): Promise { payload.response_format = normalizedResponseFormat; } - const response = await fetch(resolveApiUrl(), { + const response = await fetch(resolveApiUrl(apiUrl), { method: "POST", headers: { "content-type": "application/json", - authorization: `Bearer ${ENV.llmApiKey}`, + authorization: `Bearer ${apiKey || ENV.llmApiKey}`, }, body: JSON.stringify(payload), }); diff --git a/server/_core/static.ts b/server/_core/static.ts index 7a45f3a..4295920 100644 --- a/server/_core/static.ts +++ b/server/_core/static.ts @@ -6,7 +6,7 @@ export function serveStatic(app: Express) { const distPath = process.env.NODE_ENV === "development" ? path.resolve(import.meta.dirname, "../..", "dist", "public") - : path.resolve(import.meta.dirname, "public"); + : path.resolve(import.meta.dirname, "..", "public"); if (!fs.existsSync(distPath)) { console.error( `Could not find the build directory: ${distPath}, make sure to build the client first` diff --git a/server/db.ts b/server/db.ts index 16216f5..0708c9b 100644 --- a/server/db.ts +++ b/server/db.ts @@ -1,4 +1,4 @@ -import { eq, desc, and, sql } from "drizzle-orm"; +import { eq, desc, and, asc, lte, sql } from "drizzle-orm"; import { drizzle } from "drizzle-orm/mysql2"; import { InsertUser, users, @@ -14,6 +14,7 @@ import { tutorialProgress, InsertTutorialProgress, trainingReminders, InsertTrainingReminder, notificationLog, InsertNotificationLog, + backgroundTasks, InsertBackgroundTask, } from "../drizzle/schema"; import { ENV } from './_core/env'; @@ -179,6 +180,15 @@ export async function getVideoById(videoId: number) { return result.length > 0 ? result[0] : undefined; } +export async function getVideoByFileKey(userId: number, fileKey: string) { + const db = await getDb(); + if (!db) return undefined; + const result = await db.select().from(trainingVideos) + .where(and(eq(trainingVideos.userId, userId), eq(trainingVideos.fileKey, fileKey))) + .limit(1); + return result.length > 0 ? result[0] : undefined; +} + export async function updateVideoStatus(videoId: number, status: "pending" | "analyzing" | "completed" | "failed") { const db = await getDb(); if (!db) return; @@ -660,6 +670,162 @@ export async function getUnreadNotificationCount(userId: number) { return result[0]?.count || 0; } +// ===== BACKGROUND TASK OPERATIONS ===== + +export async function createBackgroundTask(task: InsertBackgroundTask) { + const db = await getDb(); + if (!db) throw new Error("Database not available"); + await db.insert(backgroundTasks).values(task); + return task.id; +} + +export async function listUserBackgroundTasks(userId: number, limit = 20) { + const db = await getDb(); + if (!db) return []; + return db.select().from(backgroundTasks) + .where(eq(backgroundTasks.userId, userId)) + .orderBy(desc(backgroundTasks.createdAt)) + .limit(limit); +} + +export async function getBackgroundTaskById(taskId: string) { + const db = await getDb(); + if (!db) return undefined; + const result = await db.select().from(backgroundTasks) + .where(eq(backgroundTasks.id, taskId)) + .limit(1); + return result[0]; +} + +export async function getUserBackgroundTaskById(userId: number, taskId: string) { + const db = await getDb(); + if (!db) return undefined; + const result = await db.select().from(backgroundTasks) + .where(and(eq(backgroundTasks.id, taskId), eq(backgroundTasks.userId, userId))) + .limit(1); + return result[0]; +} + +export async function claimNextBackgroundTask(workerId: string) { + const db = await getDb(); + if (!db) return null; + + const now = new Date(); + const [nextTask] = await db.select().from(backgroundTasks) + .where(and(eq(backgroundTasks.status, "queued"), lte(backgroundTasks.runAfter, now))) + .orderBy(asc(backgroundTasks.runAfter), asc(backgroundTasks.createdAt)) + .limit(1); + + if (!nextTask) { + return null; + } + + await db.update(backgroundTasks).set({ + status: "running", + workerId, + attempts: sql`${backgroundTasks.attempts} + 1`, + lockedAt: now, + startedAt: now, + updatedAt: now, + }).where(eq(backgroundTasks.id, nextTask.id)); + + return getBackgroundTaskById(nextTask.id); +} + +export async function heartbeatBackgroundTask(taskId: string, workerId: string) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + workerId, + lockedAt: new Date(), + }).where(eq(backgroundTasks.id, taskId)); +} + +export async function updateBackgroundTask(taskId: string, data: Partial) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set(data).where(eq(backgroundTasks.id, taskId)); +} + +export async function completeBackgroundTask(taskId: string, result: unknown, message?: string) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + status: "succeeded", + progress: 100, + message: message ?? "已完成", + result, + error: null, + workerId: null, + lockedAt: null, + completedAt: new Date(), + }).where(eq(backgroundTasks.id, taskId)); +} + +export async function failBackgroundTask(taskId: string, error: string) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + status: "failed", + error, + workerId: null, + lockedAt: null, + completedAt: new Date(), + }).where(eq(backgroundTasks.id, taskId)); +} + +export async function rescheduleBackgroundTask(taskId: string, params: { + progress?: number; + message?: string; + error?: string | null; + delayMs?: number; +}) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + status: "queued", + progress: params.progress, + message: params.message, + error: params.error ?? null, + workerId: null, + lockedAt: null, + runAfter: new Date(Date.now() + (params.delayMs ?? 0)), + }).where(eq(backgroundTasks.id, taskId)); +} + +export async function retryBackgroundTask(userId: number, taskId: string) { + const db = await getDb(); + if (!db) throw new Error("Database not available"); + const task = await getUserBackgroundTaskById(userId, taskId); + if (!task) { + throw new Error("Task not found"); + } + await db.update(backgroundTasks).set({ + status: "queued", + progress: 0, + message: "任务已重新排队", + error: null, + result: null, + workerId: null, + lockedAt: null, + completedAt: null, + runAfter: new Date(), + }).where(eq(backgroundTasks.id, taskId)); + return getBackgroundTaskById(taskId); +} + +export async function requeueStaleBackgroundTasks(staleBefore: Date) { + const db = await getDb(); + if (!db) return; + await db.update(backgroundTasks).set({ + status: "queued", + message: "检测到任务中断,已重新排队", + workerId: null, + lockedAt: null, + runAfter: new Date(), + }).where(and(eq(backgroundTasks.status, "running"), lte(backgroundTasks.lockedAt, staleBefore))); +} + // ===== STATS HELPERS ===== export async function getUserStats(userId: number) { diff --git a/server/mediaService.ts b/server/mediaService.ts new file mode 100644 index 0000000..15fcd12 --- /dev/null +++ b/server/mediaService.ts @@ -0,0 +1,34 @@ +import { ENV } from "./_core/env"; + +export type RemoteMediaSession = { + id: string; + userId: string; + title: string; + archiveStatus: "idle" | "queued" | "processing" | "completed" | "failed"; + playback: { + webmUrl?: string; + mp4Url?: string; + webmSize?: number; + mp4Size?: number; + ready: boolean; + previewUrl?: string; + }; + lastError?: string; +}; + +function getMediaBaseUrl() { + if (!ENV.mediaServiceUrl) { + throw new Error("MEDIA_SERVICE_URL is not configured"); + } + return ENV.mediaServiceUrl.replace(/\/+$/, ""); +} + +export async function getRemoteMediaSession(sessionId: string) { + const response = await fetch(`${getMediaBaseUrl()}/sessions/${sessionId}`); + if (!response.ok) { + const message = await response.text().catch(() => response.statusText); + throw new Error(`Media service request failed (${response.status}): ${message}`); + } + const payload = await response.json() as { session: RemoteMediaSession }; + return payload.session; +} diff --git a/server/prompts.ts b/server/prompts.ts new file mode 100644 index 0000000..0201b3d --- /dev/null +++ b/server/prompts.ts @@ -0,0 +1,255 @@ +type RecentScore = { + score: number | null; + issues: unknown; + exerciseType: string | null; + shotCount: number | null; + strokeConsistency: number | null; + footworkScore: number | null; +}; + +type RecentAnalysis = { + score: number | null; + issues: unknown; + corrections: unknown; + shotCount: number | null; + strokeConsistency: number | null; + footworkScore: number | null; + fluidityScore: number | null; +}; + +function skillLevelLabel(skillLevel: "beginner" | "intermediate" | "advanced") { + switch (skillLevel) { + case "intermediate": + return "中级"; + case "advanced": + return "高级"; + default: + return "初级"; + } +} + +export function buildTrainingPlanPrompt(input: { + skillLevel: "beginner" | "intermediate" | "advanced"; + durationDays: number; + focusAreas?: string[]; + recentScores: RecentScore[]; +}) { + return [ + `你是一位专业网球教练。请为一位${skillLevelLabel(input.skillLevel)}水平的网球学员生成 ${input.durationDays} 天训练计划。`, + "训练条件与要求:", + "- 训练以个人可执行为主,可使用球拍、弹力带、标志盘、墙面等常见器材。", + "- 每天训练 30-60 分钟,结构要清晰:热身、专项、脚步、力量/稳定、放松。", + "- 输出内容要适合直接执行,不写空话,不写营销语,不写额外说明。", + input.focusAreas?.length ? `- 重点关注:${input.focusAreas.join("、")}` : "- 如未指定重点,请自动平衡技术、脚步和体能。", + input.recentScores.length > 0 + ? `- 用户最近分析摘要:${JSON.stringify(input.recentScores)}` + : "- 暂无历史分析数据,请基于该水平的常见薄弱项设计。", + "每个训练项都要给出目标、动作描述、组次/次数、关键提示,避免重复堆砌。", + ].join("\n"); +} + +export function buildAdjustedTrainingPlanPrompt(input: { + currentExercises: unknown; + recentAnalyses: RecentAnalysis[]; +}) { + return [ + "你是一位专业网球教练,需要根据最近训练分析结果调整现有训练计划。", + `当前计划:${JSON.stringify(input.currentExercises)}`, + `最近分析结果:${JSON.stringify(input.recentAnalyses)}`, + "请优先修复最近最频繁、最影响击球质量的问题。", + "要求:", + "- 保留原计划中仍然有效的训练项,不要全部推倒重来。", + "- 增加动作纠正、脚步节奏、稳定性和专项力量训练。", + "- adjustmentNotes 需要说明为什么这样调整,以及下一阶段重点。", + "- 输出仅返回结构化 JSON。", + ].join("\n"); +} + +export function buildTextCorrectionPrompt(input: { + exerciseType: string; + poseMetrics: unknown; + detectedIssues: unknown; +}) { + return [ + "你是一位网球技术教练与动作纠正分析师。", + `动作类型:${input.exerciseType}`, + `姿态指标:${JSON.stringify(input.poseMetrics)}`, + `已检测问题:${JSON.stringify(input.detectedIssues)}`, + "请用中文输出专业、直接、可执行的纠正建议,使用 Markdown。", + "内容结构必须包括:", + "1. 动作概览", + "2. 最高优先级的 3 个修正点", + "3. 每个修正点对应的练习方法、感受提示、完成标准", + "4. 下一次拍摄或训练时的注意事项", + ].join("\n"); +} + +export const multimodalCorrectionSchema = { + type: "object", + properties: { + summary: { type: "string" }, + overallScore: { type: "number" }, + confidence: { type: "number" }, + phaseFindings: { + type: "array", + items: { + type: "object", + properties: { + phase: { type: "string" }, + score: { type: "number" }, + observation: { type: "string" }, + impact: { type: "string" }, + }, + required: ["phase", "score", "observation", "impact"], + additionalProperties: false, + }, + }, + bodyPartFindings: { + type: "array", + items: { + type: "object", + properties: { + bodyPart: { type: "string" }, + issue: { type: "string" }, + recommendation: { type: "string" }, + }, + required: ["bodyPart", "issue", "recommendation"], + additionalProperties: false, + }, + }, + priorityFixes: { + type: "array", + items: { + type: "object", + properties: { + title: { type: "string" }, + why: { type: "string" }, + howToPractice: { type: "string" }, + successMetric: { type: "string" }, + }, + required: ["title", "why", "howToPractice", "successMetric"], + additionalProperties: false, + }, + }, + drills: { + type: "array", + items: { + type: "object", + properties: { + name: { type: "string" }, + purpose: { type: "string" }, + durationMinutes: { type: "number" }, + steps: { + type: "array", + items: { type: "string" }, + }, + coachingCues: { + type: "array", + items: { type: "string" }, + }, + }, + required: ["name", "purpose", "durationMinutes", "steps", "coachingCues"], + additionalProperties: false, + }, + }, + safetyRisks: { + type: "array", + items: { type: "string" }, + }, + nextSessionFocus: { + type: "array", + items: { type: "string" }, + }, + recommendedCaptureTips: { + type: "array", + items: { type: "string" }, + }, + }, + required: [ + "summary", + "overallScore", + "confidence", + "phaseFindings", + "bodyPartFindings", + "priorityFixes", + "drills", + "safetyRisks", + "nextSessionFocus", + "recommendedCaptureTips", + ], + additionalProperties: false, +}; + +export function buildMultimodalCorrectionPrompt(input: { + exerciseType: string; + poseMetrics: unknown; + detectedIssues: unknown; + imageCount: number; +}) { + return [ + "你是一位专业网球技术教练,正在审阅学员的动作截图。", + `动作类型:${input.exerciseType}`, + `结构化姿态指标:${JSON.stringify(input.poseMetrics)}`, + `已有问题标签:${JSON.stringify(input.detectedIssues)}`, + `本次共提供 ${input.imageCount} 张关键帧图片。`, + "请严格依据图片和结构化指标交叉判断,不要编造看不到的动作细节。", + "分析要求:", + "- 识别准备、引拍、击球/发力、收拍几个阶段的质量。", + "- 指出躯干、肩髋、击球臂、非持拍手、重心和脚步的主要问题。", + "- priorityFixes 只保留最重要、最值得优先修正的项目。", + "- drills 要足够具体,适合下一次训练直接执行。", + "- recommendedCaptureTips 说明下次如何补拍,以便提高判断准确度。", + "输出仅返回 JSON,不要附加解释。", + ].join("\n"); +} + +export function renderMultimodalCorrectionMarkdown(report: { + summary: string; + overallScore: number; + confidence: number; + priorityFixes: Array<{ title: string; why: string; howToPractice: string; successMetric: string }>; + drills: Array<{ name: string; purpose: string; durationMinutes: number; coachingCues: string[] }>; + safetyRisks: string[]; + nextSessionFocus: string[]; + recommendedCaptureTips: string[]; +}) { + const priorityFixes = report.priorityFixes + .map((item, index) => [ + `${index + 1}. ${item.title}`, + `- 原因:${item.why}`, + `- 练习:${item.howToPractice}`, + `- 达标:${item.successMetric}`, + ].join("\n")) + .join("\n"); + + const drills = report.drills + .map((item) => [ + `- ${item.name}(${item.durationMinutes} 分钟)`, + ` 目的:${item.purpose}`, + ` 口令:${item.coachingCues.join(";")}`, + ].join("\n")) + .join("\n"); + + return [ + `## 动作概览`, + report.summary, + "", + `- 综合评分:${Math.round(report.overallScore)}/100`, + `- 置信度:${Math.round(report.confidence)}%`, + "", + "## 优先修正", + priorityFixes || "- 暂无", + "", + "## 推荐练习", + drills || "- 暂无", + "", + "## 风险提醒", + report.safetyRisks.length > 0 ? report.safetyRisks.map(item => `- ${item}`).join("\n") : "- 暂无明显风险", + "", + "## 下次训练重点", + report.nextSessionFocus.length > 0 ? report.nextSessionFocus.map(item => `- ${item}`).join("\n") : "- 保持当前节奏", + "", + "## 下次拍摄建议", + report.recommendedCaptureTips.length > 0 ? report.recommendedCaptureTips.map(item => `- ${item}`).join("\n") : "- 保持当前拍摄方式", + ].join("\n"); +} diff --git a/server/publicUrl.ts b/server/publicUrl.ts new file mode 100644 index 0000000..0af4f92 --- /dev/null +++ b/server/publicUrl.ts @@ -0,0 +1,22 @@ +import { ENV } from "./_core/env"; + +function hasProtocol(value: string) { + return /^[a-z][a-z0-9+.-]*:\/\//i.test(value); +} + +export function toPublicUrl(pathOrUrl: string) { + const value = pathOrUrl.trim(); + if (!value) { + throw new Error("Public URL value is empty"); + } + + if (hasProtocol(value)) { + return value; + } + + if (!ENV.appPublicBaseUrl) { + throw new Error("APP_PUBLIC_BASE_URL is required for externally accessible asset URLs"); + } + + return new URL(value.startsWith("/") ? value : `/${value}`, ENV.appPublicBaseUrl).toString(); +} diff --git a/server/routers.ts b/server/routers.ts index 52e3fec..bf42265 100644 --- a/server/routers.ts +++ b/server/routers.ts @@ -4,53 +4,34 @@ import { systemRouter } from "./_core/systemRouter"; import { publicProcedure, protectedProcedure, router } from "./_core/trpc"; import { z } from "zod"; import { sdk } from "./_core/sdk"; -import { invokeLLM } from "./_core/llm"; import { storagePut } from "./storage"; import * as db from "./db"; import { nanoid } from "nanoid"; -import { - normalizeAdjustedPlanResponse, - normalizeTrainingPlanResponse, -} from "./trainingPlan"; +import { getRemoteMediaSession } from "./mediaService"; +import { prepareCorrectionImageUrls } from "./taskWorker"; +import { toPublicUrl } from "./publicUrl"; -async function invokeStructuredPlan(params: { - baseMessages: Array<{ role: "system" | "user"; content: string }>; - responseFormat: { - type: "json_schema"; - json_schema: { - name: string; - strict: true; - schema: Record; - }; - }; - parse: (content: unknown) => T; +async function enqueueTask(params: { + userId: number; + type: "media_finalize" | "training_plan_generate" | "training_plan_adjust" | "analysis_corrections" | "pose_correction_multimodal"; + title: string; + payload: Record; + message: string; }) { - let lastError: unknown; + const taskId = nanoid(); + await db.createBackgroundTask({ + id: taskId, + userId: params.userId, + type: params.type, + title: params.title, + message: params.message, + payload: params.payload, + progress: 0, + maxAttempts: params.type === "media_finalize" ? 90 : 3, + }); - for (let attempt = 0; attempt < 3; attempt++) { - const retryHint = - attempt === 0 || !(lastError instanceof Error) - ? [] - : [{ - role: "user" as const, - content: - `上一次输出无法被系统解析,错误是:${lastError.message}。` + - "请只返回一个合法、完整、可解析的 JSON 对象,不要包含额外说明、注释或 Markdown 代码块。", - }]; - - const response = await invokeLLM({ - messages: [...params.baseMessages, ...retryHint], - response_format: params.responseFormat, - }); - - try { - return params.parse(response.choices[0]?.message?.content); - } catch (error) { - lastError = error; - } - } - - throw lastError instanceof Error ? lastError : new Error("Failed to parse structured LLM response"); + const task = await db.getBackgroundTaskById(taskId); + return { taskId, task }; } export const appRouter = router({ @@ -104,86 +85,13 @@ export const appRouter = router({ focusAreas: z.array(z.string()).optional(), })) .mutation(async ({ ctx, input }) => { - const user = ctx.user; - // Get user's recent analyses for personalization - const analyses = await db.getUserAnalyses(user.id); - const recentScores = analyses.slice(0, 5).map(a => ({ - score: a.overallScore, - issues: a.detectedIssues, - exerciseType: a.exerciseType, - shotCount: a.shotCount, - strokeConsistency: a.strokeConsistency, - footworkScore: a.footworkScore, - })); - - const prompt = `你是一位网球教练。请为一位${ - input.skillLevel === "beginner" ? "初级" : input.skillLevel === "intermediate" ? "中级" : "高级" - }水平的网球学员生成一个${input.durationDays}天的训练计划。 - -要求: -- 只需要球拍,不需要球场和球网 -- 包含影子挥拍、墙壁练习、脚步移动、体能训练等 -- 每天训练30-60分钟 -${input.focusAreas?.length ? `- 重点关注: ${input.focusAreas.join(", ")}` : ""} -${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(recentScores)}` : ""} - -请返回JSON格式,包含每天的训练内容。`; - - const parsed = await invokeStructuredPlan({ - baseMessages: [ - { role: "system", content: "你是网球训练计划生成器。返回严格的JSON格式。" }, - { role: "user", content: prompt }, - ], - responseFormat: { - type: "json_schema", - json_schema: { - name: "training_plan", - strict: true, - schema: { - type: "object", - properties: { - title: { type: "string", description: "训练计划标题" }, - exercises: { - type: "array", - items: { - type: "object", - properties: { - day: { type: "number" }, - name: { type: "string" }, - category: { type: "string" }, - duration: { type: "number", description: "分钟" }, - description: { type: "string" }, - tips: { type: "string" }, - sets: { type: "number" }, - reps: { type: "number" }, - }, - required: ["day", "name", "category", "duration", "description", "tips", "sets", "reps"], - additionalProperties: false, - }, - }, - }, - required: ["title", "exercises"], - additionalProperties: false, - }, - }, - }, - parse: (content) => normalizeTrainingPlanResponse({ - content, - fallbackTitle: `${input.durationDays}天训练计划`, - }), + return enqueueTask({ + userId: ctx.user.id, + type: "training_plan_generate", + title: `${input.durationDays}天训练计划生成`, + message: "训练计划已加入后台队列", + payload: input, }); - - const planId = await db.createTrainingPlan({ - userId: user.id, - title: parsed.title, - skillLevel: input.skillLevel, - durationDays: input.durationDays, - exercises: parsed.exercises, - isActive: 1, - version: 1, - }); - - return { planId, plan: parsed }; }), list: protectedProcedure.query(async ({ ctx }) => { @@ -197,78 +105,15 @@ ${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(rec adjust: protectedProcedure .input(z.object({ planId: z.number() })) .mutation(async ({ ctx, input }) => { - const analyses = await db.getUserAnalyses(ctx.user.id); - const recentAnalyses = analyses.slice(0, 5); const currentPlan = (await db.getUserTrainingPlans(ctx.user.id)).find(p => p.id === input.planId); if (!currentPlan) throw new Error("Plan not found"); - - const prompt = `基于以下用户的姿势分析结果,调整训练计划: - -当前计划: ${JSON.stringify(currentPlan.exercises)} -最近分析结果: ${JSON.stringify(recentAnalyses.map(a => ({ - score: a.overallScore, - issues: a.detectedIssues, - corrections: a.corrections, - shotCount: a.shotCount, - strokeConsistency: a.strokeConsistency, - footworkScore: a.footworkScore, - fluidityScore: a.fluidityScore, - })))} - -请根据分析结果调整训练计划,增加针对薄弱环节的训练,返回与原计划相同格式的JSON。`; - - const parsed = await invokeStructuredPlan({ - baseMessages: [ - { role: "system", content: "你是网球训练计划调整器。返回严格的JSON格式。" }, - { role: "user", content: prompt }, - ], - responseFormat: { - type: "json_schema", - json_schema: { - name: "adjusted_plan", - strict: true, - schema: { - type: "object", - properties: { - title: { type: "string" }, - adjustmentNotes: { type: "string", description: "调整说明" }, - exercises: { - type: "array", - items: { - type: "object", - properties: { - day: { type: "number" }, - name: { type: "string" }, - category: { type: "string" }, - duration: { type: "number" }, - description: { type: "string" }, - tips: { type: "string" }, - sets: { type: "number" }, - reps: { type: "number" }, - }, - required: ["day", "name", "category", "duration", "description", "tips", "sets", "reps"], - additionalProperties: false, - }, - }, - }, - required: ["title", "adjustmentNotes", "exercises"], - additionalProperties: false, - }, - }, - }, - parse: (content) => normalizeAdjustedPlanResponse({ - content, - fallbackTitle: currentPlan.title, - }), + return enqueueTask({ + userId: ctx.user.id, + type: "training_plan_adjust", + title: `${currentPlan.title} 调整`, + message: "训练计划调整任务已加入后台队列", + payload: input, }); - - await db.updateTrainingPlan(input.planId, { - exercises: parsed.exercises, - adjustmentNotes: parsed.adjustmentNotes, - version: (currentPlan.version || 1) + 1, - }); - - return { success: true, adjustmentNotes: parsed.adjustmentNotes }; }), }), @@ -287,19 +132,20 @@ ${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(rec const fileKey = `videos/${ctx.user.id}/${nanoid()}.${input.format}`; const contentType = input.format === "webm" ? "video/webm" : "video/mp4"; const { url } = await storagePut(fileKey, fileBuffer, contentType); + const publicUrl = toPublicUrl(url); const videoId = await db.createVideo({ userId: ctx.user.id, title: input.title, fileKey, - url, + url: publicUrl, format: input.format, fileSize: input.fileSize, exerciseType: input.exerciseType || null, analysisStatus: "pending", }); - return { videoId, url }; + return { videoId, url: publicUrl }; }), registerExternal: protectedProcedure @@ -313,11 +159,12 @@ ${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(rec exerciseType: z.string().optional(), })) .mutation(async ({ ctx, input }) => { + const publicUrl = toPublicUrl(input.url); const videoId = await db.createVideo({ userId: ctx.user.id, title: input.title, fileKey: input.fileKey, - url: input.url, + url: publicUrl, format: input.format, fileSize: input.fileSize ?? null, duration: input.duration ?? null, @@ -325,7 +172,7 @@ ${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(rec analysisStatus: "completed", }); - return { videoId, url: input.url }; + return { videoId, url: publicUrl }; }), list: protectedProcedure.query(async ({ ctx }) => { @@ -399,32 +246,70 @@ ${recentScores.length > 0 ? `- 用户最近的分析数据: ${JSON.stringify(rec poseMetrics: z.any(), exerciseType: z.string(), detectedIssues: z.any(), + imageUrls: z.array(z.string()).optional(), + imageDataUrls: z.array(z.string()).max(4).optional(), })) - .mutation(async ({ input }) => { - const response = await invokeLLM({ - messages: [ - { - role: "system", - content: "你是一位网球动作分析员。根据MediaPipe姿势分析数据,给出具体的姿势矫正建议。用中文回答。", - }, - { - role: "user", - content: `分析以下网球动作数据并给出矫正建议: -动作类型: ${input.exerciseType} -姿势指标: ${JSON.stringify(input.poseMetrics)} -检测到的问题: ${JSON.stringify(input.detectedIssues)} - -请给出: -1. 每个问题的具体矫正方法 -2. 推荐的练习动作 -3. 需要注意的关键点`, - }, - ], + .mutation(async ({ ctx, input }) => { + const imageUrls = await prepareCorrectionImageUrls({ + userId: ctx.user.id, + imageUrls: input.imageUrls, + imageDataUrls: input.imageDataUrls, }); - return { - corrections: response.choices[0]?.message?.content || "暂无建议", - }; + return enqueueTask({ + userId: ctx.user.id, + type: imageUrls.length > 0 ? "pose_correction_multimodal" : "analysis_corrections", + title: `${input.exerciseType} 动作纠正`, + message: imageUrls.length > 0 ? "多模态动作纠正任务已加入后台队列" : "动作纠正任务已加入后台队列", + payload: { + poseMetrics: input.poseMetrics, + exerciseType: input.exerciseType, + detectedIssues: input.detectedIssues, + imageUrls, + }, + }); + }), + }), + + task: router({ + list: protectedProcedure + .input(z.object({ limit: z.number().min(1).max(50).default(20) }).optional()) + .query(async ({ ctx, input }) => { + return db.listUserBackgroundTasks(ctx.user.id, input?.limit ?? 20); + }), + + get: protectedProcedure + .input(z.object({ taskId: z.string().min(1) })) + .query(async ({ ctx, input }) => { + return db.getUserBackgroundTaskById(ctx.user.id, input.taskId); + }), + + retry: protectedProcedure + .input(z.object({ taskId: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const task = await db.retryBackgroundTask(ctx.user.id, input.taskId); + return { task }; + }), + + createMediaFinalize: protectedProcedure + .input(z.object({ + sessionId: z.string().min(1), + title: z.string().min(1).max(256), + exerciseType: z.string().optional(), + })) + .mutation(async ({ ctx, input }) => { + const session = await getRemoteMediaSession(input.sessionId); + if (session.userId !== String(ctx.user.id)) { + throw new Error("Media session not found"); + } + + return enqueueTask({ + userId: ctx.user.id, + type: "media_finalize", + title: `${input.title} 归档`, + message: "录制文件归档任务已加入后台队列", + payload: input, + }); }), }), diff --git a/server/storage.test.ts b/server/storage.test.ts index 44ea28a..1a632c5 100644 --- a/server/storage.test.ts +++ b/server/storage.test.ts @@ -37,4 +37,16 @@ describe("storage fallback", () => { url: "/uploads/videos/test/sample.webm", }); }); + + it("builds externally accessible URLs for local assets", async () => { + process.env.APP_PUBLIC_BASE_URL = "https://te.hao.work/"; + const { toExternalAssetUrl } = await import("./storage"); + + expect(toExternalAssetUrl("/uploads/videos/test/sample.webm")).toBe( + "https://te.hao.work/uploads/videos/test/sample.webm" + ); + expect(toExternalAssetUrl("https://cdn.example.com/demo.jpg")).toBe( + "https://cdn.example.com/demo.jpg" + ); + }); }); diff --git a/server/storage.ts b/server/storage.ts index 73dbdd4..6534f0b 100644 --- a/server/storage.ts +++ b/server/storage.ts @@ -4,6 +4,7 @@ import { mkdir, readFile, writeFile } from "node:fs/promises"; import path from "node:path"; import { ENV } from './_core/env'; +import { toPublicUrl } from "./publicUrl"; type StorageConfig = { baseUrl: string; apiKey: string }; @@ -141,3 +142,7 @@ export async function storageGet(relKey: string): Promise<{ key: string; url: st url: await buildDownloadUrl(baseUrl, key, apiKey), }; } + +export function toExternalAssetUrl(pathOrUrl: string) { + return toPublicUrl(pathOrUrl); +} diff --git a/server/taskWorker.ts b/server/taskWorker.ts new file mode 100644 index 0000000..b1e6928 --- /dev/null +++ b/server/taskWorker.ts @@ -0,0 +1,470 @@ +import { nanoid } from "nanoid"; +import { ENV } from "./_core/env"; +import { invokeLLM, type Message } from "./_core/llm"; +import * as db from "./db"; +import { getRemoteMediaSession } from "./mediaService"; +import { + buildAdjustedTrainingPlanPrompt, + buildMultimodalCorrectionPrompt, + buildTextCorrectionPrompt, + buildTrainingPlanPrompt, + multimodalCorrectionSchema, + renderMultimodalCorrectionMarkdown, +} from "./prompts"; +import { toPublicUrl } from "./publicUrl"; +import { storagePut } from "./storage"; +import { + normalizeAdjustedPlanResponse, + normalizeTrainingPlanResponse, +} from "./trainingPlan"; + +type TaskRow = Awaited>; + +type StructuredParams = { + model?: string; + baseMessages: Array<{ role: "system" | "user"; content: string | Message["content"] }>; + responseFormat: { + type: "json_schema"; + json_schema: { + name: string; + strict: true; + schema: Record; + }; + }; + parse: (content: unknown) => T; +}; + +async function invokeStructured(params: StructuredParams) { + let lastError: unknown; + + for (let attempt = 0; attempt < 3; attempt++) { + const retryHint = + attempt === 0 || !(lastError instanceof Error) + ? [] + : [{ + role: "user" as const, + content: + `上一次输出无法被系统解析,错误是:${lastError.message}。` + + "请只返回合法完整的 JSON 对象,不要附加 Markdown 或说明。", + }]; + + const response = await invokeLLM({ + apiUrl: params.model === ENV.llmVisionModel ? ENV.llmVisionApiUrl : undefined, + apiKey: params.model === ENV.llmVisionModel ? ENV.llmVisionApiKey : undefined, + model: params.model, + messages: [...params.baseMessages, ...retryHint], + response_format: params.responseFormat, + }); + + try { + return params.parse(response.choices[0]?.message?.content); + } catch (error) { + lastError = error; + } + } + + throw lastError instanceof Error ? lastError : new Error("Failed to parse structured LLM response"); +} + +function parseDataUrl(input: string) { + const match = input.match(/^data:(.+?);base64,(.+)$/); + if (!match) { + throw new Error("Invalid image data URL"); + } + return { + contentType: match[1], + buffer: Buffer.from(match[2], "base64"), + }; +} + +async function persistInlineImages(userId: number, imageDataUrls: string[]) { + const persistedUrls: string[] = []; + for (let index = 0; index < imageDataUrls.length; index++) { + const { contentType, buffer } = parseDataUrl(imageDataUrls[index]); + const extension = contentType.includes("png") ? "png" : "jpg"; + const key = `analysis-images/${userId}/${nanoid()}.${extension}`; + const uploaded = await storagePut(key, buffer, contentType); + persistedUrls.push(toPublicUrl(uploaded.url)); + } + return persistedUrls; +} + +export async function prepareCorrectionImageUrls(input: { + userId: number; + imageUrls?: string[]; + imageDataUrls?: string[]; +}) { + const directUrls = (input.imageUrls ?? []).map((item) => toPublicUrl(item)); + const uploadedUrls = input.imageDataUrls?.length + ? await persistInlineImages(input.userId, input.imageDataUrls) + : []; + return [...directUrls, ...uploadedUrls]; +} + +async function runTrainingPlanGenerateTask(task: NonNullable) { + const payload = task.payload as { + skillLevel: "beginner" | "intermediate" | "advanced"; + durationDays: number; + focusAreas?: string[]; + }; + const analyses = await db.getUserAnalyses(task.userId); + const recentScores = analyses.slice(0, 5).map((analysis) => ({ + score: analysis.overallScore ?? null, + issues: analysis.detectedIssues, + exerciseType: analysis.exerciseType ?? null, + shotCount: analysis.shotCount ?? null, + strokeConsistency: analysis.strokeConsistency ?? null, + footworkScore: analysis.footworkScore ?? null, + })); + + const parsed = await invokeStructured({ + baseMessages: [ + { role: "system", content: "你是网球训练计划生成器。返回严格的 JSON 格式。" }, + { + role: "user", + content: buildTrainingPlanPrompt({ + ...payload, + recentScores, + }), + }, + ], + responseFormat: { + type: "json_schema", + json_schema: { + name: "training_plan", + strict: true, + schema: { + type: "object", + properties: { + title: { type: "string" }, + exercises: { + type: "array", + items: { + type: "object", + properties: { + day: { type: "number" }, + name: { type: "string" }, + category: { type: "string" }, + duration: { type: "number" }, + description: { type: "string" }, + tips: { type: "string" }, + sets: { type: "number" }, + reps: { type: "number" }, + }, + required: ["day", "name", "category", "duration", "description", "tips", "sets", "reps"], + additionalProperties: false, + }, + }, + }, + required: ["title", "exercises"], + additionalProperties: false, + }, + }, + }, + parse: (content) => normalizeTrainingPlanResponse({ + content, + fallbackTitle: `${payload.durationDays}天训练计划`, + }), + }); + + const planId = await db.createTrainingPlan({ + userId: task.userId, + title: parsed.title, + skillLevel: payload.skillLevel, + durationDays: payload.durationDays, + exercises: parsed.exercises, + isActive: 1, + version: 1, + }); + + return { + kind: "training_plan_generate" as const, + planId, + plan: parsed, + }; +} + +async function runTrainingPlanAdjustTask(task: NonNullable) { + const payload = task.payload as { planId: number }; + const analyses = await db.getUserAnalyses(task.userId); + const recentAnalyses = analyses.slice(0, 5); + const currentPlan = (await db.getUserTrainingPlans(task.userId)).find((plan) => plan.id === payload.planId); + + if (!currentPlan) { + throw new Error("Plan not found"); + } + + const parsed = await invokeStructured({ + baseMessages: [ + { role: "system", content: "你是网球训练计划调整器。返回严格的 JSON 格式。" }, + { + role: "user", + content: buildAdjustedTrainingPlanPrompt({ + currentExercises: currentPlan.exercises, + recentAnalyses: recentAnalyses.map((analysis) => ({ + score: analysis.overallScore ?? null, + issues: analysis.detectedIssues, + corrections: analysis.corrections, + shotCount: analysis.shotCount ?? null, + strokeConsistency: analysis.strokeConsistency ?? null, + footworkScore: analysis.footworkScore ?? null, + fluidityScore: analysis.fluidityScore ?? null, + })), + }), + }, + ], + responseFormat: { + type: "json_schema", + json_schema: { + name: "adjusted_plan", + strict: true, + schema: { + type: "object", + properties: { + title: { type: "string" }, + adjustmentNotes: { type: "string" }, + exercises: { + type: "array", + items: { + type: "object", + properties: { + day: { type: "number" }, + name: { type: "string" }, + category: { type: "string" }, + duration: { type: "number" }, + description: { type: "string" }, + tips: { type: "string" }, + sets: { type: "number" }, + reps: { type: "number" }, + }, + required: ["day", "name", "category", "duration", "description", "tips", "sets", "reps"], + additionalProperties: false, + }, + }, + }, + required: ["title", "adjustmentNotes", "exercises"], + additionalProperties: false, + }, + }, + }, + parse: (content) => normalizeAdjustedPlanResponse({ + content, + fallbackTitle: currentPlan.title, + }), + }); + + await db.updateTrainingPlan(payload.planId, { + exercises: parsed.exercises, + adjustmentNotes: parsed.adjustmentNotes, + version: (currentPlan.version || 1) + 1, + }); + + return { + kind: "training_plan_adjust" as const, + planId: payload.planId, + plan: parsed, + adjustmentNotes: parsed.adjustmentNotes, + }; +} + +async function runTextCorrectionTask(task: NonNullable) { + const payload = task.payload as { + exerciseType: string; + poseMetrics: unknown; + detectedIssues: unknown; + }; + return createTextCorrectionResult(payload); +} + +async function createTextCorrectionResult(payload: { + exerciseType: string; + poseMetrics: unknown; + detectedIssues: unknown; +}) { + const response = await invokeLLM({ + messages: [ + { + role: "system", + content: "你是一位专业网球技术教练。输出中文 Markdown,内容具体、克制、可执行。", + }, + { + role: "user", + content: buildTextCorrectionPrompt(payload), + }, + ], + }); + + return { + kind: "analysis_corrections" as const, + corrections: response.choices[0]?.message?.content || "暂无建议", + }; +} + +async function runMultimodalCorrectionTask(task: NonNullable) { + const payload = task.payload as { + exerciseType: string; + poseMetrics: unknown; + detectedIssues: unknown; + imageUrls: string[]; + }; + try { + const report = await invokeStructured({ + model: ENV.llmVisionModel, + baseMessages: [ + { role: "system", content: "你是专业网球教练。请基于图片和结构化姿态指标输出严格 JSON。" }, + { + role: "user", + content: [ + { type: "text", text: buildMultimodalCorrectionPrompt({ + exerciseType: payload.exerciseType, + poseMetrics: payload.poseMetrics, + detectedIssues: payload.detectedIssues, + imageCount: payload.imageUrls.length, + }) }, + ...payload.imageUrls.map((url) => ({ + type: "image_url" as const, + image_url: { + url, + detail: "high" as const, + }, + })), + ], + }, + ], + responseFormat: { + type: "json_schema", + json_schema: { + name: "pose_correction_multimodal", + strict: true, + schema: multimodalCorrectionSchema, + }, + }, + parse: (content) => { + if (typeof content === "string") { + return JSON.parse(content); + } + return content as Record; + }, + }); + + return { + kind: "pose_correction_multimodal" as const, + imageUrls: payload.imageUrls, + report, + corrections: renderMultimodalCorrectionMarkdown(report as Parameters[0]), + visionStatus: "ok" as const, + }; + } catch (error) { + const fallback = await createTextCorrectionResult(payload); + return { + kind: "pose_correction_multimodal" as const, + imageUrls: payload.imageUrls, + report: null, + corrections: fallback.corrections, + visionStatus: "fallback" as const, + warning: error instanceof Error ? error.message : "Vision model unavailable", + }; + } +} + +async function runMediaFinalizeTask(task: NonNullable) { + const payload = task.payload as { + sessionId: string; + title: string; + exerciseType?: string; + }; + const session = await getRemoteMediaSession(payload.sessionId); + + if (session.userId !== String(task.userId)) { + throw new Error("Media session does not belong to the task user"); + } + + if (session.archiveStatus === "queued") { + await db.rescheduleBackgroundTask(task.id, { + progress: 45, + message: "录制文件已入队,等待归档", + delayMs: 4_000, + }); + return null; + } + + if (session.archiveStatus === "processing") { + await db.rescheduleBackgroundTask(task.id, { + progress: 78, + message: "录制文件正在整理与转码", + delayMs: 4_000, + }); + return null; + } + + if (session.archiveStatus === "failed") { + throw new Error(session.lastError || "Media archive failed"); + } + + if (!session.playback.ready) { + await db.rescheduleBackgroundTask(task.id, { + progress: 70, + message: "等待回放文件就绪", + delayMs: 4_000, + }); + return null; + } + + const preferredUrl = session.playback.mp4Url || session.playback.webmUrl; + const format = session.playback.mp4Url ? "mp4" : "webm"; + if (!preferredUrl) { + throw new Error("Media session did not expose a playback URL"); + } + + const fileKey = `media/sessions/${session.id}/recording.${format}`; + const existing = await db.getVideoByFileKey(task.userId, fileKey); + if (existing) { + return { + kind: "media_finalize" as const, + sessionId: session.id, + videoId: existing.id, + url: existing.url, + fileKey, + format, + }; + } + + const publicUrl = toPublicUrl(preferredUrl); + const videoId = await db.createVideo({ + userId: task.userId, + title: payload.title || session.title, + fileKey, + url: publicUrl, + format, + fileSize: format === "mp4" ? (session.playback.mp4Size ?? null) : (session.playback.webmSize ?? null), + duration: null, + exerciseType: payload.exerciseType || "recording", + analysisStatus: "completed", + }); + + return { + kind: "media_finalize" as const, + sessionId: session.id, + videoId, + url: publicUrl, + fileKey, + format, + }; +} + +export async function processBackgroundTask(task: NonNullable) { + switch (task.type) { + case "training_plan_generate": + return runTrainingPlanGenerateTask(task); + case "training_plan_adjust": + return runTrainingPlanAdjustTask(task); + case "analysis_corrections": + return runTextCorrectionTask(task); + case "pose_correction_multimodal": + return runMultimodalCorrectionTask(task); + case "media_finalize": + return runMediaFinalizeTask(task); + default: + throw new Error(`Unsupported task type: ${String(task.type)}`); + } +} diff --git a/server/worker.ts b/server/worker.ts new file mode 100644 index 0000000..5a9affa --- /dev/null +++ b/server/worker.ts @@ -0,0 +1,47 @@ +import "dotenv/config"; +import { ENV } from "./_core/env"; +import * as db from "./db"; +import { processBackgroundTask } from "./taskWorker"; + +const workerId = `app-worker-${process.pid}`; + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function workOnce() { + await db.requeueStaleBackgroundTasks(new Date(Date.now() - ENV.backgroundTaskStaleMs)); + + const task = await db.claimNextBackgroundTask(workerId); + if (!task) { + return false; + } + + try { + const result = await processBackgroundTask(task); + if (result !== null) { + await db.completeBackgroundTask(task.id, result, "任务执行完成"); + } + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown background task error"; + await db.failBackgroundTask(task.id, message); + console.error(`[worker] task ${task.id} failed:`, error); + } + + return true; +} + +async function main() { + console.log(`[worker] ${workerId} started`); + for (;;) { + const hasWorked = await workOnce(); + if (!hasWorked) { + await sleep(ENV.backgroundTaskPollMs); + } + } +} + +main().catch((error) => { + console.error("[worker] fatal error", error); + process.exit(1); +}); diff --git a/tests/e2e/helpers/mockApp.ts b/tests/e2e/helpers/mockApp.ts index 2ef4357..3b42525 100644 --- a/tests/e2e/helpers/mockApp.ts +++ b/tests/e2e/helpers/mockApp.ts @@ -59,6 +59,7 @@ type MockAppState = { user: MockUser; videos: any[]; analyses: any[]; + tasks: any[]; activePlan: { id: number; title: string; @@ -79,6 +80,7 @@ type MockAppState = { } | null; mediaSession: MockMediaSession | null; nextVideoId: number; + nextTaskId: number; authMeNullResponsesAfterLogin: number; }; @@ -159,6 +161,32 @@ function buildMediaSession(user: MockUser, title: string): MockMediaSession { }; } +function createTask(state: MockAppState, input: { + type: string; + title: string; + status?: string; + progress?: number; + message?: string; + result?: any; + error?: string | null; +}) { + const task = { + id: `task-${state.nextTaskId++}`, + userId: state.user.id, + type: input.type, + status: input.status ?? "succeeded", + title: input.title, + message: input.message ?? "任务执行完成", + progress: input.progress ?? 100, + result: input.result ?? null, + error: input.error ?? null, + createdAt: nowIso(), + updatedAt: nowIso(), + }; + state.tasks = [task, ...state.tasks]; + return task; +} + async function fulfillJson(route: Route, body: unknown) { await route.fulfill({ status: 200, @@ -218,11 +246,112 @@ async function handleTrpc(route: Route, state: MockAppState) { }, ], }; - return trpcResult({ planId: state.activePlan.id, plan: state.activePlan }); + return trpcResult({ + taskId: createTask(state, { + type: "training_plan_generate", + title: "7天训练计划生成", + result: { + kind: "training_plan_generate", + planId: state.activePlan.id, + plan: state.activePlan, + }, + }).id, + }); + case "plan.adjust": + return trpcResult({ + taskId: createTask(state, { + type: "training_plan_adjust", + title: "训练计划调整", + result: { + kind: "training_plan_adjust", + adjustmentNotes: "已根据最近分析结果调整训练重点。", + }, + }).id, + }); case "video.list": return trpcResult(state.videos); case "analysis.list": return trpcResult(state.analyses); + case "task.list": + return trpcResult(state.tasks); + case "task.get": { + const rawInput = url.searchParams.get("input"); + const parsedInput = rawInput ? JSON.parse(rawInput) : {}; + const taskId = parsedInput.json?.taskId || parsedInput[0]?.json?.taskId; + return trpcResult(state.tasks.find((task) => task.id === taskId) || null); + } + case "task.retry": { + const rawInput = url.searchParams.get("input"); + const parsedInput = rawInput ? JSON.parse(rawInput) : {}; + const taskId = parsedInput.json?.taskId || parsedInput[0]?.json?.taskId; + const task = state.tasks.find((item) => item.id === taskId); + if (task) { + task.status = "succeeded"; + task.progress = 100; + task.error = null; + task.message = "任务执行完成"; + } + return trpcResult({ task }); + } + case "task.createMediaFinalize": { + if (state.mediaSession) { + state.mediaSession.status = "archived"; + state.mediaSession.archiveStatus = "completed"; + state.mediaSession.playback = { + ready: true, + webmUrl: "/media/assets/sessions/session-e2e/recording.webm", + mp4Url: "/media/assets/sessions/session-e2e/recording.mp4", + webmSize: 2_400_000, + mp4Size: 1_800_000, + previewUrl: "/media/assets/sessions/session-e2e/recording.webm", + }; + state.videos = [ + { + id: state.nextVideoId++, + title: state.mediaSession.title, + url: state.mediaSession.playback.webmUrl, + format: "webm", + fileSize: state.mediaSession.playback.webmSize, + exerciseType: "recording", + analysisStatus: "completed", + createdAt: nowIso(), + }, + ...state.videos, + ]; + } + return trpcResult({ + taskId: createTask(state, { + type: "media_finalize", + title: "录制归档", + result: { + kind: "media_finalize", + sessionId: state.mediaSession?.id, + videoId: state.videos[0]?.id, + url: state.videos[0]?.url, + }, + }).id, + }); + } + case "analysis.getCorrections": + return trpcResult({ + taskId: createTask(state, { + type: "pose_correction_multimodal", + title: "动作纠正", + result: { + corrections: "## 动作概览\n整体节奏稳定,建议继续优化击球点前置。", + report: { + priorityFixes: [ + { + title: "击球点前置", + why: "击球点略靠后会影响挥拍连贯性。", + howToPractice: "每组 8 次影子挥拍,刻意在身体前侧完成触球动作。", + successMetric: "连续 3 组都能稳定在身体前侧完成挥拍。", + }, + ], + }, + }, + }).id, + }); case "video.registerExternal": if (state.mediaSession?.playback.webmUrl || state.mediaSession?.playback.mp4Url) { state.videos = [ @@ -366,9 +495,11 @@ export async function installAppMocks( createdAt: nowIso(), }, ], + tasks: [], activePlan: null, mediaSession: null, nextVideoId: 100, + nextTaskId: 1, authMeNullResponsesAfterLogin: options?.authMeNullResponsesAfterLogin ?? 0, };