using FreeRedis; using Microsoft.Extensions.DependencyInjection; using SqlSugar; using SqlSugar.IOC; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using VideoAnalysisCore.Model; using VideoAnalysisCore.Model.Enum; namespace VideoAnalysisCore.Common { public class WorkflowFlowSwitchException : Exception { } public abstract class WorkflowBase where TEnum : struct, Enum { public bool StopTask { get; set; } = false; public Dictionary> SubscribeList = new Dictionary>(); public readonly RedisClient Redis; public readonly RedisManager RedisManager; private CancellationTokenSource? _cts; private List _workerTasks = new List(); public static ConcurrentDictionary RunningTasks = new ConcurrentDictionary(); /// /// 工作流的Redis队列key /// protected abstract string ChannelKey { get; } /// /// 单工作流并发数量 /// protected abstract int Concurrency { get; } /// /// 工作流名称 (e.g. "VideoSliceWorkflow") /// 默认通过类名去除 "Manager" 后缀生成,子类可重写 /// protected virtual string WorkflowName => "未配置的工作流名称"; public WorkflowBase(RedisClient redis, RedisManager redisManager) { Redis = redis; RedisManager = redisManager; } public async void InitChannel() { if (AppCommon.Config.TaskSetting.IS_Server) return; //处理之前程序结束前未能执行完的情况 var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask); //重试任务并发过多可能会导致程序崩溃 // 未能重新分析的中断任务 则单独开一个网页来处理 if (oldTaskCount > 0) { //获取所有未完成的任务 var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1); Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复..."); //使用信号量限制并发数(5),防止崩溃 using var semaphore = new System.Threading.SemaphoreSlim(5); var retryTaskArr = new List(); foreach (var oldTask in oldTaskArr) { try { // 检查该任务是否属于当前工作流(根据 LastEnum 的类型) var lastEnumStr = (await Redis.HMGetAsync(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); // 尝试解析为当前工作流的枚举 if (!string.IsNullOrEmpty(lastEnumStr) && Enum.TryParse(typeof(TEnum), lastEnumStr, true, out var result)) { await semaphore.WaitAsync(); var res = Task.Run(async () => { try { await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask, WorkflowName); await ClearTaskError(long.Parse(oldTask)); var lastEnum = (TEnum)result; await InsertChannel(lastEnum, oldTask); } catch (Exception ex) { await SetTaskErrorMessage(long.Parse(oldTask), ex); Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}"); } finally { semaphore.Release(); } }); retryTaskArr.Add(res); } else { // 如果无法解析为当前工作流的枚举,说明该任务属于其他工作流,跳过 // Console.WriteLine($"任务 {oldTask} 不属于工作流 {typeof(TEnum).Name},跳过"); } } catch (Exception ex) { Console.WriteLine($"检查任务 {oldTask} 状态失败: {ex.Message}"); } } //等待所有 重试任务完成后接收新任务 await Task.WhenAll(retryTaskArr); Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!"); ReceivingTaskAsync(); } else { ReceivingTaskAsync(); } } /// /// 重新执行任务 /// public void ReceivingTaskAsync() { int concurrency = Concurrency; if (concurrency <= 0) concurrency = 1; _cts = new CancellationTokenSource(); var token = _cts.Token; for (int i = 0; i < concurrency; i++) { var index = i; var task = Task.Run(async () => { Console.WriteLine($"{DateTime.Now} ==> 开始监听 [{typeof(TEnum).Name}] [{ChannelKey}] 队列 [{index}]..."); while (!token.IsCancellationRequested && !StopTask) { try { var taskId = Redis.BLPop(ChannelKey, 5); if (!string.IsNullOrEmpty(taskId)) { Redis.LPush(RedisExpandKey.IDTask, taskId); // 获取第一个枚举值作为起始步骤 var firstStep = Enum.GetValues(typeof(TEnum)).Cast().FirstOrDefault(); if (Convert.ToInt32(firstStep) == 0) // 跳过排队中 { var next = firstStep.NextEnum(); if (next.HasValue) firstStep = next.Value; } await AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ", WorkflowName); await InsertChannel(firstStep, taskId); } } catch (Exception ex) { Console.WriteLine($"任务监听异常: {ex.Message}"); await Task.Delay(2000); } } }, token); _workerTasks.Add(task); } } public async Task InsertChannel(TEnum @enum, string taskId) { await AddTaskLog(taskId, "==> 开始执行任务 ", WorkflowName); await ProcessTaskFlow(@enum, taskId, taskId); } /// /// 异步流程判定条件 /// /// /// /// /// protected virtual async Task HandleSpecialFlowAsync(TEnum currentStep, TEnum nextStep, string taskId) { await Task.CompletedTask; } /// /// 主处理任务流程 /// /// /// /// /// protected async Task ProcessTaskFlow(TEnum currentStep, string taskId, string tId) { try { if (!SubscribeList.ContainsKey(currentStep)) throw new Exception($"{currentStep} 未实现"); while (true) { if (StopTask) { await AddTaskLog(tId, "==> 手动停止任务 ", WorkflowName); return; } // 2. 执行当前步骤 await TouchChannel(currentStep, tId, SubscribeList[currentStep]); // 3. 准备下一步 var nextStepNullable = currentStep.NextEnum(); if (nextStepNullable == null) break; var nextStep = nextStepNullable.Value; // 4. 特殊分流处理 try { await HandleSpecialFlowAsync(currentStep, nextStep, taskId); } catch (WorkflowFlowSwitchException) { return; // 流程切换,退出当前循环 } currentStep = nextStep; } } catch (Exception ex) { await SetTaskErrorMessage(long.Parse(tId), ex); } finally { await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); } } /// /// 更新任务状态 /// 默认实现:保存到 WorkflowState 表。子类可重写以保存到特定表(如 VideoTask) /// /// 任务ID /// 当前步骤枚举 protected virtual async Task UpdateTaskStateAsync(string taskId, TEnum step) { var tID = long.Parse(taskId); var stepName = step.ToString(); var stepValue = Convert.ToInt32(step); using var scope = AppCommon.Services.CreateScope(); var db = scope.ServiceProvider.GetService>(); if (db == null) return; try { // 使用 upsert 逻辑 var existing = await db.AsQueryable() .FirstAsync(it => it.VideoTaskId == tID && it.WorkflowName == WorkflowName); if (existing == null) { await db.AsInsertable(new VideoTaskWorkflow { Id = Yitter.IdGenerator.YitIdHelper.NextId(), VideoTaskId = tID, WorkflowName = WorkflowName, CurrentStep = stepName, CurrentStepValue = stepValue, UpdateTime = DateTime.Now }).ExecuteCommandAsync(); } else { existing.CurrentStep = stepName; existing.CurrentStepValue = stepValue; existing.UpdateTime = DateTime.Now; await db.AsUpdateable(existing).ExecuteCommandAsync(); } } catch (Exception ex) { Console.WriteLine($"更新工作流状态失败: {ex.Message}"); } } public async Task TouchChannel(TEnum key, string taskId, Func action) { var tID = long.Parse(taskId); await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId, WorkflowName); try { // 更新 Redis 状态 (通用) Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString()); // 使用新的进度设置方法 SetTaskProgress(taskId, 0); // 调用状态更新逻辑 (由子类决定存储位置) await UpdateTaskStateAsync(taskId, key); await action(taskId); } catch { //await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """); throw; } } /// /// 设置任务进度 /// /// /// 进度百分比 public void SetTaskProgress(object taskId, object p) { var fieldName = WorkflowName == "VideoSliceWorkflow" ? "Progress" : $"Progress:{WorkflowName}"; Redis.HMSet(RedisExpandKey.Task(taskId), fieldName, p.ToString()); } /// /// 任务结束处理 /// /// public virtual async Task TaskEnd(string taskId) { var tId = long.Parse(taskId); //删除任务执行状态 await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskId); // 更新 VideoTaskWorkflow 表状态为结束 // 注意:这里假设结束状态对应的枚举值为 100,或者子类重写此方法 // 由于泛型限制,无法直接获取 TEnum 的结束值,建议子类重写或在此处做通用处理 // 这里仅做基础清理,具体业务逻辑建议在子类重写 } /// /// 写入任务异常 /// /// /// /// public async Task SetTaskErrorMessage(long taskID, Exception? ex) { var error = string.Empty; if (ex != null) { await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString()); //执行任务时出现异常 error = ex.Message + ex.StackTrace; await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """); } return await SetTaskError(taskID, error); } /// /// 清除 任务的错误信息 /// /// /// public async Task ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty); /// /// 修改任务的错误信息 /// /// /// /// public async Task SetTaskError(long taskID, string? error) { using var scope = AppCommon.Services.CreateScope(); var vDB = scope.ServiceProvider.GetService>(); if (vDB == null) return false; Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error); // 同时更新 VideoTaskWorkflow 表的错误信息(如果需要) return await vDB.CopyNew().AsUpdateable() .SetColumns(it => it.ErrorMessage == error) .Where(it => it.Id == taskID) .ExecuteCommandAsync() == 1; } /// /// 添加日志 /// /// 任务id /// 内容 /// 工作流名称(可选,默认为当前工作流) public async Task AddTaskLog(object taskId, string msg, string? workflowName = null) { var wfName = workflowName ?? WorkflowName; #if DEBUG Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} [{wfName}] \r\n{msg}\r\n"); #endif await Redis.RPushAsync(RedisExpandKey.TaskLog, new TaskLog() { VideoTaskId = long.Parse(taskId.ToString()), CreateTime = DateTime.Now, Message = msg, DeviceId = AppCommon.Config.ID, WorkflowName = wfName }); var count = 50; lock (RedisExpandKey.TaskLog) { var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog); if (oldTaskCount > count) { try { using var scope = AppCommon.Services.CreateScope(); var taskLogDB = scope.ServiceProvider.GetService>(); if (taskLogDB != null) { var insertData = Redis.LRange(RedisExpandKey.TaskLog, 0, count - 1); taskLogDB.CopyNew().AsInsertable(insertData).ExecuteCommand(); //同步删除redis Redis.LTrim(RedisExpandKey.TaskLog, count, 1000); } } catch (Exception ex) { Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace); } } } } /// /// 异步流程 /// /// /// /// /// protected async Task DispatchBackgroundFlow(TEnum startStep, string taskId, string tId) { var bgTask = Task.Run(async () => { try { await ProcessTaskFlow(startStep, taskId, tId); } finally { RunningTasks.TryRemove(tId, out _); } }); RunningTasks.TryAdd(tId, bgTask); await Task.CompletedTask; } /// /// 加入到消费队列 /// /// public void JoinQueue(params long[] taskIds) { if (taskIds is null || taskIds.Length == 0) return; // 直接批量推入,避免循环和事务开销 var d = taskIds.Select(s => (object)s).ToArray(); Redis.LPush(ChannelKey, d); } } }