using MassTransit;
using Microsoft.AspNetCore.Mvc;
using MassTransitDemo.Contracts;
namespace MassTransitDemo.Publisher.Controllers;
///
/// 消息发布控制器
/// 提供发布/订阅模式和请求/响应模式的API接口
///
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly IRequestClient _requestClient;
private readonly ILogger _logger;
///
/// 构造函数
///
/// 发布端点,用于发布/订阅模式
/// 请求客户端,用于请求/响应模式
/// 日志记录器
public MessageController(
IPublishEndpoint publishEndpoint,
IRequestClient requestClient,
ILogger logger)
{
_publishEndpoint = publishEndpoint;
_requestClient = requestClient;
_logger = logger;
}
///
/// 发布/订阅模式:发布一条广播消息
///
/// 消息发布请求
/// 操作结果
[HttpPost("publish")]
public async Task PublishMessage([FromBody] PublishMessageRequest request)
{
try
{
_logger.LogInformation("开始发布消息,标题: {Title}", request.Title);
// 创建发布/订阅消息实例
var message = new PublishSubscribeMessage
{
MessageId = Guid.NewGuid(),
CreatedAt = DateTime.UtcNow,
Title = request.Title,
Content = request.Content,
Priority = request.Priority
};
// 使用MassTransit发布消息(广播给所有订阅者)
await _publishEndpoint.Publish(message);
_logger.LogInformation("消息发布成功,MessageId: {MessageId}", message.MessageId);
return Ok(new PublishMessageResponse
{
Success = true,
MessageId = message.MessageId,
Message = "消息发布成功"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "消息发布失败");
return StatusCode(500, new ErrorResponse
{
Success = false,
Error = ex.Message
});
}
}
///
/// 请求/响应模式:发送请求并等待响应
///
/// 请求内容
/// 响应结果
[HttpPost("request")]
public async Task SendRequest([FromBody] SendRequestRequest request)
{
try
{
_logger.LogInformation("开始发送请求,操作: {Operation}", request.Operation);
// 创建请求消息实例
var requestMessage = new RequestMessage
{
RequestId = Guid.NewGuid(),
RequestedAt = DateTime.UtcNow,
Operation = request.Operation,
Parameters = request.Parameters
};
// 使用MassTransit发送请求并等待响应
var response = await _requestClient.GetResponse(requestMessage);
_logger.LogInformation("请求处理完成,RequestId: {RequestId}, 成功: {Success}",
requestMessage.RequestId, response.Message.Success);
return Ok(new RequestResponseResult
{
Success = true,
RequestId = requestMessage.RequestId,
Response = response.Message
});
}
catch (RequestTimeoutException)
{
_logger.LogError("请求超时");
return StatusCode(408, new ErrorResponse
{
Success = false,
Error = "请求超时"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "请求处理失败");
return StatusCode(500, new ErrorResponse
{
Success = false,
Error = ex.Message
});
}
}
}
///
/// 发布消息请求模型
///
public class PublishMessageRequest
{
///
/// 消息标题
///
public string Title { get; set; } = string.Empty;
///
/// 消息内容
///
public string Content { get; set; } = string.Empty;
///
/// 消息优先级(默认:1)
///
public int Priority { get; set; } = 1;
}
///
/// 发布消息响应模型
///
public class PublishMessageResponse
{
///
/// 操作是否成功
///
public bool Success { get; set; }
///
/// 消息唯一标识符
///
public Guid MessageId { get; set; }
///
/// 响应消息
///
public string Message { get; set; } = string.Empty;
}
///
/// 发送请求模型
///
public class SendRequestRequest
{
///
/// 操作类型
///
public string Operation { get; set; } = string.Empty;
///
/// 请求参数(JSON格式字符串)
///
public string Parameters { get; set; } = string.Empty;
}
///
/// 请求响应结果模型
///
public class RequestResponseResult
{
///
/// 操作是否成功
///
public bool Success { get; set; }
///
/// 请求唯一标识符
///
public Guid RequestId { get; set; }
///
/// 响应消息
///
public ResponseMessage? Response { get; set; }
}
///
/// 错误响应模型
///
public class ErrorResponse
{
///
/// 操作是否成功
///
public bool Success { get; set; }
///
/// 错误信息
///
public string Error { get; set; } = string.Empty;
}