优化任务调度流程和异常捕获流程

This commit is contained in:
小肥羊 2025-06-17 11:35:06 +08:00
parent f7c787cdf7
commit 19855abb6f
16 changed files with 144 additions and 108 deletions

View File

@ -69,7 +69,7 @@ namespace Learn.VideoAnalysis.Components.Pages
/// <param name="query"></param>
async void ReStart()
{
await RedisExpand.SetTaskErrorMessage(reStartTask.Id, null);
await RedisExpand.ClearTaskError(reStartTask.Id);
_=Task.Run(() =>
RedisExpand.InsertChannel((RedisChannelEnum)selectEnum, reStartTask.Id)
);
@ -138,7 +138,7 @@ namespace Learn.VideoAnalysis.Components.Pages
statusStr= "wait";
else if (!string.IsNullOrEmpty(rowData.Data.ErrorMessage))
statusStr= "error";
else if (dic.ContainsKey(RedisChannelEnum.EndTask))
else if (dic.ContainsKey(RedisChannelEnum.))
statusStr= "finish";
item.TaskStatus = statusStr;
StateHasChanged();

View File

@ -171,22 +171,22 @@ namespace VideoAnalysisCore.AICore.FFMPGE
ffmpeg.Error += (sender, e) =>
{
var ee = new Exception($"音频转码出现异常 \r\n[{e.Input.Name} => {e.Output.Name}]: 错误: {e.Exception.Message}");
RedisExpand.SetTaskErrorMessage(long.Parse(task), ee);
throw ee;
};
var conversionOptions = new ConversionOptions
{
ExtraArguments = "-ar 16000 -ac 1"
//+ (AppCommon.AppSetting.FFmpeg.TimeSlice == 0
//?string.Empty
//: $"-f segment -reset_timestamps 1 -segment_time {AppCommon.AppSetting.FFmpeg.TimeSlice}")
};
var res = await ffmpeg.ConvertAsync(inputFile, outputFile, conversionOptions);
try
{
await ffmpeg.ConvertAsync(inputFile, outputFile, conversionOptions);
}
catch
{
throw;
}
Console.WriteLine($"{DateTime.Now}=>音频转码完成");
//加入下一队列
RedisExpand.InsertChannel(RedisChannelEnum.ParsingCaptions, task);
}
}

View File

@ -183,7 +183,6 @@ namespace VideoAnalysisCore.AICore.GPT.ChatGPT
var gptRes = new TaskRes(captions);
await RedisExpand.Redis
.HMSetAsync(RedisExpandKey.Task(task), "ChatAnalysis", gptRes);
RedisExpand.InsertChannel(RedisChannelEnum.EndTask, task);
return gptRes;
}
public async Task<T> ChatAsync<T>(string task, string postMessages, string postMessages1, string resFormat)
@ -416,7 +415,6 @@ namespace VideoAnalysisCore.AICore.GPT.ChatGPT
await RedisExpand.Redis
.HMSetAsync(RedisExpandKey.Task(task), "ChatAnalysis", gptRes);
RedisExpand.InsertChannel(RedisChannelEnum.EndTask, task);
return gptRes;
}

View File

@ -164,7 +164,7 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
{
//校验结果质量
var thems = JsonSerializer.Serialize(questionRes.Adapt<VideoKnowQueryDto[]>());
var pptFormat = taskInfo.VideoType==AttachmentsInfoType.PPT
var pptFormat = taskInfo.VideoType==AttachmentsInfoType.
? "这堂课是习题课,所讲解内容都是试题。"
: string.Empty;
var checkResFormat = """{"Score":打分(number),"Evaluation":评价(string)""";//,"Data":优化后的分段(array)}""";
@ -262,7 +262,7 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
: $"图像视频中授课内容PPT发生了变化的时间节点是{taskInfo.PPTKeyFrame},授课阶段结果可以参考这些时间节点。";
var resFormat = """[{"StartTime":开始秒(number),"EndTime":结束秒(number),"Stage":阶段(string),"Theme":主题(string),"Content":内容总结(string)}]""";
var exerciseClass = taskInfo?.VideoType == AttachmentsInfoType.Review
var exerciseClass = taskInfo?.VideoType == AttachmentsInfoType.
? $"但是本堂课是习题课,所以每个阶段是不同的例题讲解内容。"
: string.Empty;
//$"请注意 本次分析的视频字幕只是其中一部分 不需要分析出所有类型的授课阶段。";
@ -281,7 +281,7 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
Console.WriteLine(DateTime.Now + $"=>{taskInfo.Id.ToString()}.开始分析视频内容 {tryCount}");
var resData = await ChatAsync<VideoKnowRes[]>(taskInfo.Id.ToString(), postMessages, "分析字幕");
if (taskInfo?.VideoType == AttachmentsInfoType.Review)
if (taskInfo?.VideoType == AttachmentsInfoType.)
foreach (var item in resData)
item.Stage = StageEnum..ToString();
questionRes.AddRange(resData);
@ -556,10 +556,9 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
await RedisExpand.Redis
.HMSetAsync(RedisExpandKey.Task(task), "VideoKnows", questionRes);
if (taskInfo.VideoType == AttachmentsInfoType.Review)
if (taskInfo.VideoType == AttachmentsInfoType.)
await AnalysisVideoQuestions(taskInfo, knowledgeInfos);
RedisExpand.InsertChannel(RedisChannelEnum.EndTask, task);
return null;
}
}

View File

@ -222,7 +222,6 @@ namespace VideoAnalysisCore.AICore.GPT.KIMI
await RedisExpand.Redis
.HMSetAsync(RedisExpandKey.Task(task), "ChatAnalysis", gptRes);
RedisExpand.InsertChannel(RedisChannelEnum.EndTask, task);
return gptRes;
}

View File

@ -179,13 +179,8 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
.Where(it => it.Id == long.Parse(task))
.ExecuteCommandAsync();
await RedisExpand.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res);
//RedisExpand.InsertChannel(Enum.RedisChannelEnum.ParsingSpeaker, task);
//分析完成视频字幕后继续接收任务
await RedisExpand.NewTaskAsync();
RedisExpand.InsertChannel(RedisChannelEnum.ChatModelAnalysis, task);
RedisExpand.NewTaskAsync();
}
return res;
}

View File

@ -84,8 +84,6 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
.SetColumns(it => it.Speaker == speakerStr)
.Where(it => it.Id == long.Parse(task))
.ExecuteCommandAsync();
//加入下一队列
RedisExpand.InsertChannel(RedisChannelEnum.ChatModelAnalysis, task);
}
}

View File

@ -46,7 +46,6 @@ namespace VideoAnalysisCore.AICore.Whisper
res.Add(new WhisperResDto(segment));
}
RedisExpand.Redis.HMSet(RedisExpandKey.Task(task), "Captions", res);
RedisExpand.InsertChannel(RedisChannelEnum.ParsingSpeaker, task);
}
/// <summary>
/// 检测语言的方法

View File

@ -224,6 +224,24 @@ namespace VideoAnalysisCore.Common
};
}
/// <summary>
/// 获取下一个枚举值
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="current"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public static T? NextEnum<T>(this T current) where T : struct, Enum
{
if (!typeof(T).IsEnum)
throw new ArgumentException("传入类型不是枚举");
T[] values = (T[])Enum.GetValues(typeof(T));
int currentIndex = Array.IndexOf(values, current);
if (currentIndex == values.Length - 1)
return null;
int nextIndex = (currentIndex + 1) % values.Length;
return values[nextIndex];
}
/// <summary>
/// 转化枚举
/// </summary>
/// <param name="value"></param>

View File

@ -200,9 +200,9 @@ namespace VideoAnalysisCore.Common
(s, e) => RedisExpand.SetTaskProgress(task, "PPT->" + Math.Round(e.ProgressPercentage, 1)
));
}
catch (Exception e)
catch
{
await RedisExpand.SetTaskErrorMessage(taskId, e);
throw;
}
}
try
@ -210,13 +210,10 @@ namespace VideoAnalysisCore.Common
await Download(fileUrl, localPath, task + fileExtension,
(s, e) => RedisExpand.SetTaskProgress(task, Math.Round(e.ProgressPercentage,1)
));
//加入下一队列
RedisExpand.InsertChannel(RedisChannelEnum.SeparateAudio, task);
}
catch (Exception e)
catch
{
await RedisExpand.SetTaskErrorMessage(taskId, e);
throw;
}

View File

@ -1,9 +1,11 @@
using FreeRedis;
using FreeRedis.Internal;
using Microsoft.Extensions.DependencyInjection;
using NetTaste;
using Newtonsoft.Json.Schema;
using SqlSugar.IOC;
using System;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Channels;
using System.Threading.Tasks;
@ -20,6 +22,7 @@ using VideoAnalysisCore.AICore.Whisper;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Dto;
using VideoAnalysisCore.Model.Enum;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace VideoAnalysisCore.Common
{
@ -148,7 +151,7 @@ namespace VideoAnalysisCore.Common
/// </summary>
/// <param name="enum">枚举</param>
/// <param name="taskId">任务id</param>
public static void InsertChannel(RedisChannelEnum @enum, object taskId)
public static async Task InsertChannel(RedisChannelEnum @enum, object taskId)
{
if (taskId is null) throw new Exception("taskId为空");
if (Redis is null) throw new Exception("redis未初始化");
@ -165,8 +168,20 @@ namespace VideoAnalysisCore.Common
if (!SubscribeList.ContainsKey(@enum))
throw new Exception(@enum + " 未实现");
var tId = taskId.ToString();
try
{
SubscribeList[@enum].Invoke(taskId.ToString());
while (@enum.NextEnum() != null)
{
SubscribeList[@enum].Invoke(tId);
@enum = @enum.NextEnum().Value;
}
}
catch (Exception ex)
{
await SetTaskErrorMessage((long)taskId, ex);
}
}
public static async Task TaskEnd(string task)
@ -190,7 +205,7 @@ namespace VideoAnalysisCore.Common
//taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes);
taskData.ChatAnalysisScore =0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.EndTask;
taskData.LastEnum = RedisChannelEnum.;
taskData.EndTime = DateTime.Now;
await DbScoped.Sugar.Updateable(taskData)
.UpdateColumns(it => new
@ -215,8 +230,8 @@ namespace VideoAnalysisCore.Common
{
if (Redis is null) throw new Exception("redis未初始化");
SubscribeList.Add(RedisChannelEnum.DownloadFile,
(msg) => TouchChannel(RedisChannelEnum.DownloadFile, msg,
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
@ -225,14 +240,14 @@ namespace VideoAnalysisCore.Common
else
return scope.ServiceProvider.GetService<DownloadFile>()?.RunTask(task) ?? Task.CompletedTask;
}));
SubscribeList.Add(RedisChannelEnum.SeparateAudio,
(msg) => TouchChannel(RedisChannelEnum.SeparateAudio, msg, FFMPGEHandle.RunAsync));
SubscribeList.Add(RedisChannelEnum.ParsingCaptions,
(msg) => TouchChannel(RedisChannelEnum.ParsingCaptions, msg, SenseVoice.RunTask));
SubscribeList.Add(RedisChannelEnum.ParsingSpeaker,
(msg) => TouchChannel(RedisChannelEnum.ParsingSpeaker, msg, Speaker.Run));
SubscribeList.Add(RedisChannelEnum.ChatModelAnalysis,
(msg) => TouchChannel(RedisChannelEnum.ChatModelAnalysis, msg,
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, FFMPGEHandle.RunAsync));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, SenseVoice.RunTask));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, Speaker.Run));
SubscribeList.Add(RedisChannelEnum.AI模型分析,
(msg) => TouchChannel(RedisChannelEnum.AI模型分析, msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
@ -241,8 +256,8 @@ namespace VideoAnalysisCore.Common
else
return scope.ServiceProvider.GetService<IBserGPT>()?.GetKnow(task) ?? Task.CompletedTask;
}));
SubscribeList.Add(RedisChannelEnum.EndTask,
(msg) => TouchChannel(RedisChannelEnum.EndTask, msg, TaskEnd));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, TaskEnd));
ReceivingTaskAsync();
@ -252,8 +267,9 @@ namespace VideoAnalysisCore.Common
/// 重新执行新任务
/// </summary>
/// <returns></returns>
public static async Task NewTaskAsync()
public static async void NewTaskAsync()
{
await Redis.DelAsync(RedisExpandKey.IDTask);
ReceivingTaskAsync();
}
@ -279,21 +295,21 @@ namespace VideoAnalysisCore.Common
if (!string.IsNullOrEmpty(oldTask))
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收重试任务 " + oldTask);
await ClearTaskError(long.Parse(oldTask));
var lastEnum = (await Redis.HMGetAsync<RedisChannelEnum>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
await SetTaskErrorMessage(long.Parse(oldTask), null);
InsertChannel(lastEnum, oldTask);
await InsertChannel(lastEnum, oldTask);
return;
}
if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露
return;
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, (taskId) =>
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) =>
{
if (taskId is null) return;
Subscribe?.Dispose();//取消接收任务监听
//存储当前机器的任务
Redis.HSet(RedisExpandKey.IDTask, taskId,true);
Redis.Set(RedisExpandKey.IDTask, taskId);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收到任务 " + taskId);
InsertChannel(RedisChannelEnum.DownloadFile, taskId);
await InsertChannel(RedisChannelEnum., taskId);
});
});
@ -317,15 +333,24 @@ namespace VideoAnalysisCore.Common
Console.WriteLine(ex.StackTrace);
Console.WriteLine("==============================================");
//清除失败任务 重新接收任务
await NewTaskAsync();
NewTaskAsync();
}
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清楚 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public static async Task<bool> ClearTaskError(long taskID) =>await SetTaskError(taskID, string.Empty);
public static async Task<bool> SetTaskError(long taskID, string? error)
{
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
return await DbScoped.Sugar.Updateable<VideoTask>()
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
/// <summary>
@ -370,7 +395,7 @@ namespace VideoAnalysisCore.Common
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 稍后后重试." + key + " " + taskId);
}
}
await SetTaskErrorMessage(tID, errArr.First());
throw errArr.Last();
}
else
{

View File

@ -146,26 +146,12 @@ namespace VideoAnalysisCore.Controllers
task.Subject = subject;
await videoTaskDB.UpdateAsync(task);
}
//重新开始执行GPT分析
RedisExpand.InsertChannel(RedisChannelEnum.ChatModelAnalysis
, task.Id);
//todo重新开始执行GPT分析
return Ok();
}
/// <summary>
/// 插入队列
/// </summary>
/// <param name="enum"></param>
/// <param name="msg"></param>
/// <returns></returns>
[HttpPost(Name = "TestInsertChannel")]
public IActionResult TestInsertChannel(int @enum = 1, string msg = "1")
{
RedisExpand.InsertChannel(@enum.ToEnum<RedisChannelEnum>().Value
, msg);
return Ok();
}
/// <summary>
/// 视频处理

View File

@ -78,7 +78,7 @@ namespace VideoAnalysisCore.Controllers.Dto
/// <summary>
/// 视频类型 PPT课件 = 1, 摄像头 = 2
/// </summary>
public int VideoType { get; set; }
public VideoType VideoType { get; set; }
/// <summary>
/// 分析完成后的回调地址
@ -98,7 +98,13 @@ namespace VideoAnalysisCore.Controllers.Dto
/// 科目类型
/// </summary>
[Required(ErrorMessage = "科目类型是必填项")]
public SubjectEnum SubjectType { get; set; }
public SubjectEnum SubjectId { get; set; }
/// <summary>
/// 内容的Id 当内容id相同的时候 则表示是一组数据
/// <para>用于查找视频下对应的PPT资源</para>
/// </summary>
public long ContentId { get; set; }
}
/// <summary>

View File

@ -60,15 +60,17 @@ namespace VideoAnalysisCore.Controllers
var videos = new List<VideoTask>(reqArr.Count());
var nodePackages = new List<NodePackageInfo>(reqArr.Count());
var videoIdArr = videoTaskDB.AsQueryable().Select(v => v.TagId).Distinct().ToArray();
foreach (var s in reqArr)
foreach (var sGroup in reqArr.GroupBy(s=>s.ContentId))
{
var s= sGroup.First(s=>s.VideoType==VideoType.);
var sPPT= sGroup.FirstOrDefault(s=>s.VideoType==VideoType.PPT课件);
var np = new NodePackageInfo()
{
VideoCode = s.VideoCode,
MaterialId = s.MaterialId,
AttachmentId = s.AttachmentId,
TaskType = s.TaskType,
SubjectType = s.SubjectType,
SubjectType = s.SubjectId,
VideoUrl =s.VideoUrl,
CourseType = s.CourseType,
CallBackUrl=s.CallBackUrl,
@ -78,15 +80,14 @@ namespace VideoAnalysisCore.Controllers
nodePackages.Add(np);
if (videoIdArr.Contains(s.VideoCode))
continue;
//todo»ñÈ¡ppt videoCode
var pptCode = "todo»ñÈ¡ppt videoCode";
var pptCode = sPPT!=null ? sPPT.VideoCode : string.Empty;
videos.Add(new VideoTask()
{
Id = YitIdHelper.NextId(),
ComeFrom = "127.0.0.1",
ComeFrom = GetClientIpAddress(),
ApiToken = "",
Type = s.TaskType,
Subject = s.SubjectType,
Subject = s.SubjectId,
TagId = s.VideoCode,
MediaUrl =s.VideoUrl,
PPTVideoCode = pptCode,
@ -102,6 +103,16 @@ namespace VideoAnalysisCore.Controllers
return Ok();
}
private string GetClientIpAddress()
{
// 检查 X-Forwarded-For 请求头
if (HttpContext.Request.Headers.ContainsKey("X-Forwarded-For")
&& !string.IsNullOrEmpty(HttpContext.Request.Headers["X-Forwarded-For"]))
return HttpContext.Request.Headers["X-Forwarded-For"].ToString();
if (HttpContext.Connection.RemoteIpAddress != null)
return HttpContext.Connection.RemoteIpAddress.ToString();
throw new Exception("未能获取到客户端ip地址");
}
/// <summary>
/// »ñÈ¡ÈÎÎñÀàÐÍ
/// </summary>

View File

@ -7,21 +7,22 @@ using System.Threading.Tasks;
namespace VideoAnalysisCore.Model.Enum
{
public enum VideoType
{
PPT课件 = 1,
= 2
}
public enum AttachmentsInfoType
{
[Description("常规课程")]
None = 0,
[Description("教研")]
TeachingResearch = 1,
[Description("PPT")]
PPT = 2,
[Description("复习")]
Review = 3,
= 0,
[Description("复习/习题课")]
= 3,
[Description("活动")]
Activities = 4,
= 4,
[Description("班会")]
Meeting = 5,
[Description("行为分析")]
Behavior = 6,
= 5,
[Description("其他资料")]
= 7
}
}

View File

@ -5,33 +5,37 @@
/// </summary>
public enum RedisChannelEnum
{
/// <summary>
/// 等待中
/// </summary>
= 0,
/// <summary>
/// 下载文件
/// </summary>
DownloadFile,
= 5,
/// <summary>
/// 分离音频
/// </summary>
SeparateAudio,
= 10,
/// <summary>
/// 解析字幕
/// </summary>
ParsingCaptions,
= 20,
/// <summary>
/// 解析说话人
/// </summary>
ParsingSpeaker,
= 30,
/// <summary>
/// Chat模型分析
/// </summary>
ChatModelAnalysis,
///// <summary>
///// 回调三方系统
///// </summary>
//CallBackSystem,
AI模型分析 = 40,
/// <summary>
/// 分析试题
/// </summary>
AI分析试题 = 50,
/// <summary>
/// 结束任务
/// </summary>
EndTask,
= 60,
}
}