Learn.VideoAnalysis/VideoAnalysisCore/Common/RedisExpand.cs

228 lines
8.4 KiB
C#

using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar.IOC;
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Xml.Linq;
using VideoAnalysisCore.AICore.ChatGPT;
using VideoAnalysisCore.AICore.FFMPGE;
//using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Enum;
using VideoAnalysisCore.Model;
namespace VideoAnalysisCore.Common
{
/// <summary>
/// redis key
/// </summary>
public static class RedisExpandKey
{
/// <summary>
/// 基础key
/// </summary>
public const string BaseKey = "VideoAnalysis:";
/// <summary>
/// 基础Channel key
/// </summary>
public const string ChannelKey = BaseKey + "Channel:";
/// <summary>
/// 下载文件
/// </summary>
public const string DownloadFile = ChannelKey + "DownloadFile";
/// <summary>
/// 分离音频
/// </summary>
public const string SeparateAudio = ChannelKey + "SeparateAudio";
/// <summary>
/// 解析字幕
/// </summary>
public const string ParsingCaptions = ChannelKey + "ParsingCaptions";
/// <summary>
/// 解析说话人
/// </summary>
public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker";
/// <summary>
/// Chat模型分析
/// </summary>
public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis";
/// <summary>
/// 任务数组
/// </summary>
public const string TaskArr = BaseKey + "TaskArr";
/// <summary>
/// 获取枚举RedisKey
/// </summary>
/// <param name="e"></param>
/// <returns></returns>
public static string EnumKey(RedisChannelEnum e)
{
return ChannelKey + e.ToString();
}
/// <summary>
/// 任务对象地址
/// </summary>
public static string Task(object taskId) => BaseKey + "Task:" + taskId;
}
/// <summary>
/// redis拓展
/// </summary>
public class RedisExpand
{
/// <summary>
/// redis 连接
/// </summary>
public static RedisClient Redis = new RedisClient(AppCommon.Config.Redis.ConnectionString);
/// <summary>
/// 初始化 redis
/// <para>需要在初始化配置文件时候调用</para>
/// </summary>
public static void Init()
{
Console.WriteLine("初始化 redis");
Redis.Serialize = obj => System.Text.Json.JsonSerializer.Serialize(obj);
Redis.Deserialize = (json, type) => System.Text.Json.JsonSerializer.Deserialize(json, type);
InitChannel();
}
/// <summary>
/// 获取任务进度
/// </summary>
/// <param name="taskId"></param>
public static double SetTaskProgress(object taskId)
{
return Redis.HMGet<double>(RedisExpandKey.Task(taskId), "Progress")[0];
}
/// <summary>
/// 设置任务进度
/// </summary>
/// <param name="p">进度百分比</param>
/// <param name="taskId"></param>
public static void SetTaskProgress(object taskId,double p)
{
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", Math.Round(p,2));
}
/// <summary>
/// 将任务 插入 队列
/// </summary>
/// <param name="enum">枚举</param>
/// <param name="taskId">任务id</param>
public static void InsertChannel(RedisChannelEnum @enum, object taskId)
{
if (Redis is null) throw new Exception("redis未初始化");
var startTime = Redis.HMGet<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault();
if (startTime is null)
startTime = new Dictionary<RedisChannelEnum, DateTime>();
if (!startTime.ContainsKey(@enum))
startTime.Add(@enum, DateTime.Now);
else
startTime[@enum] = DateTime.Now;
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime);
Redis.LPush(RedisExpandKey.EnumKey(@enum), taskId);
}
/// <summary>
/// 初始化 队列 任务
/// </summary>
public static void InitChannel()
{
if (Redis is null) throw new Exception("redis未初始化");
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.DownloadFile),
(msg) => { TouchChannel(RedisChannelEnum.DownloadFile, msg, DownloadFile.RunTask); });
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.SeparateAudio),
(msg) => { TouchChannel(RedisChannelEnum.SeparateAudio, msg, FFMPGEHandle.Audio2WAV16KAsync); });
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ParsingCaptions),
(msg) => { TouchChannel(RedisChannelEnum.ParsingCaptions, msg, SenseVoice.RunTask); });
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ParsingSpeaker),
(msg) => { TouchChannel(RedisChannelEnum.ParsingSpeaker, msg, Speaker.Run); });
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.ChatModelAnalysis),
(msg) =>
{
TouchChannel(RedisChannelEnum.ChatModelAnalysis, msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<IBserGPT>() is null)
throw new Exception("IBserGPT 未注入");
else
return scope.ServiceProvider.GetService<IBserGPT>()?.CallGPT(task)??Task.CompletedTask;
});
});
Redis.SubscribeList(RedisExpandKey.EnumKey(RedisChannelEnum.CallBackSystem),
(msg) => { TouchChannel(RedisChannelEnum.ParsingSpeaker, msg); });
}
/// <summary>
/// 写入任务异常
/// </summary>
/// <param name="taskID"></param>
/// <param name="errorMessage"></param>
/// <returns></returns>
public static async Task<bool> SetTaskErrorMessage(long taskID, string errorMessage)
{
return await DbScoped.SugarScope.Updateable<VideoTask>()
.SetColumns(it => it.ErrorMessage == errorMessage)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
/// <summary>
/// 触发
/// </summary>
/// <param name="key"></param>
/// <param name="taskId"></param>
/// <param name="action"></param>
public static async void TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
{
if (taskId is null) return;
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> " + key + " " + taskId);
if (action is not null)
{
try
{
var tID = long.Parse(taskId);
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key);
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
await DbScoped.SugarScope.Updateable<VideoTask>()
.SetColumns(it => it.LastEnum == key)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
await action(taskId);
}
catch (Exception ex)
{
//执行任务时出现异常
var error = ex.Message + ex.StackTrace;
await SetTaskErrorMessage(long.Parse(taskId), error);
Console.WriteLine("====================[出现异常]====================");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
Console.WriteLine("==============================================");
}
}
else
{
Console.WriteLine(key + " 任务函数 未实现");
}
}
}
}