using FreeRedis;
using FreeRedis.Internal;
using Microsoft.Extensions.DependencyInjection;
using NetTaste;
using Newtonsoft.Json.Schema;
using SqlSugar.IOC;
using System;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Xml.Linq;
using UserCenter.Model.Enum;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.GPT;
using VideoAnalysisCore.AICore.GPT.Dto;
//using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Dto;
using VideoAnalysisCore.Model.Enum;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace VideoAnalysisCore.Common
{
///
/// redis key
///
public static class RedisExpandKey
{
///
/// 基础key
///
public const string BaseKey = "VideoAnalysis:";
///
/// 基础Channel key
///
public const string ChannelKey = BaseKey + "TaskChannel";
///
/// 下载文件
///
public const string DownloadFile = ChannelKey + "DownloadFile";
///
/// 分离音频
///
public const string SeparateAudio = ChannelKey + "SeparateAudio";
///
/// 解析字幕
///
public const string ParsingCaptions = ChannelKey + "ParsingCaptions";
///
/// 解析说话人
///
public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker";
///
/// Chat模型分析
///
public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis";
///
/// 任务数组
///
public const string TaskArr = BaseKey + "TaskArr";
///
/// 任务对象地址
///
public static string Task(object taskId) => BaseKey + "Task:" + taskId;
public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID;
public static string TaskGPT(object taskId) => Task(taskId) + ":GPTCached";
}
///
/// redis拓展
///
public static class RedisExpand
{
///
/// redis 连接
///
public static RedisClient Redis = new RedisClient(AppCommon.Config.Redis.ConnectionString);
public static Dictionary> SubscribeList = new Dictionary>();
///
/// 队列池
///
static SubscribeListObject? Subscribe;
///
/// 初始化 redis
/// 需要在初始化配置文件时候调用
///
public static void AddRedisExpand(this IServiceCollection service)
{
Console.WriteLine($"{DateTime.Now}=>初始化 Redis");
Redis.Serialize = obj => JsonSerializer.Serialize(obj);
Redis.Deserialize = (json, type) => JsonSerializer.Deserialize(json, type);
Task.Run(() =>
{
Thread.Sleep(1000 * 10);
InitChannel();
});
}
///
/// 缓存GPT任务缓存
///
///
public static void SetTaskGPTCached(object taskId,string time, object? data)
{
Redis.Set(RedisExpandKey.TaskGPT(taskId) + ":" + time, data, 3600 * 24);
}
///
/// 加入到消费队列
///
///
public static void JoinQueue(params long[] taskIds)
{ //事务
if (taskIds is null || taskIds.Length == 0)
return;
using (var tran = Redis.Multi())
{
foreach (var item in taskIds)
tran.LPush(RedisExpandKey.ChannelKey, item);
tran.Exec();
}
}
///
/// 获取任务进度
///
///
public static float GetTaskProgress(object taskId)
{
return Redis.HMGet(RedisExpandKey.Task(taskId), "Progress")[0];
}
///
/// 设置任务进度
///
/// 进度百分比
///
public static void SetTaskProgress(object taskId, object p)
{
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString());
}
///
/// 将任务 插入 队列
///
/// 枚举
/// 任务id
public static async Task InsertChannel(RedisChannelEnum @enum, object taskId)
{
if (taskId is null) throw new Exception("taskId为空");
if (Redis is null) throw new Exception("redis未初始化");
var startTime = Redis.HMGet>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault();
if (startTime is null)
startTime = new Dictionary();
if (!SubscribeList.ContainsKey(@enum))
throw new Exception(@enum + " 未实现");
var tId = taskId.ToString();
try
{
while (true)
{
if (!startTime.ContainsKey(@enum))
startTime.Add(@enum, DateTime.Now);
else
startTime[@enum] = DateTime.Now;
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime);
await TouchChannel(@enum, tId, SubscribeList[@enum]);
//await SubscribeList[@enum](tId);
var e = @enum.NextEnum();
if (e is null)
break;
@enum = e.Value;
}
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(tId), ex);
}
}
public static async Task TaskEnd(string task)
{
var tId = long.Parse(task);
//var gptRes = (await Redis
// .HMGetAsync(RedisExpandKey.Task(task), "ChatAnalysis")).FirstOrDefault();
//if (gptRes is null)
// throw new Exception("未能读取到GPT处理结果");
//删除任务执行状态
await Redis.HDelAsync(RedisExpandKey.IDTask,task);
var taskData = await DbScoped.Sugar.Queryable()
.FirstAsync(s => s.Id == tId);
if (taskData.Captions == "[]")
taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First();
//if (taskData.Speaker == "[]")
// taskData.Speaker = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Speaker"))?.FirstOrDefault()??"[]";
//未使用结果暂时屏蔽
//taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes);
taskData.ChatAnalysisScore =0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.结束任务;
taskData.EndTime = DateTime.Now;
await DbScoped.Sugar.Updateable(taskData)
.UpdateColumns(it => new
{
//it.ChatAnalysis,
it.Captions,
it.Speaker,
it.ChatAnalysisScore,
it.ErrorMessage,
it.TotalTokens,
it.LastEnum,
it.EndTime
}).ExecuteCommandAsync();
//NewTask();
}
///
/// 初始化 队列 任务
///
public static async void InitChannel()
{
if (Redis is null) throw new Exception("redis未初始化");
SubscribeList.Add(RedisChannelEnum.下载文件, (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService() is null)
throw new Exception("DownloadFile 未注入");
else
return scope.ServiceProvider.GetService()?.RunTask(task) ?? Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum.分离音频, FFMPGEHandle.RunAsync);
SubscribeList.Add(RedisChannelEnum.解析字幕, SenseVoice.RunTask);
//SubscribeList.Add(RedisChannelEnum.解析说话人,Speaker.Run);
SubscribeList.Add(RedisChannelEnum.AI课程类型,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService() is null)
throw new Exception("IBserGPT 未注入");
else
return scope.ServiceProvider.GetService()?.GetVideoType(task) ?? Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum.AI模型分析, (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService() is null)
throw new Exception("IBserGPT 未注入");
else
return scope.ServiceProvider.GetService()?.GetKnow(task) ?? Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum.AI分析试题, (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService() is null)
throw new Exception("IBserGPT 未注入");
else
return scope.ServiceProvider.GetService()?.GetVideoQuestion(task) ?? Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum.结束任务, TaskEnd);
ReceivingTaskAsync();
}
///
/// 重新执行新任务
///
///
public static async void NewTaskAsync()
{
await Redis.DelAsync(RedisExpandKey.IDTask);
ReceivingTaskAsync();
}
///
/// 重新接收新任务
///
public static void ReceivingTaskAsync()
{
if (AppCommon.Config.TaskSetting.IS_Server)
{
Console.WriteLine($"{DateTime.Now} =>服务端不接收任务");
return;
}
Task.Run(async () =>
{
var oldTask = await Redis.GetAsync(RedisExpandKey.IDTask);
if (!string.IsNullOrEmpty(oldTask))
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收重试任务 " + oldTask);
await ClearTaskError(long.Parse(oldTask));
var lastEnum = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
await InsertChannel(lastEnum, oldTask);
return;
}
if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露
return;
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) =>
{
if (taskId is null) return;
Subscribe?.Dispose();//取消接收任务监听
//存储当前机器的任务
Redis.Set(RedisExpandKey.IDTask, taskId);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收到任务 " + taskId);
await InsertChannel(RedisChannelEnum.下载文件, taskId);
});
});
}
///
/// 写入任务异常
///
///
///
///
public static async Task SetTaskErrorMessage(long taskID, Exception? ex)
{
var error = string.Empty;
if (ex != null)
{
//执行任务时出现异常
error = ex.Message + ex.StackTrace;
Console.WriteLine("====================[出现异常]====================");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
Console.WriteLine("==============================================");
//清除失败任务 重新接收任务
NewTaskAsync();
}
return await SetTaskError(taskID, error);
}
///
/// 清除 任务的错误信息
///
///
///
public static async Task ClearTaskError(long taskID) =>await SetTaskError(taskID, string.Empty);
///
/// 修改任务的错误信息
///
///
///
///
public static async Task SetTaskError(long taskID, string? error)
{
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
return await DbScoped.Sugar.Updateable()
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
///
/// 触发
///
///
///
///
public static async Task TouchChannel(RedisChannelEnum key, string taskId, Func action = null)
{
if (taskId is null) return;
var tID = long.Parse(taskId);
if (action is not null)
{
var tryCount = 1;
for (int i = 0; i < tryCount; i++)
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 开始执行 " + key + " " + taskId);
try
{
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key);
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
lock (Redis)
{
DbScoped.SugarScope.Updateable()
.SetColumns(it => it.LastEnum == key)
.Where(it => it.Id == tID)
.ExecuteCommand();
}
await action(taskId);
return;
}
catch (Exception ex)
{
Console.WriteLine("====================[出现异常]====================");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
Console.WriteLine("==============================================");
Thread.Sleep(1000);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 稍后后重试." + key + " " + taskId);
if (i+1== tryCount)
throw;
}
}
}
else
{
Console.WriteLine(key + " 任务函数 未实现");
}
}
}
}