Mico.Demo/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/RequestResponseConsumer.cs

116 lines
4.0 KiB
C#

using MassTransit;
using MassTransitDemo.Contracts;
namespace MassTransitDemo.Consumer.Consumers;
/// <summary>
/// 请求/响应模式的消费者
/// 用于接收请求消息并返回响应
/// </summary>
public class RequestResponseConsumer : IConsumer<RequestMessage>
{
private readonly ILogger<RequestResponseConsumer> _logger;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器</param>
public RequestResponseConsumer(ILogger<RequestResponseConsumer> logger)
{
_logger = logger;
}
/// <summary>
/// 消费请求消息并返回响应
/// </summary>
/// <param name="context">消息上下文</param>
public async Task Consume(ConsumeContext<RequestMessage> context)
{
try
{
_logger.LogInformation("【请求/响应消费者】收到请求");
_logger.LogInformation(" RequestId: {RequestId}", context.Message.RequestId);
_logger.LogInformation(" RequestedAt: {RequestedAt}", context.Message.RequestedAt);
_logger.LogInformation(" Operation: {Operation}", context.Message.Operation);
_logger.LogInformation(" Parameters: {Parameters}", context.Message.Parameters);
// 根据操作类型处理请求
var (success, data, errorMessage) = ProcessOperation(context.Message.Operation, context.Message.Parameters);
// 创建响应消息实例
var responseMessage = new ResponseMessage
{
ResponseId = Guid.NewGuid(),
CorrelatedRequestId = context.Message.RequestId,
RespondedAt = DateTime.UtcNow,
Success = success,
Data = data,
ErrorMessage = errorMessage
};
_logger.LogInformation("【请求/响应消费者】准备返回响应,成功: {Success}", success);
// 发送响应
await context.RespondAsync(responseMessage);
_logger.LogInformation("【请求/响应消费者】响应已发送");
}
catch (Exception ex)
{
_logger.LogError(ex, "【请求/响应消费者】请求处理失败");
// 发送错误响应
var errorResponse = new ResponseMessage
{
ResponseId = Guid.NewGuid(),
CorrelatedRequestId = context.Message.RequestId,
RespondedAt = DateTime.UtcNow,
Success = false,
Data = string.Empty,
ErrorMessage = ex.Message
};
await context.RespondAsync(errorResponse);
}
}
/// <summary>
/// 根据操作类型处理业务逻辑
/// </summary>
/// <param name="operation">操作类型</param>
/// <param name="parameters">参数</param>
/// <returns>处理结果</returns>
private (bool Success, string Data, string? ErrorMessage) ProcessOperation(string operation, string parameters)
{
try
{
switch (operation.ToLower())
{
case "echo":
return (true, $"Echo: {parameters}", null);
case "uppercase":
return (true, parameters.ToUpper(), null);
case "lowercase":
return (true, parameters.ToLower(), null);
case "reverse":
char[] charArray = parameters.ToCharArray();
Array.Reverse(charArray);
return (true, new string(charArray), null);
case "length":
return (true, $"Length: {parameters.Length}", null);
default:
return (false, string.Empty, $"不支持的操作类型: {operation}。支持的操作: echo, uppercase, lowercase, reverse, length");
}
}
catch (Exception ex)
{
return (false, string.Empty, $"操作执行失败: {ex.Message}");
}
}
}