219 lines
8.8 KiB
C#
219 lines
8.8 KiB
C#
using LearningOfficer.OA.Common.Attributes;
|
|
using LearningOfficer.OA.Common.Configs;
|
|
using LearningOfficer.OA.Common.Dtos.RabbitMQ;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Options;
|
|
using Newtonsoft.Json;
|
|
using RabbitMQ.Client;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using Yitter.IdGenerator;
|
|
|
|
namespace LearningOfficer.OA.Infrastructure.RabbitMQ
|
|
{
|
|
/// <summary>
|
|
/// RabbitMQ 生产者服务
|
|
/// </summary>
|
|
public class RabbitMQProducerService
|
|
{
|
|
private readonly RabbitMQConfig _config;
|
|
private readonly RabbitMQConnectionService _connectionService;
|
|
|
|
public RabbitMQProducerService(
|
|
IOptionsMonitor<RabbitMQConfig> config,
|
|
RabbitMQConnectionService connectionService)
|
|
{
|
|
_config = config.CurrentValue;
|
|
_connectionService = connectionService;
|
|
}
|
|
/// <summary>
|
|
/// 发送消息到队列
|
|
/// </summary>
|
|
/// <typeparam name="T">消息类型</typeparam>
|
|
/// <param name="message">消息内容</param>
|
|
/// <param name="busConfig">队列配置</param>
|
|
public async Task SendMessage<T>(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType)
|
|
{
|
|
try
|
|
{
|
|
using var channel = await _connectionService.CreateChannel();
|
|
|
|
var actualQueueName = busConfig.QueueName;
|
|
|
|
// 声明队列
|
|
await channel.QueueDeclareAsync(queue: actualQueueName,
|
|
durable: busConfig.Durable,
|
|
exclusive: false,
|
|
autoDelete: busConfig.AutoDelete,
|
|
arguments: null);
|
|
var data = new RabbitMQMessageDto()
|
|
{
|
|
CorrelationId = Guid.NewGuid().ToString(),
|
|
CreatedAt = DateTime.UtcNow,
|
|
Data = message,
|
|
ExchangeName = "",
|
|
Id = YitIdHelper.NextId(),
|
|
MessageType = MessageType,
|
|
RoutingKey = actualQueueName
|
|
};
|
|
// 序列化消息
|
|
var json = JsonConvert.SerializeObject(data);
|
|
var body = Encoding.UTF8.GetBytes(json);
|
|
|
|
// 发布消息
|
|
await channel.BasicPublishAsync(exchange: "",
|
|
routingKey: actualQueueName,
|
|
body: body);
|
|
|
|
Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"消息发送失败 - 队列: {busConfig.QueueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
|
|
throw;
|
|
}
|
|
}
|
|
/// <summary>
|
|
/// 发送消息到队列
|
|
/// </summary>
|
|
/// <typeparam name="T">消息类型</typeparam>
|
|
/// <param name="message">消息内容</param>
|
|
/// <param name="queueName">队列名称</param>
|
|
public async Task SendMessage<T>(T message, string queueName, RabbitMQMessageTypeEnum MessageType)
|
|
{
|
|
try
|
|
{
|
|
using var channel = await _connectionService.CreateChannel();
|
|
|
|
var actualQueueName = queueName;
|
|
|
|
// 声明队列
|
|
await channel.QueueDeclareAsync(queue: actualQueueName,
|
|
durable: _config.imMq.Durable,
|
|
exclusive: false,
|
|
autoDelete: _config.imMq.AutoDelete,
|
|
arguments: null);
|
|
|
|
var data = new RabbitMQMessageDto()
|
|
{
|
|
CorrelationId = Guid.NewGuid().ToString(),
|
|
CreatedAt = DateTime.UtcNow,
|
|
Data = message,
|
|
ExchangeName = "",
|
|
Id = YitIdHelper.NextId(),
|
|
MessageType = MessageType,
|
|
RoutingKey = actualQueueName
|
|
};
|
|
// 序列化消息
|
|
var json = JsonConvert.SerializeObject(data);
|
|
var body = Encoding.UTF8.GetBytes(json);
|
|
|
|
// 发布消息
|
|
await channel.BasicPublishAsync(exchange: "",
|
|
routingKey: actualQueueName,
|
|
body: body);
|
|
|
|
Console.WriteLine($"消息发送成功 - 队列: {actualQueueName}, 消息类型: {typeof(T).Name}");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"消息发送失败 - 队列: {queueName}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发送消息到交换机
|
|
/// </summary>
|
|
/// <typeparam name="T">消息类型</typeparam>
|
|
/// <param name="message">消息内容</param>
|
|
/// <param name="exchangeName">交换机名称</param>
|
|
/// <param name="routingKey">路由键</param>
|
|
public async Task SendMessageToExchange<T>(T message, string exchangeName, string routingKey, RabbitMQMessageTypeEnum MessageType)
|
|
{
|
|
try
|
|
{
|
|
using var channel = await _connectionService.CreateChannel();
|
|
// 声明交换机(主题交换机)
|
|
await channel.ExchangeDeclareAsync(exchange: exchangeName,
|
|
type: ExchangeType.Topic,
|
|
durable: _config.imMq.Durable,
|
|
autoDelete: _config.imMq.AutoDelete);
|
|
|
|
var data = new RabbitMQMessageDto()
|
|
{
|
|
CorrelationId = Guid.NewGuid().ToString(),
|
|
CreatedAt = DateTime.UtcNow,
|
|
Data = message,
|
|
ExchangeName = exchangeName,
|
|
Id = YitIdHelper.NextId(),
|
|
MessageType = MessageType,
|
|
RoutingKey = routingKey
|
|
};
|
|
// 序列化消息
|
|
var json = JsonConvert.SerializeObject(data);
|
|
var body = Encoding.UTF8.GetBytes(json);
|
|
|
|
// 发布消息
|
|
await channel.BasicPublishAsync(exchange: exchangeName,
|
|
routingKey: routingKey,
|
|
body: body);
|
|
|
|
Console.WriteLine($"消息发送成功 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"消息发送失败 - 交换机: {exchangeName}, 路由键: {routingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发送消息到交换机
|
|
/// </summary>
|
|
/// <typeparam name="T">消息类型</typeparam>
|
|
/// <param name="message">消息内容</param>
|
|
/// <param name="busConfig">交换机配置</param>
|
|
public async Task SendMessageToExchange<T>(T message, BaseMqBusinessConfig busConfig, RabbitMQMessageTypeEnum MessageType)
|
|
{
|
|
try
|
|
{
|
|
using var channel = await _connectionService.CreateChannel();
|
|
|
|
var actualExchangeName = busConfig.ExchangeName;
|
|
var actualRoutingKey = busConfig.RoutingKey;
|
|
|
|
// 声明交换机(主题交换机)
|
|
await channel.ExchangeDeclareAsync(exchange: actualExchangeName,
|
|
type: ExchangeType.Topic,
|
|
durable: busConfig.Durable,
|
|
autoDelete: busConfig.AutoDelete);
|
|
var data = new RabbitMQMessageDto()
|
|
{
|
|
CorrelationId = Guid.NewGuid().ToString(),
|
|
CreatedAt = DateTime.UtcNow,
|
|
Data = message,
|
|
ExchangeName = busConfig.ExchangeName,
|
|
Id = YitIdHelper.NextId(),
|
|
MessageType = MessageType,
|
|
RoutingKey = busConfig.RoutingKey
|
|
};
|
|
// 序列化消息
|
|
var json = JsonConvert.SerializeObject(data);
|
|
var body = Encoding.UTF8.GetBytes(json);
|
|
|
|
// 发布消息
|
|
await channel.BasicPublishAsync(exchange: actualExchangeName,
|
|
routingKey: actualRoutingKey,
|
|
body: body);
|
|
|
|
Console.WriteLine($"消息发送成功 - 交换机: {actualExchangeName}, 路由键: {actualRoutingKey}, 消息类型: {typeof(T).Name}");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"消息发送失败 - 交换机: {busConfig.ExchangeName}, 路由键: {busConfig.RoutingKey}, 消息类型: {typeof(T).Name}, 错误: {ex.Message}");
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
} |