优化任务调度流程

This commit is contained in:
小肥羊 2025-06-18 10:33:36 +08:00
parent 19855abb6f
commit 4ab527c388
16 changed files with 122 additions and 1009 deletions

View File

@ -8,6 +8,7 @@
@using VideoAnalysisCore.Model.Dto
@using VideoAnalysisCore.Model.Enum;
<Table @ref="_table" Loading="tableLoading" TItem="VideoTaskDto" ScrollY="600px" PageSize="10" Total="_total" DataSource="_dataSource"
OnRowClick="(r)=>r.Expanded = !r.Expanded"
@bind-SelectedRows="_selectedRows" OnChange="OnChange"
@ -25,9 +26,9 @@
<PropertyColumn Property="c=>c.Subject" Width="100px" />
<PropertyColumn Property="c=>c.ComeFrom" Width="100px" />
<PropertyColumn Property="c=>c.MediaUrl" Width="320px" />
<PropertyColumn Property="c=>c.TotalTokens" Width="100px" />
<PropertyColumn Property="c=>c.CreateTime" />
</ColumnDefinitions>
<ExpandTemplate Context="rowData">
<Descriptions Title="任务详情" Bordered>
@ -50,25 +51,25 @@
</Button>
</DescriptionsItem>
<DescriptionsItem Title="任务时间轴" Span="5">
<DescriptionsItem Title="任务时间轴" Span="6">
<Steps Current="@((int)rowData.Data.LastEnum)" Status="@rowData.Data.TaskStatus">
<Step Title="下载文件"
Description="@RowST(rowData,RedisChannelEnum.DownloadFile)" />
Description="@RowST(rowData,RedisChannelEnum.下载文件)" />
<Step Title="分离音频"
Description="@RowST(rowData,RedisChannelEnum.SeparateAudio)" />
Description="@RowST(rowData,RedisChannelEnum.分离音频)" />
<Step Title="解析字幕"
Description="@RowST(rowData,RedisChannelEnum.ParsingCaptions)" />
Description="@RowST(rowData,RedisChannelEnum.解析字幕)" />
<Step Title="解析说话人"
Description="@RowST(rowData,RedisChannelEnum.ParsingSpeaker)" />
<Step Title="Chat模型分析"
Description="@RowST(rowData,RedisChannelEnum.ChatModelAnalysis)" />
<Step Title="AI模型分析"
Description="@RowST(rowData,RedisChannelEnum.AI模型分析)" />
<Step Title="AI分析试题"
Description="@RowST(rowData,RedisChannelEnum.AI分析试题)" />
<Step Title="结束任务"
Description="@RowST(rowData,RedisChannelEnum.EndTask)" />
Description="@RowST(rowData,RedisChannelEnum.结束任务)" />
</Steps>
</DescriptionsItem>
@ -101,4 +102,4 @@
</Select>
<br />
<br />
</Modal>
</Modal>

View File

@ -128,8 +128,8 @@ namespace Learn.VideoAnalysis.Components.Pages
var data = RedisExpand.Redis.HMGet<string>(RedisExpandKey.Task(item.Id),
"Progress", "LastEnum", "StartTime", "ErrorMessage");
item.Progress = data[0];
item.LastEnum = data[1].ToEnum<RedisChannelEnum>() ?? default;
item.StartTimeDic = System.Text.Json.JsonSerializer.Deserialize<Dictionary<RedisChannelEnum, DateTime>>(data[2]) ?? null;
item.LastEnum = data[1] == null ?default:data[1].ToEnum<RedisChannelEnum>() ?? default;
item.StartTimeDic = data[2]==null?null: System.Text.Json.JsonSerializer.Deserialize<Dictionary<RedisChannelEnum, DateTime>>(data[2]) ?? null;
item.ErrorMessage = data[3];
rowRestartLoading = false;
var statusStr = "wait";

View File

@ -95,7 +95,7 @@ namespace Learn.VideoAnalysis.Components.Pages
}).ToArray();
videoPath = AppCommon.GetVideoPath(nowTask.Id.ToString());
if (nowTask.VideoType == AttachmentsInfoType.Review)
if (nowTask.VideoType == AttachmentsInfoType.)
{
var questionArr = await videoQuestionDB
.AsQueryable().Where(s => s.VideoTaskId == nowTask.Id)

View File

@ -4,7 +4,6 @@ using Microsoft.OpenApi.Models;
using VideoAnalysisCore.AICore.SherpaOnnx;
using Mapster;
using VideoAnalysisCore.AICore.GPT;
using VideoAnalysisCore.AICore.GPT.KIMI;
using VideoAnalysisCore.AICore.GPT.ChatGPT;
using Microsoft.Extensions.FileProviders;
using VideoAnalysisCore.AICore.GPT.DeepSeek;

View File

@ -16,5 +16,11 @@ namespace VideoAnalysisCore.AICore.GPT
/// <param name="task">任务id</param>
/// <returns></returns>
public Task<TaskRes> GetKnow(string task);
/// <summary>
/// 获取 视频分段内的 试题
/// </summary>
/// <param name="task">任务id</param>
/// <returns></returns>
public Task GetVideoQuestion(string task);
}
}

View File

@ -418,5 +418,10 @@ namespace VideoAnalysisCore.AICore.GPT.ChatGPT
return gptRes;
}
public Task GetVideoQuestion(string task)
{
throw new NotImplementedException();
}
}
}

View File

@ -7,7 +7,6 @@ using System.Net.Http;
using Newtonsoft.Json;
using System.Net.Http.Json;
using System.Net;
using VideoAnalysisCore.AICore.GPT.KIMI;
using System.Threading;
using System;
using System.IO;
@ -121,8 +120,7 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
Console.WriteLine(e.Message);
Console.WriteLine(e.StackTrace);
Console.WriteLine("==============================================");
Thread.Sleep(1000);
Thread.Sleep(1000);
}
}
throw errorMSG.Last(s => s != null);

View File

@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using VideoAnalysisCore.AICore.GPT.KIMI;
namespace VideoAnalysisCore.AICore.GPT.DeepSeek
{

View File

@ -556,10 +556,50 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
await RedisExpand.Redis
.HMSetAsync(RedisExpandKey.Task(task), "VideoKnows", questionRes);
if (taskInfo.VideoType == AttachmentsInfoType.)
await AnalysisVideoQuestions(taskInfo, knowledgeInfos);
return null;
}
/// <summary>
/// 获取 视频分段内的 试题
/// </summary>
/// <param name="task">任务id</param>
/// <returns></returns>
public async Task GetVideoQuestion(string task)
{
var taskId = long.Parse(task);
var taskInfo = await videoTaskDB.AsQueryable()
.Where(s => s.Id == taskId)
.FirstAsync();
if (taskInfo.VideoType != AttachmentsInfoType.)
return;
var subject = taskInfo.Subject.ToString();
var Course_Id = 27;
switch (taskInfo.Type)//处理不同任务类型的知识点树
{
case TaskTypeEnum._中职视频分段:
Course_Id = 51;
break;
case TaskTypeEnum._视频分段:
default:
Course_Id = 27;
break;
}
var captionsArr = JsonSerializer.Deserialize<SenseVoiceRes[]>(taskInfo.Captions);
//处理视频授课章节
var sections = await GetSections(taskInfo, Course_Id);
var know = await knowledgeInfoDB.GetFirstAsync(s => s.Course_Id == Course_Id && s.Name == sections);
if (know is null)
throw new Exception("未能找到对应知识点=>" + sections);
var kInfo = await knowledgeInfoDB.GetByIdAsync(know.Parent_Id);
var knowledgeInfos = await knowledgeInfoDB.AsQueryable()
.ToChildListAsync(s => s.Parent_Id, kInfo.Parent_Id == 0 ? kInfo.Id : kInfo.Parent_Id);
//开始分析复习课 试题
await AnalysisVideoQuestions(taskInfo, knowledgeInfos);
return;
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -1,356 +0,0 @@
using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Net.Http;
using Newtonsoft.Json;
using System.Net.Http.Json;
using System.Net;
using Azure;
using System.Reflection.PortableExecutable;
using static System.Runtime.InteropServices.JavaScript.JSType;
/// <summary>
/// https://platform.moonshot.cn/docs/api-reference
/// </summary>
namespace VideoAnalysisCore.AICore.GPT.KIMI
{
public class MoonshotClient
{
private readonly ILogger<MoonshotClient> _logger;
private readonly IHttpClientFactory _httpClientFactory;
public MoonshotClient(ILogger<MoonshotClient> logger, IHttpClientFactory httpClientFactory)
{
_logger = logger;
_httpClientFactory = httpClientFactory;
}
/// <summary>
/// list models
/// </summary>
/// <returns></returns>
public async Task<ModelListResp> ListModels()
{
var response = await GetAsync("/v1/models");
return await ParseResp<ModelListResp>(response);
}
/// <summary>
/// Chat
/// </summary>
/// <param name="requestBody"></param>
/// <returns>Return HttpResponseMessage for SSE</returns>
public async Task<ChatRes?> Chat(string requestBody)
{
var chatResp = await PostJsonStreamAsync("/v1/chat/completions", requestBody);
return await chatResp.Content.ReadFromJsonAsync<ChatRes>();
}
/// <summary>
/// ChatSSE[流式传输 更稳定]
/// </summary>
/// <param name="chatReq"></param>
/// <returns>Return HttpResponseMessage for SSE</returns>
public async Task<(Usage u, string res)?> ChatSSE(ChatReq chatReq)
{
chatReq.stream = true;
var requestBody = System.Text.Json.JsonSerializer.Serialize(chatReq);
var chatResp = await PostJsonStreamAsync("/v1/chat/completions", requestBody);
using var stream = await chatResp.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream, Encoding.UTF8);
string line;
StringBuilder messageBuilder = new StringBuilder();
ChatResSSE lastChat = new ChatResSSE();
while ((line = await reader.ReadLineAsync()) != null)
{
if (line.EndsWith("[DONE]"))
{
// 表示一条消息结束
string message = messageBuilder.ToString();
messageBuilder.Clear();
var u = lastChat?.choices?.FirstOrDefault()?.usage;
if (u == null || string.IsNullOrEmpty(message))
return null;
return (u, message);
}
else if (line.StartsWith("data:"))
{
try
{
var data = System.Text.Json.JsonSerializer.Deserialize<ChatResSSE>(line.Substring("data:".Length).Trim());
lastChat = data;
var str = data?.choices.FirstOrDefault()?.delta.content;
if (!string.IsNullOrEmpty(str))
messageBuilder.Append(str);
}
catch (Exception e)
{
Console.WriteLine("异常 ChatSSE=>");
Console.WriteLine(line);
Console.WriteLine(e.Message);
Console.WriteLine(e.StackTrace);
}
}
}
return null;
}
/// <summary>
/// Chat
/// </summary>
/// <param name="chatReq"></param>
/// <returns>Return HttpResponseMessage for SSE</returns>
public async Task<(Usage u, string res)?> Chat(ChatReq chatReq)
{
var requestBody = System.Text.Json.JsonSerializer.Serialize(chatReq);
var chatResp = await PostJsonStreamAsync("/v1/chat/completions", requestBody);
var res = await chatResp.Content.ReadFromJsonAsync<ChatRes>();
var chatResContent = res?.choices.FirstOrDefault()?.message.content.Trim();
if (res is null || res.error != null)
throw new Exception($"KIMI模型返回异常 Chat 返回参数: " +
$" {System.Text.Json.JsonSerializer.Serialize(res)}");
if (string.IsNullOrEmpty(chatResContent))
return null;
return (res.usage, chatResContent);
}
/// <summary>
/// 计算token长度
/// </summary>
/// <param name="chatReqText">文本</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public async Task<int?> GetAsTiMateTokenCount(string chatReqText)
{
var reqObject = new
{
model = "moonshot-v1-128k",
messages = new List<MessagesItem>()
{
new MessagesItem(chatReqText,"system"),
}
};
var response = await PostJsonStreamAsync("/v1/tokenizers/estimate-token-count", JsonConvert.SerializeObject(reqObject));
var responseText = await response.Content.ReadAsStringAsync();
if (response.IsSuccessStatusCode)
{
var responseObj = JToken.Parse(responseText);
return responseObj?["data"]?["total_tokens"]?.ToObject<int>();
}
var error = JsonConvert.DeserializeObject<ErrorResponse>(responseText);
_logger.LogError($"{error?.error?.type}: {error?.error?.message}");
throw new Exception($"{error?.error.type}: {error?.error.message}");
}
/// <summary>
/// Get as timate token count
/// </summary>
/// <param name="chatReq"></param>
/// <returns></returns>
public async Task<int?> GetAsTiMateTokenCount(ChatReq chatReq)
{
var chatReqText = JsonConvert.SerializeObject(chatReq);
return await GetAsTiMateTokenCount(chatReqText);
}
/// <summary>
/// List files
/// </summary>
public virtual async Task<FileListResp> ListFiles()
{
var response = await GetAsync("/v1/files");
return await ParseResp<FileListResp>(response);
}
/// <summary>
/// Upload file
/// </summary>
public virtual async Task<FileItem> UploadFile(string filePath)
{
if (!File.Exists(filePath))
{
throw new FileNotFoundException($"{filePath} not found");
}
var client = _httpClientFactory.CreateClient();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ApiKey);
var request = new HttpRequestMessage(HttpMethod.Post, $"{Host}/v1/files");
var content = new MultipartFormDataContent
{
{ new StreamContent(File.OpenRead(filePath)), "file", filePath }
};
request.Content = content;
var response = await client.SendAsync(request);
return await ParseResp<FileItem>(response);
}
/// <summary>
/// Upload file stream
/// </summary>
public virtual async Task<FileItem> UploadFileStream(Stream stream, string fileName)
{
var client = _httpClientFactory.CreateClient();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ApiKey);
var request = new HttpRequestMessage(HttpMethod.Post, $"{Host}/v1/files");
var content = new MultipartFormDataContent
{
{ new StreamContent(stream), "file", fileName }
};
request.Content = content;
var response = await client.SendAsync(request);
return await ParseResp<FileItem>(response);
}
/// <summary>
/// Get file content
/// </summary>
public virtual async Task<FileContent> GetFileContent(string fileId)
{
var response = await GetAsync($"/v1/files/{fileId}/content");
return await ParseResp<FileContent>(response);
}
private async Task<HttpResponseMessage> GetAsync(string path)
{
var client = _httpClientFactory.CreateClient();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ApiKey);
return await client.GetAsync(Host + path);
}
private async Task<HttpResponseMessage> PostJsonAsync(string path, string json)
{
var client = _httpClientFactory.CreateClient();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ApiKey);
return await client.PostAsync(Host + path, new StringContent(json, Encoding.UTF8, "application/json"));
}
private async Task<HttpResponseMessage> PostJsonStreamAsync(string path, string json)
{
var uriBuilder = new UriBuilder(Host + path);
var maxRestart = 4;
var errorMSG = new Exception[maxRestart];
for (int i = 0; i < maxRestart; i++)
{
try
{
var client = _httpClientFactory.CreateClient();
client.Timeout = TimeSpan.FromSeconds(Timeout.Infinite);//超时时间20分钟
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", ApiKey);
client.DefaultRequestVersion = HttpVersion.Version20;
client.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
client.DefaultRequestHeaders.ConnectionClose = true;
//var request = ToHttpRequest(path);
//request.Version = HttpVersion.Version20;
//request.VersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
//request.Content = new StringContent(json, Encoding.UTF8, "application/json");
//return await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
var content = new StringContent(json, Encoding.UTF8, "application/json");
return await client.PostAsync(uriBuilder.Uri, content);
}
catch (Exception e)
{
errorMSG[i] = e;
Console.WriteLine("====================[请求异常,重试]====================");
Console.WriteLine(uriBuilder.Uri);
Console.WriteLine(e.Message);
Console.WriteLine(e.StackTrace);
Console.WriteLine("==============================================");
}
Thread.Sleep(1000);
}
throw errorMSG.Last(s => s != null);
}
private HttpRequestMessage ToHttpRequest(string path)
{
var request = new HttpRequestMessage();
var uriBuilder = new UriBuilder(Host + path);
request.RequestUri = uriBuilder.Uri;
request.Method = new HttpMethod("POST");
request.Headers.Host = new Uri(Host).Host;
return request;
}
/// <summary>
/// Parse response
/// </summary>
private async Task<T> ParseResp<T>(HttpResponseMessage response)
{
var responseText = await response.Content.ReadAsStringAsync();
if (response.IsSuccessStatusCode)
{
return JsonConvert.DeserializeObject<T>(responseText) ?? default;
}
var error = JsonConvert.DeserializeObject<ErrorResponse>(responseText);
_logger.LogError($"{error?.error.type}: {error?.error.message}");
throw new Exception($"{error?.error.type}: {error?.error.message}");
}
private static string _host = "https://api.moonshot.cn";
public static string Host
{
get
{
if (string.IsNullOrEmpty(_host) && !string.IsNullOrEmpty(AppCommon.Config.ChatGpt.KIMI.Host))
{
_host = AppCommon.Config.ChatGpt.KIMI.Host ?? "";
}
return _host;
}
set
{
_host = value;
}
}
private static string _apiKey = "sk_";
public static string ApiKey
{
get
{
if (string.IsNullOrEmpty(_apiKey) && !string.IsNullOrEmpty(AppCommon.Config.ChatGpt.KIMI.ApiKey))
{
_apiKey = AppCommon.Config.ChatGpt.KIMI.ApiKey ?? "";
}
return _apiKey;
}
set
{
_apiKey = value;
}
}
}
}

View File

@ -1,365 +0,0 @@
namespace VideoAnalysisCore.AICore.GPT.KIMI
{
public class MessagesItem
{
public MessagesItem()
{
}
public MessagesItem(string content, string role = "user", bool partial = false)
{
this.content = content;
this.role = role;
this.partial = partial;
}
/// <summary>
///
/// </summary>
public string role { get; set; }
public bool partial { get; set; } = false;
/// <summary>
///
/// </summary>
public string content { get; set; }
}
/// <summary>
/// chat请求参数
/// </summary>
public class ChatReq
{
/// <summary>
/// 使用的模型
/// <para>例如[ moonshot-v1-8k ]</para>
/// </summary>
public string model { get; set; } = "moonshot-v1-8k";
/// <summary>
/// 消息主体
/// </summary>
public List<MessagesItem> messages { get; set; }
/// <summary>
/// 使用什么采样温度,介于 0 和 1 之间。较高的值(如 0.7)将使输出更加随机,而较低的值(如 0.2)将使其更加集中和确定性
/// <para>默认为 0如果设置值域须为 [0, 1] 我们推荐 0.3,以达到较合适的效果</para>
/// </summary>
public float temperature { get; set; }
/// <summary>
/// 聊天完成时生成的最大 token 数。如果到生成了最大 token 数个结果仍然没有结束finish reason 会是 "length", 否则会是 "stop"
/// <para>这个值建议按需给个合理的值,如果不给的话,我们会给一个不错的整数比如 1024。特别要注意的是这个 max_tokens 是指您期待我们返回的 token 长度,而不是输入 + 输出的总长度。比如对一个 moonshot-v1-8k 模型,它的最大输入 + 输出总长度是 8192当输入 messages 总长度为 4096 的时候,您最多只能设置为 4096否则我们服务会返回不合法的输入参数 invalid_request_error ),并拒绝回答。如果您希望获得“输入的精确 token 数”,可以使用下面的“计算 Token” API 使用我们的计算器获得计数</para>
/// </summary>
public int? max_tokens { get; set; }
/// <summary>
/// 另一种采样方法,即模型考虑概率质量为 top_p 的标记的结果。因此0.1 意味着只考虑概率质量最高的 10% 的标记。一般情况下,我们建议改变这一点或温度,但不建议 同时改变
/// </summary>
public float? top_p { get; set; } = 1.0f;
/// <summary>
/// 为每条输入消息生成多少个结果
/// <para>默认为 1不得大于 5。特别的当 temperature 非常小靠近 0 的时候,我们只能返回 1 个结果,如果这个时候 n 已经设置并且 > 1我们的服务会返回不合法的输入参数(invalid_request_error)</para>
/// </summary>
public int? n { get; set; } = 1;
/// <summary>
/// 存在惩罚,介于-2.0到2.0之间的数字。正值会根据新生成的词汇是否出现在文本中来进行惩罚,增加模型讨论新话题的可能性
/// <para>默认为 0</para>
/// </summary>
public float? presence_penalty { get; set; } = 0;
/// <summary>
/// 频率惩罚,介于-2.0到2.0之间的数字。正值会根据新生成的词汇在文本中现有的频率来进行惩罚,减少模型一字不差重复同样话语的可能性
/// <para>默认为 0</para>
/// </summary>
public float? frequency_penalty { get; set; } = 0;
/// <summary>
/// 停止词,当全匹配这个(组)词后会停止输出,这个(组)词本身不会输出。最多不能超过 5 个字符串,每个字符串不得超过 32 字节
/// <para>默认 null</para>
/// </summary>
public List<string>? stop { get; set; }
/// <summary>
/// 是否流式返回
/// <para>false</para>
/// </summary>
public bool stream { get; set; } = false;
}
public class PermissionItem
{
/// <summary>
///
/// </summary>
public int created { get; set; }
/// <summary>
///
/// </summary>
public string id { get; set; }
/// <summary>
///
/// </summary>
public string @object { get; set; }
/// <summary>
///
/// </summary>
public string allow_create_engine { get; set; }
/// <summary>
///
/// </summary>
public string allow_sampling { get; set; }
/// <summary>
///
/// </summary>
public string allow_logprobs { get; set; }
/// <summary>
///
/// </summary>
public string allow_search_indices { get; set; }
/// <summary>
///
/// </summary>
public string allow_view { get; set; }
/// <summary>
///
/// </summary>
public string allow_fine_tuning { get; set; }
/// <summary>
///
/// </summary>
public string organization { get; set; }
/// <summary>
///
/// </summary>
public string @group { get; set; }
/// <summary>
///
/// </summary>
public string is_blocking { get; set; }
}
public class ChatResError
{
/// <summary>
/// 错误信息
/// </summary>
public string message { get; set; } = string.Empty;
/// <summary>
/// 错误类型
/// </summary>
public string type { get; set; } = string.Empty;
}
public class ChatResSSE
{
public string id { get; set; }
public int created { get; set; }
/// <summary>
/// 模型id
/// </summary>
public string model { get; set; }
/// <summary>
/// 对话
/// </summary>
public ChoiceSSE[] choices { get; set; }
}
public class ChatRes
{
public ChatResError? error { get; set; }
public string id { get; set; }
public int created { get; set; }
/// <summary>
/// 模型id
/// </summary>
public string model { get; set; }
/// <summary>
/// 对话
/// </summary>
public Choice[] choices { get; set; }
/// <summary>
/// token使用情况
/// </summary>
public Usage usage { get; set; }
}
/// <summary>
/// token使用情况
/// </summary>
public class Usage
{
/// <summary>
/// 输入token数量
/// </summary>
public int prompt_tokens { get; set; }
/// <summary>
/// 返回token数量
/// </summary>
public int completion_tokens { get; set; }
/// <summary>
/// 总计token数量
/// </summary>
public int total_tokens { get; set; }
}
public class ChoiceSSE
{
public int index { get; set; }
public Message delta { get; set; }
public string finish_reason { get; set; }
/// <summary>
/// token使用情况
/// </summary>
public Usage usage { get; set; }
}
public class Choice
{
public int index { get; set; }
public Message message { get; set; }
public string finish_reason { get; set; }
}
public class Message
{
public string role { get; set; }
public string content { get; set; }
}
public class ModelInfo
{
/// <summary>
///
/// </summary>
public int created { get; set; }
/// <summary>
///
/// </summary>
public string id { get; set; }
/// <summary>
///
/// </summary>
public string @object { get; set; }
/// <summary>
///
/// </summary>
public string owned_by { get; set; }
/// <summary>
///
/// </summary>
public List<PermissionItem> permission { get; set; }
/// <summary>
///
/// </summary>
public string root { get; set; }
/// <summary>
///
/// </summary>
public string parent { get; set; }
}
public class ModelListResp
{
/// <summary>
///
/// </summary>
public string @object { get; set; }
/// <summary>
///
/// </summary>
public List<ModelInfo> data { get; set; }
}
public class FileListResp
{
/// <summary>
///
/// </summary>
public string @object { get; set; }
/// <summary>
///
/// </summary>
public List<FileItem> data { get; set; }
}
public class FileContent
{
/// <summary>
///
/// </summary>
public string content { get; set; }
/// <summary>
///
/// </summary>
public string file_type { get; set; }
/// <summary>
///
/// </summary>
public string filename { get; set; }
/// <summary>
///
/// </summary>
public string title { get; set; }
/// <summary>
///
/// </summary>
public string type { get; set; }
}
public class FileItem
{
/// <summary>
///
/// </summary>
public string id { get; set; }
/// <summary>
///
/// </summary>
public string @object { get; set; }
/// <summary>
///
/// </summary>
public int bytes { get; set; }
/// <summary>
///
/// </summary>
public int created_at { get; set; }
/// <summary>
///
/// </summary>
public string filename { get; set; }
/// <summary>
///
/// </summary>
public string purpose { get; set; }
/// <summary>
///
/// </summary>
public string status { get; set; }
/// <summary>
///
/// </summary>
public string status_details { get; set; }
}
public class ErrorMsg
{
/// <summary>
///
/// </summary>
public string message { get; set; }
/// <summary>
///
/// </summary>
public string type { get; set; }
}
public class ErrorResponse
{
/// <summary>
///
/// </summary>
public ErrorMsg error { get; set; }
}
}

View File

@ -1,5 +1,6 @@
using FreeRedis;
using Microsoft.Extensions.DependencyModel;
using Microsoft.IdentityModel.Tokens;
using SqlSugar;
using SqlSugar.IOC;
using System.Collections;
@ -250,6 +251,8 @@ namespace VideoAnalysisCore.Common
{
try
{
if(value is null || string.IsNullOrEmpty(value.ToString()))
return null;
if (Enum.TryParse<T>(value.ToString(), true, out var result) && Enum.IsDefined(typeof(T), result))
return result;
return null;

View File

@ -79,11 +79,12 @@ namespace VideoAnalysisCore.Common
/// </summary>
public static class RedisExpand
{
/// <summary>
/// redis 连接
/// </summary>
public static RedisClient Redis = new RedisClient(AppCommon.Config.Redis.ConnectionString);
public static Dictionary<RedisChannelEnum, Action<string>> SubscribeList = new Dictionary<RedisChannelEnum, Action<string>>();
public static Dictionary<RedisChannelEnum, Func<string,Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
/// <summary>
/// 队列池
/// </summary>
@ -159,23 +160,25 @@ namespace VideoAnalysisCore.Common
var startTime = Redis.HMGet<Dictionary<RedisChannelEnum, DateTime>>(RedisExpandKey.Task(taskId), "StartTime").FirstOrDefault();
if (startTime is null)
startTime = new Dictionary<RedisChannelEnum, DateTime>();
if (!startTime.ContainsKey(@enum))
startTime.Add(@enum, DateTime.Now);
else
startTime[@enum] = DateTime.Now;
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime);
if (!SubscribeList.ContainsKey(@enum))
throw new Exception(@enum + " 未实现");
var tId = taskId.ToString();
try
{
while (@enum.NextEnum() != null)
while (true)
{
SubscribeList[@enum].Invoke(tId);
@enum = @enum.NextEnum().Value;
if (!startTime.ContainsKey(@enum))
startTime.Add(@enum, DateTime.Now);
else
startTime[@enum] = DateTime.Now;
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime);
await SubscribeList[@enum](tId);
var e = @enum.NextEnum();
if (e is null)
break;
@enum = e.Value;
}
}
catch (Exception ex)
@ -231,7 +234,7 @@ namespace VideoAnalysisCore.Common
if (Redis is null) throw new Exception("redis未初始化");
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg,
async (msg) => await TouchChannel(RedisChannelEnum., msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
@ -241,13 +244,13 @@ namespace VideoAnalysisCore.Common
return scope.ServiceProvider.GetService<DownloadFile>()?.RunTask(task) ?? Task.CompletedTask;
}));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, FFMPGEHandle.RunAsync));
async (msg) => await TouchChannel(RedisChannelEnum., msg, FFMPGEHandle.RunAsync));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, SenseVoice.RunTask));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, Speaker.Run));
async (msg) => await TouchChannel(RedisChannelEnum., msg, SenseVoice.RunTask));
//SubscribeList.Add(RedisChannelEnum.解析说话人,
// async (msg) => await TouchChannel(RedisChannelEnum.解析说话人, msg, Speaker.Run));
SubscribeList.Add(RedisChannelEnum.AI模型分析,
(msg) => TouchChannel(RedisChannelEnum.AI模型分析, msg,
async (msg) => await TouchChannel(RedisChannelEnum.AI模型分析, msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
@ -256,8 +259,18 @@ namespace VideoAnalysisCore.Common
else
return scope.ServiceProvider.GetService<IBserGPT>()?.GetKnow(task) ?? Task.CompletedTask;
}));
SubscribeList.Add(RedisChannelEnum.AI分析试题,
async (msg) => await TouchChannel(RedisChannelEnum.AI分析试题, msg,
(task) =>
{
using var scope = AppCommon.Services?.CreateScope();
if (scope is null || scope.ServiceProvider.GetService<IBserGPT>() is null)
throw new Exception("IBserGPT 未注入");
else
return scope.ServiceProvider.GetService<IBserGPT>()?.GetVideoQuestion(task) ?? Task.CompletedTask;
}));
SubscribeList.Add(RedisChannelEnum.,
(msg) => TouchChannel(RedisChannelEnum., msg, TaskEnd));
async (msg) => await TouchChannel(RedisChannelEnum., msg, TaskEnd));
ReceivingTaskAsync();
@ -285,12 +298,6 @@ namespace VideoAnalysisCore.Common
}
Task.Run(async () =>
{
//todo 项目接收任务进程池
//接收任务加入池
//重试任务加入池
//失败任务删除池
//停止任务删除池
//重启项目运行池内所有可用任务
var oldTask = await Redis.GetAsync(RedisExpandKey.IDTask);
if (!string.IsNullOrEmpty(oldTask))
{
@ -338,12 +345,17 @@ namespace VideoAnalysisCore.Common
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清楚 任务的错误信息
/// 清除 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <returns></returns>
public static async Task<bool> ClearTaskError(long taskID) =>await SetTaskError(taskID, string.Empty);
/// <summary>
/// 修改任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public static async Task<bool> ClearTaskError(long taskID) =>await SetTaskError(taskID, string.Empty);
public static async Task<bool> SetTaskError(long taskID, string? error)
{
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
@ -359,14 +371,14 @@ namespace VideoAnalysisCore.Common
/// <param name="key"></param>
/// <param name="taskId"></param>
/// <param name="action"></param>
public static async void TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
public static async Task TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
{
if (taskId is null) return;
var tID = long.Parse(taskId);
if (action is not null)
{
var errArr = new Exception[3];
for (int i = 0; i < 3; i++)
var tryCount = 1;
for (int i = 0; i < tryCount; i++)
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 开始执行 " + key + " " + taskId);
try
@ -386,16 +398,16 @@ namespace VideoAnalysisCore.Common
}
catch (Exception ex)
{
errArr[i] = ex;
Console.WriteLine("====================[出现异常]====================");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
Console.WriteLine("==============================================");
Thread.Sleep(1000);
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 稍后后重试." + key + " " + taskId);
if (i+1== tryCount)
throw;
}
}
throw errArr.Last();
}
else
{

View File

@ -41,7 +41,7 @@ namespace VideoAnalysisCore.Job
// 查询 2 天前任务执行完成的记录
var completedTasks = videotaskDB.AsQueryable()
.Where(t =>
t.LastEnum == Model.Enum.RedisChannelEnum.EndTask
t.LastEnum == Model.Enum.RedisChannelEnum.
&& t.EndTime < twoDaysAgo
&& t.EndTime > endDaysAgo)
.ToList();

View File

@ -21,10 +21,10 @@
/// 解析字幕
/// </summary>
= 20,
/// <summary>
/// 解析说话人
/// </summary>
= 30,
///// <summary>
///// 解析说话人
///// </summary>
//解析说话人 = 30,
/// <summary>
/// Chat模型分析
/// </summary>