修复 尝试解决多线程sql异常问题
This commit is contained in:
parent
1cb53fe405
commit
1151068185
|
|
@ -39,7 +39,7 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
bool tableLoading = false;
|
||||
private VideoTaskDto selectData;
|
||||
private bool rowRestartLoading = false;
|
||||
private VideoTaskDto reStartTask ;
|
||||
private VideoTaskDto reStartTask;
|
||||
|
||||
static TextValue[] SelectDataSource =
|
||||
Enum.GetValues(typeof(RedisChannelEnum))
|
||||
|
|
@ -47,7 +47,7 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
.Select(s => new TextValue(s.ToString(), (int)s))
|
||||
.ToArray();
|
||||
int selectEnum = 1;
|
||||
int selectDefaultValue =1;
|
||||
int selectDefaultValue = 1;
|
||||
|
||||
/// <summary>
|
||||
/// 点击重试
|
||||
|
|
@ -59,21 +59,21 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
(await redisManager.Redis.HMGetAsync<int>(RedisExpandKey.Task(query.Id), "LastEnum")).FirstOrDefault();
|
||||
selectEnum = selectDefaultValue;
|
||||
reStartTask = query;
|
||||
modalShow = true;
|
||||
modalShow = true;
|
||||
}
|
||||
void PreviewTask(VideoTaskDto task)
|
||||
void PreviewTask(VideoTaskDto task)
|
||||
{
|
||||
NavigationManager.NavigateTo("/VideoTaskShow/"+task.Id);
|
||||
NavigationManager.NavigateTo("/VideoTaskShow/" + task.Id);
|
||||
}
|
||||
/// <summary>
|
||||
/// 重试
|
||||
/// </summary>
|
||||
/// <param name="query"></param>
|
||||
async void ReStart()
|
||||
async Task ReStart()
|
||||
{
|
||||
await redisManager.ClearTaskError(reStartTask.Id);
|
||||
_=Task.Run(() =>
|
||||
redisManager.InsertChannel((RedisChannelEnum)selectEnum, reStartTask.Id)
|
||||
await Task.Run(async () =>
|
||||
await redisManager.InsertChannel((RedisChannelEnum)selectEnum, reStartTask.Id)
|
||||
);
|
||||
modalShow = false;
|
||||
}
|
||||
|
|
@ -111,8 +111,8 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
.Where(where)
|
||||
.Select<VideoTaskDto>()
|
||||
.OrderByDescending(s => s.Id)
|
||||
.ToPageListAsync(query.PageIndex , query.PageSize, _total);
|
||||
|
||||
.ToPageListAsync(query.PageIndex, query.PageSize, _total);
|
||||
|
||||
tableLoading = false;
|
||||
StateHasChanged();
|
||||
|
||||
|
|
@ -130,18 +130,18 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
var data = redisManager.Redis.HMGet<string>(RedisExpandKey.Task(item.Id),
|
||||
"Progress", "LastEnum", "StartTime", "ErrorMessage");
|
||||
item.Progress = data[0];
|
||||
item.LastEnum = data[1] == null ?default:data[1].ToEnum<RedisChannelEnum>() ?? default;
|
||||
item.StartTimeDic = data[2]==null?null: System.Text.Json.JsonSerializer.Deserialize<Dictionary<RedisChannelEnum, DateTime>>(data[2]) ?? null;
|
||||
item.LastEnum = data[1] == null ? default : data[1].ToEnum<RedisChannelEnum>() ?? default;
|
||||
item.StartTimeDic = data[2] == null ? null : System.Text.Json.JsonSerializer.Deserialize<Dictionary<RedisChannelEnum, DateTime>>(data[2]) ?? null;
|
||||
item.ErrorMessage = data[3];
|
||||
rowRestartLoading = false;
|
||||
var statusStr = "wait";
|
||||
var dic = rowData.Data.StartTimeDic;
|
||||
if (dic is null)
|
||||
statusStr= "wait";
|
||||
statusStr = "wait";
|
||||
else if (!string.IsNullOrEmpty(rowData.Data.ErrorMessage))
|
||||
statusStr= "error";
|
||||
statusStr = "error";
|
||||
else if (dic.ContainsKey(RedisChannelEnum.结束任务))
|
||||
statusStr= "finish";
|
||||
statusStr = "finish";
|
||||
item.TaskStatus = statusStr;
|
||||
StateHasChanged();
|
||||
}
|
||||
|
|
@ -158,7 +158,7 @@ namespace Learn.VideoAnalysis.Components.Pages
|
|||
}
|
||||
private void OnExpand(RowData<VideoTaskDto> rowData)
|
||||
{
|
||||
if(rowData.Expanded)
|
||||
if (rowData.Expanded)
|
||||
RowRload(rowData);
|
||||
}
|
||||
/// <summary>
|
||||
|
|
|
|||
|
|
@ -76,7 +76,8 @@ namespace VideoAnalysisCore.Common.Expand
|
|||
if (ex.Parametres == null) return;
|
||||
//var originColor = Console.ForegroundColor;
|
||||
//Console.ForegroundColor = ConsoleColor.DarkRed;
|
||||
Console.WriteLine($"【{DateTime.Now}——错误SQL - [{config.ConfigId}]】\r\n"+ ex.Message + "\r\n" + UtilMethods.GetSqlString(config.DbType, ex.Sql, (SugarParameter[])ex.Parametres) + "\r\n");
|
||||
Console.WriteLine($"【{DateTime.Now}——错误SQL - [{config.ConfigId}]】\r\n"+ ex.Message + "\r\n"
|
||||
+ UtilMethods.GetSqlString(config.DbType, ex.Sql, (SugarParameter[])ex.Parametres) + "\r\n");
|
||||
Console.WriteLine();
|
||||
//Console.ForegroundColor = originColor;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// 初始化 redis
|
||||
/// <para>需要在初始化配置文件时候调用</para>
|
||||
/// </summary>
|
||||
public static void AddTaskSubscribe(this IServiceCollection service)
|
||||
public static void AddTaskSubscribe(this IServiceCollection service)
|
||||
{
|
||||
|
||||
Console.WriteLine($"{DateTime.Now}=>初始化 Redis任务队列");
|
||||
|
|
@ -156,10 +156,10 @@ namespace VideoAnalysisCore.Common
|
|||
/// <summary>
|
||||
/// redis拓展
|
||||
/// </summary>
|
||||
public class RedisManager
|
||||
public class RedisManager
|
||||
{
|
||||
public static bool StopTask { get; set; } = false;
|
||||
public static Dictionary<RedisChannelEnum, Func<string,Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
|
||||
public static Dictionary<RedisChannelEnum, Func<string, Task>> SubscribeList = new Dictionary<RedisChannelEnum, Func<string, Task>>();
|
||||
/// <summary>
|
||||
/// 队列池
|
||||
/// </summary>
|
||||
|
|
@ -169,7 +169,7 @@ namespace VideoAnalysisCore.Common
|
|||
|
||||
public RedisManager(RedisClient redis, Repository<VideoTask> videoTaskDB)
|
||||
{
|
||||
Redis = redis;
|
||||
Redis = redis;
|
||||
this.videoTaskDB = videoTaskDB;
|
||||
}
|
||||
|
||||
|
|
@ -178,7 +178,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// 缓存GPT任务缓存
|
||||
/// </summary>
|
||||
/// <param name="taskId"></param>
|
||||
public void SetTaskGPTCached(object taskId,string time, object? data)
|
||||
public void SetTaskGPTCached(object taskId, string time, object? data)
|
||||
{
|
||||
Redis.Set(RedisExpandKey.TaskGPT(taskId) + ":" + time, data, 3600 * 24);
|
||||
}
|
||||
|
|
@ -186,7 +186,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// 加入到消费队列
|
||||
/// </summary>
|
||||
/// <param name="taskIds"></param>
|
||||
public void JoinQueue(params long[] taskIds)
|
||||
public void JoinQueue(params long[] taskIds)
|
||||
{ //事务
|
||||
if (taskIds is null || taskIds.Length == 0)
|
||||
return;
|
||||
|
|
@ -201,7 +201,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// 获取任务进度
|
||||
/// </summary>
|
||||
/// <param name="taskId"></param>
|
||||
public float GetTaskProgress(object taskId)
|
||||
public float GetTaskProgress(object taskId)
|
||||
{
|
||||
return Redis.HMGet<float>(RedisExpandKey.Task(taskId), "Progress")[0];
|
||||
}
|
||||
|
|
@ -210,7 +210,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// </summary>
|
||||
/// <param name="p">进度百分比</param>
|
||||
/// <param name="taskId"></param>
|
||||
public void SetTaskProgress(object taskId, object p)
|
||||
public void SetTaskProgress(object taskId, object p)
|
||||
{
|
||||
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString());
|
||||
|
||||
|
|
@ -220,7 +220,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// </summary>
|
||||
/// <param name="enum">枚举</param>
|
||||
/// <param name="taskId">任务id</param>
|
||||
public async Task InsertChannel(RedisChannelEnum @enum, object taskId)
|
||||
public async Task InsertChannel(RedisChannelEnum @enum, object taskId)
|
||||
{
|
||||
if (taskId is null) throw new Exception("taskId为空");
|
||||
if (Redis is null) throw new Exception("redis未初始化");
|
||||
|
|
@ -242,8 +242,8 @@ namespace VideoAnalysisCore.Common
|
|||
|
||||
Redis.HMSet(RedisExpandKey.Task(taskId), "StartTime", startTime);
|
||||
|
||||
await TouchChannel(@enum, tId, SubscribeList[@enum]);
|
||||
//await SubscribeList[@enum](tId);
|
||||
await TouchChannel(@enum, tId, SubscribeList[@enum]);
|
||||
//await SubscribeList[@enum](tId);
|
||||
var e = @enum.NextEnum();
|
||||
if (e is null)
|
||||
break;
|
||||
|
|
@ -256,7 +256,7 @@ namespace VideoAnalysisCore.Common
|
|||
}
|
||||
}
|
||||
|
||||
public async Task TaskEnd(string task)
|
||||
public async Task TaskEnd(string task)
|
||||
{
|
||||
var tId = long.Parse(task);
|
||||
//var gptRes = (await Redis
|
||||
|
|
@ -264,7 +264,7 @@ namespace VideoAnalysisCore.Common
|
|||
//if (gptRes is null)
|
||||
// throw new Exception("未能读取到GPT处理结果");
|
||||
//删除任务执行状态
|
||||
await Redis.LRemAsync(RedisExpandKey.IDTask,1,task);
|
||||
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, task);
|
||||
var taskData = await videoTaskDB
|
||||
.GetFirstAsync(s => s.Id == tId);
|
||||
if (taskData.Captions == "[]")
|
||||
|
|
@ -275,7 +275,7 @@ namespace VideoAnalysisCore.Common
|
|||
|
||||
//未使用结果暂时屏蔽
|
||||
//taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes);
|
||||
taskData.ChatAnalysisScore =0;
|
||||
taskData.ChatAnalysisScore = 0;
|
||||
taskData.ErrorMessage = string.Empty;
|
||||
taskData.LastEnum = RedisChannelEnum.结束任务;
|
||||
taskData.EndTime = DateTime.Now;
|
||||
|
|
@ -292,17 +292,15 @@ namespace VideoAnalysisCore.Common
|
|||
it.EndTime
|
||||
}).ExecuteCommandAsync();
|
||||
|
||||
//NewTask();
|
||||
//NewTask();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 初始化 队列 任务
|
||||
/// </summary>
|
||||
public async void InitChannel()
|
||||
public async void InitChannel()
|
||||
{
|
||||
if (Redis is null) throw new Exception("redis未初始化");
|
||||
|
||||
|
||||
//处理之前程序结束前未能执行完的情况
|
||||
var oldTaskCount = Redis.LLen(RedisExpandKey.IDTask);
|
||||
if (oldTaskCount > 0)
|
||||
|
|
@ -330,7 +328,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// <summary>
|
||||
/// 停止接收新任务
|
||||
/// </summary>
|
||||
public void StopTaskAsync()
|
||||
public void StopTaskAsync()
|
||||
{
|
||||
StopTask = true;
|
||||
try
|
||||
|
|
@ -348,7 +346,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// <summary>
|
||||
/// 开始接收新任务
|
||||
/// </summary>
|
||||
public void RestartTask()
|
||||
public void RestartTask()
|
||||
{
|
||||
StopTask = false;
|
||||
NewTask();
|
||||
|
|
@ -357,7 +355,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// 重新执行新任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public void NewTask()
|
||||
public void NewTask()
|
||||
{
|
||||
// 取消 消费机的任务订阅
|
||||
if (StopTask)
|
||||
|
|
@ -371,14 +369,14 @@ namespace VideoAnalysisCore.Common
|
|||
/// <summary>
|
||||
/// 重新接收新任务
|
||||
/// </summary>
|
||||
public void ReceivingTaskAsync()
|
||||
public void ReceivingTaskAsync()
|
||||
{
|
||||
if (AppCommon.Config.TaskSetting.IS_Server)
|
||||
if (AppCommon.Config.TaskSetting.IS_Server)
|
||||
{
|
||||
Console.WriteLine($"{DateTime.Now} =>服务端不接收任务");
|
||||
return;
|
||||
}
|
||||
Task.Run(async () =>
|
||||
Task.Run(() =>
|
||||
{
|
||||
lock (Redis)
|
||||
{
|
||||
|
|
@ -394,9 +392,8 @@ namespace VideoAnalysisCore.Common
|
|||
await InsertChannel(RedisChannelEnum.下载文件, taskId);
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -405,7 +402,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// <param name="taskID"></param>
|
||||
/// <param name="ex"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
|
||||
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
|
||||
{
|
||||
var error = string.Empty;
|
||||
if (ex != null)
|
||||
|
|
@ -428,17 +425,18 @@ namespace VideoAnalysisCore.Common
|
|||
/// </summary>
|
||||
/// <param name="taskID"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> ClearTaskError(long taskID) =>await SetTaskError(taskID, string.Empty);
|
||||
public async Task<bool> ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty);
|
||||
/// <summary>
|
||||
/// 修改任务的错误信息
|
||||
/// </summary>
|
||||
/// <param name="taskID"></param>
|
||||
/// <param name="error"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> SetTaskError(long taskID, string? error)
|
||||
public async Task<bool> SetTaskError(long taskID, string? error)
|
||||
{
|
||||
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
|
||||
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
|
||||
return await videoTaskDB.AsUpdateable()
|
||||
return await vDB.CopyNew().AsUpdateable()
|
||||
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
|
||||
.Where(it => it.Id == taskID)
|
||||
.ExecuteCommandAsync() == 1;
|
||||
|
|
@ -450,7 +448,7 @@ namespace VideoAnalysisCore.Common
|
|||
/// <param name="key"></param>
|
||||
/// <param name="taskId"></param>
|
||||
/// <param name="action"></param>
|
||||
public async Task TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
|
||||
public async Task TouchChannel(RedisChannelEnum key, string taskId, Func<string, Task> action = null)
|
||||
{
|
||||
if (taskId is null) return;
|
||||
var tID = long.Parse(taskId);
|
||||
|
|
@ -464,14 +462,11 @@ namespace VideoAnalysisCore.Common
|
|||
{
|
||||
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key);
|
||||
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
|
||||
lock (Redis)
|
||||
{
|
||||
videoTaskDB.AsUpdateable()
|
||||
.SetColumns(it => it.LastEnum == key)
|
||||
.Where(it => it.Id == tID)
|
||||
.ExecuteCommand();
|
||||
|
||||
}
|
||||
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
|
||||
await vDB.CopyNew().AsUpdateable()
|
||||
.SetColumns(it => it.LastEnum == key)
|
||||
.Where(it => it.Id == tID)
|
||||
.ExecuteCommandAsync();
|
||||
await action(taskId);
|
||||
return;
|
||||
}
|
||||
|
|
@ -483,7 +478,7 @@ namespace VideoAnalysisCore.Common
|
|||
Console.WriteLine("==============================================");
|
||||
Thread.Sleep(1000);
|
||||
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-> 稍后后重试." + key + " " + taskId);
|
||||
if (i+1== tryCount)
|
||||
if (i + 1 == tryCount)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,27 +13,32 @@ namespace VideoAnalysisCore.Common
|
|||
{
|
||||
public class Repository<T> : SimpleClient<T> where T : class, new()
|
||||
{
|
||||
readonly Dictionary<Type, object?> CID= new Dictionary<Type, object?>();
|
||||
readonly Dictionary<Type, object?> CID = new Dictionary<Type, object?>();
|
||||
public Repository()
|
||||
{
|
||||
SwitchConnection();
|
||||
}
|
||||
public void SwitchConnection()
|
||||
public void SwitchConnection()
|
||||
{
|
||||
var t = typeof(T);
|
||||
if (CID.ContainsKey(t))
|
||||
{
|
||||
base.Context = CID[t]!=null
|
||||
base.Context = CID[t] != null
|
||||
? DbScoped.Sugar.GetConnectionScope(CID[t])
|
||||
: DbScoped.Sugar;
|
||||
return;
|
||||
}
|
||||
var c = t.GetCustomAttribute<TenantAttribute>();
|
||||
if (!CID.ContainsKey(t))
|
||||
CID.Add(t, c?.configId);
|
||||
base.Context = c != null
|
||||
? DbScoped.Sugar.GetConnectionScope(c.configId)
|
||||
: DbScoped.Sugar;
|
||||
else
|
||||
{
|
||||
var c = t.GetCustomAttribute<TenantAttribute>();
|
||||
if (!CID.ContainsKey(t))
|
||||
CID.Add(t, c?.configId);
|
||||
base.Context = c != null
|
||||
? DbScoped.Sugar.GetConnectionScope(c.configId)
|
||||
: DbScoped.Sugar;
|
||||
//这个变量也要保证是线程安全的,尽量CopyNew在方法中使用
|
||||
base.Context = base.Context.CopyNew();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@
|
|||
<PackageReference Include="org.k2fsa.sherpa.onnx" Version="1.10.32" />
|
||||
<PackageReference Include="SixLabors.ImageSharp" Version="3.1.7" />
|
||||
<PackageReference Include="SqlSugar.IOC" Version="2.0.0" />
|
||||
<PackageReference Include="SqlSugarCore" Version="5.1.4.170" />
|
||||
<PackageReference Include="SqlSugarCore" Version="5.1.4.205" />
|
||||
<PackageReference Include="UserCenter.Model" Version="1.3.5" />
|
||||
<PackageReference Include="Whisper.net" Version="1.5.0" />
|
||||
<PackageReference Include="Whisper.net.Runtime" Version="1.5.0" />
|
||||
|
|
|
|||
Loading…
Reference in New Issue