using FreeRedis; using FreeRedis.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.IdentityModel.Tokens; using NetTaste; using Newtonsoft.Json.Schema; using SqlSugar.IOC; using System; using System.Security.Cryptography; using System.Text.Json; using System.Threading.Channels; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Xml.Linq; using UserCenter.Model.Enum; using VideoAnalysisCore.AICore.FFMPGE; using VideoAnalysisCore.AICore.GPT; using VideoAnalysisCore.AICore.GPT.Dto; //using VideoAnalysisCore.AICore.FFMPGE; using VideoAnalysisCore.AICore.SherpaOnnx; using VideoAnalysisCore.AICore.Whisper; using VideoAnalysisCore.Model; using VideoAnalysisCore.Model.Dto; using VideoAnalysisCore.Model.Enum; namespace VideoAnalysisCore.Common { /// /// redis key /// public static class RedisExpandKey { /// /// 基础key /// public const string BaseKey = "VideoAnalysis:"; /// /// 基础Channel key /// public const string ChannelKey = BaseKey + "TaskChannel"; /// /// 下载文件 /// public const string DownloadFile = ChannelKey + "DownloadFile"; /// /// 分离音频 /// public const string SeparateAudio = ChannelKey + "SeparateAudio"; /// /// 解析字幕 /// public const string ParsingCaptions = ChannelKey + "ParsingCaptions"; /// /// 解析说话人 /// public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker"; /// /// Chat模型分析 /// public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis"; /// /// 任务数组 /// public const string TaskArr = BaseKey + "TaskArr"; /// /// 任务日志缓存 /// public static string TaskLog => BaseKey + "TaskLog:" + AppCommon.Config.ID; /// /// 任务对象地址 /// public static string Task(object taskId) => BaseKey + "Info:" + taskId; public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID; public static string TaskGPT(object taskId) => BaseKey + "GPTCached:" + taskId; /// /// 初始化 redis /// 需要在初始化配置文件时候调用 /// public static void AddTaskSubscribe(this IServiceCollection service) { Console.WriteLine($"{DateTime.Now}=>初始化 Redis任务队列"); service.AddSingleton(); } /// /// redis连接拓展(包含消息队列任务) /// /// public static void AddRedisExpand(this IServiceCollection service) { Console.WriteLine($"{DateTime.Now}=>初始化 Redis"); var redis = new RedisClient(AppCommon.Config.Redis.ConnectionString); redis.Serialize = obj => JsonSerializer.Serialize(obj); redis.Deserialize = (json, type) => JsonSerializer.Deserialize(json, type); service.AddSingleton(redis); service.AddSingleton(); } } public class RedisInit { public FFMPGEHandle FFMPGE { get; set; } public SenseVoice senseVoice { get; set; } public FunASRNano funASRNano { get; set; } public RedisManager redisManager { get; set; } public RedisInit(FFMPGEHandle fFMPGE, SenseVoice senseVoice, RedisManager redisManager, FunASRNano funASRNano) { FFMPGE = fFMPGE; this.senseVoice = senseVoice; this.funASRNano = funASRNano; this.redisManager = redisManager; Init(); redisManager.InitChannel(); } public void Init() { var SubscribeList = RedisManager.SubscribeList; SubscribeList.Add(RedisChannelEnum.排队中, async (task) => { await Task.CompletedTask; }); SubscribeList.Add(RedisChannelEnum.下载文件, async (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("DownloadFile 未注入"); else await scope.ServiceProvider.GetService()?.RunTask(task); }); SubscribeList.Add(RedisChannelEnum.分离音频, FFMPGE.RunAsync); SubscribeList.Add(RedisChannelEnum.解析字幕, senseVoice.RunTask); //SubscribeList.Add(RedisChannelEnum.解析字幕, funASRNano.RunTask); //SubscribeList.Add(RedisChannelEnum.解析说话人,Speaker.Run); SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("IBserGPT 未注入"); else await scope.ServiceProvider.GetService()?.GetVideoType(task); }); SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("IBserGPT 未注入"); else await scope.ServiceProvider?.GetService()?.GetKnow(task); }); SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("IBserGPT 未注入"); else await scope.ServiceProvider?.GetService()?.GetVideoQuestion(task); }); SubscribeList.Add(RedisChannelEnum.结束任务, redisManager.TaskEnd); } } /// /// redis拓展 /// public class RedisManager { public static bool StopTask { get; set; } = false; public static Dictionary> SubscribeList = new Dictionary>(); /// /// 正在后台运行的任务集合 /// public static ConcurrentDictionary RunningTasks = new ConcurrentDictionary(); private static CancellationTokenSource? _cts; private static Task? _workerTask; public RedisClient Redis { get; set; } public Repository videoTaskDB { get; set; } public Repository taskLogDB { get; set; } public RedisManager(RedisClient redis, Repository videoTaskDB, Repository taskLogDB) { Redis = redis; this.videoTaskDB = videoTaskDB; this.taskLogDB = taskLogDB; } /// /// 缓存GPT任务缓存 /// /// public void SetTaskGPTCached(object taskId, string time, object? data) { Redis.Set(RedisExpandKey.TaskGPT(taskId) + ":" + time, data, timeoutSeconds: 3600 * 24); } /// /// 加入到消费队列 /// /// public void JoinQueue(params long[] taskIds) { //事务 if (taskIds is null || taskIds.Length == 0) return; using (var tran = Redis.Multi()) { foreach (var item in taskIds) tran.LPush(RedisExpandKey.ChannelKey, item); tran.Exec(); } } /// /// 添加日志 /// /// 任务id /// 内容 public async Task AddTaskLog(object taskId, string msg) { #if DEBUG Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} \r\n{msg}\r\n"); #endif await Redis.RPushAsync(RedisExpandKey.TaskLog, new TaskLog() { VideoTaskId = long.Parse(taskId.ToString()), CreateTime = DateTime.Now, Message = msg }); var count = 50; lock (RedisExpandKey.TaskLog) { var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog); if (oldTaskCount > count) { try { var insertData = Redis.LRange(RedisExpandKey.TaskLog, 0, count -1); taskLogDB.CopyNew().AsInsertable(insertData).ExecuteCommand(); //同步删除redis Redis.LTrim(RedisExpandKey.TaskLog, count, 1000); } catch (Exception ex) { Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace); } } } } /// /// 获取任务进度 /// /// public float GetTaskProgress(object taskId) { return Redis.HMGet(RedisExpandKey.Task(taskId), "Progress")[0]; } /// /// 设置任务进度 /// /// 进度百分比 /// public void SetTaskProgress(object taskId, object p) { Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString()); } /// /// 将任务 插入 队列 /// /// 枚举 /// 任务id public async Task InsertChannel(RedisChannelEnum @enum, object taskId) { if (taskId is null) throw new Exception("taskId为空"); if (Redis is null) throw new Exception("redis未初始化"); var tId = taskId.ToString(); await AddTaskLog(tId, "==> 开始执行任务 "); await ProcessTaskFlow(@enum, taskId, tId); } private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId) { try { // 确保有初始校验 if (!SubscribeList.ContainsKey(currentStep)) throw new Exception($"{currentStep} 未实现"); while (true) { if (StopTask) { await AddTaskLog(tId, "==> 手动停止任务 "); return; } // 1. 记录步骤开始时间 await UpdateStepTimeAsync(taskId, currentStep); // 2. 执行当前步骤业务逻辑 await TouchChannel(currentStep, tId, SubscribeList[currentStep]); // 3. 准备下一步 var nextStepNullable = currentStep.NextEnum(); if (nextStepNullable == null) break; // 流程结束 var nextStep = nextStepNullable.Value; // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理 if (currentStep == RedisChannelEnum.解析字幕) { DispatchBackgroundFlow(nextStep, taskId, tId); return; // 释放当前主控线程 } // 5. 继续循环 currentStep = nextStep; } } catch (Exception ex) { await SetTaskErrorMessage(long.Parse(tId), ex); } finally { // 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间 // 注意:如果是分流退出,这里也会执行,保证 key 活跃 await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); } } /// /// 更新任务步骤时间 /// private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step) { // 获取现有时间字典(如果不存在则新建) // 注意:HMGet 返回的是数组,取第一个元素 var result = await Redis.HMGetAsync>(RedisExpandKey.Task(taskId), "StartTime"); var startTime = result?.FirstOrDefault() ?? new Dictionary(); // 更新时间 startTime[step] = DateTime.Now; // 写回 Redis await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime); } /// /// 分发后续任务到动态线程池 /// private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId) { var bgTask = Task.Run(async () => { try { await ProcessTaskFlow(startStep, taskId, tId); } finally { RunningTasks.TryRemove(tId, out _); } }); RunningTasks.TryAdd(tId, bgTask); } public async Task TaskEnd(string task) { var tId = long.Parse(task); //var gptRes = (await Redis // .HMGetAsync(RedisExpandKey.Task(task), "ChatAnalysis")).FirstOrDefault(); //if (gptRes is null) // throw new Exception("未能读取到GPT处理结果"); //删除任务执行状态 await Redis.LRemAsync(RedisExpandKey.IDTask, 1, task); var taskData = await videoTaskDB .CopyNew() .GetFirstAsync(s => s.Id == tId); if (taskData.Captions == "[]") taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First(); //if (taskData.Speaker == "[]") // taskData.Speaker = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Speaker"))?.FirstOrDefault()??"[]"; //未使用结果暂时屏蔽 //taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes); taskData.ChatAnalysisScore = 0; taskData.ErrorMessage = string.Empty; taskData.LastEnum = RedisChannelEnum.结束任务; taskData.EndTime = DateTime.Now; await videoTaskDB.CopyNew().AsUpdateable(taskData) .UpdateColumns(it => new { //it.ChatAnalysis, it.Captions, it.Speaker, it.ChatAnalysisScore, it.ErrorMessage, it.TotalTokens, it.LastEnum, it.EndTime }).ExecuteCommandAsync(); try { await ExpandFunction.DeleteTaskFileAsync(tId, this); } catch (Exception) { throw; } //NewTask(); } /// /// 初始化 队列 任务 /// public async Task InitChannel() { if (Redis is null) throw new Exception("redis未初始化"); //处理之前程序结束前未能执行完的情况 var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask); //重试任务并发过多可能会导致程序崩溃 // 未能重新分析的中断任务 则单独开一个网页来处理 if (oldTaskCount > 0) { //获取所有未完成的任务 var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1); Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复..."); //使用信号量限制并发数(5),防止崩溃 using var semaphore = new System.Threading.SemaphoreSlim(5); var retryTaskArr = new List(); foreach (var oldTask in oldTaskArr) { await semaphore.WaitAsync(); var res = Task.Run(async () => { try { await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask); await ClearTaskError(long.Parse(oldTask)); var lastEnum = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); await InsertChannel(lastEnum, oldTask); } catch (Exception ex) { await SetTaskErrorMessage(long.Parse(oldTask), ex); Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}"); } finally { semaphore.Release(); } }); retryTaskArr.Add(res); } //等待所有 重试任务完成后接收新任务 await Task.WhenAll(retryTaskArr); Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!"); ReceivingTaskAsync(); } else { Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!"); ReceivingTaskAsync(); } } /// /// 停止接收新任务 /// public void StopTaskAsync() { StopTask = true; try { _cts?.Cancel(); } catch (Exception) { throw; } } /// /// 开始接收新任务 /// public void RestartTask() { StopTask = false; NewTask(); } /// /// 重新执行新任务 /// /// public void NewTask() { // 取消 消费机的任务订阅 if (StopTask) { Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收任务已经暂停 "); return; } ReceivingTaskAsync(); } /// /// 重新接收新任务 /// public void ReceivingTaskAsync() { if (AppCommon.Config.TaskSetting.IS_Server) { Console.WriteLine($"{DateTime.Now} =>服务端不接收任务"); return; } lock (Redis) { // 如果任务正在运行且未完成,直接返回 if (_workerTask != null && !_workerTask.IsCompleted) return; _cts = new CancellationTokenSource(); var token = _cts.Token; _workerTask = Task.Run(async () => { Console.WriteLine($"{DateTime.Now} => 开始监听任务队列..."); while (!token.IsCancellationRequested && !StopTask) { try { // 使用 BLPop 阻塞式获取任务,超时5秒以便检查取消状态 var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5); if (!string.IsNullOrEmpty(taskId)) { Redis.LPush(RedisExpandKey.IDTask, taskId); await AddTaskLog(taskId, "-------------> 接收到任务 "); // await等待任务处理完成,确保串行执行 await InsertChannel(RedisChannelEnum.下载文件, taskId); } } catch (Exception ex) { Console.WriteLine($"任务监听异常: {ex.Message}"); await Task.Delay(2000); } } Console.WriteLine($"{DateTime.Now} => 停止监听任务队列."); }, token); } } /// /// 写入任务异常 /// /// /// /// public async Task SetTaskErrorMessage(long taskID, Exception? ex) { var error = string.Empty; if (ex != null) { await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString()); //执行任务时出现异常 error = ex.Message + ex.StackTrace; await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """); //清除失败任务 重新接收任务 NewTask(); } return await SetTaskError(taskID, error); } /// /// 清除 任务的错误信息 /// /// /// public async Task ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty); /// /// 修改任务的错误信息 /// /// /// /// public async Task SetTaskError(long taskID, string? error) { var vDB = AppCommon.Services.GetService>(); Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error); return await vDB.CopyNew().AsUpdateable() .SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值 .Where(it => it.Id == taskID) .ExecuteCommandAsync() == 1; } /// /// 触发 /// /// /// /// public async Task TouchChannel(RedisChannelEnum key, string taskId, Func action = null) { if (taskId is null) return; var tID = long.Parse(taskId); if (action is not null) { var tryCount = 1; for (int i = 0; i < tryCount; i++) { await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId); try { Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key); Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0); var vDB = AppCommon.Services.GetService>(); await vDB.CopyNew().AsUpdateable() .SetColumns(it => it.LastEnum == key) .Where(it => it.Id == tID) .ExecuteCommandAsync(); await action(taskId); return; } catch (Exception ex) { await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """); Thread.Sleep(1000); await AddTaskLog(taskId, "稍后后重试." + key + " " + taskId); if (i + 1 == tryCount) throw; } } } else { await AddTaskLog(taskId, "任务函数 未实现." + key); } } } }