Learn.VideoAnalysis/VideoAnalysisCore/Common/WorkflowBase.cs

299 lines
12 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
public class WorkflowFlowSwitchException : Exception { }
public abstract class WorkflowBase<TEnum> where TEnum : struct, Enum
{
public bool StopTask { get; set; } = false;
public Dictionary<TEnum, Func<string, Task>> SubscribeList = new Dictionary<TEnum, Func<string, Task>>();
public readonly RedisClient Redis;
public readonly RedisManager RedisManager;
private CancellationTokenSource? _cts;
private List<Task> _workerTasks = new List<Task>();
public static ConcurrentDictionary<string, Task> RunningTasks = new ConcurrentDictionary<string, Task>();
/// <summary>
/// 工作流的Redis队列key
/// </summary>
protected abstract string ChannelKey { get; }
/// <summary>
/// 单工作流并发数量
/// </summary>
protected abstract int Concurrency { get; }
public WorkflowBase(RedisClient redis, RedisManager redisManager)
{
Redis = redis;
RedisManager = redisManager;
}
public async void InitChannel()
{
if (AppCommon.Config.TaskSetting.IS_Server) return;
//处理之前程序结束前未能执行完的情况
var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
//重试任务并发过多可能会导致程序崩溃
// 未能重新分析的中断任务 则单独开一个网页来处理
if (oldTaskCount > 0)
{
//获取所有未完成的任务
var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, -1);
Console.WriteLine($"{DateTime.Now:HH:mm:ss}-------------> 发现 {oldTaskArr.Length} 个未完成任务,准备恢复...");
//使用信号量限制并发数(5),防止崩溃
using var semaphore = new System.Threading.SemaphoreSlim(5);
var retryTaskArr = new List<Task>();
foreach (var oldTask in oldTaskArr)
{
try
{
// 检查该任务是否属于当前工作流(根据 LastEnum 的类型)
var lastEnumStr = (await Redis.HMGetAsync<string>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
// 尝试解析为当前工作流的枚举
if (!string.IsNullOrEmpty(lastEnumStr) && Enum.TryParse(typeof(TEnum), lastEnumStr, true, out var result))
{
await semaphore.WaitAsync();
var res = Task.Run(async () =>
{
try
{
await RedisManager.AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask);
await RedisManager.ClearTaskError(long.Parse(oldTask));
var lastEnum = (TEnum)result;
await InsertChannel(lastEnum, oldTask);
}
catch (Exception ex)
{
await RedisManager.SetTaskErrorMessage(long.Parse(oldTask), ex);
Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
}
finally
{
semaphore.Release();
}
});
retryTaskArr.Add(res);
}
else
{
// 如果无法解析为当前工作流的枚举,说明该任务属于其他工作流,跳过
// Console.WriteLine($"任务 {oldTask} 不属于工作流 {typeof(TEnum).Name},跳过");
}
}
catch (Exception ex)
{
Console.WriteLine($"检查任务 {oldTask} 状态失败: {ex.Message}");
}
}
//等待所有 重试任务完成后接收新任务
await Task.WhenAll(retryTaskArr);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 所有未完成任务处理完毕!");
ReceivingTaskAsync();
}
else
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!");
ReceivingTaskAsync();
}
}
/// <summary>
/// 重新执行任务
/// </summary>
public void ReceivingTaskAsync()
{
int concurrency = Concurrency;
if (concurrency <= 0) concurrency = 1;
_cts = new CancellationTokenSource();
var token = _cts.Token;
for (int i = 0; i < concurrency; i++)
{
var index = i;
var task = Task.Run(async () =>
{
Console.WriteLine($"{DateTime.Now} ==> 开始监听 [{typeof(TEnum).Name}] 队列 [{index}]...");
while (!token.IsCancellationRequested && !StopTask)
{
try
{
var taskId = Redis.BLPop(ChannelKey, 5);
if (!string.IsNullOrEmpty(taskId))
{
Redis.LPush(RedisExpandKey.IDTask, taskId);
await RedisManager.AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ");
// 获取第一个枚举值作为起始步骤
var firstStep = Enum.GetValues(typeof(TEnum)).Cast<TEnum>().FirstOrDefault();
if (Convert.ToInt32(firstStep) == 0) // 跳过排队中
{
var next = firstStep.NextEnum();
if (next.HasValue) firstStep = next.Value;
}
await InsertChannel(firstStep, taskId);
}
}
catch (Exception ex)
{
Console.WriteLine($"任务监听异常: {ex.Message}");
await Task.Delay(2000);
}
}
}, token);
_workerTasks.Add(task);
}
}
public async Task InsertChannel(TEnum @enum, string taskId)
{
await RedisManager.AddTaskLog(taskId, "==> 开始执行任务 ");
await ProcessTaskFlow(@enum, taskId, taskId);
}
/// <summary>
/// 异步流程判定条件
/// </summary>
/// <param name="currentStep"></param>
/// <param name="nextStep"></param>
/// <param name="taskId"></param>
/// <returns></returns>
protected virtual async Task HandleSpecialFlowAsync(TEnum currentStep, TEnum nextStep, string taskId)
{
await Task.CompletedTask;
}
/// <summary>
/// 主处理任务流程
/// </summary>
/// <param name="currentStep"></param>
/// <param name="taskId"></param>
/// <param name="tId"></param>
/// <returns></returns>
protected async Task ProcessTaskFlow(TEnum currentStep, string taskId, string tId)
{
try
{
if (!SubscribeList.ContainsKey(currentStep))
throw new Exception($"{currentStep} 未实现");
while (true)
{
if (StopTask)
{
await RedisManager.AddTaskLog(tId, "==> 手动停止任务 ");
return;
}
// 1. 记录步骤开始时间 (需要转换 RedisChannelEnum 才能调用 UpdateStepTimeAsync如果类型不匹配则需要适配)
// 这里简化,暂不记录非主流程的时间,或者需要在 RedisManager 增加泛型支持
// await RedisManager.UpdateStepTimeAsync(taskId, currentStep);
// 2. 执行当前步骤
await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
// 3. 准备下一步
var nextStepNullable = currentStep.NextEnum();
if (nextStepNullable == null) break;
var nextStep = nextStepNullable.Value;
// 4. 特殊分流处理
try
{
await HandleSpecialFlowAsync(currentStep, nextStep, taskId);
}
catch (WorkflowFlowSwitchException)
{
return; // 流程切换,退出当前循环
}
currentStep = nextStep;
}
}
catch (Exception ex)
{
await RedisManager.SetTaskErrorMessage(long.Parse(tId), ex);
}
finally
{
await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
}
}
public async Task TouchChannel(TEnum key, string taskId, Func<string, Task> action)
{
var tID = long.Parse(taskId);
await RedisManager.AddTaskLog(taskId, " 开始执行 " + key + " " + taskId);
try
{
// 尝试将当前枚举转为 RedisChannelEnum 存储状态,如果无法转换则强转 int
RedisChannelEnum dbEnum;
if (key is RedisChannelEnum rc) dbEnum = rc;
else
{
// 使用 Convert.ToInt32 处理各种枚举基础类型 (int, byte, long 等)
var intValue = Convert.ToInt32(key);
dbEnum = (RedisChannelEnum)intValue;
}
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString());
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.LastEnum == dbEnum)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
await action(taskId);
}
catch (Exception ex)
{
await RedisManager.AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
throw;
}
}
/// <summary>
/// 异步流程
/// </summary>
/// <param name="startStep"></param>
/// <param name="taskId"></param>
/// <param name="tId"></param>
/// <returns></returns>
protected async Task DispatchBackgroundFlow(TEnum startStep, string taskId, string tId)
{
var bgTask = Task.Run(async () =>
{
try
{
await ProcessTaskFlow(startStep, taskId, tId);
}
finally
{
RunningTasks.TryRemove(tId, out _);
}
});
RunningTasks.TryAdd(tId, bgTask);
await Task.CompletedTask;
}
}
}