Learn.VideoAnalysis/VideoAnalysisCore/Common/RedisExpand.cs

207 lines
7.3 KiB
C#
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using FreeRedis.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.IdentityModel.Tokens;
using NetTaste;
using Newtonsoft.Json.Schema;
using SqlSugar.IOC;
using System;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Xml.Linq;
using UserCenter.Model.Enum;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.GPT;
using VideoAnalysisCore.AICore.GPT.Dto;
//using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Dto;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
/// <summary>
/// redis key
/// </summary>
public static class RedisExpandKey
{
/// <summary>
/// 基础key
/// </summary>
public const string BaseKey = "VideoAnalysis:";
/// <summary>
/// 基础Channel key
/// </summary>
public const string ChannelKey = BaseKey + "TaskChannel";
/// <summary>
/// TidySlide 工作流 Channel key
/// </summary>
public const string TidySlideChannelKey = BaseKey + "TidySlideTaskChannel";
/// <summary>
/// 下载文件
/// </summary>
public const string DownloadFile = ChannelKey + "DownloadFile";
/// <summary>
/// 分离音频
/// </summary>
public const string SeparateAudio = ChannelKey + "SeparateAudio";
/// <summary>
/// 解析字幕
/// </summary>
public const string ParsingCaptions = ChannelKey + "ParsingCaptions";
/// <summary>
/// 解析说话人
/// </summary>
public const string ParsingSpeaker = ChannelKey + "ParsingSpeaker";
/// <summary>
/// Chat模型分析
/// </summary>
public const string ChatModelAnalysis = ChannelKey + "ChatModelAnalysis";
/// <summary>
/// 任务数组
/// </summary>
public const string TaskArr = BaseKey + "TaskArr";
/// <summary>
/// 任务日志缓存
/// </summary>
public static string TaskLog => BaseKey + "TaskLog:" + AppCommon.Config.ID;
/// <summary>
/// 任务对象地址
/// </summary>
public static string Task(object taskId) => BaseKey + "Info:" + taskId;
public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID;
/// <summary>
/// 在线设备Key集合 (已弃用,直接扫描 Heartbeat)
/// </summary>
// public static string OnlineDevices => BaseKey + "OnlineDevices";
/// <summary>
/// 设备心跳Key前缀 (VideoAnalysis:Heartbeat:{DeviceId})
/// </summary>
public static string DeviceHeartbeat(string deviceId) => BaseKey + "Heartbeat:" + deviceId;
public static string TaskGPT(object taskId) => BaseKey + "GPTCached:" + taskId;
/// <summary>
/// 初始化 redis
/// <para>需要在初始化配置文件时候调用</para>
/// </summary>
public static void AddTaskSubscribe(this IServiceCollection service)
{
Console.WriteLine($"{DateTime.Now}=>初始化 Redis任务队列");
service.AddSingleton<RedisInit>();
}
/// <summary>
/// redis连接拓展(包含消息队列任务)
/// </summary>
/// <param name="service"></param>
public static void AddRedisExpand(this IServiceCollection service)
{
Console.WriteLine($"{DateTime.Now}=>初始化 Redis");
var redis = new RedisClient(AppCommon.Config.Redis.ConnectionString);
redis.Serialize = obj =>
JsonSerializer.Serialize(obj);
redis.Deserialize = (json, type) =>
JsonSerializer.Deserialize(json, type);
service.AddSingleton(redis);
service.AddSingleton<RedisManager>();
service.AddVideoSliceWorkflow();
service.AddTidySlideWorkflow();
// 注册心跳 Job
// service.AddTransient<DeviceHeartbeatJob>(); // 迁移到 CoravelExpand 中统一管理
// service.AddTransient<TaskFileClearJob>();
// service.AddTransient<ClearAllCacheJob>();
}
}
public class RedisInit
{
public RedisInit(IServiceProvider serviceProvider)
{
serviceProvider.GetService<VideoSliceWorkflowInit>();
serviceProvider.GetService<TidySlideWorkflowInit>();
// serviceProvider.GetService<RedisManager>().InitChannel(); // 已废弃,由各工作流自行初始化
}
}
/// <summary>
/// redis拓展
/// </summary>
public class RedisManager
{
public static bool StopTask { get; set; } = false;
public static Dictionary<RedisChannelEnum, Func<string, Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
/// <summary>
/// 正在后台运行的任务集合
/// </summary>
public static ConcurrentDictionary<string, Task> RunningTasks = new ConcurrentDictionary<string, Task>();
private static CancellationTokenSource? _cts;
private static Task? _workerTask;
public RedisClient Redis { get; set; }
public Repository<VideoTask> videoTaskDB { get; set; }
public Repository<TaskLog> taskLogDB { get; set; }
public RedisManager(RedisClient redis, Repository<VideoTask> videoTaskDB, Repository<TaskLog> taskLogDB)
{
Redis = redis;
this.videoTaskDB = videoTaskDB;
this.taskLogDB = taskLogDB;
}
/// <summary>
/// 缓存GPT任务缓存
/// </summary>
/// <param name="taskId"></param>
public void SetTaskGPTCached(object taskId, string time, object? data)
{
Redis.Set(RedisExpandKey.TaskGPT(taskId) + ":" + time, data, timeoutSeconds: 3600 * 24);
}
/// <summary>
/// 加入到消费队列
/// </summary>
/// <param name="taskIds"></param>
public void JoinQueue(params long[] taskIds)
{ //事务
if (taskIds is null || taskIds.Length == 0)
return;
using (var tran = Redis.Multi())
{
foreach (var item in taskIds)
tran.LPush(RedisExpandKey.ChannelKey, item);
tran.Exec();
}
}
// AddTaskLog 已迁移至 WorkflowBase
/// <summary>
/// 获取任务进度
/// </summary>
/// <param name="taskId"></param>
/// <param name="workflowName">工作流名称(可选)</param>
public string GetTaskProgress(object taskId, string workflowName = "VideoSliceWorkflow")
{
var fieldName = workflowName == "VideoSliceWorkflow" ? "Progress" : $"Progress:{workflowName}";
return Redis.HMGet<string>(RedisExpandKey.Task(taskId), fieldName)[0] ?? "";
}
// 注意SetTaskProgress, TaskEnd, SetTaskErrorMessage, ClearTaskError, SetTaskError
// 已迁移至 WorkflowBase此处移除或标记为已废弃
}
}