Add market watch and match hub workflows

这个提交包含在:
cryptocommuniums-afk
2026-04-07 11:00:03 +08:00
父节点 495da60212
当前提交 32ffad1545
修改 39 个文件,包含 6974 行新增330 行删除

查看文件

@@ -2,7 +2,18 @@ import { nanoid } from "nanoid";
import { ENV } from "./_core/env";
import { invokeLLM, type Message } from "./_core/llm";
import * as db from "./db";
import * as matchStore from "./matchStore";
import { getRemoteMediaSession } from "./mediaService";
import {
applyComparablePriceBenchmark,
buildMarketSearchQuery,
enrichRacketListing,
formatMarketPushText,
listingMatchesWatchRule,
loadMarketConfig,
MARKET_SOURCES,
searchMarketSource,
} from "./market";
import {
buildAdjustedTrainingPlanPrompt,
buildMultimodalCorrectionPrompt,
@@ -562,6 +573,346 @@ async function runNtrpRefreshAllTask(task: NonNullable<TaskRow>) {
};
}
async function persistMarketListing(rawListing: Parameters<typeof enrichRacketListing>[0]) {
const enriched = enrichRacketListing(rawListing);
const saved = await db.upsertRacketListing(enriched);
if (!saved) {
throw new Error(`Failed to persist market listing: ${rawListing.source}:${rawListing.sourceListingId}`);
}
const comparable = await db.listRecentComparableRacketListings({
brand: saved.brand,
model: saved.model,
excludeId: saved.id,
limit: 8,
});
const benchmarked = applyComparablePriceBenchmark({
...enriched,
brand: saved.brand,
model: saved.model,
series: saved.series,
category: saved.category,
weightGram: saved.weightGram,
conditionLevel: saved.conditionLevel,
gradeLevel: saved.gradeLevel,
gradeReason: saved.gradeReason,
isLowPriceCandidate: saved.isLowPriceCandidate,
}, comparable.map((item) => item.price));
if (
benchmarked.isLowPriceCandidate !== saved.isLowPriceCandidate ||
benchmarked.gradeReason !== saved.gradeReason
) {
await db.updateRacketListing(saved.id, {
isLowPriceCandidate: benchmarked.isLowPriceCandidate,
gradeReason: benchmarked.gradeReason,
});
return (await db.getRacketListingById(saved.id)) ?? saved;
}
return saved;
}
async function queueMarketPushTask(userId: number, hitId: number, title: string) {
const taskId = nanoid();
await db.createBackgroundTask({
id: taskId,
userId,
type: "market_push_delivery",
title: `低价推送 · ${title}`.slice(0, 256),
message: "低价命中已加入飞书推送队列",
payload: { hitId },
progress: 0,
maxAttempts: 3,
});
return taskId;
}
async function recordMarketWatchHit(params: {
rule: Awaited<ReturnType<typeof db.listActiveRacketWatchRules>>[number];
listing: Awaited<ReturnType<typeof db.getRacketListingById>>;
repushDelta: number;
}) {
if (!params.listing) {
throw new Error("Listing is required to create a watch hit");
}
const now = new Date();
const existing = await db.getRacketWatchHitByRuleAndListing(params.rule.id, params.listing.id);
const pushEnabled = params.rule.pushEnabled === 1;
if (!existing) {
const created = await db.createRacketWatchHit({
watchRuleId: params.rule.id,
userId: params.rule.userId,
listingId: params.listing.id,
matchedPrice: params.listing.price,
status: pushEnabled ? "push_queued" : "suppressed",
firstMatchedAt: now,
lastMatchedAt: now,
lastPushPrice: null,
pushedAt: null,
pushCount: 0,
});
return {
hit: created,
shouldQueuePush: pushEnabled,
};
}
const lastPushPrice = existing.lastPushPrice ?? null;
const shouldQueuePush =
pushEnabled &&
existing.status !== "push_queued" &&
(
existing.pushCount === 0 ||
lastPushPrice == null ||
params.listing.price <= (lastPushPrice - params.repushDelta)
);
await db.updateRacketWatchHit(existing.id, {
matchedPrice: params.listing.price,
lastMatchedAt: now,
status: shouldQueuePush ? "push_queued" : (pushEnabled ? existing.status : "suppressed"),
});
const hit = await db.getRacketWatchHitByRuleAndListing(params.rule.id, params.listing.id);
return {
hit,
shouldQueuePush,
};
}
async function runMarketWatchRefreshTask(task: NonNullable<TaskRow>) {
const payload = task.payload as {
scope?: "user" | "all_users";
ruleIds?: number[];
sources?: Array<(typeof MARKET_SOURCES)[number]>;
trigger?: string;
};
const config = await loadMarketConfig();
const allowedSources = (payload.sources?.length
? payload.sources.filter((item): item is (typeof MARKET_SOURCES)[number] => MARKET_SOURCES.includes(item))
: [...MARKET_SOURCES]);
const rules = payload.scope === "all_users"
? await db.listActiveRacketWatchRules({ ruleIds: payload.ruleIds })
: await db.listActiveRacketWatchRules({ userId: task.userId, ruleIds: payload.ruleIds });
if (rules.length === 0) {
return {
kind: "market_watch_refresh" as const,
trigger: payload.trigger ?? "manual",
processedRules: 0,
listingsSaved: 0,
matchedHits: 0,
queuedPushes: 0,
sourceReports: [],
};
}
const sourceReports: Array<{
ruleId: number;
ruleTitle: string;
source: string;
ok: boolean;
blocked: boolean;
message: string;
listings: number;
}> = [];
let listingsSaved = 0;
let matchedHits = 0;
let queuedPushes = 0;
for (const rule of rules) {
const query = buildMarketSearchQuery(rule);
let latestMatchedAt: Date | undefined;
for (const source of allowedSources) {
const result = await searchMarketSource(source, query, config);
sourceReports.push({
ruleId: rule.id,
ruleTitle: rule.title,
source,
ok: result.ok,
blocked: result.blocked,
message: result.message,
listings: result.listings.length,
});
for (const rawListing of result.listings) {
const savedListing = await persistMarketListing(rawListing);
listingsSaved += 1;
if (!listingMatchesWatchRule(savedListing, rule)) {
continue;
}
latestMatchedAt = new Date();
matchedHits += 1;
const { hit, shouldQueuePush } = await recordMarketWatchHit({
rule,
listing: savedListing,
repushDelta: config.repushDelta,
});
if (hit && shouldQueuePush) {
await queueMarketPushTask(rule.userId, hit.id, rule.title);
queuedPushes += 1;
}
}
}
await db.updateRacketWatchRule(rule.userId, rule.id, {
lastCheckedAt: new Date(),
lastMatchedAt: latestMatchedAt,
});
}
return {
kind: "market_watch_refresh" as const,
trigger: payload.trigger ?? "manual",
processedRules: rules.length,
listingsSaved,
matchedHits,
queuedPushes,
sourceReports,
};
}
async function runMarketSourceSyncTask(task: NonNullable<TaskRow>) {
const payload = task.payload as {
source: (typeof MARKET_SOURCES)[number];
query: string;
};
const config = await loadMarketConfig();
const result = await searchMarketSource(payload.source, payload.query, config);
let savedCount = 0;
for (const rawListing of result.listings) {
await persistMarketListing(rawListing);
savedCount += 1;
}
return {
kind: "market_source_sync" as const,
source: payload.source,
query: payload.query,
ok: result.ok,
blocked: result.blocked,
message: result.message,
listingsSaved: savedCount,
};
}
async function runMarketPushDeliveryTask(task: NonNullable<TaskRow>) {
const payload = task.payload as { hitId: number };
const hit = await db.getRacketWatchHitDeliveryPayload(payload.hitId);
if (!hit) {
throw new Error("Market watch hit not found");
}
const config = await loadMarketConfig();
if (!config.defaultFeishuWebhook.trim()) {
throw new Error("Market Feishu webhook is not configured");
}
const text = formatMarketPushText({
ruleTitle: hit.ruleTitle,
source: hit.listingSource,
title: hit.listingTitle,
price: hit.matchedPrice,
targetPrice: hit.ruleTargetPrice,
brand: hit.listingBrand,
model: hit.listingModel,
category: hit.listingCategory,
weightGram: hit.listingWeightGram,
gradeLevel: hit.listingGradeLevel,
gradeReason: hit.listingGradeReason,
listingUrl: hit.listingUrl,
fetchedAt: hit.listingFetchedAt,
});
const response = await fetch(config.defaultFeishuWebhook, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({
msg_type: "text",
content: {
text,
},
}),
});
if (!response.ok) {
const detail = await response.text().catch(() => "");
throw new Error(`Feishu webhook failed (${response.status} ${response.statusText})${detail ? `: ${detail}` : ""}`);
}
await db.updateRacketWatchHit(hit.id, {
status: "pushed",
lastPushPrice: hit.matchedPrice,
pushedAt: new Date(),
pushCount: (hit.pushCount ?? 0) + 1,
});
await db.createNotification({
userId: hit.userId,
notificationType: "racket_price_alert",
title: `低价命中 · ${hit.ruleTitle}`.slice(0, 256),
message: text,
isRead: 0,
});
return {
kind: "market_push_delivery" as const,
hitId: hit.id,
delivered: true,
destination: "feishu_webhook",
};
}
async function runMatchScoreSuggestTask(task: NonNullable<TaskRow>) {
const payload = task.payload as { matchId: number };
const session = await matchStore.getMatchSessionById(payload.matchId);
if (!session) {
throw new Error("Match session not found");
}
const suggestion = await matchStore.generateSuggestedMatchState(payload.matchId);
return {
kind: "match_score_suggest" as const,
matchId: payload.matchId,
workflowStatus: "review_pending",
score: suggestion.score,
metrics: suggestion.metrics,
eventCount: suggestion.eventCount,
sourceCount: suggestion.sourceCount,
};
}
async function runMatchFinalizeTask(task: NonNullable<TaskRow>) {
const payload = task.payload as {
matchId: number;
finalizedByUserId?: number;
};
const finalized = await matchStore.finalizeMatchSettlement(
payload.matchId,
payload.finalizedByUserId ?? task.userId,
);
return {
kind: "match_finalize" as const,
matchId: payload.matchId,
workflowStatus: finalized?.workflowStatus ?? "finalized",
finalizedAt: finalized?.finalizedAt ?? new Date(),
};
}
export async function processBackgroundTask(task: NonNullable<TaskRow>) {
switch (task.type) {
case "training_plan_generate":
@@ -578,6 +929,16 @@ export async function processBackgroundTask(task: NonNullable<TaskRow>) {
return runNtrpRefreshUserTask(task);
case "ntrp_refresh_all":
return runNtrpRefreshAllTask(task);
case "market_watch_refresh":
return runMarketWatchRefreshTask(task);
case "market_source_sync":
return runMarketSourceSyncTask(task);
case "market_push_delivery":
return runMarketPushDeliveryTask(task);
case "match_score_suggest":
return runMatchScoreSuggestTask(task);
case "match_finalize":
return runMatchFinalizeTask(task);
default:
throw new Error(`Unsupported task type: ${String(task.type)}`);
}