using FreeRedis; using Microsoft.Extensions.DependencyInjection; using SqlSugar.IOC; using System; using System.Threading.Channels; using System.Threading.Tasks; using System.Xml.Linq; using VideoAnalysisCore.AICore.ChatGPT; using VideoAnalysisCore.AICore.FFMPGE; //using VideoAnalysisCore.AICore.FFMPGE; using VideoAnalysisCore.AICore.SherpaOnnx; using VideoAnalysisCore.AICore.Whisper; using VideoAnalysisCore.Enum; using VideoAnalysisCore.Model; namespace VideoAnalysisCore.Common { /// /// redis key /// public static class RedisExpandKey { /// /// 基础key /// public const string BaseKey = "VideoAnalysis:"; /// /// 基础Channel key /// public const string ChannelKey = BaseKey + "Channel:"; /// /// 下载文件 /// public const string DownloadFile = ChannelKey + "DownloadFile"; /// /// 分离音频 /// public const string SeparateAudio = ChannelKey + "SeparateAudio"; /// /// 解析字幕 /// public const string ParsingCaptions = ChannelKey + "ParsingCaptions"; /// /// 解析说话人 /// public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker"; /// /// Chat模型分析 /// public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis"; /// /// 任务数组 /// public const string TaskArr = BaseKey + "TaskArr"; /// /// 获取枚举RedisKey /// /// /// public static string EnumKey(RedisChannelEnum e) { return ChannelKey + e.ToString(); } /// /// 任务对象地址 /// public static string Task(object taskId) => BaseKey + "Task:" + taskId; } /// /// redis拓展 /// public class RedisExpand { /// /// redis 连接 /// public static RedisClient Redis = new RedisClient(AppCommon.Config.Redis.ConnectionString); /// /// 初始化 redis /// 需要在初始化配置文件时候调用 /// public static void Init() { Console.WriteLine("初始化 redis"); Redis.Serialize = obj => System.Text.Json.JsonSerializer.Serialize(obj); Redis.Deserialize = (json, type) => System.Text.Json.JsonSerializer.Deserialize(json, type); InitChannel(); } /// /// 获取任务进度 /// /// public static double SetTaskProgress(object taskId) { return Redis.HMGet(RedisExpandKey.Task(taskId), "Progress")[0]; } /// /// 设置任务进度 /// /// 进度百分比 /// public static void SetTaskProgress(object taskId,double p) { Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", Math.Round(p,2)); } /// /// 将任务 插入 队列 /// /// 枚举 /// 任务id public static void InsertChannel(RedisChannelEnum @enum, object taskId) { if (Redis is null) throw new Exception("redis未初始化"); var startTime = Redis.HMGet>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault(); if (startTime is null) startTime = new Dictionary(); if (!startTime.ContainsKey(@enum)) startTime.Add(@enum, DateTime.Now); else startTime[@enum] = DateTime.Now; Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime); Redis.LPush(RedisExpandKey.EnumKey(@enum), taskId); } /// /// 初始化 队列 任务 /// public static void InitChannel() { if (Redis is null) throw new Exception("redis未初始化"); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.DownloadFile), (msg) => { TouchChannel(RedisChannelEnum.DownloadFile, msg, DownloadFile.RunTask); }); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.SeparateAudio), (msg) => { TouchChannel(RedisChannelEnum.SeparateAudio, msg, FFMPGEHandle.Audio2WAV16KAsync); }); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ParsingCaptions), (msg) => { TouchChannel(RedisChannelEnum.ParsingCaptions, msg, SenseVoice.RunTask); }); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ParsingSpeaker), (msg) => { TouchChannel(RedisChannelEnum.ParsingSpeaker, msg, Speaker.Run); }); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ChatModelAnalysis), (msg) => { TouchChannel(RedisChannelEnum.ChatModelAnalysis, msg, (task) => { using var scope = AppCommon.Services?.CreateScope(); if (scope is null || scope.ServiceProvider.GetService() is null) throw new Exception("IBserGPT 未注入"); else return scope.ServiceProvider.GetService()?.CallGPT(task)??Task.CompletedTask; }); }); Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.CallBackSystem), (msg) => { TouchChannel(RedisChannelEnum.ParsingSpeaker, msg); }); } /// /// 写入任务异常 /// /// /// /// public static async Task SetTaskErrorMessage(long taskID, string errorMessage) { return await DbScoped.SugarScope.Updateable() .SetColumns(it => it.ErrorMessage == errorMessage)//SetColumns是可以叠加的 写2个就2个字段赋值 .Where(it => it.Id == taskID) .ExecuteCommandAsync() == 1; } /// /// 触发 /// /// /// /// public static async void TouchChannel(RedisChannelEnum key, string taskId, Func action = null) { if (taskId is null) return; Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> " + key + " " + taskId); if (action is not null) { try { var tID = long.Parse(taskId); Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key); Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0); await DbScoped.SugarScope.Updateable() .SetColumns(it => it.LastEnum == key) .Where(it => it.Id == tID) .ExecuteCommandAsync(); await action(taskId); } catch (Exception ex) { //执行任务时出现异常 var error = ex.Message + ex.StackTrace; await SetTaskErrorMessage(long.Parse(taskId), error); Console.WriteLine("====================[出现异常]===================="); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); Console.WriteLine("=============================================="); } } else { Console.WriteLine(key + " 任务函数 未实现"); } } } }