247 lines
11 KiB
C#
247 lines
11 KiB
C#
using VideoAnalysisCore.Common;
|
|
using System.Net.Http.Headers;
|
|
using System.Text;
|
|
using Microsoft.Extensions.Logging;
|
|
using Newtonsoft.Json.Linq;
|
|
using System.Net.Http;
|
|
using Newtonsoft.Json;
|
|
using System.Net.Http.Json;
|
|
using System.Net;
|
|
using System.Threading;
|
|
using System;
|
|
using System.IO;
|
|
using VideoAnalysisCore.AICore.GPT.ChatGPT;
|
|
using System.Threading.Tasks;
|
|
using System.Text.Json;
|
|
|
|
namespace VideoAnalysisCore.AICore.GPT
|
|
{
|
|
public abstract class GPTClient
|
|
{
|
|
public virtual GptConfig Config { get; set; }
|
|
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
|
private readonly RedisManager redisManager;
|
|
|
|
|
|
public GPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager)
|
|
{
|
|
_httpClientFactory = httpClientFactory;
|
|
this.redisManager = redisManager;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Chat
|
|
/// </summary>
|
|
/// <param name="chatReq"></param>
|
|
/// <returns>Return HttpResponseMessage for SSE</returns>
|
|
public async Task<(Usage u, string res, string reasoning)?> Chat(ChatRequest chatReq)
|
|
{
|
|
//chatReq.model = "deepseek-r1";
|
|
if (chatReq.stream) return await ChatSSE(chatReq);
|
|
throw new NotImplementedException();
|
|
|
|
}
|
|
|
|
private (string m1, string m2) thinkMSG(Message? m)
|
|
{
|
|
var chatResContent = m?.content.Trim();
|
|
var chatResReasoning = string.Empty;
|
|
if (chatResContent.StartsWith("<think"))
|
|
{
|
|
chatResReasoning = chatResContent.Substring(7, chatResContent.IndexOf("</think") - 7);
|
|
chatResContent = chatResContent
|
|
.Substring(chatResContent.IndexOf("</think>") + 8);
|
|
}
|
|
else
|
|
chatResReasoning = m?.reasoning_content?.Trim();
|
|
return (chatResContent, chatResReasoning);
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// ChatSSE[流式传输 更稳定]
|
|
/// </summary>
|
|
/// <param name="chatReq"></param>
|
|
/// <returns>Return HttpResponseMessage for SSE</returns>
|
|
public async Task<(Usage u, string res, string reasoning)?> ChatSSE(ChatRequest chatReq)
|
|
{
|
|
var requestBody = chatReq.ToJson();
|
|
var i = 5;
|
|
PostJsonStream:
|
|
var chatResp = await PostJsonStreamAsync(Config.Host + Config.Path, requestBody, Config.ApiKey);
|
|
if (!chatResp.IsSuccessStatusCode)
|
|
{
|
|
await redisManager.AddTaskLog(chatReq.taskId, "=>请求GPT服务器异常 " + chatResp?.StatusCode + " " + await chatResp.Content.ReadAsStringAsync());
|
|
if (--i < 0)
|
|
{
|
|
throw new Exception("请求GPT服务器失败次数过多");
|
|
}
|
|
goto PostJsonStream;
|
|
}
|
|
using var stream = chatResp.Content.ReadAsStream();
|
|
using var reader = new StreamReader(stream, Encoding.UTF8);
|
|
string line;
|
|
var messageBuilder = new StringBuilder();
|
|
var messageBuilder1 = new StringBuilder();
|
|
var lastChat = new ChatResSSE();
|
|
var splitCount = "data:".Length;
|
|
var maxLoop = 60 * 100000;
|
|
int threshold = 0;
|
|
while (maxLoop > 0)
|
|
{
|
|
line = reader.ReadLine();
|
|
if (line is null || string.IsNullOrEmpty(line) || line.StartsWith(": keep-alive"))
|
|
{
|
|
Thread.Sleep(10);
|
|
maxLoop--;
|
|
continue;
|
|
}
|
|
else if (line.EndsWith("[DONE]"))
|
|
{
|
|
// 表示一条消息结束
|
|
string message = messageBuilder.ToString();
|
|
string message2 = messageBuilder1.ToString();
|
|
messageBuilder.Clear();
|
|
messageBuilder1.Clear();
|
|
var d = thinkMSG(new Message() { content = message, reasoning_content = message2 });
|
|
message = d.m1;
|
|
message2 = d.m2;
|
|
var u = lastChat?.usage;
|
|
if (u == null || string.IsNullOrEmpty(message))
|
|
return null;
|
|
return (u, message, message2);
|
|
}
|
|
else if (line.StartsWith("data:"))
|
|
{
|
|
try
|
|
{
|
|
var data = System.Text.Json.JsonSerializer.Deserialize<ChatResSSE>(line.Substring(splitCount).Trim());
|
|
lastChat = data;
|
|
var delta = data?.choices.FirstOrDefault()?.delta;
|
|
var str = delta?.content;
|
|
var strReasoning = delta?.reasoning_content;
|
|
if (!string.IsNullOrEmpty(str))
|
|
messageBuilder.Append(str);
|
|
if (!string.IsNullOrEmpty(strReasoning))
|
|
messageBuilder1.Append(strReasoning);
|
|
var steamCount = messageBuilder.Length + messageBuilder1.Length;
|
|
if (++threshold % 30 == 0)
|
|
redisManager.SetTaskProgress(chatReq.taskId, "steam=>" + steamCount);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
await redisManager.AddTaskLog(chatReq.taskId, "异常 ChatSSE=>" + line + "\r\n" + e.Message + "\r\n" + e.StackTrace);
|
|
}
|
|
}
|
|
}
|
|
await redisManager.AddTaskLog(chatReq.taskId, DateTime.Now + "=>AI请求超时 " + chatReq.taskId);
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 请求AI
|
|
/// </summary>
|
|
/// <typeparam name="T">返回JSON类型</typeparam>
|
|
/// <param name="task">任务id</param>
|
|
/// <param name="postMessages">提示词</param>
|
|
/// <param name="title">任务类型</param>
|
|
/// <param name="model">GPT版本</param>
|
|
/// <param name="max_tokens">最大token <para>不设置默认最大值 16000/8000</para></param>
|
|
/// <returns></returns>
|
|
/// <exception cref="Exception"></exception>
|
|
public async Task<T> ChatAsync<T>(ChatRequest chatRep)
|
|
{
|
|
var tryCount = 10;
|
|
while (tryCount-- > 0)
|
|
{
|
|
try
|
|
{
|
|
var time = chatRep.title + DateTime.Now.ToString("MMddHHmmss");
|
|
var redisCached = new object[2] { chatRep, null };
|
|
redisManager.SetTaskGPTCached(chatRep.taskId, time, chatRep);
|
|
var chatResp = await Chat(chatRep);
|
|
var chatResContent = chatResp?.res;
|
|
if (string.IsNullOrEmpty(chatResContent))
|
|
throw new Exception("GPT返回message无效结果");
|
|
if (chatResp != null)
|
|
{
|
|
redisCached[1] = new object[] { chatResp.Value.res, chatResp.Value.u, chatResp.Value.reasoning };
|
|
redisManager.SetTaskGPTCached(chatRep.taskId, time, redisCached);
|
|
}
|
|
chatResContent = chatResContent?.ExtractJsonStrings()?.FirstOrDefault();
|
|
chatResContent = chatResContent?.Replace("\n", "");
|
|
chatResContent = chatResContent?.Replace("```json", "");
|
|
chatResContent = chatResContent?.Replace("```", "");
|
|
chatResContent = chatResContent?.Replace("}{", "},{");
|
|
chatResContent = chatResContent?.Replace("}|{", "},{");
|
|
chatResContent = chatResContent?.Trim();
|
|
|
|
if (string.IsNullOrEmpty(chatResContent))
|
|
throw new Exception("GPT返回结果无有效JSON");
|
|
var startsStr = typeof(T).IsArray ? "[" : "{";
|
|
var endStr = typeof(T).IsArray ? "]" : "}";
|
|
if (!chatResContent.StartsWith(startsStr))
|
|
chatResContent = startsStr + chatResContent;
|
|
if (!chatResContent.EndsWith(endStr))
|
|
chatResContent = chatResContent + endStr;
|
|
var options = new JsonSerializerOptions
|
|
{
|
|
// 允许解析不严格符合 JSON 规范的字符串
|
|
AllowTrailingCommas = true,
|
|
// 处理不匹配的 JSON 字符
|
|
ReadCommentHandling = JsonCommentHandling.Skip
|
|
};
|
|
var questionRes = System.Text.Json.JsonSerializer.Deserialize<T>(chatResContent, options);
|
|
if (questionRes is null)
|
|
throw new Exception("GPT返回无效结果");
|
|
return questionRes;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
await redisManager.AddTaskLog(chatRep.taskId, $"=>GPT结果解析错误 重试剩余{tryCount} {ex.Message}");
|
|
}
|
|
}
|
|
await redisManager.AddTaskLog(chatRep.taskId, $"=>GPT请求失败次数过多!!!");
|
|
throw new Exception(DateTime.Now + "=>GPT请求失败次数过多!!!");
|
|
}
|
|
|
|
public async Task<HttpResponseMessage> PostJsonStreamAsync(
|
|
string path, string json, string apiKey, bool readAll = false)
|
|
{
|
|
var uriBuilder = new UriBuilder(path);
|
|
var maxRestart = 4;
|
|
var errorMSG = new Exception[maxRestart];
|
|
for (int i = 0; i < maxRestart; i++)
|
|
{
|
|
try
|
|
{
|
|
var client = _httpClientFactory.CreateClient();
|
|
client.DefaultRequestHeaders.Authorization =
|
|
new AuthenticationHeaderValue("Bearer", apiKey);
|
|
client.Timeout = TimeSpan.FromSeconds(60 * 20);//超时时间20分钟
|
|
client.DefaultRequestVersion = HttpVersion.Version20;
|
|
client.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
|
|
client.DefaultRequestHeaders.ConnectionClose = true;
|
|
|
|
var request = new HttpRequestMessage(HttpMethod.Post, uriBuilder.Uri);
|
|
request.Content = new StringContent(json, Encoding.UTF8, "application/json");
|
|
if (readAll)
|
|
return await client.SendAsync(request);
|
|
return await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
errorMSG[i] = e;
|
|
Console.WriteLine("====================[请求异常,重试]====================");
|
|
Console.WriteLine(uriBuilder.Uri);
|
|
Console.WriteLine(e.Message);
|
|
Console.WriteLine(e.StackTrace);
|
|
Console.WriteLine("==============================================");
|
|
Thread.Sleep(1000);
|
|
}
|
|
}
|
|
throw errorMSG.Last(s => s != null);
|
|
}
|
|
}
|
|
} |