修改并发限制逻辑

This commit is contained in:
zhangxiaohua 2026-01-19 16:33:27 +08:00
parent 7ec72d865a
commit 61599117dc
5 changed files with 277 additions and 123 deletions

View File

@ -12,19 +12,141 @@ import { QueryTaskDto } from './dto/query-task.dto';
import { AI3DProvider, AI3D_PROVIDER } from './providers/ai-3d-provider.interface';
// 配置常量
const MAX_CONCURRENT_TASKS = 3; // 每用户最大并行任务数
const MAX_USER_TASKS = 3; // 每用户最大任务数pending + processing
const API_MAX_CONCURRENT = 3; // 混元API全局最大并发数所有用户共享
const TASK_TIMEOUT_MS = 10 * 60 * 1000; // 10分钟超时
const MAX_RETRY_COUNT = 3; // 最大重试次数
const QUEUE_CHECK_INTERVAL = 3000; // 队列检查间隔(毫秒)
@Injectable()
export class AI3DService {
private readonly logger = new Logger(AI3DService.name);
private queueCheckTimer: NodeJS.Timeout | null = null;
private isProcessingQueue = false; // 防止并发处理队列
constructor(
private prisma: PrismaService,
private ossService: OssService,
@Inject(AI3D_PROVIDER) private ai3dProvider: AI3DProvider,
) {}
) {
// 启动队列检查定时器
this.startQueueChecker();
}
/**
*
*/
private startQueueChecker() {
if (this.queueCheckTimer) return;
this.queueCheckTimer = setInterval(async () => {
await this.processQueuedTasks();
}, QUEUE_CHECK_INTERVAL);
this.logger.log('队列检查器已启动');
}
/**
* API执行的任务数
*/
private async getGlobalProcessingCount(): Promise<number> {
return this.prisma.aI3DTask.count({
where: {
status: 'processing', // 只统计已提交到API的任务
},
});
}
/**
*
*/
private async processQueuedTasks() {
// 防止并发处理
if (this.isProcessingQueue) return;
this.isProcessingQueue = true;
try {
// 检查当前全局并发数
const processingCount = await this.getGlobalProcessingCount();
const availableSlots = API_MAX_CONCURRENT - processingCount;
if (availableSlots <= 0) {
return; // 没有可用槽位
}
// 获取等待中的任务(按创建时间排序,先进先出)
const pendingTasks = await this.prisma.aI3DTask.findMany({
where: { status: 'pending' },
orderBy: { createTime: 'asc' },
take: availableSlots,
});
if (pendingTasks.length === 0) return;
this.logger.log(
`队列处理: ${pendingTasks.length} 个任务待提交,可用槽位: ${availableSlots}`,
);
// 逐个提交任务
for (const task of pendingTasks) {
// 再次检查并发数(防止并发提交)
const currentProcessing = await this.getGlobalProcessingCount();
if (currentProcessing >= API_MAX_CONCURRENT) {
this.logger.log('全局并发已满,停止提交');
break;
}
await this.submitTaskToAPI(task);
}
} catch (error) {
this.logger.error(`处理队列任务出错: ${error.message}`);
} finally {
this.isProcessingQueue = false;
}
}
/**
* API
*/
private async submitTaskToAPI(task: any) {
try {
// 构建生成选项
const options: any = {
generateType: task.generateType,
};
const externalTaskId = await this.ai3dProvider.submitTask(
task.inputType as 'text' | 'image',
task.inputContent,
options,
);
// 更新状态为处理中
await this.prisma.aI3DTask.update({
where: { id: task.id },
data: {
status: 'processing',
externalTaskId,
},
});
// 启动轮询检查任务状态
this.pollTaskStatus(task.id, externalTaskId, Date.now());
this.logger.log(`任务 ${task.id} 已提交到API外部ID: ${externalTaskId}`);
} catch (error) {
// 提交失败,标记为失败
await this.prisma.aI3DTask.update({
where: { id: task.id },
data: {
status: 'failed',
errorMessage: error.message || 'AI服务提交失败',
completeTime: new Date(),
},
});
this.logger.error(`任务 ${task.id} 提交API失败: ${error.message}`);
}
}
/**
*
@ -34,21 +156,21 @@ export class AI3DService {
tenantId: number,
dto: CreateTaskDto,
) {
// 1. 检查用户当前进行中的任务数量
const activeTaskCount = await this.prisma.aI3DTask.count({
// 1. 检查用户当前任务数量pending + processing
const userTaskCount = await this.prisma.aI3DTask.count({
where: {
userId,
status: { in: ['pending', 'processing'] },
},
});
if (activeTaskCount >= MAX_CONCURRENT_TASKS) {
if (userTaskCount >= MAX_USER_TASKS) {
throw new BadRequestException(
`您当前有 ${activeTaskCount} 个任务正在处理中,最多同时处理 ${MAX_CONCURRENT_TASKS} 个任务,请等待完成后再提交`,
`您当前有 ${userTaskCount} 个任务正在排队或处理中,最多同时 ${MAX_USER_TASKS} 个任务,请等待完成后再提交`,
);
}
// 2. 创建数据库记录
// 2. 创建数据库记录(初始状态为 pending表示排队中
const task = await this.prisma.aI3DTask.create({
data: {
userId,
@ -60,7 +182,13 @@ export class AI3DService {
},
});
// 3. 提交到 AI 服务
this.logger.log(`任务 ${task.id} 已创建,进入队列`);
// 3. 检查全局并发数,决定是立即提交还是等待队列处理
const processingCount = await this.getGlobalProcessingCount();
if (processingCount < API_MAX_CONCURRENT) {
// 有空闲槽位,立即提交
try {
// 构建生成选项
const options: any = {
@ -102,7 +230,7 @@ export class AI3DService {
options,
);
// 4. 更新状态为处理中
// 更新状态为处理中
await this.prisma.aI3DTask.update({
where: { id: task.id },
data: {
@ -111,14 +239,12 @@ export class AI3DService {
},
});
// 5. 启动轮询检查任务状态
// 启动轮询检查任务状态
this.pollTaskStatus(task.id, externalTaskId, Date.now());
this.logger.log(`任务 ${task.id} 创建成功外部ID: ${externalTaskId}`);
return this.getTask(userId, task.id);
this.logger.log(`任务 ${task.id} 已提交到API外部ID: ${externalTaskId}`);
} catch (error) {
// 提交失败,更新状态
// 提交失败,标记为失败
await this.prisma.aI3DTask.update({
where: { id: task.id },
data: {
@ -127,10 +253,17 @@ export class AI3DService {
completeTime: new Date(),
},
});
this.logger.error(`任务 ${task.id} 提交失败: ${error.message}`);
throw error;
}
} else {
// 全局并发已满,任务保持 pending 状态,等待队列调度
this.logger.log(
`全局并发已满 (${processingCount}/${API_MAX_CONCURRENT}),任务 ${task.id} 进入排队`,
);
}
return this.getTask(userId, task.id);
}
/**
@ -174,9 +307,32 @@ export class AI3DService {
throw new NotFoundException('任务不存在');
}
// 如果任务在排队中,计算队列位置
if (task.status === 'pending') {
const queuePosition = await this.getQueuePosition(task.id, task.createTime);
return {
...task,
queuePosition,
};
}
return task;
}
/**
*
*/
private async getQueuePosition(taskId: number, createTime: Date): Promise<number> {
// 统计在当前任务之前创建的、仍在排队的任务数量
const position = await this.prisma.aI3DTask.count({
where: {
status: 'pending',
createTime: { lte: createTime },
},
});
return position;
}
/**
*
*/
@ -216,64 +372,47 @@ export class AI3DService {
);
}
// 检查并发限制
const activeTaskCount = await this.prisma.aI3DTask.count({
// 检查用户任务数限制
const userTaskCount = await this.prisma.aI3DTask.count({
where: {
userId,
status: { in: ['pending', 'processing'] },
},
});
if (activeTaskCount >= MAX_CONCURRENT_TASKS) {
if (userTaskCount >= MAX_USER_TASKS) {
throw new BadRequestException(
`您当前有 ${activeTaskCount} 个任务正在处理中,请等待完成后再重试`,
`您当前有 ${userTaskCount} 个任务正在排队或处理中,请等待完成后再重试`,
);
}
// 重置任务状态
// 重置任务状态为 pending进入队列
await this.prisma.aI3DTask.update({
where: { id },
data: {
status: 'pending',
errorMessage: null,
completeTime: null,
externalTaskId: null,
retryCount: { increment: 1 },
},
});
// 重新提交任务
try {
const externalTaskId = await this.ai3dProvider.submitTask(
task.inputType as 'text' | 'image',
task.inputContent,
);
this.logger.log(`任务 ${id} 已重新加入队列,等待处理`);
await this.prisma.aI3DTask.update({
// 检查是否可以立即提交
const processingCount = await this.getGlobalProcessingCount();
if (processingCount < API_MAX_CONCURRENT) {
// 有空闲槽位,立即提交
const updatedTask = await this.prisma.aI3DTask.findUnique({
where: { id },
data: {
status: 'processing',
externalTaskId,
},
});
this.pollTaskStatus(id, externalTaskId, Date.now());
this.logger.log(`任务 ${id} 重试成功外部ID: ${externalTaskId}`);
if (updatedTask) {
await this.submitTaskToAPI(updatedTask);
}
}
return this.getTask(userId, id);
} catch (error) {
await this.prisma.aI3DTask.update({
where: { id },
data: {
status: 'failed',
errorMessage: error.message || 'AI服务提交失败',
completeTime: new Date(),
},
});
this.logger.error(`任务 ${id} 重试失败: ${error.message}`);
throw error;
}
}
/**

View File

@ -38,6 +38,8 @@ export interface AI3DTask {
retryCount: number;
createTime: string;
completeTime?: string;
// 队列位置(仅 pending 状态时返回)
queuePosition?: number;
}
/**

View File

@ -73,8 +73,10 @@
</div>
</div>
<div class="loading-info">
<div class="loading-title">AI 生成中</div>
<div class="loading-text">
<div class="loading-title">
{{ task?.status === 'pending' ? '排队中' : 'AI 生成中' }}
</div>
<div v-if="task?.status === 'pending'" class="loading-text">
<p>
队列位置:
<span class="highlight">{{ queueInfo.position }}</span>
@ -82,10 +84,13 @@
<p>
预计时间:
<span class="highlight"
>{{ queueInfo.estimatedTime }}s</span
>{{ formatEstimatedTime(queueInfo.estimatedTime) }}</span
>
</p>
</div>
<div v-else class="loading-text">
<p>正在生成3D模型请耐心等待...</p>
</div>
<div class="progress-bar">
<div class="progress-fill"></div>
</div>
@ -171,12 +176,28 @@ const selectedIndex = ref<number | null>(null)
// Polling timer
let pollingTimer: number | null = null
// Queue info (simulated)
const queueInfo = ref({
position: 1,
estimatedTime: 190,
//
const ESTIMATED_TIME_PER_TASK = 180
// Queue info
const queueInfo = computed(() => {
const position = task.value?.queuePosition || 0
// = *
const estimatedTime = position * ESTIMATED_TIME_PER_TASK
return {
position,
estimatedTime,
}
})
//
const formatEstimatedTime = (seconds: number) => {
if (seconds <= 0) return "计算中..."
if (seconds < 60) return `${seconds}`
const minutes = Math.ceil(seconds / 60)
return `${minutes}分钟`
}
// Page title
const pageTitle = computed(() => {
if (task.value?.inputType === "text") {
@ -308,14 +329,6 @@ const fetchTask = async () => {
) {
stopPolling()
}
// Update queue info (simulated)
if (taskData.status === "pending" || taskData.status === "processing") {
queueInfo.value.estimatedTime = Math.max(
10,
queueInfo.value.estimatedTime - 10
)
}
} catch (error) {
console.error("获取任务详情失败:", error)
message.error("获取任务详情失败")

View File

@ -27,7 +27,7 @@
<a-select-option value="">全部</a-select-option>
<a-select-option value="completed">已完成</a-select-option>
<a-select-option value="processing">生成中</a-select-option>
<a-select-option value="pending">等待</a-select-option>
<a-select-option value="pending">排队</a-select-option>
<a-select-option value="failed">失败</a-select-option>
<a-select-option value="timeout">超时</a-select-option>
</a-select>
@ -85,7 +85,7 @@
<span></span>
<span></span>
</div>
<span class="loading-text">生成中</span>
<span class="loading-text">{{ task.status === 'pending' ? '排队中' : '生成中' }}</span>
</div>
<div
v-else-if="task.status === 'failed' || task.status === 'timeout'"
@ -270,7 +270,7 @@ const getPreviewUrl = (task: AI3DTask) => {
//
const getStatusText = (status: string) => {
const texts: Record<string, string> = {
pending: "等待中",
pending: "排队中",
processing: "生成中",
completed: "已完成",
failed: "失败",

View File

@ -565,7 +565,7 @@
<span></span>
<span></span>
</div>
<span class="loading-text">生成中</span>
<span class="loading-text">{{ task.status === 'pending' ? '排队中' : '生成中' }}</span>
</div>
<div
v-else-if="
@ -1121,7 +1121,7 @@ const getPreviewUrl = (task: AI3DTask) => {
//
const getStatusText = (status: string) => {
const texts: Record<string, string> = {
pending: "等待中",
pending: "排队中",
processing: "生成中",
completed: "已完成",
failed: "失败",