Learn.VideoAnalysis/VideoAnalysisCore/Common/WorkflowBase.cs

480 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
public class WorkflowFlowSwitchException : Exception { }
public abstract class WorkflowBase<TEnum> where TEnum : struct, Enum
{
public bool StopTask { get; set; } = false;
public Dictionary<TEnum, Func<string, Task>> SubscribeList = new Dictionary<TEnum, Func<string, Task>>();
public readonly RedisClient Redis;
public readonly RedisManager RedisManager;
private CancellationTokenSource? _cts;
private List<Task> _workerTasks = new List<Task>();
public static ConcurrentDictionary<string, Task> RunningTasks = new ConcurrentDictionary<string, Task>();
/// <summary>
/// 工作流的Redis队列key
/// </summary>
protected abstract string ChannelKey { get; }
/// <summary>
/// 单工作流并发数量
/// </summary>
protected abstract int Concurrency { get; }
/// <summary>
/// 工作流名称 (e.g. "VideoSliceWorkflow")
/// <para>默认通过类名去除 "Manager" 后缀生成,子类可重写</para>
/// </summary>
protected virtual string WorkflowName => this.GetType().Name.Replace("Manager", "");
public WorkflowBase(RedisClient redis, RedisManager redisManager)
{
Redis = redis;
RedisManager = redisManager;
}
public async void InitChannel()
{
if (AppCommon.Config.TaskSetting.IS_Server) return;
//处理之前程序结束前未能执行完的情况
var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
//重试任务并发过多可能会导致程序崩溃
// 未能重新分析的中断任务 则单独开一个网页来处理
if (oldTaskCount > 0)
{
//获取所有未完成的任务
var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1);
Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复...");
//使用信号量限制并发数(5),防止崩溃
using var semaphore = new System.Threading.SemaphoreSlim(5);
var retryTaskArr = new List<Task>();
foreach (var oldTask in oldTaskArr)
{
try
{
// 检查该任务是否属于当前工作流(根据 LastEnum 的类型)
var lastEnumStr = (await Redis.HMGetAsync<string>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
// 尝试解析为当前工作流的枚举
if (!string.IsNullOrEmpty(lastEnumStr) && Enum.TryParse(typeof(TEnum), lastEnumStr, true, out var result))
{
await semaphore.WaitAsync();
var res = Task.Run(async () =>
{
try
{
await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask, WorkflowName);
await ClearTaskError(long.Parse(oldTask));
var lastEnum = (TEnum)result;
await InsertChannel(lastEnum, oldTask);
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(oldTask), ex);
Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
}
finally
{
semaphore.Release();
}
});
retryTaskArr.Add(res);
}
else
{
// 如果无法解析为当前工作流的枚举,说明该任务属于其他工作流,跳过
// Console.WriteLine($"任务 {oldTask} 不属于工作流 {typeof(TEnum).Name},跳过");
}
}
catch (Exception ex)
{
Console.WriteLine($"检查任务 {oldTask} 状态失败: {ex.Message}");
}
}
//等待所有 重试任务完成后接收新任务
await Task.WhenAll(retryTaskArr);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!");
ReceivingTaskAsync();
}
else
{
ReceivingTaskAsync();
}
}
/// <summary>
/// 重新执行任务
/// </summary>
public void ReceivingTaskAsync()
{
int concurrency = Concurrency;
if (concurrency <= 0) concurrency = 1;
_cts = new CancellationTokenSource();
var token = _cts.Token;
for (int i = 0; i < concurrency; i++)
{
var index = i;
var task = Task.Run(async () =>
{
Console.WriteLine($"{DateTime.Now} ==> 开始监听 [{typeof(TEnum).Name}] 队列 [{index}]...");
while (!token.IsCancellationRequested && !StopTask)
{
try
{
var taskId = Redis.BLPop(ChannelKey, 5);
if (!string.IsNullOrEmpty(taskId))
{
Redis.LPush(RedisExpandKey.IDTask, taskId);
// 获取第一个枚举值作为起始步骤
var firstStep = Enum.GetValues(typeof(TEnum)).Cast<TEnum>().FirstOrDefault();
if (Convert.ToInt32(firstStep) == 0) // 跳过排队中
{
var next = firstStep.NextEnum();
if (next.HasValue) firstStep = next.Value;
}
await AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ", WorkflowName);
await InsertChannel(firstStep, taskId);
}
}
catch (Exception ex)
{
Console.WriteLine($"任务监听异常: {ex.Message}");
await Task.Delay(2000);
}
}
}, token);
_workerTasks.Add(task);
}
}
public async Task InsertChannel(TEnum @enum, string taskId)
{
await AddTaskLog(taskId, "==> 开始执行任务 ", WorkflowName);
await ProcessTaskFlow(@enum, taskId, taskId);
}
/// <summary>
/// 异步流程判定条件
/// </summary>
/// <param name="currentStep"></param>
/// <param name="nextStep"></param>
/// <param name="taskId"></param>
/// <returns></returns>
protected virtual async Task HandleSpecialFlowAsync(TEnum currentStep, TEnum nextStep, string taskId)
{
await Task.CompletedTask;
}
/// <summary>
/// 主处理任务流程
/// </summary>
/// <param name="currentStep"></param>
/// <param name="taskId"></param>
/// <param name="tId"></param>
/// <returns></returns>
protected async Task ProcessTaskFlow(TEnum currentStep, string taskId, string tId)
{
try
{
if (!SubscribeList.ContainsKey(currentStep))
throw new Exception($"{currentStep} 未实现");
while (true)
{
if (StopTask)
{
await AddTaskLog(tId, "==> 手动停止任务 ", WorkflowName);
return;
}
// 1. 记录步骤开始时间 (需要转换 RedisChannelEnum 才能调用 UpdateStepTimeAsync如果类型不匹配则需要适配)
// 这里简化,暂不记录非主流程的时间,或者需要在 RedisManager 增加泛型支持
// await RedisManager.UpdateStepTimeAsync(taskId, currentStep);
// 2. 执行当前步骤
await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
// 3. 准备下一步
var nextStepNullable = currentStep.NextEnum();
if (nextStepNullable == null) break;
var nextStep = nextStepNullable.Value;
// 4. 特殊分流处理
try
{
await HandleSpecialFlowAsync(currentStep, nextStep, taskId);
}
catch (WorkflowFlowSwitchException)
{
return; // 流程切换,退出当前循环
}
currentStep = nextStep;
}
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(tId), ex);
}
finally
{
await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
}
}
/// <summary>
/// 更新任务状态
/// <para>默认实现:保存到 WorkflowState 表。子类可重写以保存到特定表(如 VideoTask</para>
/// </summary>
/// <param name="taskId">任务ID</param>
/// <param name="step">当前步骤枚举</param>
protected virtual async Task UpdateTaskStateAsync(string taskId, TEnum step)
{
var tID = long.Parse(taskId);
var stepName = step.ToString();
var stepValue = Convert.ToInt32(step);
using var scope = AppCommon.Services.CreateScope();
var db = scope.ServiceProvider.GetService<ISqlSugarClient>();
if (db == null) return;
// 尝试更新或插入 WorkflowState
// 注意:这里假设 VideoTaskWorkflow 表存在且已正确配置
// 如果不想引入新表依赖,也可以在这里留空,由子类实现
try
{
// 使用 upsert 逻辑
var existing = await db.Queryable<VideoTaskWorkflow>()
.FirstAsync(it => it.VideoTaskId == tID && it.WorkflowName == WorkflowName);
if (existing == null)
{
await db.Insertable(new VideoTaskWorkflow
{
Id = Yitter.IdGenerator.YitIdHelper.NextId(),
VideoTaskId = tID,
WorkflowName = WorkflowName,
CurrentStep = stepName,
CurrentStepValue = stepValue,
UpdateTime = DateTime.Now
}).ExecuteCommandAsync();
}
else
{
existing.CurrentStep = stepName;
existing.CurrentStepValue = stepValue;
existing.UpdateTime = DateTime.Now;
await db.Updateable(existing).ExecuteCommandAsync();
}
}
catch (Exception ex)
{
Console.WriteLine($"更新工作流状态失败: {ex.Message}");
}
}
public async Task TouchChannel(TEnum key, string taskId, Func<string, Task> action)
{
var tID = long.Parse(taskId);
await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId, WorkflowName);
try
{
// 更新 Redis 状态 (通用)
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString());
// 使用新的进度设置方法
SetTaskProgress(taskId, 0);
// 调用状态更新逻辑 (由子类决定存储位置)
await UpdateTaskStateAsync(taskId, key);
await action(taskId);
}
catch (Exception ex)
{
await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
throw;
}
}
/// <summary>
/// 设置任务进度
/// </summary>
/// <param name="taskId"></param>
/// <param name="p">进度百分比</param>
public void SetTaskProgress(object taskId, object p)
{
var fieldName = WorkflowName == "VideoSliceWorkflow" ? "Progress" : $"Progress:{WorkflowName}";
Redis.HMSet(RedisExpandKey.Task(taskId), fieldName, p.ToString());
}
/// <summary>
/// 任务结束处理
/// </summary>
/// <param name="taskId"></param>
public virtual async Task TaskEnd(string taskId)
{
var tId = long.Parse(taskId);
//删除任务执行状态
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskId);
// 更新 VideoTaskWorkflow 表状态为结束
// 注意:这里假设结束状态对应的枚举值为 100或者子类重写此方法
// 由于泛型限制,无法直接获取 TEnum 的结束值,建议子类重写或在此处做通用处理
// 这里仅做基础清理,具体业务逻辑建议在子类重写
}
/// <summary>
/// 写入任务异常
/// </summary>
/// <param name="taskID"></param>
/// <param name="ex"></param>
/// <returns></returns>
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
{
var error = string.Empty;
if (ex != null)
{
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString());
//执行任务时出现异常
error = ex.Message + ex.StackTrace;
await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
}
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清除 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <returns></returns>
public async Task<bool> ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty);
/// <summary>
/// 修改任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public async Task<bool> SetTaskError(long taskID, string? error)
{
using var scope = AppCommon.Services.CreateScope();
var vDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (vDB == null) return false;
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
// 同时更新 VideoTaskWorkflow 表的错误信息(如果需要)
return await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.ErrorMessage == error)
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
/// <summary>
/// 添加日志
/// </summary>
/// <param name="taskId">任务id</param>
/// <param name="msg">内容</param>
/// <param name="workflowName">工作流名称(可选,默认为当前工作流)</param>
public async Task AddTaskLog(object taskId, string msg, string? workflowName = null)
{
var wfName = workflowName ?? WorkflowName;
#if DEBUG
Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} [{wfName}] \r\n{msg}\r\n");
#endif
await Redis.RPushAsync(RedisExpandKey.TaskLog,
new TaskLog()
{
VideoTaskId = long.Parse(taskId.ToString()),
CreateTime = DateTime.Now,
Message = msg,
DeviceId = AppCommon.Config.ID,
WorkflowName = wfName
});
var count = 50;
lock (RedisExpandKey.TaskLog)
{
var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog);
if (oldTaskCount > count)
{
try
{
using var scope = AppCommon.Services.CreateScope();
var taskLogDB = scope.ServiceProvider.GetService<Repository<TaskLog>>();
if (taskLogDB != null)
{
var insertData = Redis.LRange<TaskLog>(RedisExpandKey.TaskLog, 0, count - 1);
taskLogDB.CopyNew().AsInsertable(insertData).ExecuteCommand();
//同步删除redis
Redis.LTrim(RedisExpandKey.TaskLog, count, 1000);
}
}
catch (Exception ex)
{
Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
}
}
}
}
/// <summary>
/// 异步流程
/// </summary>
/// <param name="startStep"></param>
/// <param name="taskId"></param>
/// <param name="tId"></param>
/// <returns></returns>
protected async Task DispatchBackgroundFlow(TEnum startStep, string taskId, string tId)
{
var bgTask = Task.Run(async () =>
{
try
{
await ProcessTaskFlow(startStep, taskId, tId);
}
finally
{
RunningTasks.TryRemove(tId, out _);
}
});
RunningTasks.TryAdd(tId, bgTask);
await Task.CompletedTask;
}
/// <summary>
/// 加入到消费队列
/// </summary>
/// <param name="taskIds"></param>
public void JoinQueue(params long[] taskIds)
{
if (taskIds is null || taskIds.Length == 0)
return;
// 直接批量推入,避免循环和事务开销
var d = taskIds.Select(s => (object)s).ToArray();
Redis.LPush(ChannelKey, d);
}
}
}