From a2200c0296f2d7610d6760c465c2693d4fea2ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E8=82=A5=E7=BE=8A?= <1048382248@qq.com> Date: Sat, 28 Feb 2026 14:15:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20=E7=8B=AC=E7=AB=8B?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B5=81=E4=B8=BA=E5=8D=95=E7=8B=AC=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AICore/FFMPGE/FFMPGEHandle.cs | 52 ++- VideoAnalysisCore/Common/AppConfig.cs | 17 +- .../Common/Expand/UploadExpand.cs | 159 ++++++++ VideoAnalysisCore/Common/RedisExpand.cs | 362 +----------------- .../Common/UploadWorkflowManager.cs | 80 ++++ .../Common/VideoSliceWorkflowManager.cs | 114 ++++++ VideoAnalysisCore/Common/WorkflowBase.cs | 298 ++++++++++++++ .../Controllers/VideoTaskController.cs | 13 +- .../Model/Enum/RedisUploadChannelEnum.cs | 35 ++ 9 files changed, 774 insertions(+), 356 deletions(-) create mode 100644 VideoAnalysisCore/Common/Expand/UploadExpand.cs create mode 100644 VideoAnalysisCore/Common/UploadWorkflowManager.cs create mode 100644 VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs create mode 100644 VideoAnalysisCore/Common/WorkflowBase.cs create mode 100644 VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs diff --git a/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs b/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs index f79d884..67a56f8 100644 --- a/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs +++ b/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs @@ -1,4 +1,4 @@ -using FFmpeg.NET.Events; +using FFmpeg.NET.Events; using FFmpeg.NET; using VideoAnalysisCore.AICore.SherpaOnnx; using VideoAnalysisCore.Common; @@ -208,5 +208,55 @@ namespace VideoAnalysisCore.AICore.FFMPGE } + /// + /// 合并音频和视频并切片 + /// + /// + /// + public async Task MergeAndSliceAsync(string task) + { + var taskID = long.Parse(task); + var localPath = task.LocalPath(); + var pptPath = Path.Combine(localPath, "ppt.mp4"); + var taskPath = Path.Combine(localPath, "task.mp4"); + var mergedPath = Path.Combine(localPath, "merged.mp4"); + var m3u8Path = Path.Combine(localPath, "out.m3u8"); + + if (!File.Exists(pptPath)) throw new FileNotFoundException("PPT视频文件未找到", pptPath); + if (!File.Exists(taskPath)) throw new FileNotFoundException("任务视频文件未找到", taskPath); + + var ffmpeg = new Engine(FFmpegPath); + var cToken = new CancellationToken(); + + // 1. 合并 PPT视频(画面) + 任务视频(音频) -> merged.mp4 + // -map 0:v 取第一个输入(ppt)的视频流 + // -map 1:a 取第二个输入(task)的音频流 + // -c:v copy 复制视频流不转码 + // -c:a aac 音频转码为aac (兼容性好) + // -strict experimental 允许使用aac + // -shortest 以最短的流为准 + var mergeArgs = $"-i \"{pptPath}\" -i \"{taskPath}\" -map 0:v -map 1:a -c:v copy -c:a aac -strict experimental -shortest \"{mergedPath}\" -y"; + + await redisManager.AddTaskLog(task, "开始合并视频与音频..."); + await ffmpeg.ExecuteAsync(mergeArgs, cToken); + + if (!File.Exists(mergedPath)) throw new Exception("视频合并失败"); + + // 2. 切片 merged.mp4 -> out.m3u8 + // -c copy 直接复制流 (因为上一步已经是 mp4/aac) + // -f hls HLS格式 + // -hls_time 10 切片时长10秒 + // -hls_list_size 0 包含所有切片 + // -hls_segment_filename out%03d.ts 切片文件名 + var sliceArgs = $"-i \"{mergedPath}\" -c copy -f hls -hls_time 10 -hls_list_size 0 -hls_segment_filename \"{Path.Combine(localPath, "out%03d.ts")}\" \"{m3u8Path}\" -y"; + + await redisManager.AddTaskLog(task, "开始视频切片..."); + await ffmpeg.ExecuteAsync(sliceArgs, cToken); + + if (!File.Exists(m3u8Path)) throw new Exception("视频切片失败"); + + // 更新任务状态或路径? 目前只需要生成文件 + await redisManager.AddTaskLog(task, "视频处理完成"); + } } } diff --git a/VideoAnalysisCore/Common/AppConfig.cs b/VideoAnalysisCore/Common/AppConfig.cs index 89f9d27..043fbd3 100644 --- a/VideoAnalysisCore/Common/AppConfig.cs +++ b/VideoAnalysisCore/Common/AppConfig.cs @@ -1,4 +1,4 @@ -using SqlSugar.IOC; +using SqlSugar.IOC; using System; using System.Collections.Generic; using System.Linq; @@ -70,6 +70,21 @@ namespace VideoAnalysisCore.Common /// 授权配置 /// public AuthKeyConfig AuthKey { get; set; } = new AuthKeyConfig(); + /// + /// 工作流配置 + /// + public WorkflowConfig Workflow { get; set; } = new WorkflowConfig(); + } + + public class WorkflowConfig + { + public WorkflowItemConfig Default { get; set; } = new WorkflowItemConfig(); + public WorkflowItemConfig Upload { get; set; } = new WorkflowItemConfig { Enabled = true }; + } + public class WorkflowItemConfig + { + public bool Enabled { get; set; } = true; + public int Concurrency { get; set; } = 1; } public class AuthKeyConfig diff --git a/VideoAnalysisCore/Common/Expand/UploadExpand.cs b/VideoAnalysisCore/Common/Expand/UploadExpand.cs new file mode 100644 index 0000000..0b8b3a9 --- /dev/null +++ b/VideoAnalysisCore/Common/Expand/UploadExpand.cs @@ -0,0 +1,159 @@ +using AlibabaCloud.SDK.Vod20170321; +using AlibabaCloud.SDK.Vod20170321.Models; +using Aliyun.OSS; +using Microsoft.Extensions.DependencyInjection; +using SqlSugar.IOC; +using System; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using VideoAnalysisCore.Model; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System.Collections.Generic; + +namespace VideoAnalysisCore.Common.Expand +{ + public static class UploadExpand + { + public static void AddUploadExpand(this IServiceCollection services) + { + services.AddSingleton(); + } + } + + public class UploadHandle + { + private readonly Client _vodClient; + private readonly Repository _videoTaskDB; + private readonly RedisManager _redisManager; + private readonly OssClient _ossClient; // 使用系统统一注入的 OSS Client + + public UploadHandle(Client vodClient, Repository videoTaskDB, RedisManager redisManager, OssClient ossClient) + { + _vodClient = vodClient; + _videoTaskDB = videoTaskDB; + _redisManager = redisManager; + _ossClient = ossClient; + } + + public async Task RunAsync(string task) + { + var taskId = long.Parse(task); + var localPath = task.LocalPath(); + var m3u8Path = Path.Combine(localPath, "out.m3u8"); + + if (!File.Exists(m3u8Path)) + { + await _redisManager.AddTaskLog(task, "未找到 m3u8 文件,无法进行切片上传"); + throw new FileNotFoundException("M3U8文件未找到", m3u8Path); + } + + // 获取所有切片文件 (out*.ts) + var tsFiles = Directory.GetFiles(localPath, "out*.ts"); + if (tsFiles.Length == 0) + { + await _redisManager.AddTaskLog(task, "未找到 ts 切片文件"); + throw new FileNotFoundException("TS切片文件未找到"); + } + + var title = $"Task_{taskId}_{DateTime.Now:yyyyMMddHHmmss}"; + + await _redisManager.AddTaskLog(task, "正在获取VOD上传凭证..."); + + // 1. 获取上传凭证和地址 + // 注意:VOD上传m3u8时,FileName必须以 .m3u8 结尾 + var request = new CreateUploadVideoRequest + { + Title = title, + FileName = "out.m3u8", // 必须是 m3u8 文件名 + Description = "Video Analysis HLS Upload", + // CoverURL = "...", // 可选:设置封面 + // Tags = "...", // 可选:设置标签 + }; + + var response = await _vodClient.CreateUploadVideoAsync(request); + if (response.Body == null || string.IsNullOrEmpty(response.Body.UploadAddress) || string.IsNullOrEmpty(response.Body.UploadAuth)) + { + throw new Exception($"获取上传凭证失败: RequestId={response.Body?.RequestId}"); + } + + var videoId = response.Body.VideoId; + var uploadAddressStr = response.Body.UploadAddress; + var uploadAuthStr = response.Body.UploadAuth; + + await _redisManager.AddTaskLog(task, $"获取凭证成功,VideoId: {videoId}"); + + // 2. 解析凭证 (Base64 -> JSON) + var addressJson = JObject.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(uploadAddressStr))); + var authJson = JObject.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(uploadAuthStr))); + + var endpoint = addressJson["Endpoint"]?.ToString(); + var bucket = addressJson["Bucket"]?.ToString(); + var objectName = addressJson["FileName"]?.ToString(); // 这是 VOD 分配的 m3u8 存储路径,例如 "sv/243d.../out.m3u8" + + var accessKeyId = authJson["AccessKeyId"]?.ToString(); + var accessKeySecret = authJson["AccessKeySecret"]?.ToString(); + var securityToken = authJson["SecurityToken"]?.ToString(); + + if (string.IsNullOrEmpty(endpoint) || string.IsNullOrEmpty(bucket) || string.IsNullOrEmpty(objectName)) + { + throw new Exception("解析上传地址失败"); + } + + // 修正 Endpoint 格式 (如果缺少协议头) + if (!endpoint.StartsWith("http")) + { + endpoint = "https://" + endpoint; + } + + // 3. 构造 OSS 客户端 (使用临时凭证) + var ossClient = new OssClient(endpoint, accessKeyId, accessKeySecret, securityToken); + + // 4. 确定 OSS 目录前缀 + // VOD 返回的 objectName 是完整的文件路径,我们需要提取目录部分来存放 .ts 文件 + // 例如: objectName = "sv/5903240e-19544975a64/out.m3u8" + // 则 prefix = "sv/5903240e-19544975a64/" + var ossPrefix = objectName.Substring(0, objectName.LastIndexOf('/') + 1); + + await _redisManager.AddTaskLog(task, $"开始上传文件到 VOD OSS (Bucket: {bucket}, Prefix: {ossPrefix})..."); + + try + { + // A. 上传所有 TS 切片 + await _redisManager.AddTaskLog(task, $"开始上传 TS 切片 (共 {tsFiles.Length} 个)..."); + foreach (var tsFile in tsFiles) + { + var fileName = Path.GetFileName(tsFile); + var tsObjectKey = ossPrefix + fileName; + + using var fs = File.OpenRead(tsFile); + ossClient.PutObject(bucket, tsObjectKey, fs); + } + + // B. 上传 m3u8 索引文件 + // 必须使用 VOD 指定的 objectName + await _redisManager.AddTaskLog(task, "开始上传 m3u8 索引文件..."); + using (var fs = File.OpenRead(m3u8Path)) + { + ossClient.PutObject(bucket, objectName, fs); + } + + await _redisManager.AddTaskLog(task, "上传成功"); + + // 5. 更新数据库 + // 对于 VOD 托管视频,我们主要存储 VideoId (TagId),播放地址通常由前端调用 VOD 接口获取 + // 或者我们可以尝试获取播放地址存入 MediaUrl + await _videoTaskDB.CopyNew().AsUpdateable() + .SetColumns(it => it.TagId == videoId) + .Where(it => it.Id == taskId) + .ExecuteCommandAsync(); + } + catch (Exception ex) + { + await _redisManager.AddTaskLog(task, $"上传 VOD OSS 异常: {ex.Message}"); + throw; + } + } + } +} diff --git a/VideoAnalysisCore/Common/RedisExpand.cs b/VideoAnalysisCore/Common/RedisExpand.cs index 165be0d..c22254e 100644 --- a/VideoAnalysisCore/Common/RedisExpand.cs +++ b/VideoAnalysisCore/Common/RedisExpand.cs @@ -1,4 +1,4 @@ -using FreeRedis; +using FreeRedis; using FreeRedis.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.IdentityModel.Tokens; @@ -42,6 +42,10 @@ namespace VideoAnalysisCore.Common /// public const string ChannelKey = BaseKey + "TaskChannel"; /// + /// 上传工作流 Channel key + /// + public const string UploadChannelKey = BaseKey + "UploadTaskChannel"; + /// /// 下载文件 /// public const string DownloadFile = ChannelKey + "DownloadFile"; @@ -102,72 +106,19 @@ namespace VideoAnalysisCore.Common JsonSerializer.Deserialize(json, type); service.AddSingleton(redis); service.AddSingleton(); + service.AddVideoSliceWorkflow(); + service.AddUploadWorkflow(); } } public class RedisInit { - public FFMPGEHandle FFMPGE { get; set; } - public SenseVoice senseVoice { get; set; } - public FunASRNano funASRNano { get; set; } - public RedisManager redisManager { get; set; } - - public RedisInit(FFMPGEHandle fFMPGE, SenseVoice senseVoice, RedisManager redisManager, FunASRNano funASRNano) + public RedisInit(IServiceProvider serviceProvider) { - FFMPGE = fFMPGE; - this.senseVoice = senseVoice; - this.funASRNano = funASRNano; - this.redisManager = redisManager; - Init(); - redisManager.InitChannel(); - } - - public void Init() - { - var SubscribeList = RedisManager.SubscribeList; - SubscribeList.Add(RedisChannelEnum.排队中, async (task) => - { - await Task.CompletedTask; - }); - SubscribeList.Add(RedisChannelEnum.下载文件, async (task) => - { - using var scope = AppCommon.Services?.CreateScope(); - if (scope is null || scope.ServiceProvider.GetService() is null) - throw new Exception("DownloadFile 未注入"); - else - await scope.ServiceProvider.GetService()?.RunTask(task); - }); - SubscribeList.Add(RedisChannelEnum.分离音频, FFMPGE.RunAsync); - SubscribeList.Add(RedisChannelEnum.解析字幕, senseVoice.RunTask); - //SubscribeList.Add(RedisChannelEnum.解析字幕, funASRNano.RunTask); - //SubscribeList.Add(RedisChannelEnum.解析说话人,Speaker.Run); - SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) => - { - using var scope = AppCommon.Services?.CreateScope(); - if (scope is null || scope.ServiceProvider.GetService() is null) - throw new Exception("IBserGPT 未注入"); - else - await scope.ServiceProvider.GetService()?.GetVideoType(task); - }); - SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) => - { - using var scope = AppCommon.Services?.CreateScope(); - if (scope is null || scope.ServiceProvider.GetService() is null) - throw new Exception("IBserGPT 未注入"); - else - await scope.ServiceProvider?.GetService()?.GetKnow(task); - }); - SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) => - { - using var scope = AppCommon.Services?.CreateScope(); - if (scope is null || scope.ServiceProvider.GetService() is null) - throw new Exception("IBserGPT 未注入"); - else - await scope.ServiceProvider?.GetService()?.GetVideoQuestion(task); - }); - SubscribeList.Add(RedisChannelEnum.结束任务, redisManager.TaskEnd); - + serviceProvider.GetService(); + serviceProvider.GetService(); + // serviceProvider.GetService().InitChannel(); // 已废弃,由各工作流自行初始化 } } /// @@ -277,105 +228,6 @@ namespace VideoAnalysisCore.Common Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString()); } - /// - /// 将任务 插入 队列 - /// - /// 枚举 - /// 任务id - public async Task InsertChannel(RedisChannelEnum @enum, object taskId) - { - if (taskId is null) throw new Exception("taskId为空"); - if (Redis is null) throw new Exception("redis未初始化"); - - var tId = taskId.ToString(); - - await AddTaskLog(tId, "==> 开始执行任务 "); - - await ProcessTaskFlow(@enum, taskId, tId); - } - - private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId) - { - try - { - // 确保有初始校验 - if (!SubscribeList.ContainsKey(currentStep)) - throw new Exception($"{currentStep} 未实现"); - - while (true) - { - if (StopTask) - { - await AddTaskLog(tId, "==> 手动停止任务 "); - return; - } - // 1. 记录步骤开始时间 - await UpdateStepTimeAsync(taskId, currentStep); - // 2. 执行当前步骤业务逻辑 - await TouchChannel(currentStep, tId, SubscribeList[currentStep]); - // 3. 准备下一步 - var nextStepNullable = currentStep.NextEnum(); - if (nextStepNullable == null) break; // 流程结束 - - var nextStep = nextStepNullable.Value; - // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理 - if (currentStep == RedisChannelEnum.解析字幕) - { - DispatchBackgroundFlow(nextStep, taskId, tId); - return; // 释放当前主控线程 - } - // 5. 继续循环 - currentStep = nextStep; - } - } - catch (Exception ex) - { - await SetTaskErrorMessage(long.Parse(tId), ex); - } - finally - { - // 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间 - // 注意:如果是分流退出,这里也会执行,保证 key 活跃 - await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); - } - } - - /// - /// 更新任务步骤时间 - /// - private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step) - { - // 获取现有时间字典(如果不存在则新建) - // 注意:HMGet 返回的是数组,取第一个元素 - var result = await Redis.HMGetAsync>(RedisExpandKey.Task(taskId), "StartTime"); - var startTime = result?.FirstOrDefault() ?? new Dictionary(); - - // 更新时间 - startTime[step] = DateTime.Now; - - // 写回 Redis - await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime); - } - - /// - /// 分发后续任务到动态线程池 - /// - private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId) - { - var bgTask = Task.Run(async () => - { - try - { - await ProcessTaskFlow(startStep, taskId, tId); - } - finally - { - RunningTasks.TryRemove(tId, out _); - } - }); - - RunningTasks.TryAdd(tId, bgTask); - } public async Task TaskEnd(string task) { @@ -427,151 +279,6 @@ namespace VideoAnalysisCore.Common //NewTask(); } - /// - /// 初始化 队列 任务 - /// - public async Task InitChannel() - { - Thread.Sleep(1000); - if (Redis is null) throw new Exception("redis未初始化"); - //处理之前程序结束前未能执行完的情况 - var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask); - //重试任务并发过多可能会导致程序崩溃 - // 未能重新分析的中断任务 则单独开一个网页来处理 - if (oldTaskCount > 0) - { - //获取所有未完成的任务 - var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1); - Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复..."); - - //使用信号量限制并发数(5),防止崩溃 - using var semaphore = new System.Threading.SemaphoreSlim(5); - var retryTaskArr = new List(); - - foreach (var oldTask in oldTaskArr) - { - await semaphore.WaitAsync(); - var res = Task.Run(async () => - { - try - { - await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask); - await ClearTaskError(long.Parse(oldTask)); - var lastEnum = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); - await InsertChannel(lastEnum, oldTask); - } - catch (Exception ex) - { - await SetTaskErrorMessage(long.Parse(oldTask), ex); - Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}"); - } - finally - { - semaphore.Release(); - } - }); - retryTaskArr.Add(res); - } - //等待所有 重试任务完成后接收新任务 - await Task.WhenAll(retryTaskArr); - - Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!"); - ReceivingTaskAsync(); - - } - else - { - Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!"); - ReceivingTaskAsync(); - } - - } - /// - /// 停止接收新任务 - /// - public void StopTaskAsync() - { - StopTask = true; - try - { - _cts?.Cancel(); - } - catch (Exception) - { - throw; - } - } - - /// - /// 开始接收新任务 - /// - public void RestartTask() - { - StopTask = false; - NewTask(); - } - /// - /// 重新执行新任务 - /// - /// - public void NewTask() - { - // 取消 消费机的任务订阅 - if (StopTask) - { - Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收任务已经暂停 "); - return; - } - ReceivingTaskAsync(); - } - - /// - /// 重新接收新任务 - /// - public void ReceivingTaskAsync() - { - if (AppCommon.Config.TaskSetting.IS_Server) - { - Console.WriteLine($"{DateTime.Now} =>服务端不接收任务"); - return; - } - lock (Redis) - { - // 如果任务正在运行且未完成,直接返回 - if (_workerTask != null && !_workerTask.IsCompleted) - return; - - _cts = new CancellationTokenSource(); - var token = _cts.Token; - - _workerTask = Task.Run(async () => - { - Console.WriteLine($"{DateTime.Now} => 开始监听任务队列..."); - while (!token.IsCancellationRequested && !StopTask) - { - try - { - // 使用 BLPop 阻塞式获取任务,超时5秒以便检查取消状态 - var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5); - if (!string.IsNullOrEmpty(taskId)) - { - Redis.LPush(RedisExpandKey.IDTask, taskId); - await AddTaskLog(taskId, "-------------> 接收到任务 "); - // await等待任务处理完成,确保串行执行 - await InsertChannel(RedisChannelEnum.下载文件, taskId); - } - } - catch (Exception ex) - { - Console.WriteLine($"任务监听异常: {ex.Message}"); - await Task.Delay(2000); - } - } - Console.WriteLine($"{DateTime.Now} => 停止监听任务队列."); - }, token); - } - } - /// /// 写入任务异常 /// @@ -589,7 +296,7 @@ namespace VideoAnalysisCore.Common error = ex.Message + ex.StackTrace; await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """); //清除失败任务 重新接收任务 - NewTask(); + // NewTask(); // 已废弃,工作流会自动处理 } return await SetTaskError(taskID, error); } @@ -615,50 +322,5 @@ namespace VideoAnalysisCore.Common .ExecuteCommandAsync() == 1; } - /// - /// 触发 - /// - /// - /// - /// - public async Task TouchChannel(RedisChannelEnum key, string taskId, Func action = null) - { - if (taskId is null) return; - var tID = long.Parse(taskId); - if (action is not null) - { - var tryCount = 1; - for (int i = 0; i < tryCount; i++) - { - await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId); - try - { - Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key); - Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0); - var vDB = AppCommon.Services.GetService>(); - await vDB.CopyNew().AsUpdateable() - .SetColumns(it => it.LastEnum == key) - .Where(it => it.Id == tID) - .ExecuteCommandAsync(); - await action(taskId); - return; - } - catch (Exception ex) - { - - await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """); - Thread.Sleep(1000); - await AddTaskLog(taskId, "稍后后重试." + key + " " + taskId); - if (i + 1 == tryCount) - throw; - } - } - } - else - { - await AddTaskLog(taskId, "任务函数 未实现." + key); - } - } - } } diff --git a/VideoAnalysisCore/Common/UploadWorkflowManager.cs b/VideoAnalysisCore/Common/UploadWorkflowManager.cs new file mode 100644 index 0000000..8979152 --- /dev/null +++ b/VideoAnalysisCore/Common/UploadWorkflowManager.cs @@ -0,0 +1,80 @@ +using FreeRedis; +using Microsoft.Extensions.DependencyInjection; +using SqlSugar.IOC; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using VideoAnalysisCore.AICore.FFMPGE; +using VideoAnalysisCore.Common.Expand; +using VideoAnalysisCore.Model; +using VideoAnalysisCore.Model.Enum; + +namespace VideoAnalysisCore.Common +{ + public static class UploadWorkflowExpand + { + public static void AddUploadWorkflow(this IServiceCollection services) + { + // 只有在配置启用时才注册 + if (AppCommon.Config.Workflow.Upload.Enabled) + { + Console.WriteLine($"{DateTime.Now}=>初始化 视频合并工作流"); + services.AddUploadExpand(); // Register UploadHandle + services.AddSingleton(); + services.AddSingleton(); + } + } + } + + public class UploadWorkflowInit + { + private readonly UploadWorkflowManager _manager; + private readonly IServiceProvider _serviceProvider; + + public UploadWorkflowInit(UploadWorkflowManager manager, IServiceProvider serviceProvider) + { + _manager = manager; + _serviceProvider = serviceProvider; + Init(); + _manager.InitChannel(); + } + + public void Init() + { + var SubscribeList = _manager.SubscribeList; + SubscribeList.Add(RedisUploadChannelEnum.排队中, async (task) => await Task.CompletedTask); + SubscribeList.Add(RedisUploadChannelEnum.下载文件, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var downloadService = scope.ServiceProvider.GetRequiredService(); + await downloadService.RunTask(task); + }); + SubscribeList.Add(RedisUploadChannelEnum.合并切片, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var ffmpegService = scope.ServiceProvider.GetRequiredService(); + await ffmpegService.MergeAndSliceAsync(task); + }); + SubscribeList.Add(RedisUploadChannelEnum.上传视频, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var uploadService = scope.ServiceProvider.GetRequiredService(); + await uploadService.RunAsync(task); + }); + SubscribeList.Add(RedisUploadChannelEnum.结束任务, _manager.RedisManager.TaskEnd); + } + } + + public class UploadWorkflowManager : WorkflowBase + { + public UploadWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager) + { + } + + protected override string ChannelKey => RedisExpandKey.UploadChannelKey; + protected override int Concurrency => AppCommon.Config.Workflow.Upload.Concurrency; + } +} diff --git a/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs b/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs new file mode 100644 index 0000000..a1dd228 --- /dev/null +++ b/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs @@ -0,0 +1,114 @@ +using FreeRedis; +using Microsoft.Extensions.DependencyInjection; +using SqlSugar.IOC; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using VideoAnalysisCore.AICore.FFMPGE; +using VideoAnalysisCore.AICore.GPT; +using VideoAnalysisCore.AICore.SherpaOnnx; +using VideoAnalysisCore.AICore.Whisper; +using VideoAnalysisCore.Common.Expand; +using VideoAnalysisCore.Model; +using VideoAnalysisCore.Model.Enum; + +namespace VideoAnalysisCore.Common +{ + /// + /// AI视频切片工作流 + /// + public static class VideoSliceWorkflowExpand + { + public static void AddVideoSliceWorkflow(this IServiceCollection services) + { + if (AppCommon.Config.Workflow.Default.Enabled) + { + Console.WriteLine($"{DateTime.Now}=>初始化 AI切片工作流"); + services.AddSingleton(); + services.AddSingleton(); + } + } + } + + public class VideoSliceWorkflowInit + { + private readonly VideoSliceWorkflowManager _manager; + private readonly IServiceProvider _serviceProvider; + private readonly FFMPGEHandle _ffmpeg; + private readonly SenseVoice _senseVoice; + private readonly RedisManager _redisManager; + + public VideoSliceWorkflowInit(VideoSliceWorkflowManager manager, IServiceProvider serviceProvider, FFMPGEHandle ffmpeg, SenseVoice senseVoice, RedisManager redisManager) + { + _manager = manager; + _serviceProvider = serviceProvider; + _ffmpeg = ffmpeg; + _senseVoice = senseVoice; + _redisManager = redisManager; + Init(); + _manager.InitChannel(); + } + + public void Init() + { + var SubscribeList = _manager.SubscribeList; + SubscribeList.Add(RedisChannelEnum.排队中, async (task) => await Task.CompletedTask); + SubscribeList.Add(RedisChannelEnum.下载文件, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var downloadService = scope.ServiceProvider.GetService(); + if (downloadService is null) throw new Exception("DownloadFile 未注入"); + await downloadService.RunTask(task); + }); + SubscribeList.Add(RedisChannelEnum.分离音频, _ffmpeg.RunAsync); + SubscribeList.Add(RedisChannelEnum.解析字幕, _senseVoice.RunTask); + + SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + if (service is null) throw new Exception("IBserGPT 未注入"); + await service.GetVideoType(task); + }); + SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + if (service is null) throw new Exception("IBserGPT 未注入"); + await service.GetKnow(task); + }); + SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) => + { + using var scope = _serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + if (service is null) throw new Exception("IBserGPT 未注入"); + await service.GetVideoQuestion(task); + }); + SubscribeList.Add(RedisChannelEnum.结束任务, _redisManager.TaskEnd); + } + } + + public class VideoSliceWorkflowManager : WorkflowBase + { + public VideoSliceWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager) + { + } + + protected override string ChannelKey => RedisExpandKey.ChannelKey; + protected override int Concurrency => AppCommon.Config.Workflow.Default.Concurrency; + + protected override async Task HandleSpecialFlowAsync(RedisChannelEnum currentStep, RedisChannelEnum nextStep, string taskId) + { + // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理 + if (currentStep == RedisChannelEnum.解析字幕) + { + await DispatchBackgroundFlow(nextStep, taskId, taskId); + throw new WorkflowFlowSwitchException(); // 抛出异常以中断当前流程(基类捕获) + } + await Task.CompletedTask; + } + } +} diff --git a/VideoAnalysisCore/Common/WorkflowBase.cs b/VideoAnalysisCore/Common/WorkflowBase.cs new file mode 100644 index 0000000..707c774 --- /dev/null +++ b/VideoAnalysisCore/Common/WorkflowBase.cs @@ -0,0 +1,298 @@ +using FreeRedis; +using Microsoft.Extensions.DependencyInjection; +using SqlSugar.IOC; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using VideoAnalysisCore.Model; +using VideoAnalysisCore.Model.Enum; + +namespace VideoAnalysisCore.Common +{ + public class WorkflowFlowSwitchException : Exception { } + + public abstract class WorkflowBase where TEnum : struct, Enum + { + public bool StopTask { get; set; } = false; + public Dictionary> SubscribeList = new Dictionary>(); + public readonly RedisClient Redis; + public readonly RedisManager RedisManager; + private CancellationTokenSource? _cts; + private List _workerTasks = new List(); + public static ConcurrentDictionary RunningTasks = new ConcurrentDictionary(); + + /// + /// 工作流的Redis队列key + /// + protected abstract string ChannelKey { get; } + /// + /// 单工作流并发数量 + /// + protected abstract int Concurrency { get; } + + public WorkflowBase(RedisClient redis, RedisManager redisManager) + { + Redis = redis; + RedisManager = redisManager; + } + + public async void InitChannel() + { + if (AppCommon.Config.TaskSetting.IS_Server) return; + //处理之前程序结束前未能执行完的情况 + var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask); + //重试任务并发过多可能会导致程序崩溃 + // 未能重新分析的中断任务 则单独开一个网页来处理 + if (oldTaskCount > 0) + { + //获取所有未完成的任务 + var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1); + Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复..."); + + //使用信号量限制并发数(5),防止崩溃 + using var semaphore = new System.Threading.SemaphoreSlim(5); + var retryTaskArr = new List(); + + foreach (var oldTask in oldTaskArr) + { + try + { + // 检查该任务是否属于当前工作流(根据 LastEnum 的类型) + var lastEnumStr = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); + + // 尝试解析为当前工作流的枚举 + if (!string.IsNullOrEmpty(lastEnumStr) && Enum.TryParse(typeof(TEnum), lastEnumStr, true, out var result)) + { + await semaphore.WaitAsync(); + var res = Task.Run(async () => + { + try + { + await RedisManager.AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask); + await RedisManager.ClearTaskError(long.Parse(oldTask)); + + var lastEnum = (TEnum)result; + await InsertChannel(lastEnum, oldTask); + } + catch (Exception ex) + { + await RedisManager.SetTaskErrorMessage(long.Parse(oldTask), ex); + Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}"); + } + finally + { + semaphore.Release(); + } + }); + retryTaskArr.Add(res); + } + else + { + // 如果无法解析为当前工作流的枚举,说明该任务属于其他工作流,跳过 + // Console.WriteLine($"任务 {oldTask} 不属于工作流 {typeof(TEnum).Name},跳过"); + } + } + catch (Exception ex) + { + Console.WriteLine($"检查任务 {oldTask} 状态失败: {ex.Message}"); + } + } + + //等待所有 重试任务完成后接收新任务 + await Task.WhenAll(retryTaskArr); + + Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!"); + ReceivingTaskAsync(); + + } + else + { + Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!"); + ReceivingTaskAsync(); + } + } + /// + /// 重新执行任务 + /// + public void ReceivingTaskAsync() + { + int concurrency = Concurrency; + if (concurrency <= 0) concurrency = 1; + + _cts = new CancellationTokenSource(); + var token = _cts.Token; + + for (int i = 0; i < concurrency; i++) + { + var index = i; + var task = Task.Run(async () => + { + Console.WriteLine($"{DateTime.Now} ==> 开始监听 [{typeof(TEnum).Name}] 队列 [{index}]..."); + while (!token.IsCancellationRequested && !StopTask) + { + try + { + var taskId = Redis.BLPop(ChannelKey, 5); + if (!string.IsNullOrEmpty(taskId)) + { + Redis.LPush(RedisExpandKey.IDTask, taskId); + await RedisManager.AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] "); + + // 获取第一个枚举值作为起始步骤 + var firstStep = Enum.GetValues(typeof(TEnum)).Cast().FirstOrDefault(); + if (Convert.ToInt32(firstStep) == 0) // 跳过排队中 + { + var next = firstStep.NextEnum(); + if (next.HasValue) firstStep = next.Value; + } + + await InsertChannel(firstStep, taskId); + } + } + catch (Exception ex) + { + Console.WriteLine($"任务监听异常: {ex.Message}"); + await Task.Delay(2000); + } + } + }, token); + _workerTasks.Add(task); + } + } + + public async Task InsertChannel(TEnum @enum, string taskId) + { + await RedisManager.AddTaskLog(taskId, "==> 开始执行任务 "); + await ProcessTaskFlow(@enum, taskId, taskId); + } + /// + /// 异步流程判定条件 + /// + /// + /// + /// + /// + protected virtual async Task HandleSpecialFlowAsync(TEnum currentStep, TEnum nextStep, string taskId) + { + await Task.CompletedTask; + } + + /// + /// 主处理任务流程 + /// + /// + /// + /// + /// + protected async Task ProcessTaskFlow(TEnum currentStep, string taskId, string tId) + { + try + { + if (!SubscribeList.ContainsKey(currentStep)) + throw new Exception($"{currentStep} 未实现"); + + while (true) + { + if (StopTask) + { + await RedisManager.AddTaskLog(tId, "==> 手动停止任务 "); + return; + } + + // 1. 记录步骤开始时间 (需要转换 RedisChannelEnum 才能调用 UpdateStepTimeAsync,如果类型不匹配则需要适配) + // 这里简化,暂不记录非主流程的时间,或者需要在 RedisManager 增加泛型支持 + // await RedisManager.UpdateStepTimeAsync(taskId, currentStep); + + // 2. 执行当前步骤 + await TouchChannel(currentStep, tId, SubscribeList[currentStep]); + + // 3. 准备下一步 + var nextStepNullable = currentStep.NextEnum(); + if (nextStepNullable == null) break; + + var nextStep = nextStepNullable.Value; + + // 4. 特殊分流处理 + try + { + await HandleSpecialFlowAsync(currentStep, nextStep, taskId); + } + catch (WorkflowFlowSwitchException) + { + return; // 流程切换,退出当前循环 + } + + currentStep = nextStep; + } + } + catch (Exception ex) + { + await RedisManager.SetTaskErrorMessage(long.Parse(tId), ex); + } + finally + { + await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); + } + } + + public async Task TouchChannel(TEnum key, string taskId, Func action) + { + var tID = long.Parse(taskId); + await RedisManager.AddTaskLog(taskId, " 开始执行 " + key + " " + taskId); + try + { + // 尝试将当前枚举转为 RedisChannelEnum 存储状态,如果无法转换则强转 int + RedisChannelEnum dbEnum; + if (key is RedisChannelEnum rc) dbEnum = rc; + else + { + // 使用 Convert.ToInt32 处理各种枚举基础类型 (int, byte, long 等) + var intValue = Convert.ToInt32(key); + dbEnum = (RedisChannelEnum)intValue; + } + + Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString()); + Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0); + + var vDB = AppCommon.Services.GetService>(); + await vDB.CopyNew().AsUpdateable() + .SetColumns(it => it.LastEnum == dbEnum) + .Where(it => it.Id == tID) + .ExecuteCommandAsync(); + + await action(taskId); + } + catch (Exception ex) + { + await RedisManager.AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """); + throw; + } + } + /// + /// 异步流程 + /// + /// + /// + /// + /// + protected async Task DispatchBackgroundFlow(TEnum startStep, string taskId, string tId) + { + var bgTask = Task.Run(async () => + { + try + { + await ProcessTaskFlow(startStep, taskId, tId); + } + finally + { + RunningTasks.TryRemove(tId, out _); + } + }); + RunningTasks.TryAdd(tId, bgTask); + await Task.CompletedTask; + } + } +} diff --git a/VideoAnalysisCore/Controllers/VideoTaskController.cs b/VideoAnalysisCore/Controllers/VideoTaskController.cs index 8c04b7f..3f9bb35 100644 --- a/VideoAnalysisCore/Controllers/VideoTaskController.cs +++ b/VideoAnalysisCore/Controllers/VideoTaskController.cs @@ -39,13 +39,16 @@ namespace VideoAnalysisCore.Controllers readonly RedisManager redisManager; + readonly UploadWorkflowManager uploadWorkflowManager; + readonly VideoSliceWorkflowManager videoSliceWorkflowManager; + public readonly SenseVoice senseVoice; public readonly FunASRNano funASRNano; private readonly IMapper mp; public VideoTaskController(Repository baseService, RedisManager redisManager, Repository videoQuestionDB, - Repository videoQuestionKonwDB, Repository videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository taskLogDB, FunASRNano funASRNano, Repository videoTaskStageDB) : base(baseService) + Repository videoQuestionKonwDB, Repository videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository taskLogDB, FunASRNano funASRNano, Repository videoTaskStageDB, UploadWorkflowManager uploadWorkflowManager, VideoSliceWorkflowManager videoSliceWorkflowManager) : base(baseService) { this.baseService = baseService; this.redisManager = redisManager; @@ -57,6 +60,8 @@ namespace VideoAnalysisCore.Controllers this.taskLogDB = taskLogDB; this.funASRNano = funASRNano; this.videoTaskStageDB = videoTaskStageDB; + this.uploadWorkflowManager = uploadWorkflowManager; + this.videoSliceWorkflowManager = videoSliceWorkflowManager; } @@ -118,9 +123,9 @@ namespace VideoAnalysisCore.Controllers public IActionResult StartTask(bool task) { if (task) - redisManager.RestartTask(); + videoSliceWorkflowManager.StopTask=false; else - redisManager.StopTaskAsync(); + videoSliceWorkflowManager.StopTask=true; return Ok(); } /// @@ -316,7 +321,7 @@ namespace VideoAnalysisCore.Controllers await redisManager.AddTaskLog(id,"手动重试任务"); await redisManager.ClearTaskError(id); _ = Task.Run(async () => - await redisManager.InsertChannel(selectEnum, id) + await videoSliceWorkflowManager.InsertChannel(selectEnum, id.ToString()) ); } diff --git a/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs b/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs new file mode 100644 index 0000000..adced84 --- /dev/null +++ b/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace VideoAnalysisCore.Model.Enum +{ + /// + /// 上传工作流 Redis 频道枚举 + /// + public enum RedisUploadChannelEnum + { + /// + /// 排队中 + /// + 排队中 = 0, + /// + /// 下载文件 + /// + 下载文件 = 10, + /// + /// 合并切片 + /// + 合并切片 = 20, + /// + /// 上传视频 + /// + 上传视频 = 30, + /// + /// 结束任务 + /// + 结束任务 = 100, + } +}