Learn.VideoAnalysis/VideoAnalysisCore/AICore/GPT/GPTClient.cs

299 lines
13 KiB
C#

using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Net.Http;
using Newtonsoft.Json;
using System.Net.Http.Json;
using System.Net;
using System.Threading;
using System;
using System.IO;
using VideoAnalysisCore.AICore.GPT.ChatGPT;
using System.Threading.Tasks;
using System.Text.Json;
using FFmpeg.NET.Services;
using NetTaste;
namespace VideoAnalysisCore.AICore.GPT
{
public abstract class GPTClient
{
public virtual GptConfig Config { get; set; }
private readonly IHttpClientFactory _httpClientFactory;
private readonly RedisManager redisManager;
private readonly VideoSliceWorkflowManager _workflowManager;
public GPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager, VideoSliceWorkflowManager workflowManager)
{
_httpClientFactory = httpClientFactory;
this.redisManager = redisManager;
_workflowManager = workflowManager;
}
/// <summary>
/// Chat
/// </summary>
/// <param name="chatReq"></param>
/// <returns>Return HttpResponseMessage for SSE</returns>
public async Task<(Usage u, string res, string reasoning)?> Chat(ChatRequest chatReq)
{
//chatReq.model = "deepseek-r1";
if (chatReq.stream) return await ChatSSE(chatReq);
throw new NotImplementedException();
}
private (string m1, string m2) thinkMSG(Message? m)
{
var chatResContent = m?.content.Trim();
var chatResReasoning = string.Empty;
if (chatResContent.StartsWith("<think"))
{
chatResReasoning = chatResContent.Substring(7, chatResContent.IndexOf("</think") - 7);
chatResContent = chatResContent
.Substring(chatResContent.IndexOf("</think>") + 8);
}
else
chatResReasoning = m?.reasoning_content?.Trim();
return (chatResContent, chatResReasoning);
}
/// <summary>
/// ChatSSE[流式传输 更稳定]
/// </summary>
/// <param name="chatReq"></param>
/// <returns>Return HttpResponseMessage for SSE</returns>
public async Task<(Usage u, string res, string reasoning)?> ChatSSE(ChatRequest chatReq)
{
var i = 5;
PostJsonStream:
var chatResp = await PostJsonStreamAsync(Config.Host + Config.Path, chatReq, Config.ApiKey);
if (!chatResp.IsSuccessStatusCode)
{
await _workflowManager.AddTaskLog(chatReq.taskId, $"==>请求GPT服务器异常 {chatResp?.StatusCode} 模型 {chatReq.model} {await chatResp.Content.ReadAsStringAsync()}");
if (--i < 0)
{
throw new Exception("请求GPT服务器失败次数过多");
}
goto PostJsonStream;
}
using var stream = await chatResp.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream, Encoding.UTF8);
string? line;
var messageBuilder = new StringBuilder();
var messageBuilder1 = new StringBuilder();
var lastChat = new ChatResSSE();
var splitCount = "data:".Length;
var maxLoop = 50 * 200;
int threshold = 0;
var startTime = DateTime.Now;
var endTime = startTime.AddHours(1.5);
//最长分析分析时间1.5小时 或者重试读取 1w次
while (maxLoop > 0 && DateTime.Now < endTime)
{
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
line = await reader.ReadLineAsync(cts.Token);
}
catch (OperationCanceledException)
{
await _workflowManager.AddTaskLog(chatReq.taskId, "==>流式响应超时(3分钟),尝试重新读取...");
maxLoop--;
continue;
}
if (line is null || string.IsNullOrWhiteSpace(line) || line.StartsWith(": keep-alive"))
{
await Task.Delay(50);
maxLoop--;
continue;
}
else if (line.EndsWith("[DONE]"))
{
// 表示一条消息结束
string message = messageBuilder.ToString();
string message2 = messageBuilder1.ToString();
messageBuilder.Clear();
messageBuilder1.Clear();
var d = thinkMSG(new Message() { content = message, reasoning_content = message2 });
message = d.m1;
message2 = d.m2;
var u = lastChat?.usage;
if (u == null || string.IsNullOrEmpty(message))
return null;
return (u, message, message2);
}
else if (line.StartsWith("data:"))
{
try
{
var data = System.Text.Json.JsonSerializer.Deserialize<ChatResSSE>(line.Substring(splitCount).Trim());
lastChat = data;
var delta = data?.choices.FirstOrDefault()?.delta;
var str = delta?.content;
var strReasoning = delta?.reasoning_content;
if (!string.IsNullOrEmpty(str))
messageBuilder.Append(str);
if (!string.IsNullOrEmpty(strReasoning))
messageBuilder1.Append(strReasoning);
if (chatReq.title != "优化字幕")
{
var steamCount = messageBuilder.Length + messageBuilder1.Length;
if (++threshold % 30 == 0)
_workflowManager.SetTaskProgress(chatReq.taskId, "steam=>" + steamCount);
}
}
catch (Exception e)
{
await _workflowManager.AddTaskLog(chatReq.taskId, "异常 ChatSSE=>" + line + "\r\n" + e.Message + "\r\n" + e.StackTrace);
}
}
}
throw new Exception(DateTime.Now + "==>AI请求超时 " + chatReq.taskId);
//await _workflowManager.AddTaskLog(chatReq.taskId, DateTime.Now + "==>AI请求超时 " + chatReq.taskId);
//return null;
}
/// <summary>
/// 请求AI
/// </summary>
/// <typeparam name="T">返回JSON类型</typeparam>
/// <param name="task">任务id</param>
/// <param name="postMessages">提示词</param>
/// <param name="title">任务类型</param>
/// <param name="model">GPT版本</param>
/// <param name="max_tokens">最大token <para>不设置默认最大值 16000/8000</para></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public async Task<T> ChatAsync<T>(ChatRequest chatRep) where T:class,new()
{
var tryCount = 10;
while (tryCount-- > 0)
{
try
{
var time = chatRep.title + DateTime.Now.ToString("MMddHHmmss");
var redisCached = new object[2] { chatRep, null };
redisManager.SetTaskGPTCached(chatRep.taskId, time, chatRep);
var chatResp = await Chat(chatRep);
var chatResContent = chatResp?.res;
if (string.IsNullOrEmpty(chatResContent))
throw new Exception($"GPT返回message无效结果 => res:{chatResp?.res} reasoning:{chatResp?.reasoning}");
if (chatResp != null)
{
redisCached[1] = new object[] { chatResp.Value.res, chatResp.Value.u, chatResp.Value.reasoning };
redisManager.SetTaskGPTCached(chatRep.taskId, time, redisCached);
}
chatResContent = chatResContent?.Replace("\n", "");
chatResContent = chatResContent?.Replace("```json", "");
chatResContent = chatResContent?.Replace("```", "");
chatResContent = chatResContent?.Replace("}{", "},{");
chatResContent = chatResContent?.Replace("}|{", "},{");
chatResContent = chatResContent?.Trim();
chatResContent = chatResContent?.ExtractJsonStrings()?.FirstOrDefault();
if (string.IsNullOrEmpty(chatResContent))
throw new Exception($"GPT返回结果无有效JSON =>{chatResp?.res}");
var startsStr = "{";
var endStr = "}";
var resT = new T();
if (resT is Array || resT is System.Collections.IList || resT is System.Collections.IList)
{
startsStr = "[";
endStr = "]";
}
if (!chatResContent.StartsWith(startsStr))
chatResContent = startsStr + chatResContent;
if (!chatResContent.EndsWith(endStr))
chatResContent = chatResContent + endStr;
var options = new JsonSerializerOptions
{
// 允许解析不严格符合 JSON 规范的字符串
AllowTrailingCommas = true,
// 处理不匹配的 JSON 字符
ReadCommentHandling = JsonCommentHandling.Skip
};
var questionRes = System.Text.Json.JsonSerializer.Deserialize<T>(chatResContent, options);
if (questionRes is null)
throw new Exception("GPT返回无效结果");
return questionRes;
}
catch (Exception ex)
{
await _workflowManager.AddTaskLog(chatRep.taskId, $"==>GPT结果解析错误 重试剩余{tryCount} {ex.Message} {ex.StackTrace}");
}
}
await _workflowManager.AddTaskLog(chatRep.taskId, $"==>GPT请求失败次数过多!!!");
throw new Exception("==>GPT请求失败次数过多!!!");
}
public async Task<HttpResponseMessage> PostJsonStreamAsync(
string path, ChatRequest data, string apiKey, bool readAll = false)
{
var json = data.ToJson();
var uriBuilder = new UriBuilder(path);
var maxRestart = 4;
var errorMSG = new Exception[maxRestart];
for (int i = 0; i < maxRestart; i++)
{
try
{
var client = _httpClientFactory.CreateClient();
client.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Bearer", apiKey);
client.Timeout = TimeSpan.FromSeconds(60 * 20);//超时时间20分钟
client.DefaultRequestVersion = HttpVersion.Version20;
client.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
client.DefaultRequestHeaders.ConnectionClose = true;
var request = new HttpRequestMessage(HttpMethod.Post, uriBuilder.Uri);
request.Content = new StringContent(json, Encoding.UTF8, "application/json");
if (readAll)
return await client.SendAsync(request);
return await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
}
catch (Exception e)
{
errorMSG[i] = e;
var msg = $"""
====================[{DateTime.Now.ToString("MM-dd HH:mm:ss")} 请求异常,重试]====================
{uriBuilder.Uri}
{e.Message}
{e.StackTrace}
==============================================
""";
await _workflowManager.AddTaskLog(data.taskId, $"==>GPT Http请求失败 {msg} 1秒后重试");
Thread.Sleep(1000);
}
}
throw errorMSG.Last(s => s != null);
}
/// <summary>
/// 请求AI
/// </summary>
/// <typeparam name="T">返回JSON类型</typeparam>
/// <param name="task">任务id</param>
/// <param name="postMessages">提示词</param>
/// <param name="title">任务类型</param>
/// <param name="model">GPT版本 <see cref="ChatGPTType"/></param>
/// <param name="max_tokens">最大token <para>不设置默认最大值 16000</para></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public virtual Task<T> ChatAsync<T>(string task, string postMessages, string title,
string model = null, int max_tokens = 16000) where T : class, new()
{
throw new Exception("需要实现");
}
}
}