using LearningOfficer.OA.Common.Attributes; using LearningOfficer.OA.Common.Configs; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; using Serilog; namespace LearningOfficer.OA.Infrastructure.RabbitMQ { /// /// RabbitMQ 连接服务 /// public class RabbitMQConnectionService : IAsyncDisposable { private readonly RabbitMQConfig _config; private IConnection? _connection; private bool _disposed = false; private readonly SemaphoreSlim _lock = new(1, 1); public RabbitMQConnectionService(IOptionsMonitor config) { _config = config.CurrentValue; } /// /// 获取 RabbitMQ 连接 /// /// public async Task GetConnection() { if (_connection?.IsOpen == true) { return _connection; } await _lock.WaitAsync(); try { if (_connection?.IsOpen == true) { return _connection; } try { var factory = new ConnectionFactory { HostName = _config.HostName, Port = _config.Port, UserName = _config.UserName, Password = _config.Password, VirtualHost = _config.VirtualHost, AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; _connection = await factory.CreateConnectionAsync(); _connection.ConnectionShutdownAsync += _connection_ConnectionShutdownAsync; Log.Information("RabbitMQ 连接创建成功: {HostName}:{Port}", _config.HostName, _config.Port); return _connection; } catch (Exception ex) { Log.Error(ex, "RabbitMQ 连接创建失败: {HostName}:{Port}", _config.HostName, _config.Port); throw; } } finally { _lock.Release(); } } private async Task _connection_ConnectionShutdownAsync(object sender, global::RabbitMQ.Client.Events.ShutdownEventArgs @event) { Log.Warning("RabbitMQ 连接断开: {ReplyText}", @event.ReplyText); await Task.CompletedTask; } /// /// 创建通道 /// public async Task CreateChannel() { var connection = await GetConnection(); return await connection.CreateChannelAsync(); } public async ValueTask DisposeAsync() { if (!_disposed) { if (_connection != null) { _connection.ConnectionShutdownAsync -= _connection_ConnectionShutdownAsync; await _connection.DisposeAsync(); Log.Information("RabbitMQ 连接已释放"); } _disposed = true; } GC.SuppressFinalize(this); } } }