import { TaskListService } from '@/define/db/service/book/taskListService' import { BookBackTaskStatus, BookBackTaskType, TaskExecuteType } from '@/define/enum/bookEnum' import { TaskModal } from '@/define/model/task' import { SettingModal } from '@/define/model/setting' import { AsyncQueue } from '@/define/quene' import { errorMessage, successMessage } from '@/public/generalTools' import { MJHandle } from '../mj' import { SDHandle } from '../sd' import { OptionRealmService } from '@/define/db/service/optionService' import { OptionKeyName } from '@/define/enum/option' import { optionSerialization } from '../option/optionSerialization' export class TaskManager { isExecuting: boolean = false currentTaskList: TaskModal.Task[] = [] taskListService!: TaskListService eventListeners: Record = {} mjSimpleSetting!: SettingModal.MJGeneralSettings spaceTime: number = 5000 count = 0 maxWaitTask: number = 5 // 最大等待任务数 isListening = false intervalId: any // 用于存储 setInterval 的 ID mjHandle: MJHandle sdHandle: SDHandle // reverseBook: ReverseBook = new ReverseBook() // basicReverse: BasicReverse = new BasicReverse() // bookVideo: BookVideo // bookServiceBasic: BookServiceBasic // videoGlobal: VideoGlobal // sdOpt: SDOpt // comfyUIOpt: ComfyUIOpt // d3Opt: D3Opt // fluxOpt: FluxOpt constructor() { this.isExecuting = false this.currentTaskList = [] // this.basicReverse = new BasicReverse() // this.reverseBook = new ReverseBook() this.mjHandle = new MJHandle() this.sdHandle = new SDHandle() // this.sdOpt = new SDOpt() // this.comfyUIOpt = new ComfyUIOpt() // this.d3Opt = new D3Opt() // this.fluxOpt = new FluxOpt() } async InitService() { if (!this.taskListService) { this.taskListService = await TaskListService.getInstance() } await this.mjHandle.GetMJGeneralSetting() this.mjSimpleSetting = this.mjHandle.mjGeneralSetting as SettingModal.MJGeneralSettings } // 初始化事件监听方法 async InitListeners() { if (this.isListening) return // 如果已经在监听,直接返回 this.isListening = true // 标记为已开始监听 // 判断是不是有任务调度 没有的话初始化 if (!global.taskQueue) { const optionRealmService = await OptionRealmService.getInstance() const generalSettingOption = optionRealmService.GetOptionByKey( OptionKeyName.Software.GeneralSetting ) const generalSetting = optionSerialization(generalSettingOption) global.taskQueue = new AsyncQueue(global, global.am.isPro ? generalSetting.concurrency : 1) } const executeWithDynamicInterval = async () => { await this.ExecuteAutoTask() this.count++ console.log('等待时间--' + this.spaceTime, this.count) // 动态调整等待时间 clearInterval(this.intervalId) this.intervalId = setInterval(executeWithDynamicInterval, this.spaceTime) } this.intervalId = setInterval(executeWithDynamicInterval, this.spaceTime) } // 停止监听的方法 StopListeners() { this.isListening = false // 标记为停止监听 clearInterval(this.intervalId) // 清除定时器 } async ExecuteAutoTask() { await this.InitService() // 加之前先判断是不是还能执行任务 let waitTask = global.taskQueue.getWaitingQueue() if (waitTask > this.maxWaitTask) { // 最懂同时等待十个 console.log('等待中的任务太多,等待中的任务数量:', waitTask) this.spaceTime = 20000 return } // 判断MJ队列是不是存在 if (!global.mjQueue) { // MJ 队列不存在,创建 global.mjQueue = new AsyncQueue(global, global.am.isPro ? this.mjSimpleSetting.taskCount : 3) } // 开始添加 // 查任务 const tasks = this.taskListService.GetWaitTaskAndSlice( TaskExecuteType.AUTO, this.maxWaitTask - waitTask ) if (!tasks || tasks.length <= 0) { console.log('没有等待中的任务') this.spaceTime = 20000 return } this.spaceTime = 5000 //循环添加任务 for (let index = 0; index < tasks.length; index++) { const element = tasks[index] if ( element.type == BookBackTaskType.MJ_IMAGE || element.type == BookBackTaskType.MJ_REVERSE ) { // 判断任务数量是不是又修改 let taskNumber = global.mjQueue.getConcurrencyLimit() if (taskNumber != this.mjSimpleSetting.taskCount && global.am.isPro) { global.mjQueue.concurrencyLimit = this.mjSimpleSetting.taskCount // 重置并发执行的数量 } if (global.mjQueue.getWaitingQueue() > this.maxWaitTask) { console.log('MJ等待中的任务太多,等待中的任务数量:', global.mjQueue.getWaitingQueue()) this.spaceTime = 20000 return } // MJ任务 await this.AddQueue(element) continue } else { // 其他任务 // 设置并发执行的数量 await this.AddQueue(element) } // 添加完成,修改一下提交时间 // 要判断是否超时 } } //#region 添加任务到内存任务中 /** * 添加分镜计算任务 * @param task 任务信息 */ // AddGetFrameDataTask(task: TaskModal.Task): void { // let batch = DEFINE_STRING.BOOK.GET_FRAME_DATA // global.taskQueue.enqueue( // async () => { // await this.basicReverse.GetFrameData(task) // }, // `${batch}_${task.id}`, // batch // ) // } /** * 添加视频分镜任务 * @param task 任务信息 */ // AddCutVideoData(task: TaskModal.Task): void { // let batch = DEFINE_STRING.BOOK.FRAMING // global.taskQueue.enqueue( // async () => { // await this.basicReverse.CutVideoData(task) // }, // `${batch}_${task.id}`, // batch // ) // } /** * 添加切割视频任务 * @param task 任务信息 */ // AddSplitAudioData(task: TaskModal.Task): void { // let batch = DEFINE_STRING.BOOK.SPLI_TAUDIO // global.taskQueue.enqueue( // async () => { // await this.basicReverse.SplitAudioData(task) // }, // `${batch}_${task.id}`, // batch // ) // } /** * 添加抽帧的任务 * @param task 任务信息 */ // AddGetFrame(task: TaskModal.Task): void { // let batch = DEFINE_STRING.BOOK.GET_FRAME // global.taskQueue.enqueue( // async () => { // await this.basicReverse.GetFrame(task) // }, // `${batch}_${task.id}`, // batch // ) // } /** * 添加识别字幕任务 * @param task 任务信息 */ // AddExtractSubtitlesData(task: TaskModal.Task): void { // let batch = DEFINE_STRING.BOOK.GET_COPYWRITING // global.taskQueue.enqueue( // async () => { // await this.basicReverse.ExtractSubtitlesData(task) // }, // `${batch}_${task.id}`, // batch // ) // } /** * 添加单独反推任务 * @param task */ // AddSingleReversePrompt(task: TaskModal.Task): void { // let batch = task.messageName // global.taskQueue.enqueue( // async () => { // await this.reverseBook.SingleReversePrompt(task) // }, // `${batch}_${task.id}`, // batch, // `${batch}_${task.id}_${new Date().getTime()}`, // this.bookServiceBasic.SetMessageNameTaskToFail // ) // } /** * 将MJ生图生成任务添加内存任务中 * @param task */ async AddImageMJImage(task: TaskModal.Task) { // 判断是不是MJ的任务 let batch = task.messageName global.mjQueue.enqueue( async () => { await this.mjHandle.MJImagine(task) }, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`, this.taskListService.SetMessageNameTaskToFail.bind(this.taskListService) ) } // /** // * 将SD生图任务添加到内存任务中 // * @param task // */ async AddSDImage(task: TaskModal.Task) { let batch = task.messageName global.taskQueue.enqueue( async () => { await this.sdHandle.SDImageGenerate(task) }, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`, this.taskListService.SetMessageNameTaskToFail.bind(this.taskListService) ) } // /** // * 添加 flux forge 任务到内存队列中 // * @param task // */ async AddFluxForgeImage(task: TaskModal.Task) { let batch = task.messageName global.taskQueue.enqueue( async () => { await this.sdHandle.FluxForgeImageGenerate(task) }, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`, this.taskListService.SetMessageNameTaskToFail ) } // /** // * 将Comfy UI生图任务添加到内存任务中 // * @param task // */ async AddComfyUIImage(task: TaskModal.Task) { let batch = task.messageName global.taskQueue.enqueue( async () => { await this.sdHandle.ComfyUIImageGenerate(task) }, `${batch}_${task.id}`, batch, `${batch}_${task.id}_${new Date().getTime()}`, this.taskListService.SetMessageNameTaskToFail ) } // /** // * 异步添加D3图像生成任务 // * // * 此方法接受一个任务对象,将基于该任务生成D3图像 // * 它使用全局请求队列来管理任务,确保并发处理的效率和稳定性 // * // * @param task 任务对象,包含任务的具体信息和标识 // */ // async AddD3Image(task: TaskModal.Task) { // let batch = task.messageName // global.taskQueue.enqueue( // async () => { // await this.d3Opt.D3ImageGenerate(task) // }, // `${batch}_${task.id}`, // batch, // `${batch}_${task.id}_${new Date().getTime()}`, // this.taskListService.SetMessageNameTaskToFail // ) // } /** 添加生成视频的后台任务 */ // async AddGenerateVideo(task: TaskModal.Task) { // let batch = task.messageName // global.taskQueue.enqueue( // async () => { // this.bookVideo = new BookVideo() // await this.bookVideo.GenerateVideo(task) // }, // `${batch}_${task.id}`, // batch, // `${batch}_${task.id}_${new Date().getTime()}`, // this.taskListService.SetMessageNameTaskToFail // ) // } /** 添加图片转视频后台人物 */ // async AddImageToVideo(task: TaskModal.Task) { // let batch = task.messageName // global.taskQueue.enqueue( // async () => { // this.videoGlobal = new VideoGlobal() // await this.videoGlobal.ImageToVideo(task) // }, // `${batch}_${task.id}`, // batch, // `${batch}_${task.id}_${new Date().getTime()}`, // this.taskListService.SetMessageNameTaskToFail // ) // } // /** // * 添加 FLUX api 到内存队列中 // * @param task // */ // async AddFluxAPIImage(task: TaskModal.Task) { // let batch = task.messageName // global.taskQueue.enqueue( // async () => { // await this.fluxOpt.FluxAPIImage(task) // }, // `${batch}_${task.id}`, // batch, // `${batch}_${task.id}_${new Date().getTime()}`, // this.taskListService.SetMessageNameTaskToFail // ) // } /** * 添加任务到内存队列中,分流 * @param task 任务相关 * @returns */ async AddQueue(task: TaskModal.Task) { try { switch (task.type) { // case BookBackTaskType.STORYBOARD: // this.AddGetFrameDataTask(task) // break // case BookBackTaskType.SPLIT: // this.AddCutVideoData(task) // break // case BookBackTaskType.AUDIO: // this.AddSplitAudioData(task) // break // case BookBackTaskType.FRAME: // this.AddGetFrame(task) // break // case BookBackTaskType.RECOGNIZE: // this.AddExtractSubtitlesData(task) // break // case BookBackTaskType.MJ_REVERSE: // case BookBackTaskType.SD_REVERSE: // this.AddSingleReversePrompt(task) // break case BookBackTaskType.FLUX_FORGE_IMAGE: this.AddFluxForgeImage(task) break // case BookBackTaskType.FLUX_API_IMAGE: // this.AddFluxAPIImage(task) // break case BookBackTaskType.MJ_IMAGE: this.AddImageMJImage(task) break case BookBackTaskType.SD_IMAGE: this.AddSDImage(task) break case BookBackTaskType.ComfyUI_IMAGE: this.AddComfyUIImage(task) break // case BookBackTaskType.D3_IMAGE: // this.AddD3Image(task) // break // case BookBackTaskType.COMPOSING: // 合成视频 // this.AddGenerateVideo(task) // break // case BookBackTaskType.RUNWAY_VIDEO: // case BookBackTaskType.LUMA_VIDEO: // case BookBackTaskType.KLING_VIDEO: // this.AddImageToVideo(task) // break default: throw new Error('未知的任务类型') } // 是不是要添加自动任务 // await this.AddTaskHandle(task, true); // 添加成功后,更新任务状态 let updateRes = this.taskListService.UpdateTaskStatus({ id: task.id as string, status: BookBackTaskStatus.RUNNING }) return successMessage( updateRes, `${task.name}_${task.id} 任务添加调度完成`, 'TaskManager_AddQueue' ) } catch (error: any) { // 修改任务状态为失败 this.taskListService.UpdateTaskStatus({ id: task.id as string, status: BookBackTaskStatus.FAIL, errorMessage: '任务调度失败,请手动重试' }) return errorMessage( `处理 ${task.type} 类型任务 ${task.name} 失败,失败信息如下:${error.message}`, 'TaskManager_handleTask' ) } } //#endregion }