feat: async task pipeline for media and llm workflows
这个提交包含在:
168
server/db.ts
168
server/db.ts
@@ -1,4 +1,4 @@
|
||||
import { eq, desc, and, sql } from "drizzle-orm";
|
||||
import { eq, desc, and, asc, lte, sql } from "drizzle-orm";
|
||||
import { drizzle } from "drizzle-orm/mysql2";
|
||||
import {
|
||||
InsertUser, users,
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
tutorialProgress, InsertTutorialProgress,
|
||||
trainingReminders, InsertTrainingReminder,
|
||||
notificationLog, InsertNotificationLog,
|
||||
backgroundTasks, InsertBackgroundTask,
|
||||
} from "../drizzle/schema";
|
||||
import { ENV } from './_core/env';
|
||||
|
||||
@@ -179,6 +180,15 @@ export async function getVideoById(videoId: number) {
|
||||
return result.length > 0 ? result[0] : undefined;
|
||||
}
|
||||
|
||||
export async function getVideoByFileKey(userId: number, fileKey: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return undefined;
|
||||
const result = await db.select().from(trainingVideos)
|
||||
.where(and(eq(trainingVideos.userId, userId), eq(trainingVideos.fileKey, fileKey)))
|
||||
.limit(1);
|
||||
return result.length > 0 ? result[0] : undefined;
|
||||
}
|
||||
|
||||
export async function updateVideoStatus(videoId: number, status: "pending" | "analyzing" | "completed" | "failed") {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
@@ -660,6 +670,162 @@ export async function getUnreadNotificationCount(userId: number) {
|
||||
return result[0]?.count || 0;
|
||||
}
|
||||
|
||||
// ===== BACKGROUND TASK OPERATIONS =====
|
||||
|
||||
export async function createBackgroundTask(task: InsertBackgroundTask) {
|
||||
const db = await getDb();
|
||||
if (!db) throw new Error("Database not available");
|
||||
await db.insert(backgroundTasks).values(task);
|
||||
return task.id;
|
||||
}
|
||||
|
||||
export async function listUserBackgroundTasks(userId: number, limit = 20) {
|
||||
const db = await getDb();
|
||||
if (!db) return [];
|
||||
return db.select().from(backgroundTasks)
|
||||
.where(eq(backgroundTasks.userId, userId))
|
||||
.orderBy(desc(backgroundTasks.createdAt))
|
||||
.limit(limit);
|
||||
}
|
||||
|
||||
export async function getBackgroundTaskById(taskId: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return undefined;
|
||||
const result = await db.select().from(backgroundTasks)
|
||||
.where(eq(backgroundTasks.id, taskId))
|
||||
.limit(1);
|
||||
return result[0];
|
||||
}
|
||||
|
||||
export async function getUserBackgroundTaskById(userId: number, taskId: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return undefined;
|
||||
const result = await db.select().from(backgroundTasks)
|
||||
.where(and(eq(backgroundTasks.id, taskId), eq(backgroundTasks.userId, userId)))
|
||||
.limit(1);
|
||||
return result[0];
|
||||
}
|
||||
|
||||
export async function claimNextBackgroundTask(workerId: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return null;
|
||||
|
||||
const now = new Date();
|
||||
const [nextTask] = await db.select().from(backgroundTasks)
|
||||
.where(and(eq(backgroundTasks.status, "queued"), lte(backgroundTasks.runAfter, now)))
|
||||
.orderBy(asc(backgroundTasks.runAfter), asc(backgroundTasks.createdAt))
|
||||
.limit(1);
|
||||
|
||||
if (!nextTask) {
|
||||
return null;
|
||||
}
|
||||
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "running",
|
||||
workerId,
|
||||
attempts: sql`${backgroundTasks.attempts} + 1`,
|
||||
lockedAt: now,
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
}).where(eq(backgroundTasks.id, nextTask.id));
|
||||
|
||||
return getBackgroundTaskById(nextTask.id);
|
||||
}
|
||||
|
||||
export async function heartbeatBackgroundTask(taskId: string, workerId: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
workerId,
|
||||
lockedAt: new Date(),
|
||||
}).where(eq(backgroundTasks.id, taskId));
|
||||
}
|
||||
|
||||
export async function updateBackgroundTask(taskId: string, data: Partial<InsertBackgroundTask>) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set(data).where(eq(backgroundTasks.id, taskId));
|
||||
}
|
||||
|
||||
export async function completeBackgroundTask(taskId: string, result: unknown, message?: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "succeeded",
|
||||
progress: 100,
|
||||
message: message ?? "已完成",
|
||||
result,
|
||||
error: null,
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
completedAt: new Date(),
|
||||
}).where(eq(backgroundTasks.id, taskId));
|
||||
}
|
||||
|
||||
export async function failBackgroundTask(taskId: string, error: string) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "failed",
|
||||
error,
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
completedAt: new Date(),
|
||||
}).where(eq(backgroundTasks.id, taskId));
|
||||
}
|
||||
|
||||
export async function rescheduleBackgroundTask(taskId: string, params: {
|
||||
progress?: number;
|
||||
message?: string;
|
||||
error?: string | null;
|
||||
delayMs?: number;
|
||||
}) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "queued",
|
||||
progress: params.progress,
|
||||
message: params.message,
|
||||
error: params.error ?? null,
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
runAfter: new Date(Date.now() + (params.delayMs ?? 0)),
|
||||
}).where(eq(backgroundTasks.id, taskId));
|
||||
}
|
||||
|
||||
export async function retryBackgroundTask(userId: number, taskId: string) {
|
||||
const db = await getDb();
|
||||
if (!db) throw new Error("Database not available");
|
||||
const task = await getUserBackgroundTaskById(userId, taskId);
|
||||
if (!task) {
|
||||
throw new Error("Task not found");
|
||||
}
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "queued",
|
||||
progress: 0,
|
||||
message: "任务已重新排队",
|
||||
error: null,
|
||||
result: null,
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
completedAt: null,
|
||||
runAfter: new Date(),
|
||||
}).where(eq(backgroundTasks.id, taskId));
|
||||
return getBackgroundTaskById(taskId);
|
||||
}
|
||||
|
||||
export async function requeueStaleBackgroundTasks(staleBefore: Date) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "queued",
|
||||
message: "检测到任务中断,已重新排队",
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
runAfter: new Date(),
|
||||
}).where(and(eq(backgroundTasks.status, "running"), lte(backgroundTasks.lockedAt, staleBefore)));
|
||||
}
|
||||
|
||||
// ===== STATS HELPERS =====
|
||||
|
||||
export async function getUserStats(userId: number) {
|
||||
|
||||
在新工单中引用
屏蔽一个用户