104 lines
3.4 KiB
C#
104 lines
3.4 KiB
C#
using LearningOfficer.OA.Common.Attributes;
|
|
using LearningOfficer.OA.Common.Configs;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Options;
|
|
using RabbitMQ.Client;
|
|
using Serilog;
|
|
|
|
namespace LearningOfficer.OA.Infrastructure.RabbitMQ
|
|
{
|
|
/// <summary>
|
|
/// RabbitMQ 连接服务
|
|
/// </summary>
|
|
public class RabbitMQConnectionService : IAsyncDisposable
|
|
{
|
|
private readonly RabbitMQConfig _config;
|
|
private IConnection? _connection;
|
|
private bool _disposed = false;
|
|
private readonly SemaphoreSlim _lock = new(1, 1);
|
|
|
|
public RabbitMQConnectionService(IOptionsMonitor<RabbitMQConfig> config)
|
|
{
|
|
_config = config.CurrentValue;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 获取 RabbitMQ 连接
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<IConnection> GetConnection()
|
|
{
|
|
if (_connection?.IsOpen == true)
|
|
{
|
|
return _connection;
|
|
}
|
|
await _lock.WaitAsync();
|
|
try
|
|
{
|
|
if (_connection?.IsOpen == true)
|
|
{
|
|
return _connection;
|
|
}
|
|
|
|
try
|
|
{
|
|
var factory = new ConnectionFactory
|
|
{
|
|
HostName = _config.HostName,
|
|
Port = _config.Port,
|
|
UserName = _config.UserName,
|
|
Password = _config.Password,
|
|
VirtualHost = _config.VirtualHost,
|
|
AutomaticRecoveryEnabled = true,
|
|
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
|
|
};
|
|
|
|
_connection = await factory.CreateConnectionAsync();
|
|
_connection.ConnectionShutdownAsync += _connection_ConnectionShutdownAsync;
|
|
|
|
Log.Information("RabbitMQ 连接创建成功: {HostName}:{Port}", _config.HostName, _config.Port);
|
|
|
|
return _connection;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "RabbitMQ 连接创建失败: {HostName}:{Port}", _config.HostName, _config.Port);
|
|
throw;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_lock.Release();
|
|
}
|
|
}
|
|
|
|
private async Task _connection_ConnectionShutdownAsync(object sender, global::RabbitMQ.Client.Events.ShutdownEventArgs @event)
|
|
{
|
|
Log.Warning("RabbitMQ 连接断开: {ReplyText}", @event.ReplyText);
|
|
await Task.CompletedTask;
|
|
}
|
|
/// <summary>
|
|
/// 创建通道
|
|
/// </summary>
|
|
public async Task<IChannel> CreateChannel()
|
|
{
|
|
var connection = await GetConnection();
|
|
return await connection.CreateChannelAsync();
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
if (_connection != null)
|
|
{
|
|
_connection.ConnectionShutdownAsync -= _connection_ConnectionShutdownAsync;
|
|
await _connection.DisposeAsync();
|
|
Log.Information("RabbitMQ 连接已释放");
|
|
}
|
|
_disposed = true;
|
|
}
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
}
|
|
} |