diff --git a/MassTransit.Message/CheckInventoryRequest.cs b/MassTransit.Message/CheckInventoryRequest.cs
new file mode 100644
index 0000000..64fe64c
--- /dev/null
+++ b/MassTransit.Message/CheckInventoryRequest.cs
@@ -0,0 +1,3 @@
+namespace MassTransit.Message;
+
+public record CheckInventoryRequest(string ProductId, int Quantity);
diff --git a/MassTransit.Message/CheckInventoryResponse.cs b/MassTransit.Message/CheckInventoryResponse.cs
new file mode 100644
index 0000000..f8f8d49
--- /dev/null
+++ b/MassTransit.Message/CheckInventoryResponse.cs
@@ -0,0 +1,3 @@
+namespace MassTransit.Message;
+
+public record CheckInventoryResponse(bool IsAvailable, int Quantity);
diff --git a/MassTransit.Message/MassTransit.Message.csproj b/MassTransit.Message/MassTransit.Message.csproj
new file mode 100644
index 0000000..090c1f0
--- /dev/null
+++ b/MassTransit.Message/MassTransit.Message.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/MassTransit.Message/OrderInventoryNotificationRequest.cs b/MassTransit.Message/OrderInventoryNotificationRequest.cs
new file mode 100644
index 0000000..e6cfc63
--- /dev/null
+++ b/MassTransit.Message/OrderInventoryNotificationRequest.cs
@@ -0,0 +1,134 @@
+using System;
+
+namespace MassTransit.Message;
+
+///
+/// ============================================================
+/// Point-to-Point 通信模式示例 - 消息契约
+/// ============================================================
+///
+/// Point-to-Point(点对点)通信模式特点:
+///
+/// 1. 单向消息传递
+/// - 发送方只发送消息,不等待响应
+/// - 消息发送后立即返回,提高性能
+/// - 适用于不需要立即知道处理结果的场景
+///
+/// 2. 消息只被一个消费者处理
+/// - RabbitMQ 队列确保消息只被一个消费者消费
+/// - 多个消费者会形成竞争关系,每个消息只被处理一次
+/// - 适合负载均衡和水平扩展
+///
+/// 3. 高性能
+/// - 无需等待响应,消息发送后立即返回
+/// - 减少网络往返时间(RTT)
+/// - 提高系统吞吐量
+///
+/// 4. 可靠性
+/// - 消息持久化保证消息不丢失
+/// - 死信队列(DLQ)处理失败的消息
+/// - 重试机制处理临时性错误
+///
+/// 使用场景:
+/// - 订单创建后异步通知库存服务扣减库存
+/// - 用户注册后发送欢迎邮件(不等待邮件发送完成)
+/// - 数据同步和事件通知
+/// - 日志和监控数据收集
+///
+/// 与 Request-Response 模式的区别:
+/// - Request-Response: 使用 IRequestClient.GetResponse(),等待响应
+/// - Point-to-Point: 使用 IBus.Send(),不等待响应
+///
+/// ============================================================
+public record OrderInventoryNotificationRequest
+{
+ ///
+ /// 消息唯一标识符
+ /// 用于消息去重和追踪
+ ///
+ public string MessageId { get; init; } = Guid.NewGuid().ToString("N");
+
+ ///
+ /// 关联ID,用于关联相关消息
+ /// 例如:订单ID、用户ID等业务标识
+ ///
+ public string CorrelationId { get; init; } = string.Empty;
+
+ ///
+ /// 通知类型
+ /// - OrderCreated: 订单创建
+ /// - OrderCancelled: 订单取消
+ ///
+ public string NotificationType { get; init; } = string.Empty;
+
+ ///
+ /// 产品信息列表
+ ///
+ public List Products { get; init; } = new();
+
+ ///
+ /// 消息创建时间
+ ///
+ public DateTime CreatedAt { get; init; } = DateTime.UtcNow;
+
+ ///
+ /// 无参构造函数(用于反序列化)
+ ///
+ public OrderInventoryNotificationRequest() { }
+
+ ///
+ /// 带参数构造函数
+ ///
+ public OrderInventoryNotificationRequest(
+ string correlationId,
+ string notificationType,
+ List products)
+ {
+ CorrelationId = correlationId;
+ NotificationType = notificationType;
+ Products = products;
+ }
+}
+
+///
+/// 产品信息
+///
+public record ProductInfo
+{
+ ///
+ /// 产品ID
+ ///
+ public string ProductId { get; init; } = string.Empty;
+
+ ///
+ /// 产品名称
+ ///
+ public string ProductName { get; init; } = string.Empty;
+
+ ///
+ /// 操作数量
+ /// 正数表示增加库存,负数表示减少库存
+ ///
+ public int Quantity { get; init; }
+
+ ///
+ /// 操作原因
+ ///
+ public string Reason { get; init; } = string.Empty;
+
+ ///
+ /// 无参构造函数(用于反序列化)
+ ///
+ public ProductInfo() { }
+
+ ///
+ /// 带参数构造函数
+ ///
+ public ProductInfo(string productId, string productName, int quantity, string reason)
+ {
+ ProductId = productId;
+ ProductName = productName;
+ Quantity = quantity;
+ Reason = reason;
+ }
+}
diff --git a/MicoService.Demo.slnx b/MicoService.Demo.slnx
index 33f9471..d9b4106 100644
--- a/MicoService.Demo.slnx
+++ b/MicoService.Demo.slnx
@@ -1,4 +1,5 @@
+
diff --git a/MicoService.Demo/Controllers/TestController.cs b/MicoService.Demo/Controllers/TestController.cs
index 590e0f2..4c67088 100644
--- a/MicoService.Demo/Controllers/TestController.cs
+++ b/MicoService.Demo/Controllers/TestController.cs
@@ -1,11 +1,10 @@
-using Microsoft.AspNetCore.Mvc;
-using Microsoft.Extensions.Configuration;
-using Microsoft.Extensions.Logging;
-using Nacos.V2;
-using System.Threading.Tasks;
+using MassTransit;
+using MassTransit.Message;
using Microservice.Common;
using Microservice.Common.Models;
using Microservice.Common.Services;
+using Microsoft.AspNetCore.Mvc;
+using Nacos.V2;
namespace MicoService.Demo.Controllers
{
@@ -18,18 +17,21 @@ namespace MicoService.Demo.Controllers
private readonly INacosNamingService _nacosNamingService;
private readonly IServiceClient _serviceClient;
private readonly IUserService _userService;
+ private readonly IRequestClient _checkInvRequestClient;
private readonly ILogger _logger;
public TestController(IConfiguration configuration,
INacosNamingService nacosNamingService,
IServiceClient serviceClient,
IUserService userService,
+ IRequestClient checkInvRequestClient,
ILogger logger)
{
this._configuration = configuration;
this._nacosNamingService = nacosNamingService;
this._serviceClient = serviceClient;
this._userService = userService;
+ this._checkInvRequestClient = checkInvRequestClient;
this._logger = logger;
}
@@ -90,5 +92,44 @@ namespace MicoService.Demo.Controllers
}
+ ///
+ /// 演示 :通过MassTransit 使用 【请求-响应模式】 调用其他微服务的接口,检查库存是否充足
+ ///
+ ///
+ ///
+ ///
+ [HttpGet("check-inventory")]
+ public async Task CheckInventoryAsync(string productId, int quantity)
+ {
+ try
+ {
+ var response = await _checkInvRequestClient.GetResponse(new CheckInventoryRequest(productId, quantity));
+ return response.Message.IsAvailable;
+ }
+ catch (RequestTimeoutException)
+ {
+ return false;
+ }
+ catch (Exception)
+ {
+ return false;
+ }
+ }
+
+
+ ///
+ /// 演示 :通过MassTransit 使用 【点对点模式】 调用其他微服务的接口,扣减库存
+ ///
+ ///
+ ///
+ ///
+ [HttpGet("dec-inventory")]
+ public async Task DecInventoryAsync()
+ {
+
+
+
+
+ }
}
}
diff --git a/MicoService.Demo/DTOs/OrderDto.cs b/MicoService.Demo/DTOs/OrderDto.cs
new file mode 100644
index 0000000..ca59ffe
--- /dev/null
+++ b/MicoService.Demo/DTOs/OrderDto.cs
@@ -0,0 +1,16 @@
+namespace MicoService.Demo.DTOs
+{
+
+
+ public class OrderItemResponseDto
+ {
+ public int Id { get; set; }
+ public string ProductId { get; set; }
+ public string ProductName { get; set; }
+ public int Quantity { get; set; }
+ public decimal UnitPrice { get; set; }
+ public decimal Subtotal { get; set; }
+ }
+
+
+}
diff --git a/MicoService.Demo/MicoService.Demo.csproj b/MicoService.Demo/MicoService.Demo.csproj
index 58aee2c..26004f1 100644
--- a/MicoService.Demo/MicoService.Demo.csproj
+++ b/MicoService.Demo/MicoService.Demo.csproj
@@ -7,14 +7,28 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/MicoService.Demo/Program.cs b/MicoService.Demo/Program.cs
index 159b287..987be48 100644
--- a/MicoService.Demo/Program.cs
+++ b/MicoService.Demo/Program.cs
@@ -1,8 +1,10 @@
-using Nacos.AspNetCore.V2;
-using Nacos.V2.DependencyInjection;
+using MassTransit;
+using MassTransit.Message;
using Microservice.Common;
using Microservice.Common.Services;
+using Nacos.AspNetCore.V2;
+using Nacos.V2.DependencyInjection;
namespace MicoService.Demo
{
@@ -20,10 +22,46 @@ namespace MicoService.Demo
// 配置 Nacos 注册发现
builder.Services.AddNacosAspNet(builder.Configuration);
- builder.Services.AddNacosV2Naming(builder.Configuration);
+ builder.Services.AddNacosV2Naming(builder.Configuration);
#endregion
+
+
+ // MassTransit 配置
+ // ============================================================
+ // 配置 MassTransit 总线,使用 RabbitMQ 作为消息中间件
+ builder.Services.AddMassTransit(x =>
+ {
+
+ // 注册请求客户端,用于发送请求消息并等待响应
+ x.AddRequestClient();
+
+ // 配置 RabbitMQ 连接
+ x.UsingRabbitMq((context, cfg) =>
+ {
+ // 设置 RabbitMQ 主机连接信息
+ cfg.Host("rabbitmq://192.168.2.7:5672", h =>
+ {
+ h.Username("rabbit");
+ h.Password("qwe123!@#");
+ });
+
+ // 配置消息重试机制:最多重试3次,每次间隔1秒
+ cfg.UseMessageRetry(r => r.Interval(3, 1000));
+
+ // 自动配置消费者端点,将消费者注册为 RabbitMQ 队列
+ cfg.ConfigureEndpoints(context);
+ });
+
+ });
+
+
+
+
+
+
+
// Add services to the container.
builder.Services.AddControllers();
@@ -35,6 +73,8 @@ namespace MicoService.Demo
// 注册用户服务
builder.Services.AddScoped();
+ // 注册业务逻辑服务
+ //builder.Services.AddScoped();
var app = builder.Build();
@@ -42,6 +82,10 @@ namespace MicoService.Demo
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
+ app.UseSwaggerUI(options =>
+ {
+ options.SwaggerEndpoint("/openapi/v1.json", "API v1");
+ });
}
app.UseAuthorization();
diff --git a/MicoService.Demo/Properties/launchSettings.json b/MicoService.Demo/Properties/launchSettings.json
index 70c78b8..dfa0af3 100644
--- a/MicoService.Demo/Properties/launchSettings.json
+++ b/MicoService.Demo/Properties/launchSettings.json
@@ -5,7 +5,7 @@
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
- "applicationUrl": "http://localhost:5252",
+ "applicationUrl": "http://localhost:5001",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
diff --git a/MicoService.Demo/Services/IOrderInventoryNotificationService.cs b/MicoService.Demo/Services/IOrderInventoryNotificationService.cs
new file mode 100644
index 0000000..16eaf18
--- /dev/null
+++ b/MicoService.Demo/Services/IOrderInventoryNotificationService.cs
@@ -0,0 +1,51 @@
+using MicoService.Demo.DTOs;
+
+namespace MicoService.Demo.Services
+{
+ ///
+ /// ============================================================
+ /// Point-to-Point 通信模式示例 - 发送服务
+ /// ============================================================
+ ///
+ /// 服务特点:
+ ///
+ /// 1. 使用 IBus.Send() 发送单向消息
+ /// - 不等待响应,消息发送后立即返回
+ /// - 提高性能,减少等待时间
+ ///
+ /// 2. 与 Request-Response 的区别
+ /// - Request-Response: 使用 IRequestClient.GetResponse(),等待响应
+ /// - Point-to-Point: 使用 IBus.Send(),不等待响应
+ ///
+ /// 3. 使用场景
+ /// - 订单创建后异步通知库存服务
+ /// - 不需要立即知道处理结果的场景
+ /// - 对性能要求较高的场景
+ ///
+ /// 4. 消息可靠性
+ /// - 消息持久化保证消息不丢失
+ /// - 死信队列处理失败的消息
+ /// - 重试机制处理临时性错误
+ ///
+ /// ============================================================
+ public interface IOrderInventoryNotificationService
+ {
+ ///
+ /// 发送订单创建通知(不等待响应)
+ ///
+ /// 订单ID
+ /// 用户ID
+ /// 订单项列表
+ /// 异步任务
+ Task SendOrderCreatedNotificationAsync(string orderId, string userId, List items);
+
+ ///
+ /// 发送订单取消通知(不等待响应)
+ ///
+ /// 订单ID
+ /// 用户ID
+ /// 订单项列表
+ /// 异步任务
+ Task SendOrderCancelledNotificationAsync(string orderId, string userId, List items);
+ }
+}
diff --git a/MicoService.Demo/Services/OrderInventoryNotificationService.cs b/MicoService.Demo/Services/OrderInventoryNotificationService.cs
new file mode 100644
index 0000000..701798e
--- /dev/null
+++ b/MicoService.Demo/Services/OrderInventoryNotificationService.cs
@@ -0,0 +1,114 @@
+using MassTransit;
+using MassTransit.Message;
+using MicoService.Demo.DTOs;
+
+namespace MicoService.Demo.Services
+{
+ ///
+ /// Point-to-Point 通信模式示例 - 发送服务实现
+ ///
+ public class OrderInventoryNotificationService : IOrderInventoryNotificationService
+ {
+ private readonly ILogger _logger;
+ private readonly IBus _bus;
+
+ ///
+ /// 构造函数注入依赖
+ ///
+ public OrderInventoryNotificationService(ILogger logger, IBus bus)
+ {
+ _logger = logger;
+ _bus = bus;
+ }
+
+ ///
+ /// 发送订单创建通知(不等待响应)
+ ///
+ /// 订单ID
+ /// 用户ID
+ /// 订单项列表
+ /// 异步任务
+ public async Task SendOrderCreatedNotificationAsync(string orderId, string userId, List items)
+ {
+ _logger.LogInformation(
+ "[Point-to-Point Example] 发送订单 {OrderId} 的创建通知 (用户ID: {UserId})",
+ orderId, userId);
+
+ try
+ {
+ // 构建通知消息
+ var notification = new OrderInventoryNotificationRequest(
+ correlationId: orderId,
+ notificationType: "OrderCreated",
+ products: items.Select(item => new ProductInfo(
+ productId: item.ProductId,
+ productName: item.ProductName,
+ quantity: item.Quantity,
+ reason: "订单创建扣减库存"
+ )).ToList()
+ );
+
+ // 发送消息(不等待响应)
+ // IBus.Send() 是单向的,发送后立即返回
+ await _bus.Send(notification);
+
+ _logger.LogInformation(
+ "[Point-to-Point Example] 订单 {OrderId} 的创建通知已发送",
+ orderId);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex,
+ "[Point-to-Point Example] 发送订单 {OrderId} 的创建通知失败",
+ orderId);
+
+ // 可以选择将失败的消息发送到死信队列或重试队列
+ throw;
+ }
+ }
+
+ ///
+ /// 发送订单取消通知(不等待响应)
+ ///
+ /// 订单ID
+ /// 用户ID
+ /// 订单项列表
+ /// 异步任务
+ public async Task SendOrderCancelledNotificationAsync(string orderId, string userId, List items)
+ {
+ _logger.LogInformation(
+ "[Point-to-Point Example] 发送订单 {OrderId} 的取消通知 (用户ID: {UserId})",
+ orderId, userId);
+
+ try
+ {
+ // 构建通知消息
+ var notification = new OrderInventoryNotificationRequest(
+ correlationId: orderId,
+ notificationType: "OrderCancelled",
+ products: items.Select(item => new ProductInfo(
+ productId: item.ProductId,
+ productName: item.ProductName,
+ quantity: item.Quantity,
+ reason: "订单取消恢复库存"
+ )).ToList()
+ );
+
+ // 发送消息(不等待响应)
+ await _bus.Send(notification);
+
+ _logger.LogInformation(
+ "[Point-to-Point Example] 订单 {OrderId} 的取消通知已发送",
+ orderId);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex,
+ "[Point-to-Point Example] 发送订单 {OrderId} 的取消通知失败",
+ orderId);
+ throw;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/MicoService.Demo2/Consumers/CheckInventoryConsumer.cs b/MicoService.Demo2/Consumers/CheckInventoryConsumer.cs
new file mode 100644
index 0000000..b0f6d91
--- /dev/null
+++ b/MicoService.Demo2/Consumers/CheckInventoryConsumer.cs
@@ -0,0 +1,35 @@
+using MassTransit;
+using MassTransit.Message;
+
+namespace MicoService.Demo2.Consumers
+{
+ public class CheckInventoryConsumer : IConsumer
+ {
+ //private readonly IInventoryService _inventoryService;
+
+ public CheckInventoryConsumer(/*IInventoryService inventoryService*/)
+ {
+ //_inventoryService = inventoryService;
+ }
+
+ public async Task Consume(ConsumeContext context)
+ {
+ // 此处查询数据库,获取库存信息
+ //var inventory = await _inventoryService.GetByProductIdAsync(context.Message.ProductId);
+
+ // 模拟库存数据
+ var inventory = new
+ {
+ ProductId = context.Message.ProductId,
+ Quantity = 100 // 假设库存数量为100
+ };
+
+ var response = new CheckInventoryResponse(
+ inventory != null && inventory.Quantity >= context.Message.Quantity,
+ inventory?.Quantity ?? 0
+ );
+
+ await context.RespondAsync(response);
+ }
+ }
+}
diff --git a/MicoService.Demo2/MicoService.Demo2.csproj b/MicoService.Demo2/MicoService.Demo2.csproj
index 58aee2c..47f42a3 100644
--- a/MicoService.Demo2/MicoService.Demo2.csproj
+++ b/MicoService.Demo2/MicoService.Demo2.csproj
@@ -1,4 +1,4 @@
-
+
net10.0
@@ -7,13 +7,17 @@
+
+
+
+
diff --git a/MicoService.Demo2/Program.cs b/MicoService.Demo2/Program.cs
index d4e835c..41fa2a6 100644
--- a/MicoService.Demo2/Program.cs
+++ b/MicoService.Demo2/Program.cs
@@ -1,7 +1,9 @@
+using MassTransit;
+using MicoService.Demo2.Consumers;
+using Microservice.Common;
using Nacos.AspNetCore.V2;
using Nacos.V2.DependencyInjection;
-using Microservice.Common;
namespace MicoService.Demo2
{
@@ -21,6 +23,33 @@ namespace MicoService.Demo2
builder.Services.AddNacosV2Naming(builder.Configuration);
#endregion
+
+ // MassTransit 配置
+ // ============================================================
+ // 配置 MassTransit 总线,使用 RabbitMQ 作为消息中间件
+ builder.Services.AddMassTransit(x =>
+ {
+ // 注册消费者
+ x.AddConsumer();
+
+ // 配置 RabbitMQ 连接
+ x.UsingRabbitMq((context, cfg) =>
+ {
+ // 设置 RabbitMQ 主机连接信息
+ cfg.Host("rabbitmq://192.168.2.7:5672", h =>
+ {
+ h.Username("rabbit");
+ h.Password("qwe123!@#");
+ });
+
+ // 配置消息重试机制:最多重试3次,每次间隔1秒
+ cfg.UseMessageRetry(r => r.Interval(3, 1000));
+
+ // 自动配置消费者端点,将消费者注册为 RabbitMQ 队列
+ cfg.ConfigureEndpoints(context);
+ });
+ });
+
// Add services to the container.
builder.Services.AddControllers();
@@ -36,6 +65,10 @@ namespace MicoService.Demo2
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
+ app.UseSwaggerUI(options =>
+ {
+ options.SwaggerEndpoint("/openapi/v1.json", "API v1");
+ });
}
app.UseAuthorization();
diff --git a/MicoService.Demo2/Properties/launchSettings.json b/MicoService.Demo2/Properties/launchSettings.json
index f4639c3..056a519 100644
--- a/MicoService.Demo2/Properties/launchSettings.json
+++ b/MicoService.Demo2/Properties/launchSettings.json
@@ -5,7 +5,7 @@
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
- "applicationUrl": "http://localhost:5181",
+ "applicationUrl": "http://localhost:5002",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
diff --git a/Microservice.Common.Tests/ServiceClientTests.cs b/Microservice.Common.Tests/ServiceClientTests.cs
deleted file mode 100644
index 58f5b09..0000000
--- a/Microservice.Common.Tests/ServiceClientTests.cs
+++ /dev/null
@@ -1,256 +0,0 @@
-using Microsoft.AspNetCore.Builder;
-using Microsoft.AspNetCore.Http;
-using Microsoft.AspNetCore.Mvc.Testing;
-using Microsoft.Extensions.Logging;
-using Moq;
-using Nacos.V2;
-using Nacos.V2.Common;
-using Nacos.V2.Naming.Dtos;
-using System.Net.Http;
-using System.Net.Http.Json;
-using System.Threading.Tasks;
-using Xunit;
-
-namespace Microservice.Common.Tests
-{
- public class ServiceClientTests
- {
- private readonly Mock _nacosNamingServiceMock;
- private readonly HttpClient _httpClient;
- private readonly Mock> _loggerMock;
- private readonly ServiceClient _serviceClient;
-
- public ServiceClientTests()
- {
- // 初始化模拟对象
- _nacosNamingServiceMock = new Mock();
- _loggerMock = new Mock>();
-
- // 创建一个测试用的 HttpClient
- var httpClientHandler = new HttpClientHandler
- {
- ServerCertificateCustomValidationCallback = (sender, cert, chain, sslPolicyErrors) => true
- };
- _httpClient = new HttpClient(httpClientHandler);
-
- // 创建 ServiceClient 实例
- _serviceClient = new ServiceClient(_nacosNamingServiceMock.Object, _loggerMock.Object);
- }
-
- [Fact]
- public async Task GetAsync_ShouldReturnSuccess_WhenServiceIsAvailable()
- {
- // Arrange
- var serviceName = "TestService";
- var endpoint = "/api/test";
- var expectedResponse = new { Message = "Test Response" };
-
- // 模拟 Nacos 服务发现返回一个健康的实例
- var instance = new Instance
- {
- Ip = "127.0.0.1",
- Port = 8080,
- Healthy = true,
- Enabled = true
- };
- _nacosNamingServiceMock.Setup(x => x.SelectOneHealthyInstance(serviceName)).ReturnsAsync(instance);
-
- // 创建一个测试服务器
- var testServer = new TestServer(new WebApplicationFactory