Learn.VideoAnalysis/VideoAnalysisCore/Job/NodeSubscriptionJob.cs

111 lines
3.8 KiB
C#

using Coravel.Invocable;
using Microsoft.AspNetCore.Http;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using UserCenter.Model.Enum;
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Dto;
using VideoAnalysisCore.Model.;
using Yitter.IdGenerator;
namespace VideoAnalysisCore.Job
{
/// <summary>
/// [蓝鲸智库] 查找未处理的视频
/// </summary>
public class NodeSubscriptionJob : IInvocable
{
/// <summary>
/// 每个扫描文件包每次取出{20}个
/// </summary>
private readonly int TopLength = 20;
private readonly Repository<NodeSubscription> nodesubscriptionDB;
private readonly Repository<Attachments> attachmentsDB;
private readonly Repository<VideoTask> videotaskDB;
public NodeSubscriptionJob(Repository<Attachments> videoTaskDB, Repository<NodeSubscription> nodesubscriptionDB, Repository<VideoTask> videotaskDB)
{
this.attachmentsDB = videoTaskDB;
this.nodesubscriptionDB = nodesubscriptionDB;
this.videotaskDB = videotaskDB;
}
public async Task Invoke()
{
Console.WriteLine($"{DateTime.Now} 执行=>文件节点订阅");
var videoIdArr = videotaskDB.AsQueryable().Select(v => v.TagId).Distinct().ToArray();
var tasks = await nodesubscriptionDB.GetListAsync(s => s.Enable && s.Subject ==SubjectEnum.);
foreach (var item in tasks)
{
var fileNodeId = item.NodeId;
var data = attachmentsDB.Context.Ado
.SqlQuery<Attachments>($"""
SELECT top {TopLength}
*
FROM
Attachments
WHERE
Id IN (
SELECT
AttachmentsId
FROM
Material
WHERE
id IN (
SELECT
MaterialId
FROM
FileContentMaterial
WHERE
FileContentId IN ( SELECT id FROM FileContent WHERE BagId IN ( SELECT Id FROM FileDirectory WHERE Id={fileNodeId} AND types = 1 AND DeleteState = 0 ) )
AND DeleteState = 0
AND ( MaterialName NOT LIKE '%PPT%' OR MaterialName NOT LIKE '%ppt%' )
)
)
AND Type = '录播视频'
AND (
NAME NOT LIKE '%PPT%'
OR NAME NOT LIKE '%ppt%'
) and id>{item.LastId}
""");
if (data is null || data.Count == 0)
continue;
Console.WriteLine($"{DateTime.Now} 视频订阅=>Node:{item.NodeId} 数量{data.Count}");
var videos = new List<VideoTask>(data.Count);
foreach (var s in data)
{
if (videoIdArr.Contains(s.VideoCode))
continue;
videos.Add(new VideoTask()
{
Id = YitIdHelper.NextId(),
ComeFrom = "127.0.0.1",
ApiToken = "",
//Type = item.TaskType,
Subject = item.Subject,
Tag = item.NodeId.ToString(),
TagId = s.VideoCode,
MediaUrl = string.Empty,
});
}
var maxId = data.Max(s => s.Id);
//入库 更新最后扫描记录
await nodesubscriptionDB.AsUpdateable()
.SetColumns(it => it.LastId == maxId)
.Where(it => it.Id == item.Id)
.ExecuteCommandAsync();
await videotaskDB.InsertRangeAsync(videos);
var ids = videos.Select(s => s.Id).ToArray();
RedisExpand.JoinQueue(ids);
}
}
}
}