From ba00c75a364ab011ea21fcc9c49b08cef5a8a80a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E8=82=A5=E7=BE=8A?= <1048382248@qq.com> Date: Wed, 7 Jan 2026 17:27:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AICore/GPT/GTP_Analysis_1.cs | 6 +- .../AICore/SherpaOnnx/SenseVoice.cs | 2 +- VideoAnalysisCore/Common/RedisExpand.cs | 146 +++++++++++++----- .../Controllers/VideoTaskController.cs | 12 ++ 4 files changed, 128 insertions(+), 38 deletions(-) diff --git a/VideoAnalysisCore/AICore/GPT/GTP_Analysis_1.cs b/VideoAnalysisCore/AICore/GPT/GTP_Analysis_1.cs index bd2e285..8445c74 100644 --- a/VideoAnalysisCore/AICore/GPT/GTP_Analysis_1.cs +++ b/VideoAnalysisCore/AICore/GPT/GTP_Analysis_1.cs @@ -303,7 +303,8 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek //分段超长问题,评分优化如何处理 var keyFrameStr = string.IsNullOrEmpty(taskInfo?.PPTVideoCode) || string.IsNullOrEmpty(taskInfo?.PPTKeyFrame) ? $"请分析授课中字幕描述的知识内容,然后基于视频整体知识点讲解提炼出不同的阶段以便对老师上课内容切片提取为知识库,所以请确保阶段的内容准确性" - : $"授课中老师的PPT在这些时间段内进行了切换{taskInfo.PPTKeyFrame},理应这些时间段内的讲述内容也发生了变化,请你基于PPT变化时间点结合字幕描述的知识内容提炼出不同的切片。每个阶段的起始和结束应接近这些时间点(例如,以时间点为中心,扩展至内容自然过渡处)。"; + : $"授课中老师的PPT在这些时间段内进行了切换{taskInfo.PPTKeyFrame},理应这些时间段内的讲述内容也发生了变化,请你基于PPT变化时间点结合字幕描述的知识内容提炼出不同的切片。" + + $"每个阶段的起始和结束应接近这些时间点(例如,以时间点为中心,扩展至内容自然过渡处)。"; var resFormat = """[{"StartTime":开始秒(number),"EndTime":结束秒(number),"Stage":阶段(string),"Theme":主题(string),"Content":内容总结(string)}]"""; var reviewStr = taskInfo?.VideoType == AttachmentsInfoType.复习 ? $"但本堂课是习题课,所以大部分阶段是不同的例题讲解内容。\n" @@ -316,11 +317,12 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek $"完整的课堂标准流程包含以下5个阶段:课程引入/新知讲解/例题精讲/课堂练习/知识总结。\n" + reviewStr + $"初步划分阶段:{keyFrameStr}\n" + + $"\n" + $"内容分析:对每个时间段,提取主要讲解内容:识别关键词(如“例题”“证明”“练习”“总结”)和内容结构。\n" + $"判断阶段类型:如果内容以解题为主,归类为“例题精讲”;如果涉及新知识讲解,归类为“新知讲解”;以此类推。\n" + $"内容总结:简述该阶段的核心讲解内容70~200字,确保内容与阶段时间内授课内容符合。\n" + $"阶段主题:基于内容总结,提炼一个恰当的主题(例如,“柯西不等式的基本应用”)。\n" + - $"输出要求:确保阶段划分合理、无` 重叠,且时长符合要求\n" + + $"输出要求:确保阶段划分合理、无` 重叠,且时长符合要求,并且每个阶段的时长需要超过60秒如果时长不够去考虑合并到相邻的阶段\n" + $"输出格式要求:内容只返回json格式({resFormat})\n" + $"字幕格式(开始秒:内容|下一段字幕).以下是包含时间的视频字幕文本。\n" + $"字幕列表 {captions.Captions} 字幕结束!"; diff --git a/VideoAnalysisCore/AICore/SherpaOnnx/SenseVoice.cs b/VideoAnalysisCore/AICore/SherpaOnnx/SenseVoice.cs index 5eaa927..2f177a1 100644 --- a/VideoAnalysisCore/AICore/SherpaOnnx/SenseVoice.cs +++ b/VideoAnalysisCore/AICore/SherpaOnnx/SenseVoice.cs @@ -204,7 +204,7 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx .ExecuteCommandAsync(); await redisManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res); //分析完成视频字幕后继续接收任务 - redisManager.NewTask(); + //redisManager.NewTask(); } return res; } diff --git a/VideoAnalysisCore/Common/RedisExpand.cs b/VideoAnalysisCore/Common/RedisExpand.cs index 028b295..15effea 100644 --- a/VideoAnalysisCore/Common/RedisExpand.cs +++ b/VideoAnalysisCore/Common/RedisExpand.cs @@ -10,6 +10,7 @@ using System.Security.Cryptography; using System.Text.Json; using System.Threading.Channels; using System.Threading.Tasks; +using System.Collections.Concurrent; using System.Xml.Linq; using UserCenter.Model.Enum; using VideoAnalysisCore.AICore.FFMPGE; @@ -169,10 +170,15 @@ namespace VideoAnalysisCore.Common { public static bool StopTask { get; set; } = false; public static Dictionary> SubscribeList = new Dictionary>(); + /// - /// 队列池 + /// 正在后台运行的任务集合 /// - static SubscribeListObject? Subscribe; + public static ConcurrentDictionary RunningTasks = new ConcurrentDictionary(); + + private static CancellationTokenSource? _cts; + private static Task? _workerTask; + public RedisClient Redis { get; set; } public Repository videoTaskDB { get; set; } public Repository taskLogDB { get; set; } @@ -273,41 +279,94 @@ namespace VideoAnalysisCore.Common if (taskId is null) throw new Exception("taskId为空"); if (Redis is null) throw new Exception("redis未初始化"); - //设置任务Redis缓存过期时间 - - var startTime = Redis.HMGet>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault(); - if (startTime is null) - startTime = new Dictionary(); - if (!SubscribeList.ContainsKey(@enum)) - throw new Exception(@enum + " 未实现"); var tId = taskId.ToString(); await AddTaskLog(tId, "==> 开始执行任务 "); + + await ProcessTaskFlow(@enum, taskId, tId); + } + + private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId) + { try { + // 确保有初始校验 + if (!SubscribeList.ContainsKey(currentStep)) + throw new Exception($"{currentStep} 未实现"); + while (true) { - if (!startTime.ContainsKey(@enum)) - startTime.Add(@enum, DateTime.Now); - else - startTime[@enum] = DateTime.Now; + if (StopTask) + { + await AddTaskLog(tId, "==> 手动停止任务 "); + return; + } + // 1. 记录步骤开始时间 + await UpdateStepTimeAsync(taskId, currentStep); + // 2. 执行当前步骤业务逻辑 + await TouchChannel(currentStep, tId, SubscribeList[currentStep]); + // 3. 准备下一步 + var nextStepNullable = currentStep.NextEnum(); + if (nextStepNullable == null) break; // 流程结束 - Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime); - - await TouchChannel(@enum, tId, SubscribeList[@enum]); - //await SubscribeList[@enum](tId); - var e = @enum.NextEnum(); - if (e is null) - break; - @enum = e.Value; + var nextStep = nextStepNullable.Value; + // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理 + if (currentStep == RedisChannelEnum.解析字幕) + { + DispatchBackgroundFlow(nextStep, taskId, tId); + return; // 释放当前主控线程 + } + // 5. 继续循环 + currentStep = nextStep; } } catch (Exception ex) { await SetTaskErrorMessage(long.Parse(tId), ex); } + finally + { + // 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间 + // 注意:如果是分流退出,这里也会执行,保证 key 活跃 + await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); + } + } - Redis.Expire(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); + /// + /// 更新任务步骤时间 + /// + private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step) + { + // 获取现有时间字典(如果不存在则新建) + // 注意:HMGet 返回的是数组,取第一个元素 + var result = await Redis.HMGetAsync>(RedisExpandKey.Task(taskId), "StartTime"); + var startTime = result?.FirstOrDefault() ?? new Dictionary(); + + // 更新时间 + startTime[step] = DateTime.Now; + + // 写回 Redis + await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime); + } + + /// + /// 分发后续任务到动态线程池 + /// + private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId) + { + var bgTask = Task.Run(async () => + { + try + { + await ProcessTaskFlow(startStep, taskId, tId); + } + finally + { + RunningTasks.TryRemove(tId, out _); + } + }); + + RunningTasks.TryAdd(tId, bgTask); } public async Task TaskEnd(string task) @@ -407,12 +466,10 @@ namespace VideoAnalysisCore.Common StopTask = true; try { - //取消接收任务监听 - Subscribe?.Dispose(); + _cts?.Cancel(); } catch (Exception) { - throw; } } @@ -452,19 +509,38 @@ namespace VideoAnalysisCore.Common } lock (Redis) { - if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露 + // 如果任务正在运行且未完成,直接返回 + if (_workerTask != null && !_workerTask.IsCompleted) return; - Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) => + + _cts = new CancellationTokenSource(); + var token = _cts.Token; + + _workerTask = Task.Run(async () => { - if (taskId is null) return; - lock (Redis) + Console.WriteLine($"{DateTime.Now} => 开始监听任务队列..."); + while (!token.IsCancellationRequested && !StopTask) { - Subscribe?.Dispose();//取消接收任务监听 - } - Redis.LPush(RedisExpandKey.IDTask, taskId); - await AddTaskLog(taskId, "-------------> 接收到任务 "); - await InsertChannel(RedisChannelEnum.下载文件, taskId); - }); + try + { + // 使用 BLPop 阻塞式获取任务,超时5秒以便检查取消状态 + var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5); + if (!string.IsNullOrEmpty(taskId)) + { + Redis.LPush(RedisExpandKey.IDTask, taskId); + await AddTaskLog(taskId, "-------------> 接收到任务 "); + // await等待任务处理完成,确保串行执行 + await InsertChannel(RedisChannelEnum.下载文件, taskId); + } + } + catch (Exception ex) + { + Console.WriteLine($"任务监听异常: {ex.Message}"); + await Task.Delay(2000); + } + } + Console.WriteLine($"{DateTime.Now} => 停止监听任务队列."); + }, token); } } diff --git a/VideoAnalysisCore/Controllers/VideoTaskController.cs b/VideoAnalysisCore/Controllers/VideoTaskController.cs index e18d0d5..473e3dc 100644 --- a/VideoAnalysisCore/Controllers/VideoTaskController.cs +++ b/VideoAnalysisCore/Controllers/VideoTaskController.cs @@ -200,6 +200,18 @@ namespace VideoAnalysisCore.Controllers } + /// + /// 视频处理[批量] + /// + /// 请求体 + /// + [HttpPost(Name = "VideoAnalysis_Batch")] + public async Task VideoAnalysis_Batch(VideoAnalysisReq[] req) + { + foreach (var item in req) + await VideoAnalysis(item); + return Ok(); + } /// /// 视频处理