新增 redis任务缓存过期时间

This commit is contained in:
小肥羊 2025-10-28 17:15:13 +08:00
parent d5a1da1040
commit 193289437b
4 changed files with 62 additions and 35 deletions

View File

@ -61,7 +61,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
var intervalSec = 5; var intervalSec = 5;
var threshold = 8.15; var threshold = 8.15;
var ssimThreshold = 0.9; var ssimThreshold = 0.9;
var taskInfo = await videoTaskDB.AsQueryable() var taskInfo = await videoTaskDB.CopyNew().AsQueryable()
.Where(s => s.Id == long.Parse(task)).FirstAsync(); .Where(s => s.Id == long.Parse(task)).FirstAsync();
if (string.IsNullOrEmpty(taskInfo.PPTVideoCode) || string.IsNullOrEmpty(taskInfo.PPTVideoUrl)) return; if (string.IsNullOrEmpty(taskInfo.PPTVideoCode) || string.IsNullOrEmpty(taskInfo.PPTVideoUrl)) return;
//视频切帧 //视频切帧
@ -77,7 +77,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
redisManager.SetTaskProgress(task, "Frame=>10%"); redisManager.SetTaskProgress(task, "Frame=>10%");
foreach (string jpgFile in Directory.GetFiles(localPath, "*.jpg")) foreach (string jpgFile in Directory.GetFiles(localPath, "*.jpg"))
FileSystem.DeleteFile(jpgFile, UIOption.OnlyErrorDialogs, RecycleOption.DeletePermanently); File.Delete(jpgFile);
redisManager.SetTaskProgress(task, "Frame=>20%"); redisManager.SetTaskProgress(task, "Frame=>20%");
await ffmpeg.ExecuteAsync($"-i {filePath} -vf \"fps=1/{intervalSec},scale=960:540\" {localPath}/{ExpandFunction.FrameName}%03d.jpg", cToken); await ffmpeg.ExecuteAsync($"-i {filePath} -vf \"fps=1/{intervalSec},scale=960:540\" {localPath}/{ExpandFunction.FrameName}%03d.jpg", cToken);
@ -122,7 +122,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
} }
//写入数据库 //写入数据库
var keyFramStr = keyFrames.Where(s => s != -1).ToJson(); var keyFramStr = keyFrames.Where(s => s != -1).ToJson();
await videoTaskDB.AsUpdateable() await videoTaskDB.CopyNew().AsUpdateable()
.SetColumns(it => it.PPTKeyFrame == keyFramStr) .SetColumns(it => it.PPTKeyFrame == keyFramStr)
.Where(it => it.Id == taskID) .Where(it => it.Id == taskID)
.ExecuteCommandAsync(); .ExecuteCommandAsync();
@ -176,7 +176,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
/// <returns></returns> /// <returns></returns>
public async Task Audio2WAV16KAsync(string task) public async Task Audio2WAV16KAsync(string task)
{ {
var filePath = await videoTaskDB.AsQueryable() var filePath = await videoTaskDB.CopyNew().AsQueryable()
.Where(s => s.Id == long.Parse(task)) .Where(s => s.Id == long.Parse(task))
.Select(s=>s.LocalMediaPath).FirstAsync(); .Select(s=>s.LocalMediaPath).FirstAsync();
if (string.IsNullOrEmpty(filePath)) if (string.IsNullOrEmpty(filePath))

View File

@ -84,14 +84,21 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
$"这是我的分段 {thems}。" + $"这是我的分段 {thems}。" +
$"课堂内容与{sections}章节相关" + $"课堂内容与{sections}章节相关" +
$"最后请确保分配的知识点是用户提供的,并且一定正确合理!" + $"最后请确保分配的知识点是用户提供的,并且一定正确合理!" +
$"返回的片段数量与传入片段数量一致!" +
$"输出内容只返回json格式({checkResFormat1})" + $"输出内容只返回json格式({checkResFormat1})" +
$" 格式 (方法点Id|方法点名称) " + $" 格式 (方法点Id|方法点名称) " +
$"提供的知识点名称({knows})。"; $"提供的知识点名称({knows})。";
Console.WriteLine(DateTime.Now + "=>2.开始分析视频内容知识点"); Console.WriteLine(DateTime.Now + "=>2.开始分析视频内容知识点");
var konwRes = await chatGPTClient.ChatAsync<VideoKnowRes[]>(taskInfo.Id.ToString(), knowMessages, "知识点"); VideoKnowRes[] konwRes;
for (int i = 0; i < 5; i++)
for (int i = 0; i < konwRes.Count(); i++) {
questionRes[i].KnowPoint = konwRes[i].KnowPoint; konwRes = await chatGPTClient.ChatAsync<VideoKnowRes[]>(taskInfo.Id.ToString(), knowMessages, "知识点");
// 分析结果的片段数量与预期不匹配
if (questionRes.Length != konwRes.Length) continue;
for (int xi = 0; xi < konwRes.Count(); xi++)
questionRes[i].KnowPoint = konwRes[i].KnowPoint;
break;
}

View File

@ -70,9 +70,9 @@ namespace VideoAnalysisCore.Common
/// <summary> /// <summary>
/// 任务对象地址 /// 任务对象地址
/// </summary> /// </summary>
public static string Task(object taskId) => BaseKey + "TaskInfo:" + taskId; public static string Task(object taskId) => BaseKey + "Info:" + taskId;
public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID; public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID;
public static string TaskGPT(object taskId) => Task(taskId) + ":GPTCached"; public static string TaskGPT(object taskId) => BaseKey + "GPTCached:" + taskId;
/// <summary> /// <summary>
/// 初始化 redis /// 初始化 redis
/// <para>需要在初始化配置文件时候调用</para> /// <para>需要在初始化配置文件时候调用</para>
@ -113,13 +113,17 @@ namespace VideoAnalysisCore.Common
public void Init() public void Init()
{ {
var SubscribeList = RedisManager.SubscribeList; var SubscribeList = RedisManager.SubscribeList;
SubscribeList.Add(RedisChannelEnum., async (task) =>
{
await Task.CompletedTask;
});
SubscribeList.Add(RedisChannelEnum., async (task) => SubscribeList.Add(RedisChannelEnum., async (task) =>
{ {
using var scope = AppCommon.Services?.CreateScope(); using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<DownloadFile>() is null) if (scope is null || scope.ServiceProvider.GetService<DownloadFile>() is null)
throw new Exception("DownloadFile 未注入"); throw new Exception("DownloadFile 未注入");
else else
await scope.ServiceProvider.GetService<DownloadFile>()?.RunTask(task) ; await scope.ServiceProvider.GetService<DownloadFile>()?.RunTask(task);
}); });
SubscribeList.Add(RedisChannelEnum., FFMPGE.RunAsync); SubscribeList.Add(RedisChannelEnum., FFMPGE.RunAsync);
SubscribeList.Add(RedisChannelEnum., senseVoice.RunTask); SubscribeList.Add(RedisChannelEnum., senseVoice.RunTask);
@ -225,7 +229,6 @@ namespace VideoAnalysisCore.Common
if (Redis is null) throw new Exception("redis未初始化"); if (Redis is null) throw new Exception("redis未初始化");
//设置任务Redis缓存过期时间 //设置任务Redis缓存过期时间
Redis.Expire(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 14);
var startTime = Redis.HMGet<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault(); var startTime = Redis.HMGet<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault();
if (startTime is null) if (startTime is null)
@ -233,6 +236,8 @@ namespace VideoAnalysisCore.Common
if (!SubscribeList.ContainsKey(@enum)) if (!SubscribeList.ContainsKey(@enum))
throw new Exception(@enum + " 未实现"); throw new Exception(@enum + " 未实现");
var tId = taskId.ToString(); var tId = taskId.ToString();
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 开始执行任务 " + tId);
try try
{ {
while (true) while (true)
@ -256,6 +261,8 @@ namespace VideoAnalysisCore.Common
{ {
await SetTaskErrorMessage(long.Parse(tId), ex); await SetTaskErrorMessage(long.Parse(tId), ex);
} }
Redis.Expire(RedisExpandKey.Task(taskId), 60 * 60 * 24 * 15);
} }
public async Task TaskEnd(string task) public async Task TaskEnd(string task)
@ -318,15 +325,23 @@ namespace VideoAnalysisCore.Common
var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, oldTaskCount); var oldTaskArr = Redis.LRange(RedisExpandKey.IDTask, 0, oldTaskCount);
//不自动清理未完成任务 等待执行完毕/失败后自动清理 //不自动清理未完成任务 等待执行完毕/失败后自动清理
//Redis.LTrim(RedisExpandKey.IDTask, 1, 0);//删除 redis 列表 //Redis.LTrim(RedisExpandKey.IDTask, 1, 0);//删除 redis 列表
foreach (var oldTask in oldTaskArr) foreach (var oldTask in oldTaskArr.Take(10))
{ {
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask); try
await ClearTaskError(long.Parse(oldTask)); {
var lastEnum = (await Redis.HMGetAsync<RedisChannelEnum>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault(); Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收上次未完成任务 " + oldTask);
await InsertChannel(lastEnum, oldTask); await ClearTaskError(long.Parse(oldTask));
}); var lastEnum = (await Redis.HMGetAsync<RedisChannelEnum>(RedisExpandKey.Task(oldTask), "LastEnum")).FirstOrDefault();
await InsertChannel(lastEnum, oldTask);
}
catch (Exception ex)
{
await SetTaskErrorMessage(long.Parse(oldTask), ex);
throw;
}
});
} }
} }
else else
@ -387,24 +402,19 @@ namespace VideoAnalysisCore.Common
Console.WriteLine($"{DateTime.Now} =>服务端不接收任务"); Console.WriteLine($"{DateTime.Now} =>服务端不接收任务");
return; return;
} }
Task.Run(() => lock (Redis)
{ {
lock (Redis) if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露
return;
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) =>
{ {
if (Subscribe?.IsUnsubscribed == false)//排除重试机制后 多次接收任务导致内存泄露 if (taskId is null) return;
return; Subscribe?.Dispose();//取消接收任务监听
Subscribe = Redis.SubscribeList(RedisExpandKey.ChannelKey, async (taskId) => Redis.LPush(RedisExpandKey.IDTask, taskId);
{ Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收到任务 " + taskId);
if (taskId is null) return; await InsertChannel(RedisChannelEnum., taskId);
Subscribe?.Dispose();//取消接收任务监听 });
//存储当前机器的任务 }
Redis.LPush(RedisExpandKey.IDTask, taskId);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收到任务 " + taskId);
await InsertChannel(RedisChannelEnum., taskId);
});
}
});
} }
/// <summary> /// <summary>

View File

@ -77,6 +77,12 @@ namespace VideoAnalysisCore.Controllers
{ {
SubjectEnum. SubjectEnum.
}; };
var courseTypeArr = new List<AttachmentsInfoType?>
{
AttachmentsInfoType.,
AttachmentsInfoType.,
AttachmentsInfoType.
};
foreach (var sGroup in reqArr.GroupBy(s => s.ContentId)) foreach (var sGroup in reqArr.GroupBy(s => s.ContentId))
{ {
var s = sGroup.FirstOrDefault(s => s.VideoType == VideoType.); var s = sGroup.FirstOrDefault(s => s.VideoType == VideoType.);
@ -84,6 +90,10 @@ namespace VideoAnalysisCore.Controllers
return BadRequest("无有效的老师授课视频"); return BadRequest("无有效的老师授课视频");
//校验学科有效 //校验学科有效
if (!subjectArr.Contains(s.SubjectId)) continue; if (!subjectArr.Contains(s.SubjectId)) continue;
//无效的课程类型
if (!courseTypeArr.Contains(s.CourseType)) continue;
var stageId = s.StageId.GetHashCode(); var stageId = s.StageId.GetHashCode();
var subjectId = s.SubjectId.GetHashCode(); var subjectId = s.SubjectId.GetHashCode();
var course = courseArr.FirstOrDefault(x => stageId == x.Stage_Id && subjectId == x.Subject_Id); var course = courseArr.FirstOrDefault(x => stageId == x.Stage_Id && subjectId == x.Subject_Id);