using LearningOfficer.OA.Common.Attributes;
using LearningOfficer.OA.Common.Configs;
using LearningOfficer.OA.Common.Dtos.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
using Yitter.IdGenerator;
namespace LearningOfficer.OA.Infrastructure.RabbitMQ
{
///
/// RabbitMQ 生产者服务
///
public class RabbitMQProducerService
{
private readonly RabbitMQConfig _config;
private readonly RabbitMQConnectionService _connectionService;
public RabbitMQProducerService(
IOptionsMonitor config,
RabbitMQConnectionService connectionService)
{
_config = config.CurrentValue;
_connectionService = connectionService;
}
///
/// 发送消息到队列
///
/// 消息类型
/// 消息内容
/// 队列配置
public async Task SendMessage(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType)
{
try
{
using var channel = await _connectionService.CreateChannel();
var actualQueueName = busConfig.QueueName;
// 声明队列
await channel.QueueDeclareAsync(queue: actualQueueName,
durable: busConfig.Durable,
exclusive: false,
autoDelete: busConfig.AutoDelete,
arguments: null);
var data = new RabbitMQMessageDto()
{
CorrelationId = Guid.NewGuid().ToString(),
CreatedAt = DateTime.UtcNow,
Data = message,
ExchangeName = "",
Id = YitIdHelper.NextId(),
MessageType = MessageType,
RoutingKey = actualQueueName
};
// 序列化消息
var json = JsonConvert.SerializeObject(data);
var body = Encoding.UTF8.GetBytes(json);
// 发布消息
await channel.BasicPublishAsync(exchange: "",
routingKey: actualQueueName,
body: body);
Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}");
}
catch (Exception ex)
{
Console.WriteLine($"消息发送失败 - 队列: {busConfig.QueueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
throw;
}
}
///
/// 发送消息到队列
///
/// 消息类型
/// 消息内容
/// 队列名称
public async Task SendMessage(T message, string queueName, RabbitMQMessageTypeEnum MessageType)
{
try
{
using var channel = await _connectionService.CreateChannel();
var actualQueueName = queueName;
// 声明队列
await channel.QueueDeclareAsync(queue: actualQueueName,
durable: _config.imMq.Durable,
exclusive: false,
autoDelete: _config.imMq.AutoDelete,
arguments: null);
var data = new RabbitMQMessageDto()
{
CorrelationId = Guid.NewGuid().ToString(),
CreatedAt = DateTime.UtcNow,
Data = message,
ExchangeName = "",
Id = YitIdHelper.NextId(),
MessageType = MessageType,
RoutingKey = actualQueueName
};
// 序列化消息
var json = JsonConvert.SerializeObject(data);
var body = Encoding.UTF8.GetBytes(json);
// 发布消息
await channel.BasicPublishAsync(exchange: "",
routingKey: actualQueueName,
body: body);
Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}");
}
catch (Exception ex)
{
Console.WriteLine($"消息发送失败 - 队列: {queueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
throw;
}
}
///
/// 发送消息到交换机
///
/// 消息类型
/// 消息内容
/// 交换机名称
/// 路由键
public async Task SendMessageToExchange(T message, string exchangeName, string routingKey, RabbitMQMessageTypeEnum MessageType)
{
try
{
using var channel = await _connectionService.CreateChannel();
// 声明交换机(主题交换机)
await channel.ExchangeDeclareAsync(exchange: exchangeName,
type: ExchangeType.Topic,
durable: _config.imMq.Durable,
autoDelete: _config.imMq.AutoDelete);
var data = new RabbitMQMessageDto()
{
CorrelationId = Guid.NewGuid().ToString(),
CreatedAt = DateTime.UtcNow,
Data = message,
ExchangeName = exchangeName,
Id = YitIdHelper.NextId(),
MessageType = MessageType,
RoutingKey = routingKey
};
// 序列化消息
var json = JsonConvert.SerializeObject(data);
var body = Encoding.UTF8.GetBytes(json);
// 发布消息
await channel.BasicPublishAsync(exchange: exchangeName,
routingKey: routingKey,
body: body);
Console.WriteLine($"消息发送成功 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}");
}
catch (Exception ex)
{
Console.WriteLine($"消息发送失败 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
throw;
}
}
///
/// 发送消息到交换机
///
/// 消息类型
/// 消息内容
/// 交换机配置
public async Task SendMessageToExchange(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType)
{
try
{
using var channel = await _connectionService.CreateChannel();
var actualExchangeName = busConfig.ExchangeName;
var actualRoutingKey = busConfig.RoutingKey;
// 声明交换机(主题交换机)
await channel.ExchangeDeclareAsync(exchange: actualExchangeName,
type: ExchangeType.Topic,
durable: busConfig.Durable,
autoDelete: busConfig.AutoDelete);
var data = new RabbitMQMessageDto()
{
CorrelationId = Guid.NewGuid().ToString(),
CreatedAt = DateTime.UtcNow,
Data = message,
ExchangeName = busConfig.ExchangeName,
Id = YitIdHelper.NextId(),
MessageType = MessageType,
RoutingKey = busConfig.RoutingKey
};
// 序列化消息
var json = JsonConvert.SerializeObject(data);
var body = Encoding.UTF8.GetBytes(json);
// 发布消息
await channel.BasicPublishAsync(exchange: actualExchangeName,
routingKey: actualRoutingKey,
body: body);
Console.WriteLine($"消息发送成功 - 交换机: {actualExchangeName}, 路由键: {actualRoutingKey}, 消息类型: {typeof(T).Name}");
}
catch (Exception ex)
{
Console.WriteLine($"消息发送失败 - 交换机: {busConfig.ExchangeName}, 路由键: {busConfig.RoutingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
throw;
}
}
}
}