From d73495df340a95e6c31f8b42ec086ea4a0ef4fe4 Mon Sep 17 00:00:00 2001 From: YangQiang Date: Thu, 19 Mar 2026 17:48:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EMassTransit+RabbitMQ=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=98=9F=E5=88=97=E6=BC=94=E7=A4=BA=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本次提交包含完整的发布/订阅与请求/响应模式示例,新增三大项目(Contracts、Publisher、Consumer),实现消息契约、消费者与发布者API,配置RabbitMQ连接,集成Swagger,提供详细测试说明文档,便于分布式消息通信学习与测试。 --- MassTransit示例代码/.gitattributes | 63 +++ MassTransit示例代码/.gitignore | 363 ++++++++++++++++++ .../Consumers/PublishSubscribeConsumer1.cs | 49 +++ .../Consumers/PublishSubscribeConsumer2.cs | 51 +++ .../Consumers/RequestResponseConsumer.cs | 115 ++++++ .../MassTransitDemo.Consumer.csproj | 20 + .../MassTransitDemo.Consumer/Program.cs | 69 ++++ .../Properties/launchSettings.json | 15 + .../appsettings.Development.json | 8 + .../MassTransitDemo.Consumer/appsettings.json | 17 + .../MassTransitDemo.Contracts.csproj | 9 + .../PublishSubscribeMessage.cs | 33 ++ .../MassTransitDemo.Contracts/RequestMessage.cs | 28 ++ .../MassTransitDemo.Contracts/ResponseMessage.cs | 38 ++ .../Controllers/MessageController.cs | 228 +++++++++++ .../MassTransitDemo.Publisher.csproj | 20 + .../MassTransitDemo.Publisher/Program.cs | 50 +++ .../Properties/launchSettings.json | 14 + .../appsettings.Development.json | 8 + .../MassTransitDemo.Publisher/appsettings.json | 17 + MassTransit示例代码/MassTransitDemo.slnx | 5 + MassTransit示例代码/测试说明.md | 179 +++++++++ 22 files changed, 1399 insertions(+) create mode 100644 MassTransit示例代码/.gitattributes create mode 100644 MassTransit示例代码/.gitignore create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer1.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer2.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/Consumers/RequestResponseConsumer.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/MassTransitDemo.Consumer.csproj create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/Program.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/Properties/launchSettings.json create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/appsettings.Development.json create mode 100644 MassTransit示例代码/MassTransitDemo.Consumer/appsettings.json create mode 100644 MassTransit示例代码/MassTransitDemo.Contracts/MassTransitDemo.Contracts.csproj create mode 100644 MassTransit示例代码/MassTransitDemo.Contracts/PublishSubscribeMessage.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Contracts/RequestMessage.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Contracts/ResponseMessage.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/Controllers/MessageController.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/MassTransitDemo.Publisher.csproj create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/Program.cs create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/Properties/launchSettings.json create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/appsettings.Development.json create mode 100644 MassTransit示例代码/MassTransitDemo.Publisher/appsettings.json create mode 100644 MassTransit示例代码/MassTransitDemo.slnx create mode 100644 MassTransit示例代码/测试说明.md diff --git a/MassTransit示例代码/.gitattributes b/MassTransit示例代码/.gitattributes new file mode 100644 index 0000000..1ff0c42 --- /dev/null +++ b/MassTransit示例代码/.gitattributes @@ -0,0 +1,63 @@ +############################################################################### +# Set default behavior to automatically normalize line endings. +############################################################################### +* text=auto + +############################################################################### +# Set default behavior for command prompt diff. +# +# This is need for earlier builds of msysgit that does not have it on by +# default for csharp files. +# Note: This is only used by command line +############################################################################### +#*.cs diff=csharp + +############################################################################### +# Set the merge driver for project and solution files +# +# Merging from the command prompt will add diff markers to the files if there +# are conflicts (Merging from VS is not affected by the settings below, in VS +# the diff markers are never inserted). Diff markers may cause the following +# file extensions to fail to load in VS. An alternative would be to treat +# these files as binary and thus will always conflict and require user +# intervention with every merge. To do so, just uncomment the entries below +############################################################################### +#*.sln merge=binary +#*.csproj merge=binary +#*.vbproj merge=binary +#*.vcxproj merge=binary +#*.vcproj merge=binary +#*.dbproj merge=binary +#*.fsproj merge=binary +#*.lsproj merge=binary +#*.wixproj merge=binary +#*.modelproj merge=binary +#*.sqlproj merge=binary +#*.wwaproj merge=binary + +############################################################################### +# behavior for image files +# +# image files are treated as binary by default. +############################################################################### +#*.jpg binary +#*.png binary +#*.gif binary + +############################################################################### +# diff behavior for common document formats +# +# Convert binary document formats to text before diffing them. This feature +# is only available from the command line. Turn it on by uncommenting the +# entries below. +############################################################################### +#*.doc diff=astextplain +#*.DOC diff=astextplain +#*.docx diff=astextplain +#*.DOCX diff=astextplain +#*.dot diff=astextplain +#*.DOT diff=astextplain +#*.pdf diff=astextplain +#*.PDF diff=astextplain +#*.rtf diff=astextplain +#*.RTF diff=astextplain diff --git a/MassTransit示例代码/.gitignore b/MassTransit示例代码/.gitignore new file mode 100644 index 0000000..9491a2f --- /dev/null +++ b/MassTransit示例代码/.gitignore @@ -0,0 +1,363 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.rsuser +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Mono auto generated files +mono_crash.* + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +[Ww][Ii][Nn]32/ +[Aa][Rr][Mm]/ +[Aa][Rr][Mm]64/ +bld/ +[Bb]in/ +[Oo]bj/ +[Oo]ut/ +[Ll]og/ +[Ll]ogs/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUnit +*.VisualState.xml +TestResult.xml +nunit-*.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ + +# ASP.NET Scaffolding +ScaffoldingReadMe.txt + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_h.h +*.ilk +*.meta +*.obj +*.iobj +*.pch +*.pdb +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*_wpftmp.csproj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Coverlet is a free, cross platform Code Coverage Tool +coverage*.json +coverage*.xml +coverage*.info + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# NuGet Symbol Packages +*.snupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx +*.appxbundle +*.appxupload + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!?*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser +*- [Bb]ackup.rdl +*- [Bb]ackup ([0-9]).rdl +*- [Bb]ackup ([0-9][0-9]).rdl + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# CodeRush personal settings +.cr/personal + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ + +# Local History for Visual Studio +.localhistory/ + +# BeatPulse healthcheck temp database +healthchecksdb + +# Backup folder for Package Reference Convert tool in Visual Studio 2017 +MigrationBackup/ + +# Ionide (cross platform F# VS Code tools) working folder +.ionide/ + +# Fody - auto-generated XML schema +FodyWeavers.xsd \ No newline at end of file diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer1.cs b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer1.cs new file mode 100644 index 0000000..d3cbdc5 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer1.cs @@ -0,0 +1,49 @@ +using MassTransit; +using MassTransitDemo.Contracts; + +namespace MassTransitDemo.Consumer.Consumers; + +/// +/// 发布/订阅模式的消费者1 +/// 用于接收并处理广播消息 +/// +public class PublishSubscribeConsumer1 : IConsumer +{ + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 日志记录器 + public PublishSubscribeConsumer1(ILogger logger) + { + _logger = logger; + } + + /// + /// 消费消息的方法 + /// + /// 消息上下文 + public Task Consume(ConsumeContext context) + { + try + { + _logger.LogInformation("【消费者1】收到发布/订阅消息"); + _logger.LogInformation(" MessageId: {MessageId}", context.Message.MessageId); + _logger.LogInformation(" CreatedAt: {CreatedAt}", context.Message.CreatedAt); + _logger.LogInformation(" Title: {Title}", context.Message.Title); + _logger.LogInformation(" Content: {Content}", context.Message.Content); + _logger.LogInformation(" Priority: {Priority}", context.Message.Priority); + + // 模拟处理逻辑 + _logger.LogInformation("【消费者1】消息处理完成"); + + return Task.CompletedTask; + } + catch (Exception ex) + { + _logger.LogError(ex, "【消费者1】消息处理失败"); + throw; + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer2.cs b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer2.cs new file mode 100644 index 0000000..1344c07 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/PublishSubscribeConsumer2.cs @@ -0,0 +1,51 @@ +using MassTransit; +using MassTransitDemo.Contracts; + +namespace MassTransitDemo.Consumer.Consumers; + +/// +/// 发布/订阅模式的消费者2 +/// 用于演示多个消费者接收同一条广播消息 +/// +public class PublishSubscribeConsumer2 : IConsumer +{ + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 日志记录器 + public PublishSubscribeConsumer2(ILogger logger) + { + _logger = logger; + } + + /// + /// 消费消息的方法 + /// + /// 消息上下文 + public Task Consume(ConsumeContext context) + { + try + { + _logger.LogInformation("【消费者2】收到发布/订阅消息"); + _logger.LogInformation(" MessageId: {MessageId}", context.Message.MessageId); + _logger.LogInformation(" CreatedAt: {CreatedAt}", context.Message.CreatedAt); + _logger.LogInformation(" Title: {Title}", context.Message.Title); + _logger.LogInformation(" Content: {Content}", context.Message.Content); + _logger.LogInformation(" Priority: {Priority}", context.Message.Priority); + + // 模拟不同的处理逻辑(与消费者1不同的处理方式) + var processedContent = $"[消费者2处理] {context.Message.Content.ToUpper()}"; + _logger.LogInformation(" 处理结果: {ProcessedContent}", processedContent); + _logger.LogInformation("【消费者2】消息处理完成"); + + return Task.CompletedTask; + } + catch (Exception ex) + { + _logger.LogError(ex, "【消费者2】消息处理失败"); + throw; + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/RequestResponseConsumer.cs b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/RequestResponseConsumer.cs new file mode 100644 index 0000000..fd43fbf --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/Consumers/RequestResponseConsumer.cs @@ -0,0 +1,115 @@ +using MassTransit; +using MassTransitDemo.Contracts; + +namespace MassTransitDemo.Consumer.Consumers; + +/// +/// 请求/响应模式的消费者 +/// 用于接收请求消息并返回响应 +/// +public class RequestResponseConsumer : IConsumer +{ + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 日志记录器 + public RequestResponseConsumer(ILogger logger) + { + _logger = logger; + } + + /// + /// 消费请求消息并返回响应 + /// + /// 消息上下文 + public async Task Consume(ConsumeContext context) + { + try + { + _logger.LogInformation("【请求/响应消费者】收到请求"); + _logger.LogInformation(" RequestId: {RequestId}", context.Message.RequestId); + _logger.LogInformation(" RequestedAt: {RequestedAt}", context.Message.RequestedAt); + _logger.LogInformation(" Operation: {Operation}", context.Message.Operation); + _logger.LogInformation(" Parameters: {Parameters}", context.Message.Parameters); + + // 根据操作类型处理请求 + var (success, data, errorMessage) = ProcessOperation(context.Message.Operation, context.Message.Parameters); + + // 创建响应消息实例 + var responseMessage = new ResponseMessage + { + ResponseId = Guid.NewGuid(), + CorrelatedRequestId = context.Message.RequestId, + RespondedAt = DateTime.UtcNow, + Success = success, + Data = data, + ErrorMessage = errorMessage + }; + + _logger.LogInformation("【请求/响应消费者】准备返回响应,成功: {Success}", success); + + // 发送响应 + await context.RespondAsync(responseMessage); + + _logger.LogInformation("【请求/响应消费者】响应已发送"); + } + catch (Exception ex) + { + _logger.LogError(ex, "【请求/响应消费者】请求处理失败"); + + // 发送错误响应 + var errorResponse = new ResponseMessage + { + ResponseId = Guid.NewGuid(), + CorrelatedRequestId = context.Message.RequestId, + RespondedAt = DateTime.UtcNow, + Success = false, + Data = string.Empty, + ErrorMessage = ex.Message + }; + + await context.RespondAsync(errorResponse); + } + } + + /// + /// 根据操作类型处理业务逻辑 + /// + /// 操作类型 + /// 参数 + /// 处理结果 + private (bool Success, string Data, string? ErrorMessage) ProcessOperation(string operation, string parameters) + { + try + { + switch (operation.ToLower()) + { + case "echo": + return (true, $"Echo: {parameters}", null); + + case "uppercase": + return (true, parameters.ToUpper(), null); + + case "lowercase": + return (true, parameters.ToLower(), null); + + case "reverse": + char[] charArray = parameters.ToCharArray(); + Array.Reverse(charArray); + return (true, new string(charArray), null); + + case "length": + return (true, $"Length: {parameters.Length}", null); + + default: + return (false, string.Empty, $"不支持的操作类型: {operation}。支持的操作: echo, uppercase, lowercase, reverse, length"); + } + } + catch (Exception ex) + { + return (false, string.Empty, $"操作执行失败: {ex.Message}"); + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/MassTransitDemo.Consumer.csproj b/MassTransit示例代码/MassTransitDemo.Consumer/MassTransitDemo.Consumer.csproj new file mode 100644 index 0000000..1faeb97 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/MassTransitDemo.Consumer.csproj @@ -0,0 +1,20 @@ + + + + net10.0 + enable + enable + + + + + + + + + + + + + + diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/Program.cs b/MassTransit示例代码/MassTransitDemo.Consumer/Program.cs new file mode 100644 index 0000000..a571141 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/Program.cs @@ -0,0 +1,69 @@ +using MassTransit; +using MassTransitDemo.Consumer.Consumers; +using MassTransitDemo.Contracts; + +var builder = WebApplication.CreateBuilder(args); + +// 添加控制器服务 +builder.Services.AddControllers(); + +// 添加API探索器(用于Swagger) +builder.Services.AddOpenApi(); + + +// 配置MassTransit和RabbitMQ +builder.Services.AddMassTransit(x => +{ + // 注册所有消费者 + x.AddConsumer(); + x.AddConsumer(); + x.AddConsumer(); + + // 配置RabbitMQ传输 + x.UsingRabbitMq((context, cfg) => + { + // 从配置中读取RabbitMQ连接信息 + var rabbitMqConfig = builder.Configuration.GetSection("RabbitMQ"); + var host = rabbitMqConfig["Host"]; + var port = ushort.Parse(rabbitMqConfig["Port"]!); + var username = rabbitMqConfig["Username"]; + var password = rabbitMqConfig["Password"]; + var virtualHost = rabbitMqConfig["VirtualHost"]; + + // 配置RabbitMQ主机连接 + cfg.Host(host, port, virtualHost, h => + { + h.Username(username); + h.Password(password); + }); + + // 配置消息端点 + cfg.ReceiveEndpoint("publish-subscribe-queue-1", e => + { + e.ConfigureConsumer(context); + }); + + cfg.ReceiveEndpoint("publish-subscribe-queue-2", e => + { + e.ConfigureConsumer(context); + }); + + cfg.ReceiveEndpoint("request-response-queue", e => + { + e.ConfigureConsumer(context); + }); + }); +}); + +var app = builder.Build(); + +app.MapOpenApi(); +app.UseSwaggerUI(options => +{ + options.SwaggerEndpoint("/openapi/v1.json", "API v1"); +}); + +// 映射控制器路由 +app.MapControllers(); + +app.Run(); diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/Properties/launchSettings.json b/MassTransit示例代码/MassTransitDemo.Consumer/Properties/launchSettings.json new file mode 100644 index 0000000..d39d7fe --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/Properties/launchSettings.json @@ -0,0 +1,15 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5002", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.Development.json b/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.json b/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.json new file mode 100644 index 0000000..ade226e --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Consumer/appsettings.json @@ -0,0 +1,17 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "MassTransit": "Information" + } + }, + "AllowedHosts": "*", + "RabbitMQ": { + "Host": "192.168.2.7", + "Port": 5672, + "Username": "rabbit", + "Password": "qwe123!@#", + "VirtualHost": "/" + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Contracts/MassTransitDemo.Contracts.csproj b/MassTransit示例代码/MassTransitDemo.Contracts/MassTransitDemo.Contracts.csproj new file mode 100644 index 0000000..b760144 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Contracts/MassTransitDemo.Contracts.csproj @@ -0,0 +1,9 @@ + + + + net10.0 + enable + enable + + + diff --git a/MassTransit示例代码/MassTransitDemo.Contracts/PublishSubscribeMessage.cs b/MassTransit示例代码/MassTransitDemo.Contracts/PublishSubscribeMessage.cs new file mode 100644 index 0000000..c7e8b11 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Contracts/PublishSubscribeMessage.cs @@ -0,0 +1,33 @@ +namespace MassTransitDemo.Contracts; + +/// +/// 发布/订阅模式的消息契约 +/// 用于向多个消费者广播消息 +/// +public class PublishSubscribeMessage +{ + /// + /// 消息唯一标识符 + /// + public Guid MessageId { get; set; } + + /// + /// 消息创建时间 + /// + public DateTime CreatedAt { get; set; } + + /// + /// 消息标题 + /// + public string Title { get; set; } = string.Empty; + + /// + /// 消息内容 + /// + public string Content { get; set; } = string.Empty; + + /// + /// 消息优先级 + /// + public int Priority { get; set; } +} diff --git a/MassTransit示例代码/MassTransitDemo.Contracts/RequestMessage.cs b/MassTransit示例代码/MassTransitDemo.Contracts/RequestMessage.cs new file mode 100644 index 0000000..3a07620 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Contracts/RequestMessage.cs @@ -0,0 +1,28 @@ +namespace MassTransitDemo.Contracts; + +/// +/// 请求/响应模式的请求消息契约 +/// 用于客户端向服务端发送请求 +/// +public class RequestMessage +{ + /// + /// 请求唯一标识符 + /// + public Guid RequestId { get; set; } + + /// + /// 请求创建时间 + /// + public DateTime RequestedAt { get; set; } + + /// + /// 请求操作类型 + /// + public string Operation { get; set; } = string.Empty; + + /// + /// 请求参数 + /// + public string Parameters { get; set; } = string.Empty; +} diff --git a/MassTransit示例代码/MassTransitDemo.Contracts/ResponseMessage.cs b/MassTransit示例代码/MassTransitDemo.Contracts/ResponseMessage.cs new file mode 100644 index 0000000..0fe6461 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Contracts/ResponseMessage.cs @@ -0,0 +1,38 @@ +namespace MassTransitDemo.Contracts; + +/// +/// 请求/响应模式的响应消息契约 +/// 用于服务端向客户端返回响应 +/// +public class ResponseMessage +{ + /// + /// 响应唯一标识符 + /// + public Guid ResponseId { get; set; } + + /// + /// 原始请求标识符 + /// + public Guid CorrelatedRequestId { get; set; } + + /// + /// 响应创建时间 + /// + public DateTime RespondedAt { get; set; } + + /// + /// 操作是否成功 + /// + public bool Success { get; set; } + + /// + /// 响应数据 + /// + public string Data { get; set; } = string.Empty; + + /// + /// 错误信息(如果有) + /// + public string? ErrorMessage { get; set; } +} diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/Controllers/MessageController.cs b/MassTransit示例代码/MassTransitDemo.Publisher/Controllers/MessageController.cs new file mode 100644 index 0000000..f32a711 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/Controllers/MessageController.cs @@ -0,0 +1,228 @@ +using MassTransit; +using Microsoft.AspNetCore.Mvc; +using MassTransitDemo.Contracts; + +namespace MassTransitDemo.Publisher.Controllers; + +/// +/// 消息发布控制器 +/// 提供发布/订阅模式和请求/响应模式的API接口 +/// +[ApiController] +[Route("api/[controller]")] +public class MessageController : ControllerBase +{ + private readonly IPublishEndpoint _publishEndpoint; + private readonly IRequestClient _requestClient; + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 发布端点,用于发布/订阅模式 + /// 请求客户端,用于请求/响应模式 + /// 日志记录器 + public MessageController( + IPublishEndpoint publishEndpoint, + IRequestClient requestClient, + ILogger logger) + { + _publishEndpoint = publishEndpoint; + _requestClient = requestClient; + _logger = logger; + } + + /// + /// 发布/订阅模式:发布一条广播消息 + /// + /// 消息发布请求 + /// 操作结果 + [HttpPost("publish")] + public async Task PublishMessage([FromBody] PublishMessageRequest request) + { + try + { + _logger.LogInformation("开始发布消息,标题: {Title}", request.Title); + + // 创建发布/订阅消息实例 + var message = new PublishSubscribeMessage + { + MessageId = Guid.NewGuid(), + CreatedAt = DateTime.UtcNow, + Title = request.Title, + Content = request.Content, + Priority = request.Priority + }; + + // 使用MassTransit发布消息(广播给所有订阅者) + await _publishEndpoint.Publish(message); + + _logger.LogInformation("消息发布成功,MessageId: {MessageId}", message.MessageId); + + return Ok(new PublishMessageResponse + { + Success = true, + MessageId = message.MessageId, + Message = "消息发布成功" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "消息发布失败"); + return StatusCode(500, new ErrorResponse + { + Success = false, + Error = ex.Message + }); + } + } + + /// + /// 请求/响应模式:发送请求并等待响应 + /// + /// 请求内容 + /// 响应结果 + [HttpPost("request")] + public async Task SendRequest([FromBody] SendRequestRequest request) + { + try + { + _logger.LogInformation("开始发送请求,操作: {Operation}", request.Operation); + + // 创建请求消息实例 + var requestMessage = new RequestMessage + { + RequestId = Guid.NewGuid(), + RequestedAt = DateTime.UtcNow, + Operation = request.Operation, + Parameters = request.Parameters + }; + + // 使用MassTransit发送请求并等待响应 + var response = await _requestClient.GetResponse(requestMessage); + + _logger.LogInformation("请求处理完成,RequestId: {RequestId}, 成功: {Success}", + requestMessage.RequestId, response.Message.Success); + + return Ok(new RequestResponseResult + { + Success = true, + RequestId = requestMessage.RequestId, + Response = response.Message + }); + } + catch (RequestTimeoutException) + { + _logger.LogError("请求超时"); + return StatusCode(408, new ErrorResponse + { + Success = false, + Error = "请求超时" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "请求处理失败"); + return StatusCode(500, new ErrorResponse + { + Success = false, + Error = ex.Message + }); + } + } +} + +/// +/// 发布消息请求模型 +/// +public class PublishMessageRequest +{ + /// + /// 消息标题 + /// + public string Title { get; set; } = string.Empty; + + /// + /// 消息内容 + /// + public string Content { get; set; } = string.Empty; + + /// + /// 消息优先级(默认:1) + /// + public int Priority { get; set; } = 1; +} + +/// +/// 发布消息响应模型 +/// +public class PublishMessageResponse +{ + /// + /// 操作是否成功 + /// + public bool Success { get; set; } + + /// + /// 消息唯一标识符 + /// + public Guid MessageId { get; set; } + + /// + /// 响应消息 + /// + public string Message { get; set; } = string.Empty; +} + +/// +/// 发送请求模型 +/// +public class SendRequestRequest +{ + /// + /// 操作类型 + /// + public string Operation { get; set; } = string.Empty; + + /// + /// 请求参数(JSON格式字符串) + /// + public string Parameters { get; set; } = string.Empty; +} + +/// +/// 请求响应结果模型 +/// +public class RequestResponseResult +{ + /// + /// 操作是否成功 + /// + public bool Success { get; set; } + + /// + /// 请求唯一标识符 + /// + public Guid RequestId { get; set; } + + /// + /// 响应消息 + /// + public ResponseMessage? Response { get; set; } +} + +/// +/// 错误响应模型 +/// +public class ErrorResponse +{ + /// + /// 操作是否成功 + /// + public bool Success { get; set; } + + /// + /// 错误信息 + /// + public string Error { get; set; } = string.Empty; +} diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/MassTransitDemo.Publisher.csproj b/MassTransit示例代码/MassTransitDemo.Publisher/MassTransitDemo.Publisher.csproj new file mode 100644 index 0000000..1faeb97 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/MassTransitDemo.Publisher.csproj @@ -0,0 +1,20 @@ + + + + net10.0 + enable + enable + + + + + + + + + + + + + + diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/Program.cs b/MassTransit示例代码/MassTransitDemo.Publisher/Program.cs new file mode 100644 index 0000000..62d7799 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/Program.cs @@ -0,0 +1,50 @@ +using MassTransit; +using MassTransitDemo.Contracts; + +var builder = WebApplication.CreateBuilder(args); + +// 添加控制器服务 +builder.Services.AddControllers(); + +// 添加API探索器(用于Swagger) +builder.Services.AddOpenApi(); + + +// 配置MassTransit和RabbitMQ +builder.Services.AddMassTransit(x => +{ + // 配置RabbitMQ传输 + x.UsingRabbitMq((context, cfg) => + { + // 从配置中读取RabbitMQ连接信息 + var rabbitMqConfig = builder.Configuration.GetSection("RabbitMQ"); + var host = rabbitMqConfig["Host"]; + var port = ushort.Parse(rabbitMqConfig["Port"]!); + var username = rabbitMqConfig["Username"]; + var password = rabbitMqConfig["Password"]; + var virtualHost = rabbitMqConfig["VirtualHost"]; + + // 配置RabbitMQ主机连接 + cfg.Host(host, port, virtualHost, h => + { + h.Username(username); + h.Password(password); + }); + + // 配置消息端点(发布者不需要配置消费者,只需要配置发送) + cfg.ConfigureEndpoints(context); + }); +}); + +var app = builder.Build(); + +app.MapOpenApi(); +app.UseSwaggerUI(options => +{ + options.SwaggerEndpoint("/openapi/v1.json", "API v1"); +}); + +// 映射控制器路由 +app.MapControllers(); + +app.Run(); diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/Properties/launchSettings.json b/MassTransit示例代码/MassTransitDemo.Publisher/Properties/launchSettings.json new file mode 100644 index 0000000..dfa0af3 --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/Properties/launchSettings.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5001", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.Development.json b/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.json b/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.json new file mode 100644 index 0000000..ade226e --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.Publisher/appsettings.json @@ -0,0 +1,17 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "MassTransit": "Information" + } + }, + "AllowedHosts": "*", + "RabbitMQ": { + "Host": "192.168.2.7", + "Port": 5672, + "Username": "rabbit", + "Password": "qwe123!@#", + "VirtualHost": "/" + } +} diff --git a/MassTransit示例代码/MassTransitDemo.slnx b/MassTransit示例代码/MassTransitDemo.slnx new file mode 100644 index 0000000..f13523b --- /dev/null +++ b/MassTransit示例代码/MassTransitDemo.slnx @@ -0,0 +1,5 @@ + + + + + diff --git a/MassTransit示例代码/测试说明.md b/MassTransit示例代码/测试说明.md new file mode 100644 index 0000000..1557d93 --- /dev/null +++ b/MassTransit示例代码/测试说明.md @@ -0,0 +1,179 @@ +# MassTransit Demo 测试说明 + +## 项目结构 + +``` +MassTransit.Demo/ +├── MassTransitDemo.Contracts/ # 消息契约项目 +│ ├── PublishSubscribeMessage.cs # 发布/订阅消息契约 +│ ├── RequestMessage.cs # 请求消息契约 +│ └── ResponseMessage.cs # 响应消息契约 +├── MassTransitDemo.Publisher/ # 消息发布者API +│ ├── Controllers/ +│ │ └── MessageController.cs # 消息发布控制器 +│ ├── Program.cs # 服务配置 +│ └── appsettings.json # 配置文件 +└── MassTransitDemo.Consumer/ # 消息消费者API + ├── Consumers/ + │ ├── PublishSubscribeConsumer1.cs # 消费者1 + │ ├── PublishSubscribeConsumer2.cs # 消费者2 + │ └── RequestResponseConsumer.cs # 请求/响应消费者 + ├── Program.cs # 服务配置 + └── appsettings.json # 配置文件 +``` + +## 前置条件 + +1. 确保RabbitMQ服务器已启动并运行在:192.168.2.7:5672 +2. 确保使用用户名:rabbit,密码:qwe123!@# 可以正常连接 +3. 安装.NET 10 SDK + +## 运行步骤 + +### 1. 还原依赖 + +```bash +dotnet restore +``` + +### 2. 构建解决方案 + +```bash +dotnet build +``` + +### 3. 启动Consumer项目 + +打开一个终端,进入Consumer项目目录: + +```bash +cd MassTransitDemo.Consumer +dotnet run +``` + +Consumer项目将监听RabbitMQ队列并等待消息。 + +### 4. 启动Publisher项目 + +打开另一个终端,进入Publisher项目目录: + +```bash +cd MassTransitDemo.Publisher +dotnet run +``` + +Publisher项目将提供API接口用于发送消息。 + +## API测试 + +### 发布/订阅模式测试 + +使用Postman或curl发送POST请求: + +**URL:** `POST /api/message/publish` + +**请求体:** +```json +{ + "title": "测试消息", + "content": "这是一条发布/订阅模式的测试消息", + "priority": 1 +} +``` + +**curl示例:** +```bash +curl -X POST "https://localhost:/api/message/publish" \ + -H "Content-Type: application/json" \ + -d '{ + "title": "测试消息", + "content": "这是一条发布/订阅模式的测试消息", + "priority": 1 + }' +``` + +**预期结果:** +- 响应包含成功状态和MessageId +- Consumer项目的日志中会看到【消费者1】和【消费者2】都接收到了消息 + +### 请求/响应模式测试 + +**URL:** `POST /api/message/request` + +**支持的操作类型:** +- `echo`: 原样返回参数 +- `uppercase`: 将参数转换为大写 +- `lowercase`: 将参数转换为小写 +- `reverse`: 反转参数字符串 +- `length`: 返回参数长度 + +**请求体示例 (echo):** +```json +{ + "operation": "echo", + "parameters": "Hello, MassTransit!" +} +``` + +**请求体示例 (uppercase):** +```json +{ + "operation": "uppercase", + "parameters": "Hello, MassTransit!" +} +``` + +**curl示例:** +```bash +curl -X POST "https://localhost:/api/message/request" \ + -H "Content-Type: application/json" \ + -d '{ + "operation": "uppercase", + "parameters": "Hello, MassTransit!" + }' +``` + +**预期结果:** +- 响应包含成功状态、RequestId和处理后的数据 +- Consumer项目的日志中会看到【请求/响应消费者】处理请求的过程 + +## RabbitMQ配置说明 + +配置文件位置:`appsettings.json` + +```json +{ + "RabbitMQ": { + "Host": "192.168.2.7", + "Port": 5672, + "Username": "rabbit", + "Password": "qwe123!@#", + "VirtualHost": "/" + } +} +``` + +如需修改RabbitMQ连接信息,请更新Publisher和Consumer项目的`appsettings.json`文件。 + +## 消息队列说明 + +- `publish-subscribe-queue-1`: 消费者1的队列 +- `publish-subscribe-queue-2`: 消费者2的队列 +- `request-response-queue`: 请求/响应队列 + +## 故障排查 + +1. **连接RabbitMQ失败** + - 检查RabbitMQ服务器是否启动 + - 检查IP地址、端口、用户名和密码是否正确 + - 检查网络连接是否正常 + +2. **消息没有被消费** + - 确保Consumer项目正在运行 + - 检查RabbitMQ管理界面,确认队列存在且有绑定 + - 查看Consumer项目的日志输出 + +3. **请求超时** + - 确保Consumer项目正在运行 + - 检查网络延迟 + - 确认请求队列配置正确