diff --git a/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs b/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs
index f79d884..67a56f8 100644
--- a/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs
+++ b/VideoAnalysisCore/AICore/FFMPGE/FFMPGEHandle.cs
@@ -1,4 +1,4 @@
-using FFmpeg.NET.Events;
+using FFmpeg.NET.Events;
using FFmpeg.NET;
using VideoAnalysisCore.AICore.SherpaOnnx;
using VideoAnalysisCore.Common;
@@ -208,5 +208,55 @@ namespace VideoAnalysisCore.AICore.FFMPGE
}
+ ///
+ /// 合并音频和视频并切片
+ ///
+ ///
+ ///
+ public async Task MergeAndSliceAsync(string task)
+ {
+ var taskID = long.Parse(task);
+ var localPath = task.LocalPath();
+ var pptPath = Path.Combine(localPath, "ppt.mp4");
+ var taskPath = Path.Combine(localPath, "task.mp4");
+ var mergedPath = Path.Combine(localPath, "merged.mp4");
+ var m3u8Path = Path.Combine(localPath, "out.m3u8");
+
+ if (!File.Exists(pptPath)) throw new FileNotFoundException("PPT视频文件未找到", pptPath);
+ if (!File.Exists(taskPath)) throw new FileNotFoundException("任务视频文件未找到", taskPath);
+
+ var ffmpeg = new Engine(FFmpegPath);
+ var cToken = new CancellationToken();
+
+ // 1. 合并 PPT视频(画面) + 任务视频(音频) -> merged.mp4
+ // -map 0:v 取第一个输入(ppt)的视频流
+ // -map 1:a 取第二个输入(task)的音频流
+ // -c:v copy 复制视频流不转码
+ // -c:a aac 音频转码为aac (兼容性好)
+ // -strict experimental 允许使用aac
+ // -shortest 以最短的流为准
+ var mergeArgs = $"-i \"{pptPath}\" -i \"{taskPath}\" -map 0:v -map 1:a -c:v copy -c:a aac -strict experimental -shortest \"{mergedPath}\" -y";
+
+ await redisManager.AddTaskLog(task, "开始合并视频与音频...");
+ await ffmpeg.ExecuteAsync(mergeArgs, cToken);
+
+ if (!File.Exists(mergedPath)) throw new Exception("视频合并失败");
+
+ // 2. 切片 merged.mp4 -> out.m3u8
+ // -c copy 直接复制流 (因为上一步已经是 mp4/aac)
+ // -f hls HLS格式
+ // -hls_time 10 切片时长10秒
+ // -hls_list_size 0 包含所有切片
+ // -hls_segment_filename out%03d.ts 切片文件名
+ var sliceArgs = $"-i \"{mergedPath}\" -c copy -f hls -hls_time 10 -hls_list_size 0 -hls_segment_filename \"{Path.Combine(localPath, "out%03d.ts")}\" \"{m3u8Path}\" -y";
+
+ await redisManager.AddTaskLog(task, "开始视频切片...");
+ await ffmpeg.ExecuteAsync(sliceArgs, cToken);
+
+ if (!File.Exists(m3u8Path)) throw new Exception("视频切片失败");
+
+ // 更新任务状态或路径? 目前只需要生成文件
+ await redisManager.AddTaskLog(task, "视频处理完成");
+ }
}
}
diff --git a/VideoAnalysisCore/Common/AppConfig.cs b/VideoAnalysisCore/Common/AppConfig.cs
index 89f9d27..043fbd3 100644
--- a/VideoAnalysisCore/Common/AppConfig.cs
+++ b/VideoAnalysisCore/Common/AppConfig.cs
@@ -1,4 +1,4 @@
-using SqlSugar.IOC;
+using SqlSugar.IOC;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -70,6 +70,21 @@ namespace VideoAnalysisCore.Common
/// 授权配置
///
public AuthKeyConfig AuthKey { get; set; } = new AuthKeyConfig();
+ ///
+ /// 工作流配置
+ ///
+ public WorkflowConfig Workflow { get; set; } = new WorkflowConfig();
+ }
+
+ public class WorkflowConfig
+ {
+ public WorkflowItemConfig Default { get; set; } = new WorkflowItemConfig();
+ public WorkflowItemConfig Upload { get; set; } = new WorkflowItemConfig { Enabled = true };
+ }
+ public class WorkflowItemConfig
+ {
+ public bool Enabled { get; set; } = true;
+ public int Concurrency { get; set; } = 1;
}
public class AuthKeyConfig
diff --git a/VideoAnalysisCore/Common/Expand/UploadExpand.cs b/VideoAnalysisCore/Common/Expand/UploadExpand.cs
new file mode 100644
index 0000000..0b8b3a9
--- /dev/null
+++ b/VideoAnalysisCore/Common/Expand/UploadExpand.cs
@@ -0,0 +1,159 @@
+using AlibabaCloud.SDK.Vod20170321;
+using AlibabaCloud.SDK.Vod20170321.Models;
+using Aliyun.OSS;
+using Microsoft.Extensions.DependencyInjection;
+using SqlSugar.IOC;
+using System;
+using System.IO;
+using System.Text;
+using System.Threading.Tasks;
+using VideoAnalysisCore.Model;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using System.Collections.Generic;
+
+namespace VideoAnalysisCore.Common.Expand
+{
+ public static class UploadExpand
+ {
+ public static void AddUploadExpand(this IServiceCollection services)
+ {
+ services.AddSingleton();
+ }
+ }
+
+ public class UploadHandle
+ {
+ private readonly Client _vodClient;
+ private readonly Repository _videoTaskDB;
+ private readonly RedisManager _redisManager;
+ private readonly OssClient _ossClient; // 使用系统统一注入的 OSS Client
+
+ public UploadHandle(Client vodClient, Repository videoTaskDB, RedisManager redisManager, OssClient ossClient)
+ {
+ _vodClient = vodClient;
+ _videoTaskDB = videoTaskDB;
+ _redisManager = redisManager;
+ _ossClient = ossClient;
+ }
+
+ public async Task RunAsync(string task)
+ {
+ var taskId = long.Parse(task);
+ var localPath = task.LocalPath();
+ var m3u8Path = Path.Combine(localPath, "out.m3u8");
+
+ if (!File.Exists(m3u8Path))
+ {
+ await _redisManager.AddTaskLog(task, "未找到 m3u8 文件,无法进行切片上传");
+ throw new FileNotFoundException("M3U8文件未找到", m3u8Path);
+ }
+
+ // 获取所有切片文件 (out*.ts)
+ var tsFiles = Directory.GetFiles(localPath, "out*.ts");
+ if (tsFiles.Length == 0)
+ {
+ await _redisManager.AddTaskLog(task, "未找到 ts 切片文件");
+ throw new FileNotFoundException("TS切片文件未找到");
+ }
+
+ var title = $"Task_{taskId}_{DateTime.Now:yyyyMMddHHmmss}";
+
+ await _redisManager.AddTaskLog(task, "正在获取VOD上传凭证...");
+
+ // 1. 获取上传凭证和地址
+ // 注意:VOD上传m3u8时,FileName必须以 .m3u8 结尾
+ var request = new CreateUploadVideoRequest
+ {
+ Title = title,
+ FileName = "out.m3u8", // 必须是 m3u8 文件名
+ Description = "Video Analysis HLS Upload",
+ // CoverURL = "...", // 可选:设置封面
+ // Tags = "...", // 可选:设置标签
+ };
+
+ var response = await _vodClient.CreateUploadVideoAsync(request);
+ if (response.Body == null || string.IsNullOrEmpty(response.Body.UploadAddress) || string.IsNullOrEmpty(response.Body.UploadAuth))
+ {
+ throw new Exception($"获取上传凭证失败: RequestId={response.Body?.RequestId}");
+ }
+
+ var videoId = response.Body.VideoId;
+ var uploadAddressStr = response.Body.UploadAddress;
+ var uploadAuthStr = response.Body.UploadAuth;
+
+ await _redisManager.AddTaskLog(task, $"获取凭证成功,VideoId: {videoId}");
+
+ // 2. 解析凭证 (Base64 -> JSON)
+ var addressJson = JObject.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(uploadAddressStr)));
+ var authJson = JObject.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(uploadAuthStr)));
+
+ var endpoint = addressJson["Endpoint"]?.ToString();
+ var bucket = addressJson["Bucket"]?.ToString();
+ var objectName = addressJson["FileName"]?.ToString(); // 这是 VOD 分配的 m3u8 存储路径,例如 "sv/243d.../out.m3u8"
+
+ var accessKeyId = authJson["AccessKeyId"]?.ToString();
+ var accessKeySecret = authJson["AccessKeySecret"]?.ToString();
+ var securityToken = authJson["SecurityToken"]?.ToString();
+
+ if (string.IsNullOrEmpty(endpoint) || string.IsNullOrEmpty(bucket) || string.IsNullOrEmpty(objectName))
+ {
+ throw new Exception("解析上传地址失败");
+ }
+
+ // 修正 Endpoint 格式 (如果缺少协议头)
+ if (!endpoint.StartsWith("http"))
+ {
+ endpoint = "https://" + endpoint;
+ }
+
+ // 3. 构造 OSS 客户端 (使用临时凭证)
+ var ossClient = new OssClient(endpoint, accessKeyId, accessKeySecret, securityToken);
+
+ // 4. 确定 OSS 目录前缀
+ // VOD 返回的 objectName 是完整的文件路径,我们需要提取目录部分来存放 .ts 文件
+ // 例如: objectName = "sv/5903240e-19544975a64/out.m3u8"
+ // 则 prefix = "sv/5903240e-19544975a64/"
+ var ossPrefix = objectName.Substring(0, objectName.LastIndexOf('/') + 1);
+
+ await _redisManager.AddTaskLog(task, $"开始上传文件到 VOD OSS (Bucket: {bucket}, Prefix: {ossPrefix})...");
+
+ try
+ {
+ // A. 上传所有 TS 切片
+ await _redisManager.AddTaskLog(task, $"开始上传 TS 切片 (共 {tsFiles.Length} 个)...");
+ foreach (var tsFile in tsFiles)
+ {
+ var fileName = Path.GetFileName(tsFile);
+ var tsObjectKey = ossPrefix + fileName;
+
+ using var fs = File.OpenRead(tsFile);
+ ossClient.PutObject(bucket, tsObjectKey, fs);
+ }
+
+ // B. 上传 m3u8 索引文件
+ // 必须使用 VOD 指定的 objectName
+ await _redisManager.AddTaskLog(task, "开始上传 m3u8 索引文件...");
+ using (var fs = File.OpenRead(m3u8Path))
+ {
+ ossClient.PutObject(bucket, objectName, fs);
+ }
+
+ await _redisManager.AddTaskLog(task, "上传成功");
+
+ // 5. 更新数据库
+ // 对于 VOD 托管视频,我们主要存储 VideoId (TagId),播放地址通常由前端调用 VOD 接口获取
+ // 或者我们可以尝试获取播放地址存入 MediaUrl
+ await _videoTaskDB.CopyNew().AsUpdateable()
+ .SetColumns(it => it.TagId == videoId)
+ .Where(it => it.Id == taskId)
+ .ExecuteCommandAsync();
+ }
+ catch (Exception ex)
+ {
+ await _redisManager.AddTaskLog(task, $"上传 VOD OSS 异常: {ex.Message}");
+ throw;
+ }
+ }
+ }
+}
diff --git a/VideoAnalysisCore/Common/RedisExpand.cs b/VideoAnalysisCore/Common/RedisExpand.cs
index 165be0d..c22254e 100644
--- a/VideoAnalysisCore/Common/RedisExpand.cs
+++ b/VideoAnalysisCore/Common/RedisExpand.cs
@@ -1,4 +1,4 @@
-using FreeRedis;
+using FreeRedis;
using FreeRedis.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.IdentityModel.Tokens;
@@ -42,6 +42,10 @@ namespace VideoAnalysisCore.Common
///
public const string ChannelKey = BaseKey + "TaskChannel";
///
+ /// 上传工作流 Channel key
+ ///
+ public const string UploadChannelKey = BaseKey + "UploadTaskChannel";
+ ///
/// 下载文件
///
public const string DownloadFile = ChannelKey + "DownloadFile";
@@ -102,72 +106,19 @@ namespace VideoAnalysisCore.Common
JsonSerializer.Deserialize(json, type);
service.AddSingleton(redis);
service.AddSingleton();
+ service.AddVideoSliceWorkflow();
+ service.AddUploadWorkflow();
}
}
public class RedisInit
{
- public FFMPGEHandle FFMPGE { get; set; }
- public SenseVoice senseVoice { get; set; }
- public FunASRNano funASRNano { get; set; }
- public RedisManager redisManager { get; set; }
-
- public RedisInit(FFMPGEHandle fFMPGE, SenseVoice senseVoice, RedisManager redisManager, FunASRNano funASRNano)
+ public RedisInit(IServiceProvider serviceProvider)
{
- FFMPGE = fFMPGE;
- this.senseVoice = senseVoice;
- this.funASRNano = funASRNano;
- this.redisManager = redisManager;
- Init();
- redisManager.InitChannel();
- }
-
- public void Init()
- {
- var SubscribeList = RedisManager.SubscribeList;
- SubscribeList.Add(RedisChannelEnum.排队中, async (task) =>
- {
- await Task.CompletedTask;
- });
- SubscribeList.Add(RedisChannelEnum.下载文件, async (task) =>
- {
- using var scope = AppCommon.Services?.CreateScope();
- if (scope is null || scope.ServiceProvider.GetService() is null)
- throw new Exception("DownloadFile 未注入");
- else
- await scope.ServiceProvider.GetService()?.RunTask(task);
- });
- SubscribeList.Add(RedisChannelEnum.分离音频, FFMPGE.RunAsync);
- SubscribeList.Add(RedisChannelEnum.解析字幕, senseVoice.RunTask);
- //SubscribeList.Add(RedisChannelEnum.解析字幕, funASRNano.RunTask);
- //SubscribeList.Add(RedisChannelEnum.解析说话人,Speaker.Run);
- SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) =>
- {
- using var scope = AppCommon.Services?.CreateScope();
- if (scope is null || scope.ServiceProvider.GetService() is null)
- throw new Exception("IBserGPT 未注入");
- else
- await scope.ServiceProvider.GetService()?.GetVideoType(task);
- });
- SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) =>
- {
- using var scope = AppCommon.Services?.CreateScope();
- if (scope is null || scope.ServiceProvider.GetService() is null)
- throw new Exception("IBserGPT 未注入");
- else
- await scope.ServiceProvider?.GetService()?.GetKnow(task);
- });
- SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) =>
- {
- using var scope = AppCommon.Services?.CreateScope();
- if (scope is null || scope.ServiceProvider.GetService() is null)
- throw new Exception("IBserGPT 未注入");
- else
- await scope.ServiceProvider?.GetService()?.GetVideoQuestion(task);
- });
- SubscribeList.Add(RedisChannelEnum.结束任务, redisManager.TaskEnd);
-
+ serviceProvider.GetService();
+ serviceProvider.GetService();
+ // serviceProvider.GetService().InitChannel(); // 已废弃,由各工作流自行初始化
}
}
///
@@ -277,105 +228,6 @@ namespace VideoAnalysisCore.Common
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString());
}
- ///
- /// 将任务 插入 队列
- ///
- /// 枚举
- /// 任务id
- public async Task InsertChannel(RedisChannelEnum @enum, object taskId)
- {
- if (taskId is null) throw new Exception("taskId为空");
- if (Redis is null) throw new Exception("redis未初始化");
-
- var tId = taskId.ToString();
-
- await AddTaskLog(tId, "==> 开始执行任务 ");
-
- await ProcessTaskFlow(@enum, taskId, tId);
- }
-
- private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId)
- {
- try
- {
- // 确保有初始校验
- if (!SubscribeList.ContainsKey(currentStep))
- throw new Exception($"{currentStep} 未实现");
-
- while (true)
- {
- if (StopTask)
- {
- await AddTaskLog(tId, "==> 手动停止任务 ");
- return;
- }
- // 1. 记录步骤开始时间
- await UpdateStepTimeAsync(taskId, currentStep);
- // 2. 执行当前步骤业务逻辑
- await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
- // 3. 准备下一步
- var nextStepNullable = currentStep.NextEnum();
- if (nextStepNullable == null) break; // 流程结束
-
- var nextStep = nextStepNullable.Value;
- // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理
- if (currentStep == RedisChannelEnum.解析字幕)
- {
- DispatchBackgroundFlow(nextStep, taskId, tId);
- return; // 释放当前主控线程
- }
- // 5. 继续循环
- currentStep = nextStep;
- }
- }
- catch (Exception ex)
- {
- await SetTaskErrorMessage(long.Parse(tId), ex);
- }
- finally
- {
- // 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间
- // 注意:如果是分流退出,这里也会执行,保证 key 活跃
- await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
- }
- }
-
- ///
- /// 更新任务步骤时间
- ///
- private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step)
- {
- // 获取现有时间字典(如果不存在则新建)
- // 注意:HMGet 返回的是数组,取第一个元素
- var result = await Redis.HMGetAsync>(RedisExpandKey.Task(taskId), "StartTime");
- var startTime = result?.FirstOrDefault() ?? new Dictionary();
-
- // 更新时间
- startTime[step] = DateTime.Now;
-
- // 写回 Redis
- await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime);
- }
-
- ///
- /// 分发后续任务到动态线程池
- ///
- private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId)
- {
- var bgTask = Task.Run(async () =>
- {
- try
- {
- await ProcessTaskFlow(startStep, taskId, tId);
- }
- finally
- {
- RunningTasks.TryRemove(tId, out _);
- }
- });
-
- RunningTasks.TryAdd(tId, bgTask);
- }
public async Task TaskEnd(string task)
{
@@ -427,151 +279,6 @@ namespace VideoAnalysisCore.Common
//NewTask();
}
- ///
- /// 初始化 队列 任务
- ///
- public async Task InitChannel()
- {
- Thread.Sleep(1000);
- if (Redis is null) throw new Exception("redis未初始化");
- //处理之前程序结束前未能执行完的情况
- var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
- //重试任务并发过多可能会导致程序崩溃
- // 未能重新分析的中断任务 则单独开一个网页来处理
- if (oldTaskCount > 0)
- {
- //获取所有未完成的任务
- var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1);
- Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复...");
-
- //使用信号量限制并发数(5),防止崩溃
- using var semaphore = new System.Threading.SemaphoreSlim(5);
- var retryTaskArr = new List();
-
- foreach (var oldTask in oldTaskArr)
- {
- await semaphore.WaitAsync();
- var res = Task.Run(async () =>
- {
- try
- {
- await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask);
- await ClearTaskError(long.Parse(oldTask));
- var lastEnum = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
- await InsertChannel(lastEnum, oldTask);
- }
- catch (Exception ex)
- {
- await SetTaskErrorMessage(long.Parse(oldTask), ex);
- Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
- }
- finally
- {
- semaphore.Release();
- }
- });
- retryTaskArr.Add(res);
- }
- //等待所有 重试任务完成后接收新任务
- await Task.WhenAll(retryTaskArr);
-
- Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!");
- ReceivingTaskAsync();
-
- }
- else
- {
- Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!");
- ReceivingTaskAsync();
- }
-
- }
- ///
- /// 停止接收新任务
- ///
- public void StopTaskAsync()
- {
- StopTask = true;
- try
- {
- _cts?.Cancel();
- }
- catch (Exception)
- {
- throw;
- }
- }
-
- ///
- /// 开始接收新任务
- ///
- public void RestartTask()
- {
- StopTask = false;
- NewTask();
- }
- ///
- /// 重新执行新任务
- ///
- ///
- public void NewTask()
- {
- // 取消 消费机的任务订阅
- if (StopTask)
- {
- Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收任务已经暂停 ");
- return;
- }
- ReceivingTaskAsync();
- }
-
- ///
- /// 重新接收新任务
- ///
- public void ReceivingTaskAsync()
- {
- if (AppCommon.Config.TaskSetting.IS_Server)
- {
- Console.WriteLine($"{DateTime.Now} =>服务端不接收任务");
- return;
- }
- lock (Redis)
- {
- // 如果任务正在运行且未完成,直接返回
- if (_workerTask != null && !_workerTask.IsCompleted)
- return;
-
- _cts = new CancellationTokenSource();
- var token = _cts.Token;
-
- _workerTask = Task.Run(async () =>
- {
- Console.WriteLine($"{DateTime.Now} => 开始监听任务队列...");
- while (!token.IsCancellationRequested && !StopTask)
- {
- try
- {
- // 使用 BLPop 阻塞式获取任务,超时5秒以便检查取消状态
- var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5);
- if (!string.IsNullOrEmpty(taskId))
- {
- Redis.LPush(RedisExpandKey.IDTask, taskId);
- await AddTaskLog(taskId, "-------------> 接收到任务 ");
- // await等待任务处理完成,确保串行执行
- await InsertChannel(RedisChannelEnum.下载文件, taskId);
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine($"任务监听异常: {ex.Message}");
- await Task.Delay(2000);
- }
- }
- Console.WriteLine($"{DateTime.Now} => 停止监听任务队列.");
- }, token);
- }
- }
-
///
/// 写入任务异常
///
@@ -589,7 +296,7 @@ namespace VideoAnalysisCore.Common
error = ex.Message + ex.StackTrace;
await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
//清除失败任务 重新接收任务
- NewTask();
+ // NewTask(); // 已废弃,工作流会自动处理
}
return await SetTaskError(taskID, error);
}
@@ -615,50 +322,5 @@ namespace VideoAnalysisCore.Common
.ExecuteCommandAsync() == 1;
}
- ///
- /// 触发
- ///
- ///
- ///
- ///
- public async Task TouchChannel(RedisChannelEnum key, string taskId, Func action = null)
- {
- if (taskId is null) return;
- var tID = long.Parse(taskId);
- if (action is not null)
- {
- var tryCount = 1;
- for (int i = 0; i < tryCount; i++)
- {
- await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId);
- try
- {
- Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key);
- Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
- var vDB = AppCommon.Services.GetService>();
- await vDB.CopyNew().AsUpdateable()
- .SetColumns(it => it.LastEnum == key)
- .Where(it => it.Id == tID)
- .ExecuteCommandAsync();
- await action(taskId);
- return;
- }
- catch (Exception ex)
- {
-
- await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
- Thread.Sleep(1000);
- await AddTaskLog(taskId, "稍后后重试." + key + " " + taskId);
- if (i + 1 == tryCount)
- throw;
- }
- }
- }
- else
- {
- await AddTaskLog(taskId, "任务函数 未实现." + key);
- }
- }
-
}
}
diff --git a/VideoAnalysisCore/Common/UploadWorkflowManager.cs b/VideoAnalysisCore/Common/UploadWorkflowManager.cs
new file mode 100644
index 0000000..8979152
--- /dev/null
+++ b/VideoAnalysisCore/Common/UploadWorkflowManager.cs
@@ -0,0 +1,80 @@
+using FreeRedis;
+using Microsoft.Extensions.DependencyInjection;
+using SqlSugar.IOC;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using VideoAnalysisCore.AICore.FFMPGE;
+using VideoAnalysisCore.Common.Expand;
+using VideoAnalysisCore.Model;
+using VideoAnalysisCore.Model.Enum;
+
+namespace VideoAnalysisCore.Common
+{
+ public static class UploadWorkflowExpand
+ {
+ public static void AddUploadWorkflow(this IServiceCollection services)
+ {
+ // 只有在配置启用时才注册
+ if (AppCommon.Config.Workflow.Upload.Enabled)
+ {
+ Console.WriteLine($"{DateTime.Now}=>初始化 视频合并工作流");
+ services.AddUploadExpand(); // Register UploadHandle
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+ }
+
+ public class UploadWorkflowInit
+ {
+ private readonly UploadWorkflowManager _manager;
+ private readonly IServiceProvider _serviceProvider;
+
+ public UploadWorkflowInit(UploadWorkflowManager manager, IServiceProvider serviceProvider)
+ {
+ _manager = manager;
+ _serviceProvider = serviceProvider;
+ Init();
+ _manager.InitChannel();
+ }
+
+ public void Init()
+ {
+ var SubscribeList = _manager.SubscribeList;
+ SubscribeList.Add(RedisUploadChannelEnum.排队中, async (task) => await Task.CompletedTask);
+ SubscribeList.Add(RedisUploadChannelEnum.下载文件, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var downloadService = scope.ServiceProvider.GetRequiredService();
+ await downloadService.RunTask(task);
+ });
+ SubscribeList.Add(RedisUploadChannelEnum.合并切片, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var ffmpegService = scope.ServiceProvider.GetRequiredService();
+ await ffmpegService.MergeAndSliceAsync(task);
+ });
+ SubscribeList.Add(RedisUploadChannelEnum.上传视频, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var uploadService = scope.ServiceProvider.GetRequiredService();
+ await uploadService.RunAsync(task);
+ });
+ SubscribeList.Add(RedisUploadChannelEnum.结束任务, _manager.RedisManager.TaskEnd);
+ }
+ }
+
+ public class UploadWorkflowManager : WorkflowBase
+ {
+ public UploadWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
+ {
+ }
+
+ protected override string ChannelKey => RedisExpandKey.UploadChannelKey;
+ protected override int Concurrency => AppCommon.Config.Workflow.Upload.Concurrency;
+ }
+}
diff --git a/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs b/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs
new file mode 100644
index 0000000..a1dd228
--- /dev/null
+++ b/VideoAnalysisCore/Common/VideoSliceWorkflowManager.cs
@@ -0,0 +1,114 @@
+using FreeRedis;
+using Microsoft.Extensions.DependencyInjection;
+using SqlSugar.IOC;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using VideoAnalysisCore.AICore.FFMPGE;
+using VideoAnalysisCore.AICore.GPT;
+using VideoAnalysisCore.AICore.SherpaOnnx;
+using VideoAnalysisCore.AICore.Whisper;
+using VideoAnalysisCore.Common.Expand;
+using VideoAnalysisCore.Model;
+using VideoAnalysisCore.Model.Enum;
+
+namespace VideoAnalysisCore.Common
+{
+ ///
+ /// AI视频切片工作流
+ ///
+ public static class VideoSliceWorkflowExpand
+ {
+ public static void AddVideoSliceWorkflow(this IServiceCollection services)
+ {
+ if (AppCommon.Config.Workflow.Default.Enabled)
+ {
+ Console.WriteLine($"{DateTime.Now}=>初始化 AI切片工作流");
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+ }
+
+ public class VideoSliceWorkflowInit
+ {
+ private readonly VideoSliceWorkflowManager _manager;
+ private readonly IServiceProvider _serviceProvider;
+ private readonly FFMPGEHandle _ffmpeg;
+ private readonly SenseVoice _senseVoice;
+ private readonly RedisManager _redisManager;
+
+ public VideoSliceWorkflowInit(VideoSliceWorkflowManager manager, IServiceProvider serviceProvider, FFMPGEHandle ffmpeg, SenseVoice senseVoice, RedisManager redisManager)
+ {
+ _manager = manager;
+ _serviceProvider = serviceProvider;
+ _ffmpeg = ffmpeg;
+ _senseVoice = senseVoice;
+ _redisManager = redisManager;
+ Init();
+ _manager.InitChannel();
+ }
+
+ public void Init()
+ {
+ var SubscribeList = _manager.SubscribeList;
+ SubscribeList.Add(RedisChannelEnum.排队中, async (task) => await Task.CompletedTask);
+ SubscribeList.Add(RedisChannelEnum.下载文件, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var downloadService = scope.ServiceProvider.GetService();
+ if (downloadService is null) throw new Exception("DownloadFile 未注入");
+ await downloadService.RunTask(task);
+ });
+ SubscribeList.Add(RedisChannelEnum.分离音频, _ffmpeg.RunAsync);
+ SubscribeList.Add(RedisChannelEnum.解析字幕, _senseVoice.RunTask);
+
+ SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var service = scope.ServiceProvider.GetService();
+ if (service is null) throw new Exception("IBserGPT 未注入");
+ await service.GetVideoType(task);
+ });
+ SubscribeList.Add(RedisChannelEnum.AI模型分析, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var service = scope.ServiceProvider.GetService();
+ if (service is null) throw new Exception("IBserGPT 未注入");
+ await service.GetKnow(task);
+ });
+ SubscribeList.Add(RedisChannelEnum.AI分析试题, async (task) =>
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var service = scope.ServiceProvider.GetService();
+ if (service is null) throw new Exception("IBserGPT 未注入");
+ await service.GetVideoQuestion(task);
+ });
+ SubscribeList.Add(RedisChannelEnum.结束任务, _redisManager.TaskEnd);
+ }
+ }
+
+ public class VideoSliceWorkflowManager : WorkflowBase
+ {
+ public VideoSliceWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
+ {
+ }
+
+ protected override string ChannelKey => RedisExpandKey.ChannelKey;
+ protected override int Concurrency => AppCommon.Config.Workflow.Default.Concurrency;
+
+ protected override async Task HandleSpecialFlowAsync(RedisChannelEnum currentStep, RedisChannelEnum nextStep, string taskId)
+ {
+ // 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理
+ if (currentStep == RedisChannelEnum.解析字幕)
+ {
+ await DispatchBackgroundFlow(nextStep, taskId, taskId);
+ throw new WorkflowFlowSwitchException(); // 抛出异常以中断当前流程(基类捕获)
+ }
+ await Task.CompletedTask;
+ }
+ }
+}
diff --git a/VideoAnalysisCore/Common/WorkflowBase.cs b/VideoAnalysisCore/Common/WorkflowBase.cs
new file mode 100644
index 0000000..707c774
--- /dev/null
+++ b/VideoAnalysisCore/Common/WorkflowBase.cs
@@ -0,0 +1,298 @@
+using FreeRedis;
+using Microsoft.Extensions.DependencyInjection;
+using SqlSugar.IOC;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using VideoAnalysisCore.Model;
+using VideoAnalysisCore.Model.Enum;
+
+namespace VideoAnalysisCore.Common
+{
+ public class WorkflowFlowSwitchException : Exception { }
+
+ public abstract class WorkflowBase where TEnum : struct, Enum
+ {
+ public bool StopTask { get; set; } = false;
+ public Dictionary> SubscribeList = new Dictionary>();
+ public readonly RedisClient Redis;
+ public readonly RedisManager RedisManager;
+ private CancellationTokenSource? _cts;
+ private List _workerTasks = new List();
+ public static ConcurrentDictionary RunningTasks = new ConcurrentDictionary();
+
+ ///
+ /// 工作流的Redis队列key
+ ///
+ protected abstract string ChannelKey { get; }
+ ///
+ /// 单工作流并发数量
+ ///
+ protected abstract int Concurrency { get; }
+
+ public WorkflowBase(RedisClient redis, RedisManager redisManager)
+ {
+ Redis = redis;
+ RedisManager = redisManager;
+ }
+
+ public async void InitChannel()
+ {
+ if (AppCommon.Config.TaskSetting.IS_Server) return;
+ //处理之前程序结束前未能执行完的情况
+ var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
+ //重试任务并发过多可能会导致程序崩溃
+ // 未能重新分析的中断任务 则单独开一个网页来处理
+ if (oldTaskCount > 0)
+ {
+ //获取所有未完成的任务
+ var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1);
+ Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复...");
+
+ //使用信号量限制并发数(5),防止崩溃
+ using var semaphore = new System.Threading.SemaphoreSlim(5);
+ var retryTaskArr = new List();
+
+ foreach (var oldTask in oldTaskArr)
+ {
+ try
+ {
+ // 检查该任务是否属于当前工作流(根据 LastEnum 的类型)
+ var lastEnumStr = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
+
+ // 尝试解析为当前工作流的枚举
+ if (!string.IsNullOrEmpty(lastEnumStr) && Enum.TryParse(typeof(TEnum), lastEnumStr, true, out var result))
+ {
+ await semaphore.WaitAsync();
+ var res = Task.Run(async () =>
+ {
+ try
+ {
+ await RedisManager.AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask);
+ await RedisManager.ClearTaskError(long.Parse(oldTask));
+
+ var lastEnum = (TEnum)result;
+ await InsertChannel(lastEnum, oldTask);
+ }
+ catch (Exception ex)
+ {
+ await RedisManager.SetTaskErrorMessage(long.Parse(oldTask), ex);
+ Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ });
+ retryTaskArr.Add(res);
+ }
+ else
+ {
+ // 如果无法解析为当前工作流的枚举,说明该任务属于其他工作流,跳过
+ // Console.WriteLine($"任务 {oldTask} 不属于工作流 {typeof(TEnum).Name},跳过");
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"检查任务 {oldTask} 状态失败: {ex.Message}");
+ }
+ }
+
+ //等待所有 重试任务完成后接收新任务
+ await Task.WhenAll(retryTaskArr);
+
+ Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!");
+ ReceivingTaskAsync();
+
+ }
+ else
+ {
+ Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!");
+ ReceivingTaskAsync();
+ }
+ }
+ ///
+ /// 重新执行任务
+ ///
+ public void ReceivingTaskAsync()
+ {
+ int concurrency = Concurrency;
+ if (concurrency <= 0) concurrency = 1;
+
+ _cts = new CancellationTokenSource();
+ var token = _cts.Token;
+
+ for (int i = 0; i < concurrency; i++)
+ {
+ var index = i;
+ var task = Task.Run(async () =>
+ {
+ Console.WriteLine($"{DateTime.Now} ==> 开始监听 [{typeof(TEnum).Name}] 队列 [{index}]...");
+ while (!token.IsCancellationRequested && !StopTask)
+ {
+ try
+ {
+ var taskId = Redis.BLPop(ChannelKey, 5);
+ if (!string.IsNullOrEmpty(taskId))
+ {
+ Redis.LPush(RedisExpandKey.IDTask, taskId);
+ await RedisManager.AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ");
+
+ // 获取第一个枚举值作为起始步骤
+ var firstStep = Enum.GetValues(typeof(TEnum)).Cast().FirstOrDefault();
+ if (Convert.ToInt32(firstStep) == 0) // 跳过排队中
+ {
+ var next = firstStep.NextEnum();
+ if (next.HasValue) firstStep = next.Value;
+ }
+
+ await InsertChannel(firstStep, taskId);
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"任务监听异常: {ex.Message}");
+ await Task.Delay(2000);
+ }
+ }
+ }, token);
+ _workerTasks.Add(task);
+ }
+ }
+
+ public async Task InsertChannel(TEnum @enum, string taskId)
+ {
+ await RedisManager.AddTaskLog(taskId, "==> 开始执行任务 ");
+ await ProcessTaskFlow(@enum, taskId, taskId);
+ }
+ ///
+ /// 异步流程判定条件
+ ///
+ ///
+ ///
+ ///
+ ///
+ protected virtual async Task HandleSpecialFlowAsync(TEnum currentStep, TEnum nextStep, string taskId)
+ {
+ await Task.CompletedTask;
+ }
+
+ ///
+ /// 主处理任务流程
+ ///
+ ///
+ ///
+ ///
+ ///
+ protected async Task ProcessTaskFlow(TEnum currentStep, string taskId, string tId)
+ {
+ try
+ {
+ if (!SubscribeList.ContainsKey(currentStep))
+ throw new Exception($"{currentStep} 未实现");
+
+ while (true)
+ {
+ if (StopTask)
+ {
+ await RedisManager.AddTaskLog(tId, "==> 手动停止任务 ");
+ return;
+ }
+
+ // 1. 记录步骤开始时间 (需要转换 RedisChannelEnum 才能调用 UpdateStepTimeAsync,如果类型不匹配则需要适配)
+ // 这里简化,暂不记录非主流程的时间,或者需要在 RedisManager 增加泛型支持
+ // await RedisManager.UpdateStepTimeAsync(taskId, currentStep);
+
+ // 2. 执行当前步骤
+ await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
+
+ // 3. 准备下一步
+ var nextStepNullable = currentStep.NextEnum();
+ if (nextStepNullable == null) break;
+
+ var nextStep = nextStepNullable.Value;
+
+ // 4. 特殊分流处理
+ try
+ {
+ await HandleSpecialFlowAsync(currentStep, nextStep, taskId);
+ }
+ catch (WorkflowFlowSwitchException)
+ {
+ return; // 流程切换,退出当前循环
+ }
+
+ currentStep = nextStep;
+ }
+ }
+ catch (Exception ex)
+ {
+ await RedisManager.SetTaskErrorMessage(long.Parse(tId), ex);
+ }
+ finally
+ {
+ await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
+ }
+ }
+
+ public async Task TouchChannel(TEnum key, string taskId, Func action)
+ {
+ var tID = long.Parse(taskId);
+ await RedisManager.AddTaskLog(taskId, " 开始执行 " + key + " " + taskId);
+ try
+ {
+ // 尝试将当前枚举转为 RedisChannelEnum 存储状态,如果无法转换则强转 int
+ RedisChannelEnum dbEnum;
+ if (key is RedisChannelEnum rc) dbEnum = rc;
+ else
+ {
+ // 使用 Convert.ToInt32 处理各种枚举基础类型 (int, byte, long 等)
+ var intValue = Convert.ToInt32(key);
+ dbEnum = (RedisChannelEnum)intValue;
+ }
+
+ Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString());
+ Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
+
+ var vDB = AppCommon.Services.GetService>();
+ await vDB.CopyNew().AsUpdateable()
+ .SetColumns(it => it.LastEnum == dbEnum)
+ .Where(it => it.Id == tID)
+ .ExecuteCommandAsync();
+
+ await action(taskId);
+ }
+ catch (Exception ex)
+ {
+ await RedisManager.AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
+ throw;
+ }
+ }
+ ///
+ /// 异步流程
+ ///
+ ///
+ ///
+ ///
+ ///
+ protected async Task DispatchBackgroundFlow(TEnum startStep, string taskId, string tId)
+ {
+ var bgTask = Task.Run(async () =>
+ {
+ try
+ {
+ await ProcessTaskFlow(startStep, taskId, tId);
+ }
+ finally
+ {
+ RunningTasks.TryRemove(tId, out _);
+ }
+ });
+ RunningTasks.TryAdd(tId, bgTask);
+ await Task.CompletedTask;
+ }
+ }
+}
diff --git a/VideoAnalysisCore/Controllers/VideoTaskController.cs b/VideoAnalysisCore/Controllers/VideoTaskController.cs
index 8c04b7f..3f9bb35 100644
--- a/VideoAnalysisCore/Controllers/VideoTaskController.cs
+++ b/VideoAnalysisCore/Controllers/VideoTaskController.cs
@@ -39,13 +39,16 @@ namespace VideoAnalysisCore.Controllers
readonly RedisManager redisManager;
+ readonly UploadWorkflowManager uploadWorkflowManager;
+ readonly VideoSliceWorkflowManager videoSliceWorkflowManager;
+
public readonly SenseVoice senseVoice;
public readonly FunASRNano funASRNano;
private readonly IMapper mp;
public VideoTaskController(Repository baseService, RedisManager redisManager,
Repository videoQuestionDB,
- Repository videoQuestionKonwDB, Repository videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository taskLogDB, FunASRNano funASRNano, Repository videoTaskStageDB) : base(baseService)
+ Repository videoQuestionKonwDB, Repository videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository taskLogDB, FunASRNano funASRNano, Repository videoTaskStageDB, UploadWorkflowManager uploadWorkflowManager, VideoSliceWorkflowManager videoSliceWorkflowManager) : base(baseService)
{
this.baseService = baseService;
this.redisManager = redisManager;
@@ -57,6 +60,8 @@ namespace VideoAnalysisCore.Controllers
this.taskLogDB = taskLogDB;
this.funASRNano = funASRNano;
this.videoTaskStageDB = videoTaskStageDB;
+ this.uploadWorkflowManager = uploadWorkflowManager;
+ this.videoSliceWorkflowManager = videoSliceWorkflowManager;
}
@@ -118,9 +123,9 @@ namespace VideoAnalysisCore.Controllers
public IActionResult StartTask(bool task)
{
if (task)
- redisManager.RestartTask();
+ videoSliceWorkflowManager.StopTask=false;
else
- redisManager.StopTaskAsync();
+ videoSliceWorkflowManager.StopTask=true;
return Ok();
}
///
@@ -316,7 +321,7 @@ namespace VideoAnalysisCore.Controllers
await redisManager.AddTaskLog(id,"手动重试任务");
await redisManager.ClearTaskError(id);
_ = Task.Run(async () =>
- await redisManager.InsertChannel(selectEnum, id)
+ await videoSliceWorkflowManager.InsertChannel(selectEnum, id.ToString())
);
}
diff --git a/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs b/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs
new file mode 100644
index 0000000..adced84
--- /dev/null
+++ b/VideoAnalysisCore/Model/Enum/RedisUploadChannelEnum.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace VideoAnalysisCore.Model.Enum
+{
+ ///
+ /// 上传工作流 Redis 频道枚举
+ ///
+ public enum RedisUploadChannelEnum
+ {
+ ///
+ /// 排队中
+ ///
+ 排队中 = 0,
+ ///
+ /// 下载文件
+ ///
+ 下载文件 = 10,
+ ///
+ /// 合并切片
+ ///
+ 合并切片 = 20,
+ ///
+ /// 上传视频
+ ///
+ 上传视频 = 30,
+ ///
+ /// 结束任务
+ ///
+ 结束任务 = 100,
+ }
+}