新增 视频批量处理

This commit is contained in:
小肥羊 2026-01-07 17:27:58 +08:00
parent 1f5ecaa604
commit ba00c75a36
4 changed files with 128 additions and 38 deletions

View File

@ -303,7 +303,8 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
//分段超长问题,评分优化如何处理 //分段超长问题,评分优化如何处理
var keyFrameStr = string.IsNullOrEmpty(taskInfo?.PPTVideoCode) || string.IsNullOrEmpty(taskInfo?.PPTKeyFrame) var keyFrameStr = string.IsNullOrEmpty(taskInfo?.PPTVideoCode) || string.IsNullOrEmpty(taskInfo?.PPTKeyFrame)
? $"请分析授课中字幕描述的知识内容,然后基于视频整体知识点讲解提炼出不同的阶段以便对老师上课内容切片提取为知识库,所以请确保阶段的内容准确性" ? $"请分析授课中字幕描述的知识内容,然后基于视频整体知识点讲解提炼出不同的阶段以便对老师上课内容切片提取为知识库,所以请确保阶段的内容准确性"
: $"授课中老师的PPT在这些时间段内进行了切换{taskInfo.PPTKeyFrame},理应这些时间段内的讲述内容也发生了变化,请你基于PPT变化时间点结合字幕描述的知识内容提炼出不同的切片。每个阶段的起始和结束应接近这些时间点例如以时间点为中心扩展至内容自然过渡处。"; : $"授课中老师的PPT在这些时间段内进行了切换{taskInfo.PPTKeyFrame},理应这些时间段内的讲述内容也发生了变化,请你基于PPT变化时间点结合字幕描述的知识内容提炼出不同的切片。" +
$"每个阶段的起始和结束应接近这些时间点(例如,以时间点为中心,扩展至内容自然过渡处)。";
var resFormat = """[{"StartTime":开始秒(number),"EndTime":结束秒(number),"Stage":阶段(string),"Theme":主题(string),"Content":内容总结(string)}]"""; var resFormat = """[{"StartTime":开始秒(number),"EndTime":结束秒(number),"Stage":阶段(string),"Theme":主题(string),"Content":内容总结(string)}]""";
var reviewStr = taskInfo?.VideoType == AttachmentsInfoType. var reviewStr = taskInfo?.VideoType == AttachmentsInfoType.
? $"但本堂课是习题课,所以大部分阶段是不同的例题讲解内容。\n" ? $"但本堂课是习题课,所以大部分阶段是不同的例题讲解内容。\n"
@ -316,11 +317,12 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
$"完整的课堂标准流程包含以下5个阶段课程引入/新知讲解/例题精讲/课堂练习/知识总结。\n" + $"完整的课堂标准流程包含以下5个阶段课程引入/新知讲解/例题精讲/课堂练习/知识总结。\n" +
reviewStr + reviewStr +
$"初步划分阶段:{keyFrameStr}\n" + $"初步划分阶段:{keyFrameStr}\n" +
$"\n" +
$"内容分析:对每个时间段,提取主要讲解内容:识别关键词(如“例题”“证明”“练习”“总结”)和内容结构。\n" + $"内容分析:对每个时间段,提取主要讲解内容:识别关键词(如“例题”“证明”“练习”“总结”)和内容结构。\n" +
$"判断阶段类型:如果内容以解题为主,归类为“例题精讲”;如果涉及新知识讲解,归类为“新知讲解”;以此类推。\n" + $"判断阶段类型:如果内容以解题为主,归类为“例题精讲”;如果涉及新知识讲解,归类为“新知讲解”;以此类推。\n" +
$"内容总结简述该阶段的核心讲解内容70~200字,确保内容与阶段时间内授课内容符合。\n" + $"内容总结简述该阶段的核心讲解内容70~200字,确保内容与阶段时间内授课内容符合。\n" +
$"阶段主题:基于内容总结,提炼一个恰当的主题(例如,“柯西不等式的基本应用”)。\n" + $"阶段主题:基于内容总结,提炼一个恰当的主题(例如,“柯西不等式的基本应用”)。\n" +
$"输出要求:确保阶段划分合理、无` 重叠,且时长符合要求\n" + $"输出要求:确保阶段划分合理、无` 重叠,且时长符合要求,并且每个阶段的时长需要超过60秒如果时长不够去考虑合并到相邻的阶段\n" +
$"输出格式要求内容只返回json格式({resFormat})\n" + $"输出格式要求内容只返回json格式({resFormat})\n" +
$"字幕格式(开始秒:内容|下一段字幕).以下是包含时间的视频字幕文本。\n" + $"字幕格式(开始秒:内容|下一段字幕).以下是包含时间的视频字幕文本。\n" +
$"字幕列表 {captions.Captions} 字幕结束!"; $"字幕列表 {captions.Captions} 字幕结束!";

View File

@ -204,7 +204,7 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
.ExecuteCommandAsync(); .ExecuteCommandAsync();
await redisManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res); await redisManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res);
//分析完成视频字幕后继续接收任务 //分析完成视频字幕后继续接收任务
redisManager.NewTask(); //redisManager.NewTask();
} }
return res; return res;
} }

View File

@ -10,6 +10,7 @@ using System.Security.Cryptography;
using System.Text.Json; using System.Text.Json;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Xml.Linq; using System.Xml.Linq;
using UserCenter.Model.Enum; using UserCenter.Model.Enum;
using VideoAnalysisCore.AICore.FFMPGE; using VideoAnalysisCore.AICore.FFMPGE;
@ -169,10 +170,15 @@ namespace VideoAnalysisCore.Common
{ {
public static bool StopTask { get; set; } = false; public static bool StopTask { get; set; } = false;
public static Dictionary<RedisChannelEnum, Func<string, Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>(); public static Dictionary<RedisChannelEnum, Func<string, Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
/// <summary> /// <summary>
/// 队列池 /// 正在后台运行的任务集合
/// </summary> /// </summary>
static SubscribeListObject? Subscribe; public static ConcurrentDictionary<string, Task> RunningTasks = new ConcurrentDictionary<string, Task>();
private static CancellationTokenSource? _cts;
private static Task? _workerTask;
public RedisClient Redis { get; set; } public RedisClient Redis { get; set; }
public Repository<VideoTask> videoTaskDB { get; set; } public Repository<VideoTask> videoTaskDB { get; set; }
public Repository<TaskLog> taskLogDB { get; set; } public Repository<TaskLog> taskLogDB { get; set; }
@ -273,41 +279,94 @@ namespace VideoAnalysisCore.Common
if (taskId is null) throw new Exception("taskId为空"); if (taskId is null) throw new Exception("taskId为空");
if (Redis is null) throw new Exception("redis未初始化"); if (Redis is null) throw new Exception("redis未初始化");
//设置任务Redis缓存过期时间
var startTime = Redis.HMGet<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault();
if (startTime is null)
startTime = new Dictionary<RedisChannelEnum, DateTime>();
if (!SubscribeList.ContainsKey(@enum))
throw new Exception(@enum + " 未实现");
var tId = taskId.ToString(); var tId = taskId.ToString();
await AddTaskLog(tId, "==> 开始执行任务 "); await AddTaskLog(tId, "==> 开始执行任务 ");
await ProcessTaskFlow(@enum, taskId, tId);
}
private async Task ProcessTaskFlow(RedisChannelEnum currentStep, object taskId, string tId)
{
try try
{ {
// 确保有初始校验
if (!SubscribeList.ContainsKey(currentStep))
throw new Exception($"{currentStep} 未实现");
while (true) while (true)
{ {
if (!startTime.ContainsKey(@enum)) if (StopTask)
startTime.Add(@enum, DateTime.Now); {
else await AddTaskLog(tId, "==> 手动停止任务 ");
startTime[@enum] = DateTime.Now; return;
}
// 1. 记录步骤开始时间
await UpdateStepTimeAsync(taskId, currentStep);
// 2. 执行当前步骤业务逻辑
await TouchChannel(currentStep, tId, SubscribeList[currentStep]);
// 3. 准备下一步
var nextStepNullable = currentStep.NextEnum();
if (nextStepNullable == null) break; // 流程结束
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime); var nextStep = nextStepNullable.Value;
// 4. 特殊分流:解析字幕完成后,后续步骤转后台并行处理
await TouchChannel(@enum, tId, SubscribeList[@enum]); if (currentStep == RedisChannelEnum.)
//await SubscribeList[@enum](tId); {
var e = @enum.NextEnum(); DispatchBackgroundFlow(nextStep, taskId, tId);
if (e is null) return; // 释放当前主控线程
break; }
@enum = e.Value; // 5. 继续循环
currentStep = nextStep;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
await SetTaskErrorMessage(long.Parse(tId), ex); await SetTaskErrorMessage(long.Parse(tId), ex);
} }
finally
{
// 每次流程结束(无论是正常结束、异常还是分流退出),都尝试延长过期时间
// 注意:如果是分流退出,这里也会执行,保证 key 活跃
await Redis.ExpireAsync(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
}
}
Redis.Expire(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15); /// <summary>
/// 更新任务步骤时间
/// </summary>
private async Task UpdateStepTimeAsync(object taskId, RedisChannelEnum step)
{
// 获取现有时间字典(如果不存在则新建)
// 注意HMGet 返回的是数组,取第一个元素
var result = await Redis.HMGetAsync<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime");
var startTime = result?.FirstOrDefault() ?? new Dictionary<RedisChannelEnum, DateTime>();
// 更新时间
startTime[step] = DateTime.Now;
// 写回 Redis
await Redis.HMSetAsync(RedisExpandKey.Task(taskId), "StartTime", startTime);
}
/// <summary>
/// 分发后续任务到动态线程池
/// </summary>
private void DispatchBackgroundFlow(RedisChannelEnum startStep, object taskId, string tId)
{
var bgTask = Task.Run(async () =>
{
try
{
await ProcessTaskFlow(startStep, taskId, tId);
}
finally
{
RunningTasks.TryRemove(tId, out _);
}
});
RunningTasks.TryAdd(tId, bgTask);
} }
public async Task TaskEnd(string task) public async Task TaskEnd(string task)
@ -407,12 +466,10 @@ namespace VideoAnalysisCore.Common
StopTask = true; StopTask = true;
try try
{ {
//取消接收任务监听 _cts?.Cancel();
Subscribe?.Dispose();
} }
catch (Exception) catch (Exception)
{ {
throw; throw;
} }
} }
@ -452,19 +509,38 @@ namespace VideoAnalysisCore.Common
} }
lock (Redis) lock (Redis)
{ {
if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露 // 如果任务正在运行且未完成,直接返回
if (_workerTask != null && !_workerTask.IsCompleted)
return; return;
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) =>
_cts = new CancellationTokenSource();
var token = _cts.Token;
_workerTask = Task.Run(async () =>
{ {
if (taskId is null) return; Console.WriteLine($"{DateTime.Now} => 开始监听任务队列...");
lock (Redis) while (!token.IsCancellationRequested && !StopTask)
{ {
Subscribe?.Dispose();//取消接收任务监听 try
} {
Redis.LPush(RedisExpandKey.IDTask, taskId); // 使用 BLPop 阻塞式获取任务超时5秒以便检查取消状态
await AddTaskLog(taskId, "-------------> 接收到任务 "); var taskId = Redis.BLPop(RedisExpandKey.ChannelKey, 5);
await InsertChannel(RedisChannelEnum., taskId); if (!string.IsNullOrEmpty(taskId))
}); {
Redis.LPush(RedisExpandKey.IDTask, taskId);
await AddTaskLog(taskId, "-------------> 接收到任务 ");
// await等待任务处理完成确保串行执行
await InsertChannel(RedisChannelEnum., taskId);
}
}
catch (Exception ex)
{
Console.WriteLine($"任务监听异常: {ex.Message}");
await Task.Delay(2000);
}
}
Console.WriteLine($"{DateTime.Now} => 停止监听任务队列.");
}, token);
} }
} }

View File

@ -200,6 +200,18 @@ namespace VideoAnalysisCore.Controllers
} }
/// <summary>
/// 视频处理[批量]
/// </summary>
/// <param name="req">请求体</param>
/// <returns></returns>
[HttpPost(Name = "VideoAnalysis_Batch")]
public async Task<IActionResult> VideoAnalysis_Batch(VideoAnalysisReq[] req)
{
foreach (var item in req)
await VideoAnalysis(item);
return Ok();
}
/// <summary> /// <summary>
/// 视频处理 /// 视频处理