Quanxue.Zhanghao.Daochu/LearningOfficer.OA.Infrastr.../RabbitMQ/RabbitMQConnectionService.cs

104 lines
3.4 KiB
C#

using LearningOfficer.OA.Common.Attributes;
using LearningOfficer.OA.Common.Configs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Serilog;
namespace LearningOfficer.OA.Infrastructure.RabbitMQ
{
/// <summary>
/// RabbitMQ 连接服务
/// </summary>
public class RabbitMQConnectionService : IAsyncDisposable
{
private readonly RabbitMQConfig _config;
private IConnection? _connection;
private bool _disposed = false;
private readonly SemaphoreSlim _lock = new(1, 1);
public RabbitMQConnectionService(IOptionsMonitor<RabbitMQConfig> config)
{
_config = config.CurrentValue;
}
/// <summary>
/// 获取 RabbitMQ 连接
/// </summary>
/// <returns></returns>
public async Task<IConnection> GetConnection()
{
if (_connection?.IsOpen == true)
{
return _connection;
}
await _lock.WaitAsync();
try
{
if (_connection?.IsOpen == true)
{
return _connection;
}
try
{
var factory = new ConnectionFactory
{
HostName = _config.HostName,
Port = _config.Port,
UserName = _config.UserName,
Password = _config.Password,
VirtualHost = _config.VirtualHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
_connection = await factory.CreateConnectionAsync();
_connection.ConnectionShutdownAsync += _connection_ConnectionShutdownAsync;
Log.Information("RabbitMQ 连接创建成功: {HostName}:{Port}", _config.HostName, _config.Port);
return _connection;
}
catch (Exception ex)
{
Log.Error(ex, "RabbitMQ 连接创建失败: {HostName}:{Port}", _config.HostName, _config.Port);
throw;
}
}
finally
{
_lock.Release();
}
}
private async Task _connection_ConnectionShutdownAsync(object sender, global::RabbitMQ.Client.Events.ShutdownEventArgs @event)
{
Log.Warning("RabbitMQ 连接断开: {ReplyText}", @event.ReplyText);
await Task.CompletedTask;
}
/// <summary>
/// 创建通道
/// </summary>
public async Task<IChannel> CreateChannel()
{
var connection = await GetConnection();
return await connection.CreateChannelAsync();
}
public async ValueTask DisposeAsync()
{
if (!_disposed)
{
if (_connection != null)
{
_connection.ConnectionShutdownAsync -= _connection_ConnectionShutdownAsync;
await _connection.DisposeAsync();
Log.Information("RabbitMQ 连接已释放");
}
_disposed = true;
}
GC.SuppressFinalize(this);
}
}
}