Harden async task flows and enhance analysis tooling
这个提交包含在:
@@ -51,7 +51,13 @@ export const ENV = {
|
||||
llmMaxTokens: parseInteger(process.env.LLM_MAX_TOKENS, 32768),
|
||||
llmEnableThinking: parseBoolean(process.env.LLM_ENABLE_THINKING, false),
|
||||
llmThinkingBudget: parseInteger(process.env.LLM_THINKING_BUDGET, 128),
|
||||
llmTimeoutMs: parseInteger(process.env.LLM_TIMEOUT_MS, 45000),
|
||||
llmRetryCount: parseInteger(process.env.LLM_RETRY_COUNT, 1),
|
||||
mediaServiceUrl: process.env.MEDIA_SERVICE_URL ?? "",
|
||||
mediaFetchTimeoutMs: parseInteger(process.env.MEDIA_FETCH_TIMEOUT_MS, 12000),
|
||||
mediaFetchRetryCount: parseInteger(process.env.MEDIA_FETCH_RETRY_COUNT, 2),
|
||||
youtubeApiKey: process.env.YOUTUBE_API_KEY ?? "",
|
||||
backgroundTaskPollMs: parseInteger(process.env.BACKGROUND_TASK_POLL_MS, 3000),
|
||||
backgroundTaskStaleMs: parseInteger(process.env.BACKGROUND_TASK_STALE_MS, 300000),
|
||||
backgroundTaskHeartbeatMs: parseInteger(process.env.BACKGROUND_TASK_HEARTBEAT_MS, 5000),
|
||||
};
|
||||
|
||||
85
server/_core/fetch.ts
普通文件
85
server/_core/fetch.ts
普通文件
@@ -0,0 +1,85 @@
|
||||
type FetchRetryOptions = {
|
||||
timeoutMs: number;
|
||||
retries?: number;
|
||||
retryStatuses?: number[];
|
||||
retryMethods?: string[];
|
||||
baseDelayMs?: number;
|
||||
};
|
||||
|
||||
const DEFAULT_RETRY_STATUSES = [408, 425, 429, 502, 503, 504];
|
||||
|
||||
function sleep(ms: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function shouldRetryResponse(method: string, response: Response, options: FetchRetryOptions) {
|
||||
const allowedMethods = options.retryMethods ?? ["GET", "HEAD"];
|
||||
const retryStatuses = options.retryStatuses ?? DEFAULT_RETRY_STATUSES;
|
||||
return allowedMethods.includes(method) && retryStatuses.includes(response.status);
|
||||
}
|
||||
|
||||
function shouldRetryError(method: string, error: unknown, options: FetchRetryOptions) {
|
||||
const allowedMethods = options.retryMethods ?? ["GET", "HEAD"];
|
||||
if (!allowedMethods.includes(method)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (error instanceof Error) {
|
||||
return error.name === "AbortError" || error.name === "TimeoutError" || error.message.includes("fetch");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export async function fetchWithTimeout(input: string | URL, init: RequestInit | undefined, options: FetchRetryOptions) {
|
||||
const method = (init?.method ?? "GET").toUpperCase();
|
||||
const retries = Math.max(0, options.retries ?? 0);
|
||||
const baseDelayMs = Math.max(150, options.baseDelayMs ?? 350);
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attempt = 0; attempt <= retries; attempt += 1) {
|
||||
const controller = new AbortController();
|
||||
const upstreamSignal = init?.signal;
|
||||
let didTimeout = false;
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
didTimeout = true;
|
||||
controller.abort();
|
||||
}, options.timeoutMs);
|
||||
|
||||
const abortHandler = () => controller.abort();
|
||||
upstreamSignal?.addEventListener("abort", abortHandler, { once: true });
|
||||
|
||||
try {
|
||||
const response = await fetch(input, {
|
||||
...init,
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (attempt < retries && shouldRetryResponse(method, response, options)) {
|
||||
await response.text().catch(() => undefined);
|
||||
await sleep(baseDelayMs * (attempt + 1));
|
||||
continue;
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
if (didTimeout) {
|
||||
lastError = new Error(`Request timed out after ${options.timeoutMs}ms`);
|
||||
} else {
|
||||
lastError = error;
|
||||
}
|
||||
|
||||
if (attempt >= retries || !shouldRetryError(method, lastError, options)) {
|
||||
throw lastError instanceof Error ? lastError : new Error("Request failed");
|
||||
}
|
||||
|
||||
await sleep(baseDelayMs * (attempt + 1));
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
upstreamSignal?.removeEventListener("abort", abortHandler);
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError instanceof Error ? lastError : new Error("Request failed");
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import { ENV } from "./env";
|
||||
import { fetchWithTimeout } from "./fetch";
|
||||
|
||||
export type Role = "system" | "user" | "assistant" | "tool" | "function";
|
||||
|
||||
@@ -323,13 +324,17 @@ export async function invokeLLM(params: InvokeParams): Promise<InvokeResult> {
|
||||
payload.response_format = normalizedResponseFormat;
|
||||
}
|
||||
|
||||
const response = await fetch(resolveApiUrl(apiUrl), {
|
||||
const response = await fetchWithTimeout(resolveApiUrl(apiUrl), {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
authorization: `Bearer ${apiKey || ENV.llmApiKey}`,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
}, {
|
||||
timeoutMs: ENV.llmTimeoutMs,
|
||||
retries: ENV.llmRetryCount,
|
||||
retryMethods: ["POST"],
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
|
||||
24
server/db.ts
24
server/db.ts
@@ -1631,7 +1631,11 @@ export async function claimNextBackgroundTask(workerId: string) {
|
||||
|
||||
const now = new Date();
|
||||
const [nextTask] = await db.select().from(backgroundTasks)
|
||||
.where(and(eq(backgroundTasks.status, "queued"), lte(backgroundTasks.runAfter, now)))
|
||||
.where(and(
|
||||
eq(backgroundTasks.status, "queued"),
|
||||
lte(backgroundTasks.runAfter, now),
|
||||
sql`${backgroundTasks.attempts} < ${backgroundTasks.maxAttempts}`,
|
||||
))
|
||||
.orderBy(asc(backgroundTasks.runAfter), asc(backgroundTasks.createdAt))
|
||||
.limit(1);
|
||||
|
||||
@@ -1733,6 +1737,24 @@ export async function retryBackgroundTask(userId: number, taskId: string) {
|
||||
return getBackgroundTaskById(taskId);
|
||||
}
|
||||
|
||||
export async function failExhaustedBackgroundTasks(now: Date = new Date()) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
await db.update(backgroundTasks).set({
|
||||
status: "failed",
|
||||
progress: 100,
|
||||
message: "任务达到最大重试次数,已停止自动重试",
|
||||
error: sql`coalesce(${backgroundTasks.error}, '任务达到最大重试次数')`,
|
||||
workerId: null,
|
||||
lockedAt: null,
|
||||
completedAt: now,
|
||||
}).where(and(
|
||||
eq(backgroundTasks.status, "queued"),
|
||||
lte(backgroundTasks.runAfter, now),
|
||||
sql`${backgroundTasks.attempts} >= ${backgroundTasks.maxAttempts}`,
|
||||
));
|
||||
}
|
||||
|
||||
export async function requeueStaleBackgroundTasks(staleBefore: Date) {
|
||||
const db = await getDb();
|
||||
if (!db) return;
|
||||
|
||||
@@ -41,8 +41,16 @@ describe("getRemoteMediaSession", () => {
|
||||
const session = await getRemoteMediaSession("session-1");
|
||||
|
||||
expect(session.id).toBe("session-1");
|
||||
expect(fetchMock).toHaveBeenNthCalledWith(1, "http://127.0.0.1:8081/sessions/session-1");
|
||||
expect(fetchMock).toHaveBeenNthCalledWith(2, "http://127.0.0.1:8081/media/sessions/session-1");
|
||||
expect(fetchMock).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
"http://127.0.0.1:8081/sessions/session-1",
|
||||
expect.objectContaining({ signal: expect.any(AbortSignal) }),
|
||||
);
|
||||
expect(fetchMock).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"http://127.0.0.1:8081/media/sessions/session-1",
|
||||
expect.objectContaining({ signal: expect.any(AbortSignal) }),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the configured /media base URL directly when already present", async () => {
|
||||
@@ -68,6 +76,9 @@ describe("getRemoteMediaSession", () => {
|
||||
|
||||
expect(session.id).toBe("session-2");
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
expect(fetchMock).toHaveBeenCalledWith("http://media:8081/media/sessions/session-2");
|
||||
expect(fetchMock).toHaveBeenCalledWith(
|
||||
"http://media:8081/media/sessions/session-2",
|
||||
expect.objectContaining({ signal: expect.any(AbortSignal) }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { ENV } from "./_core/env";
|
||||
import { fetchWithTimeout } from "./_core/fetch";
|
||||
|
||||
export type RemoteMediaSession = {
|
||||
id: string;
|
||||
@@ -35,7 +36,11 @@ export async function getRemoteMediaSession(sessionId: string) {
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (const url of getMediaCandidateUrls(`/sessions/${encodeURIComponent(sessionId)}`)) {
|
||||
const response = await fetch(url);
|
||||
const response = await fetchWithTimeout(url, undefined, {
|
||||
timeoutMs: ENV.mediaFetchTimeoutMs,
|
||||
retries: ENV.mediaFetchRetryCount,
|
||||
retryMethods: ["GET"],
|
||||
});
|
||||
if (response.ok) {
|
||||
const payload = await response.json() as { session: RemoteMediaSession };
|
||||
return payload.session;
|
||||
|
||||
@@ -9,7 +9,6 @@ import { ENV } from "./_core/env";
|
||||
import { storagePut } from "./storage";
|
||||
import * as db from "./db";
|
||||
import { nanoid } from "nanoid";
|
||||
import { getRemoteMediaSession } from "./mediaService";
|
||||
import { prepareCorrectionImageUrls } from "./taskWorker";
|
||||
import { toPublicUrl } from "./publicUrl";
|
||||
import { ACTION_LABELS, refreshUserNtrp, syncAnalysisTrainingData, syncLiveTrainingData } from "./trainingAutomation";
|
||||
@@ -602,11 +601,6 @@ export const appRouter = router({
|
||||
durationMinutes: z.number().min(1).max(720).optional(),
|
||||
}))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const session = await getRemoteMediaSession(input.sessionId);
|
||||
if (session.userId !== String(ctx.user.id)) {
|
||||
throw new Error("Media session not found");
|
||||
}
|
||||
|
||||
return enqueueTask({
|
||||
userId: ctx.user.id,
|
||||
type: "media_finalize",
|
||||
|
||||
@@ -10,6 +10,7 @@ function sleep(ms: number) {
|
||||
}
|
||||
|
||||
async function workOnce() {
|
||||
await db.failExhaustedBackgroundTasks();
|
||||
await db.requeueStaleBackgroundTasks(new Date(Date.now() - ENV.backgroundTaskStaleMs));
|
||||
|
||||
const task = await db.claimNextBackgroundTask(workerId);
|
||||
@@ -17,6 +18,12 @@ async function workOnce() {
|
||||
return false;
|
||||
}
|
||||
|
||||
const heartbeatTimer = setInterval(() => {
|
||||
void db.heartbeatBackgroundTask(task.id, workerId).catch((error) => {
|
||||
console.error(`[worker] heartbeat failed for ${task.id}:`, error);
|
||||
});
|
||||
}, ENV.backgroundTaskHeartbeatMs);
|
||||
|
||||
try {
|
||||
const result = await processBackgroundTask(task);
|
||||
if (result !== null) {
|
||||
@@ -27,6 +34,8 @@ async function workOnce() {
|
||||
await db.failBackgroundTask(task.id, message);
|
||||
await db.failVisionTestRun(task.id, message);
|
||||
console.error(`[worker] task ${task.id} failed:`, error);
|
||||
} finally {
|
||||
clearInterval(heartbeatTimer);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
在新工单中引用
屏蔽一个用户