Learn.VideoAnalysis/VideoAnalysisCore/Common/RedisExpand.cs

660 lines
25 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using FreeRedis.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.IdentityModel.Tokens;
using NetTaste;
using Newtonsoft.Json.Schema;
using SqlSugar.IOC;
using System;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Xml.Linq;
using UserCenter.Model.Enum;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.GPT;
using VideoAnalysisCore.AICore.GPT.Dto;
//using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Dto;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
/// <summary>
/// redis key
/// </summary>
public static class RedisExpandKey
{
/// <summary>
/// 基础key
/// </summary>
public const string BaseKey = "VideoAnalysis:";
/// <summary>
/// 基础Channel key
/// </summary>
public const string ChannelKey = BaseKey + "TaskChannel";
/// <summary>
/// 下载文件
/// </summary>
public const string DownloadFile = ChannelKey + "DownloadFile";
/// <summary>
/// 分离音频
/// </summary>
public const string SeparateAudio = ChannelKey + "SeparateAudio";
/// <summary>
/// 解析字幕
/// </summary>
public const string ParsingCaptions = ChannelKey + "ParsingCaptions";
/// <summary>
/// 解析说话人
/// </summary>
public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker";
/// <summary>
/// Chat模型分析
/// </summary>
public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis";
/// <summary>
/// 任务数组
/// </summary>
public const string TaskArr = BaseKey + "TaskArr";
/// <summary>
/// 任务日志缓存
/// </summary>
public static string TaskLog => BaseKey + "TaskLog:" + AppCommon.Config.ID;
/// <summary>
/// 任务对象地址
/// </summary>
public static string Task(object taskId) => BaseKey + "Info:" + taskId;
public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID;
public static string TaskGPT(object taskId) => BaseKey + "GPTCached:" + taskId;
/// <summary>
/// 初始化 redis
/// <para>需要在初始化配置文件时候调用</para>
/// </summary>
public static void AddTaskSubscribe(this IServiceCollection service)
{
Console.WriteLine($"{DateTime.Now}=>初始化 Redis任务队列");
service.AddSingleton<RedisInit>();
}
/// <summary>
/// redis连接拓展(包含消息队列任务)
/// </summary>
/// <param name="service"></param>
public static void AddRedisExpand(this IServiceCollection service)
{
Console.WriteLine($"{DateTime.Now}=>初始化 Redis");
var redis = new RedisClient(AppCommon.Config.Redis.ConnectionString);
redis.Serialize = obj =>
JsonSerializer.Serialize(obj);
redis.Deserialize = (json, type) =>
JsonSerializer.Deserialize(json, type);
service.AddSingleton(redis);
service.AddSingleton<RedisManager>();
}
}
public class RedisInit
{
public FFMPGEHandle FFMPGE { get; set; }
public SenseVoice senseVoice { get; set; }
public FunASRNano funASRNano { get; set; }
public RedisManager redisManager { get; set; }
public RedisInit(FFMPGEHandle fFMPGE, SenseVoice senseVoice, RedisManager redisManager, FunASRNano funASRNano)
{
FFMPGE = fFMPGE;
this.senseVoice = senseVoice;
this.funASRNano = funASRNano;
this.redisManager = redisManager;
Init();
redisManager.InitChannel();
}
public void Init()
{
var SubscribeList = RedisManager.SubscribeList;
SubscribeList.Add(RedisChannelEnum., async (task) =>
{
await Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum., async (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<DownloadFile>() is null)
throw new Exception("DownloadFile 未注入");
else
await scope.ServiceProvider.GetService<DownloadFile>()?.RunTask(task);
});
SubscribeList.Add(RedisChannelEnum., FFMPGE.RunAsync);
SubscribeList.Add(RedisChannelEnum., senseVoice.RunTask);
//SubscribeList.Add(RedisChannelEnum.解析字幕, funASRNano.RunTask);
//SubscribeList.Add(RedisChannelEnum.解析说话人,Speaker.Run);
SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<IBserGPTWorkflow>() is null)
throw new Exception("IBserGPT 未注入");
else
await scope.ServiceProvider.GetService<IBserGPTWorkflow>()?.GetVideoType(task);
});
SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<IBserGPTWorkflow>() is null)
throw new Exception("IBserGPT 未注入");
else
await scope.ServiceProvider?.GetService<IBserGPTWorkflow>()?.GetKnow(task);
});
SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<IBserGPTWorkflow>() is null)
throw new Exception("IBserGPT 未注入");
else
await scope.ServiceProvider?.GetService<IBserGPTWorkflow>()?.GetVideoQuestion(task);
});
SubscribeList.Add(RedisChannelEnum., redisManager.TaskEnd);
}
}
/// <summary>
/// redis拓展
/// </summary>
public class RedisManager
{
public static bool StopTask { get; set; } = false;
public static Dictionary<RedisChannelEnum, Func<string, Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
/// <summary>
/// 正在后台运行的任务集合
/// </summary>
public static ConcurrentDictionary<string, Task> RunningTasks = new ConcurrentDictionary<string, Task>();
private static CancellationTokenSource? _cts;
private static Task? _workerTask;
public RedisClient Redis { get; set; }
public Repository<VideoTask> videoTaskDB { get; set; }
public Repository<TaskLog> taskLogDB { get; set; }
public RedisManager(RedisClient redis, Repository<VideoTask> videoTaskDB, Repository<TaskLog> taskLogDB)
{
Redis = redis;
this.videoTaskDB = videoTaskDB;
this.taskLogDB = taskLogDB;
}
/// <summary>
/// 缓存GPT任务缓存
/// </summary>
/// <param name="taskId"></param>
public void SetTaskGPTCached(object taskId, string time, object? data)
{
Redis.Set(RedisExpandKey.TaskGPT(taskId) + ":" + time, data, timeoutSeconds: 3600 * 24);
}
/// <summary>
/// 加入到消费队列
/// </summary>
/// <param name="taskIds"></param>
public void JoinQueue(params long[] taskIds)
{ //事务
if (taskIds is null || taskIds.Length == 0)
return;
using (var tran = Redis.Multi())
{
foreach (var item in taskIds)
tran.LPush(RedisExpandKey.ChannelKey, item);
tran.Exec();
}
}
/// <summary>
/// 添加日志
/// </summary>
/// <param name="taskId">任务id</param>
/// <param name="msg">内容</param>
public async Task AddTaskLog(object taskId, string msg)
{
#if DEBUG
Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} \r\n{msg}\r\n");
#endif
await Redis.RPushAsync(RedisExpandKey.TaskLog,
new TaskLog()
{
VideoTaskId = long.Parse(taskId.ToString()),
CreateTime = DateTime.Now,
Message = msg
});
var count = 50;
lock (RedisExpandKey.TaskLog)
{
var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog);
if (oldTaskCount > count)
{
try
{
var insertData = Redis.LRange<TaskLog>(RedisExpandKey.TaskLog, 0, count -1);
taskLogDB.AsInsertable(insertData).ExecuteCommand();
//同步删除redis
Redis.LTrim(RedisExpandKey.TaskLog, count, 1000);
}
catch (Exception ex)
{
Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
}
}
}
}
/// <summary>
/// 获取任务进度
/// </summary>
/// <param name="taskId"></param>
public float GetTaskProgress(object taskId)
{
return Redis.HMGet<float>(RedisExpandKey.Task(taskId), "Progress")[0];
}
/// <summary>
/// 设置任务进度
/// </summary>
/// <param name="p">进度百分比</param>
/// <param name="taskId"></param>
public void SetTaskProgress(object taskId, object p)
{
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString());
}
/// <summary>
/// 将任务 插入 队列
/// </summary>
/// <param name="enum">枚举</param>
/// <param name="taskId">任务id</param>
public async Task InsertChannel(RedisChannelEnum @enum, object taskId)
{
if (taskId is null) throw new Exception("taskId为空");
if (Redis is null) throw new Exception("redis未初始化");
var tId = taskId.ToString();
await AddTaskLog(tId, "==> 开始执行任务 ");
await ProcessTaskFlow(@enum, taskId, tId);
}
private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId)
{
try
{
// 确保有初始校验
if (!SubscribeList.ContainsKey(currentStep))
throw new Exception($"{currentStep} 未实现");
while (true)
{
if (StopTask)
{
await AddTaskLog(tId, "==> 手动停止任务 ");
return;
}
// 1. 记录步骤开始时间
await UpdateStepTimeAsync(taskId, currentStep);
// 2. 执行当前步骤业务逻辑
await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
// 3. 准备下一步
var nextStepNullable = currentStep.NextEnum();
if (nextStepNullable == null) break; // 流程结束
var nextStep = nextStepNullable.Value;
// 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理
if (currentStep == RedisChannelEnum.)
{
DispatchBackgroundFlow(nextStep, taskId, tId);
return; // 释放当前主控线程
}
// 5. 继续循环
currentStep = nextStep;
}
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(tId), ex);
}
finally
{
// 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间
// 注意:如果是分流退出,这里也会执行,保证 key 活跃
await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
}
}
/// <summary>
/// 更新任务步骤时间
/// </summary>
private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step)
{
// 获取现有时间字典(如果不存在则新建)
// 注意HMGet 返回的是数组,取第一个元素
var result = await Redis.HMGetAsync<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime");
var startTime = result?.FirstOrDefault() ?? new Dictionary<RedisChannelEnum, DateTime>();
// 更新时间
startTime[step] = DateTime.Now;
// 写回 Redis
await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime);
}
/// <summary>
/// 分发后续任务到动态线程池
/// </summary>
private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId)
{
var bgTask = Task.Run(async () =>
{
try
{
await ProcessTaskFlow(startStep, taskId, tId);
}
finally
{
RunningTasks.TryRemove(tId, out _);
}
});
RunningTasks.TryAdd(tId, bgTask);
}
public async Task TaskEnd(string task)
{
var tId = long.Parse(task);
//var gptRes = (await Redis
// .HMGetAsync<TaskRes>(RedisExpandKey.Task(task), "ChatAnalysis")).FirstOrDefault();
//if (gptRes is null)
// throw new Exception("未能读取到GPT处理结果");
//删除任务执行状态
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, task);
var taskData = await videoTaskDB
.GetFirstAsync(s => s.Id == tId);
if (taskData.Captions == "[]")
taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First();
//if (taskData.Speaker == "[]")
// taskData.Speaker = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Speaker"))?.FirstOrDefault()??"[]";
//未使用结果暂时屏蔽
//taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes);
taskData.ChatAnalysisScore = 0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.;
taskData.EndTime = DateTime.Now;
await videoTaskDB.AsUpdateable(taskData)
.UpdateColumns(it => new
{
//it.ChatAnalysis,
it.Captions,
it.Speaker,
it.ChatAnalysisScore,
it.ErrorMessage,
it.TotalTokens,
it.LastEnum,
it.EndTime
}).ExecuteCommandAsync();
try
{
await ExpandFunction.DeleteTaskFileAsync(tId, this);
}
catch (Exception)
{
throw;
}
//NewTask();
}
/// <summary>
/// 初始化 队列 任务
/// </summary>
public async Task InitChannel()
{
if (Redis is null) throw new Exception("redis未初始化");
//处理之前程序结束前未能执行完的情况
var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
//重试任务并发过多可能会导致程序崩溃
// 未能重新分析的中断任务 则单独开一个网页来处理
if (oldTaskCount > 0)
{
//获取所有未完成的任务
var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1);
Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复...");
//使用信号量限制并发数(5),防止崩溃
using var semaphore = new System.Threading.SemaphoreSlim(5);
var retryTaskArr = new List<Task>();
foreach (var oldTask in oldTaskArr)
{
await semaphore.WaitAsync();
var res = Task.Run(async () =>
{
try
{
await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask);
await ClearTaskError(long.Parse(oldTask));
var lastEnum = (await Redis.HMGetAsync<RedisChannelEnum>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
await InsertChannel(lastEnum, oldTask);
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(oldTask), ex);
Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
}
finally
{
semaphore.Release();
}
});
retryTaskArr.Add(res);
}
//等待所有 重试任务完成后接收新任务
await Task.WhenAll(retryTaskArr);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!");
ReceivingTaskAsync();
}
else
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!");
ReceivingTaskAsync();
}
}
/// <summary>
/// 停止接收新任务
/// </summary>
public void StopTaskAsync()
{
StopTask = true;
try
{
_cts?.Cancel();
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 开始接收新任务
/// </summary>
public void RestartTask()
{
StopTask = false;
NewTask();
}
/// <summary>
/// 重新执行新任务
/// </summary>
/// <returns></returns>
public void NewTask()
{
// 取消 消费机的任务订阅
if (StopTask)
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收任务已经暂停 ");
return;
}
ReceivingTaskAsync();
}
/// <summary>
/// 重新接收新任务
/// </summary>
public void ReceivingTaskAsync()
{
if (AppCommon.Config.TaskSetting.IS_Server)
{
Console.WriteLine($"{DateTime.Now} =>服务端不接收任务");
return;
}
lock (Redis)
{
// 如果任务正在运行且未完成,直接返回
if (_workerTask != null && !_workerTask.IsCompleted)
return;
_cts = new CancellationTokenSource();
var token = _cts.Token;
_workerTask = Task.Run(async () =>
{
Console.WriteLine($"{DateTime.Now} => 开始监听任务队列...");
while (!token.IsCancellationRequested && !StopTask)
{
try
{
// 使用 BLPop 阻塞式获取任务超时5秒以便检查取消状态
var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5);
if (!string.IsNullOrEmpty(taskId))
{
Redis.LPush(RedisExpandKey.IDTask, taskId);
await AddTaskLog(taskId, "-------------> 接收到任务 ");
// await等待任务处理完成确保串行执行
await InsertChannel(RedisChannelEnum., taskId);
}
}
catch (Exception ex)
{
Console.WriteLine($"任务监听异常: {ex.Message}");
await Task.Delay(2000);
}
}
Console.WriteLine($"{DateTime.Now} => 停止监听任务队列.");
}, token);
}
}
/// <summary>
/// 写入任务异常
/// </summary>
/// <param name="taskID"></param>
/// <param name="ex"></param>
/// <returns></returns>
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
{
var error = string.Empty;
if (ex != null)
{
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString());
//执行任务时出现异常
error = ex.Message + ex.StackTrace;
await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
//清除失败任务 重新接收任务
NewTask();
}
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清除 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <returns></returns>
public async Task<bool> ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty);
/// <summary>
/// 修改任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public async Task<bool> SetTaskError(long taskID, string? error)
{
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
return await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
/// <summary>
/// 触发
/// </summary>
/// <param name="key"></param>
/// <param name="taskId"></param>
/// <param name="action"></param>
public async Task TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
{
if (taskId is null) return;
var tID = long.Parse(taskId);
if (action is not null)
{
var tryCount = 1;
for (int i = 0; i < tryCount; i++)
{
await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId);
try
{
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key);
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.LastEnum == key)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
await action(taskId);
return;
}
catch (Exception ex)
{
await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
Thread.Sleep(1000);
await AddTaskLog(taskId, "稍后后重试." + key + " " + taskId);
if (i + 1 == tryCount)
throw;
}
}
}
else
{
await AddTaskLog(taskId, "任务函数 未实现." + key);
}
}
}
}