WGShare.API/WGShare.API/BackgroudServices/AgoraCallbackComsuerService.cs

91 lines
2.8 KiB
C#

using Hangfire;
using Hangfire.Server;
using Mapster;
using Masuit.Tools;
using SqlSugar;
using WGShare.API.Helpers;
using WGShare.Domain.Constant;
using WGShare.Domain.DTOs.AgoraCallback;
using WGShare.Domain.Entities;
namespace WGShare.API.BackgroudServices
{
public class AgoraCallbackComsuerService : BackgroundService
{
private readonly ILogger<AgoraCallbackComsuerService> _logger;
private readonly ISqlSugarClient _sugarClient;
private readonly SemaphoreSlim _semaphore;
public AgoraCallbackComsuerService(ILogger<AgoraCallbackComsuerService> logger, ISqlSugarClient sugarClient)
{
_logger = logger;
this._sugarClient = sugarClient;
_semaphore = new SemaphoreSlim(1);// 允许最多1个线程同时访问
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("参会记录监听服务已启动");
stoppingToken.Register(() => _logger.LogInformation("RedisSubscriberService is stopping."));
// 离开频道消息订阅
using (RedisHelper.Instance.SubscribeList(RedisKeyConstant.PubSub.MeetingRecord, async (message) =>
{
_logger.LogDebug($"接受消息: {message}");
if (message == null) return;
var body = message.ToString().FromJson<EventBody>();
if (body == null)
{
_logger.LogError("消息体为空");
return;
}
if (body.payload.uid.ToString().Length == 9)
{
_logger.LogInformation("共享屏幕不记录参会记录");
return;
}
await MeetingRecord(body);
}))
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(10000, stoppingToken); // 等待一段时间,然后继续循环
}
_logger.LogInformation("参会记录监听服务已停止");
}
public async Task MeetingRecord(EventBody body)
{
await _semaphore.WaitAsync();
try
{
var entity = body.payload.Adapt<MeetingRecord>();
entity.EventType = body.eventType;
await _sugarClient.Storageable(entity)
.WhereColumns(x => new { x.uid, x.clientSeq })
.ToStorage()
.AsInsertable
.ExecuteCommandAsync();
}
catch (Exception e)
{
throw;
}
finally
{
_semaphore.Release();
}
}
}
}