using AntDesign; using FreeRedis; using FreeRedis.Internal; using Microsoft.Extensions.DependencyInjection; using SqlSugar.IOC; using System; using System.Threading.Channels; using System.Threading.Tasks; using System.Xml.Linq; using VideoAnalysisCore.AICore.ChatGPT; using VideoAnalysisCore.AICore.ChatGPT.Dto; using VideoAnalysisCore.AICore.FFMPGE; //using VideoAnalysisCore.AICore.FFMPGE; using VideoAnalysisCore.AICore.SherpaOnnx; using VideoAnalysisCore.AICore.Whisper; using VideoAnalysisCore.Enum; using VideoAnalysisCore.Model; using VideoAnalysisCore.Model.Dto; namespace VideoAnalysisCore.Common { /// /// redis key /// public static class RedisExpandKey { /// /// 基础key /// public const string BaseKey = "VideoAnalysis:"; /// /// 基础Channel key /// public const string ChannelKey = BaseKey + "TaskChannel"; /// /// 下载文件 /// public const string DownloadFile = ChannelKey + "DownloadFile"; /// /// 分离音频 /// public const string SeparateAudio = ChannelKey + "SeparateAudio"; /// /// 解析字幕 /// public const string ParsingCaptions = ChannelKey + "ParsingCaptions"; /// /// 解析说话人 /// public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker"; /// /// Chat模型分析 /// public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis"; /// /// 任务数组 /// public const string TaskArr = BaseKey + "TaskArr"; /// /// 任务对象地址 /// public static string Task(object taskId) => BaseKey + "Task:" + taskId; public static string IDTask => BaseKey + AppCommon.Config.ID; public static string TaskGPT(object taskId) => Task(taskId) + ":GPTCached"; } /// /// redis拓展 /// public class RedisExpand { /// /// redis 连接 /// public static RedisClient Redis = new RedisClient(AppCommon.Config.Redis.ConnectionString); public static Dictionary> SubscribeList = new Dictionary>(); /// /// 队列池 /// static SubscribeListObject? Subscribe; /// /// 初始化 redis /// 需要在初始化配置文件时候调用 /// public static void Init() { Console.WriteLine("初始化 redis"); Redis.Serialize = obj => System.Text.Json.JsonSerializer.Serialize(obj); Redis.Deserialize = (json, type) => System.Text.Json.JsonSerializer.Deserialize(json, type); Task.Run(() => { Task.Delay(1000 * 10); InitChannel(); }); } /// /// 缓存GPT任务缓存 /// /// public static void SetTaskGPTCached(object taskId, object? data) { Redis.Set(RedisExpandKey.TaskGPT(taskId), data, 3600); } /// /// 获取任务进度 /// /// public static float GetTaskProgress(object taskId) { return Redis.HMGet(RedisExpandKey.Task(taskId), "Progress")[0]; } /// /// 设置任务进度 /// /// 进度百分比 /// public static void SetTaskProgress(object taskId, double p) { Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", Math.Round(p, 2)); } /// /// 将任务 插入 队列 /// /// 枚举 /// 任务id public static void InsertChannel(RedisChannelEnum @enum, object taskId) { if (taskId is null) throw new Exception("taskId为空"); if (Redis is null) throw new Exception("redis未初始化"); var startTime = Redis.HMGet>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault(); if (startTime is null) startTime = new Dictionary(); if (!startTime.ContainsKey(@enum)) startTime.Add(@enum, DateTime.Now); else startTime[@enum] = DateTime.Now; Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime); if(!SubscribeList.ContainsKey(@enum)) throw new Exception(@enum+" 未实现"); SubscribeList[@enum].Invoke(taskId.ToString()); } public static async Task TaskEnd(string task) { var tId = long.Parse(task); var gptRes = (await RedisExpand.Redis .HMGetAsync(RedisExpandKey.Task(task), "ChatAnalysis")).FirstOrDefault(); if (gptRes is null) throw new Exception("未能读取到GPT处理结果"); var taskData = await DbScoped.SugarScope.Queryable() .FirstAsync(s => s.Id == tId); taskData.ChatAnalysis = gptRes; taskData.ChatAnalysisScore = gptRes.Assessment.Merit?.Sum(s => s.Score) ?? 0; taskData.ErrorMessage = string.Empty; taskData.LastEnum = RedisChannelEnum.EndTask; await DbScoped.SugarScope.Updateable(taskData) .UpdateColumns(it => new { it.ChatAnalysis, it.ChatAnalysisScore, it.ErrorMessage, it.TotalTokens, it.LastEnum, }).ExecuteCommandAsync(); await Redis.DelAsync(RedisExpandKey.IDTask); await ReceivingTaskAsync(); } /// /// 初始化 队列 任务 /// public static async void InitChannel() { if (Redis is null) throw new Exception("redis未初始化"); SubscribeList.Add(RedisChannelEnum.DownloadFile, (msg) => { TouchChannel(RedisChannelEnum.DownloadFile, msg, DownloadFile.RunTask); }); SubscribeList.Add(RedisChannelEnum.SeparateAudio, (msg) => { TouchChannel(RedisChannelEnum.SeparateAudio, msg, FFMPGEHandle.Audio2WAV16KAsync); }); SubscribeList.Add(RedisChannelEnum.ParsingCaptions, (msg) => { TouchChannel(RedisChannelEnum.ParsingCaptions, msg, SenseVoice.RunTask); }); SubscribeList.Add(RedisChannelEnum.ParsingSpeaker, (msg) => { TouchChannel(RedisChannelEnum.ParsingSpeaker, msg, Speaker.Run); }); SubscribeList.Add(RedisChannelEnum.ChatModelAnalysis, (msg) => { TouchChannel(RedisChannelEnum.ChatModelAnalysis, msg, (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("IBserGPT 未注入"); else return scope.ServiceProvider.GetService()?.CallGPT(task) ?? Task.CompletedTask; }); }); SubscribeList.Add(RedisChannelEnum.EndTask, (msg) => { TouchChannel(RedisChannelEnum.EndTask, msg, TaskEnd); }); await ReceivingTaskAsync(); } /// /// 重新接收新任务 /// public static async Task ReceivingTaskAsync() { var oldTask = await Redis.GetAsync(RedisExpandKey.IDTask); if (!string.IsNullOrEmpty(oldTask)) { var lastEnum = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); await SetTaskErrorMessage(long.Parse(oldTask), null); InsertChannel(lastEnum, oldTask); return; } if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露 return; Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, (taskId) => { if (taskId is null) return; Subscribe?.Dispose(); //存储当前机器的任务 Redis.Set(RedisExpandKey.IDTask, taskId); Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收到任务 " + taskId); InsertChannel(RedisChannelEnum.DownloadFile, taskId); }); } /// /// 写入任务异常 /// /// /// /// public static async Task SetTaskErrorMessage(long taskID, Exception? ex) { var error = string.Empty; if (ex != null) { //执行任务时出现异常 error = ex.Message + ex.StackTrace; Console.WriteLine("====================[出现异常]===================="); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); Console.WriteLine("=============================================="); //清除失败任务 重新接收任务 await Redis.DelAsync(RedisExpandKey.IDTask); await ReceivingTaskAsync(); } Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error); return await DbScoped.SugarScope.Updateable() .SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值 .Where(it => it.Id == taskID) .ExecuteCommandAsync() == 1; } /// /// 触发 /// /// /// /// public static async void TouchChannel(RedisChannelEnum key, string taskId, Func action = null) { if (taskId is null) return; var tID = long.Parse(taskId); if (action is not null) { var errArr = new Exception[3]; for (int i = 0; i < 3; i++) { Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 开始执行 " + key + " " + taskId); try { Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key); Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0); await DbScoped.SugarScope.Updateable() .SetColumns(it => it.LastEnum == key) .Where(it => it.Id == tID) .ExecuteCommandAsync(); await action(taskId); return; } catch (Exception ex) { errArr[i] = ex; Console.WriteLine("====================[出现异常]===================="); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); Console.WriteLine("=============================================="); Thread.Sleep(1000); Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 稍后后重试." + key + " " + taskId ); } } await SetTaskErrorMessage(tID, errArr.First()); } else { Console.WriteLine(key + " 任务函数 未实现"); } } } }