using LearningOfficer.OA.Common.Attributes; using LearningOfficer.OA.Common.Configs; using LearningOfficer.OA.Common.Dtos.RabbitMQ; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Newtonsoft.Json; using RabbitMQ.Client; using System.Text; using System.Threading.Tasks; using Yitter.IdGenerator; namespace LearningOfficer.OA.Infrastructure.RabbitMQ { /// /// RabbitMQ 生产者服务 /// public class RabbitMQProducerService { private readonly RabbitMQConfig _config; private readonly RabbitMQConnectionService _connectionService; public RabbitMQProducerService( IOptionsMonitor config, RabbitMQConnectionService connectionService) { _config = config.CurrentValue; _connectionService = connectionService; } /// /// 发送消息到队列 /// /// 消息类型 /// 消息内容 /// 队列配置 public async Task SendMessage(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType) { try { using var channel = await _connectionService.CreateChannel(); var actualQueueName = busConfig.QueueName; // 声明队列 await channel.QueueDeclareAsync(queue: actualQueueName, durable: busConfig.Durable, exclusive: false, autoDelete: busConfig.AutoDelete, arguments: null); var data = new RabbitMQMessageDto() { CorrelationId = Guid.NewGuid().ToString(), CreatedAt = DateTime.UtcNow, Data = message, ExchangeName = "", Id = YitIdHelper.NextId(), MessageType = MessageType, RoutingKey = actualQueueName }; // 序列化消息 var json = JsonConvert.SerializeObject(data); var body = Encoding.UTF8.GetBytes(json); // 发布消息 await channel.BasicPublishAsync(exchange: "", routingKey: actualQueueName, body: body); Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}"); } catch (Exception ex) { Console.WriteLine($"消息发送失败 - 队列: {busConfig.QueueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}"); throw; } } /// /// 发送消息到队列 /// /// 消息类型 /// 消息内容 /// 队列名称 public async Task SendMessage(T message, string queueName, RabbitMQMessageTypeEnum MessageType) { try { using var channel = await _connectionService.CreateChannel(); var actualQueueName = queueName; // 声明队列 await channel.QueueDeclareAsync(queue: actualQueueName, durable: _config.imMq.Durable, exclusive: false, autoDelete: _config.imMq.AutoDelete, arguments: null); var data = new RabbitMQMessageDto() { CorrelationId = Guid.NewGuid().ToString(), CreatedAt = DateTime.UtcNow, Data = message, ExchangeName = "", Id = YitIdHelper.NextId(), MessageType = MessageType, RoutingKey = actualQueueName }; // 序列化消息 var json = JsonConvert.SerializeObject(data); var body = Encoding.UTF8.GetBytes(json); // 发布消息 await channel.BasicPublishAsync(exchange: "", routingKey: actualQueueName, body: body); Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}"); } catch (Exception ex) { Console.WriteLine($"消息发送失败 - 队列: {queueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}"); throw; } } /// /// 发送消息到交换机 /// /// 消息类型 /// 消息内容 /// 交换机名称 /// 路由键 public async Task SendMessageToExchange(T message, string exchangeName, string routingKey, RabbitMQMessageTypeEnum MessageType) { try { using var channel = await _connectionService.CreateChannel(); // 声明交换机(主题交换机) await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Topic, durable: _config.imMq.Durable, autoDelete: _config.imMq.AutoDelete); var data = new RabbitMQMessageDto() { CorrelationId = Guid.NewGuid().ToString(), CreatedAt = DateTime.UtcNow, Data = message, ExchangeName = exchangeName, Id = YitIdHelper.NextId(), MessageType = MessageType, RoutingKey = routingKey }; // 序列化消息 var json = JsonConvert.SerializeObject(data); var body = Encoding.UTF8.GetBytes(json); // 发布消息 await channel.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); Console.WriteLine($"消息发送成功 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}"); } catch (Exception ex) { Console.WriteLine($"消息发送失败 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}"); throw; } } /// /// 发送消息到交换机 /// /// 消息类型 /// 消息内容 /// 交换机配置 public async Task SendMessageToExchange(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType) { try { using var channel = await _connectionService.CreateChannel(); var actualExchangeName = busConfig.ExchangeName; var actualRoutingKey = busConfig.RoutingKey; // 声明交换机(主题交换机) await channel.ExchangeDeclareAsync(exchange: actualExchangeName, type: ExchangeType.Topic, durable: busConfig.Durable, autoDelete: busConfig.AutoDelete); var data = new RabbitMQMessageDto() { CorrelationId = Guid.NewGuid().ToString(), CreatedAt = DateTime.UtcNow, Data = message, ExchangeName = busConfig.ExchangeName, Id = YitIdHelper.NextId(), MessageType = MessageType, RoutingKey = busConfig.RoutingKey }; // 序列化消息 var json = JsonConvert.SerializeObject(data); var body = Encoding.UTF8.GetBytes(json); // 发布消息 await channel.BasicPublishAsync(exchange: actualExchangeName, routingKey: actualRoutingKey, body: body); Console.WriteLine($"消息发送成功 - 交换机: {actualExchangeName}, 路由键: {actualRoutingKey}, 消息类型: {typeof(T).Name}"); } catch (Exception ex) { Console.WriteLine($"消息发送失败 - 交换机: {busConfig.ExchangeName}, 路由键: {busConfig.RoutingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}"); throw; } } } }