Learn.VideoAnalysis/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs

176 lines
7.1 KiB
C#

using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.AICore.GPT;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Common.Expand;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
/// <summary>
/// AI视频切片工作流
/// </summary>
public static class VideoSliceWorkflowExpand
{
public static void AddVideoSliceWorkflow(this IServiceCollection services)
{
if (AppCommon.Config.Workflow.Default.Enabled)
{
Console.WriteLine($"{DateTime.Now}=>初始化 AI切片工作流");
services.AddSingleton<VideoSliceWorkflowManager>();
services.AddSingleton<VideoSliceWorkflowInit>();
}
}
}
public class VideoSliceWorkflowInit
{
private readonly VideoSliceWorkflowManager _manager;
private readonly IServiceProvider _serviceProvider;
private readonly FFMPGEHandle _ffmpeg;
private readonly SenseVoice _senseVoice;
public VideoSliceWorkflowInit(VideoSliceWorkflowManager manager, IServiceProvider serviceProvider, FFMPGEHandle ffmpeg, SenseVoice senseVoice)
{
_manager = manager;
_serviceProvider = serviceProvider;
_ffmpeg = ffmpeg;
_senseVoice = senseVoice;
Init();
_manager.InitChannel();
}
public void Init()
{
var SubscribeList = _manager.SubscribeList;
SubscribeList.Add(RedisChannelEnum., async (task) => await Task.CompletedTask);
SubscribeList.Add(RedisChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var downloadService = scope.ServiceProvider.GetService<DownloadFile>();
if (downloadService is null) throw new Exception("DownloadFile 未注入");
await downloadService.RunTask(task, "VideoSliceWorkflow");
});
SubscribeList.Add(RedisChannelEnum., _ffmpeg.RunAsync);
SubscribeList.Add(RedisChannelEnum., _senseVoice.RunTask);
SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IBserGPTWorkflow>();
if (service is null) throw new Exception("IBserGPT 未注入");
await service.GetVideoType(task);
});
SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IBserGPTWorkflow>();
if (service is null) throw new Exception("IBserGPT 未注入");
await service.GetKnow(task);
});
SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IBserGPTWorkflow>();
if (service is null) throw new Exception("IBserGPT 未注入");
await service.GetVideoQuestion(task);
});
SubscribeList.Add(RedisChannelEnum., _manager.TaskEnd);
}
}
public class VideoSliceWorkflowManager : WorkflowBase<RedisChannelEnum>
{
public VideoSliceWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
{
}
protected override string ChannelKey => RedisExpandKey.ChannelKey;
protected override int Concurrency => AppCommon.Config.Workflow.Default.Concurrency;
protected override string WorkflowName => "VideoSliceWorkflow"; // 显式指定,避免重构改名风险
/// <summary>
/// 重写状态更新逻辑:保持兼容性,继续更新 VideoTask.LastEnum
/// </summary>
protected override async Task UpdateTaskStateAsync(string taskId, RedisChannelEnum step)
{
var tID = long.Parse(taskId);
// 1. 调用基类方法,更新 VideoTaskWorkflow 表 (可选,如果想双写)
await base.UpdateTaskStateAsync(taskId, step);
// 2. 更新旧的 VideoTask 表,保持前端兼容
using var scope = AppCommon.Services.CreateScope();
var vDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (vDB != null)
{
await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.LastEnum == step)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
}
}
protected override async Task HandleSpecialFlowAsync(RedisChannelEnum currentStep, RedisChannelEnum nextStep, string taskId)
{
// 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理
if (currentStep == RedisChannelEnum.)
{
await DispatchBackgroundFlow(nextStep, taskId, taskId);
throw new WorkflowFlowSwitchException(); // 抛出异常以中断当前流程(基类捕获)
}
await Task.CompletedTask;
}
public override async Task TaskEnd(string task)
{
var tId = long.Parse(task);
await base.TaskEnd(task);
// 原 RedisManager.TaskEnd 逻辑迁移至此
using var scope = AppCommon.Services.CreateScope();
var videoTaskDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (videoTaskDB == null) return;
var taskData = await videoTaskDB.CopyNew().GetFirstAsync(s => s.Id == tId);
if (taskData.Captions == "[]")
taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First();
taskData.ChatAnalysisScore = 0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.;
taskData.EndTime = DateTime.Now;
await videoTaskDB.CopyNew().AsUpdateable(taskData)
.UpdateColumns(it => new
{
it.Captions,
it.Speaker,
it.ChatAnalysisScore,
it.ErrorMessage,
it.TotalTokens,
it.LastEnum,
it.EndTime
}).ExecuteCommandAsync();
try
{
await ExpandFunction.DeleteTaskAllFileAsync(tId, this);
}
catch (Exception)
{
throw;
}
}
}
}