前言
在企业级系统里,RabbitMQ 常用于异步通知、日志审计、业务解耦、任务拆分和跨系统事件传递。如果每个业务模块都自己创建连接、Channel、声明队列、序列化消息和处理 ACK,后期会出现重复代码多、连接管理混乱、拓展困难等问题。
所以 HQServer 这次把 RabbitMQ 封装到 HQ.Common/Messaging 中,作为框架级通用能力。业务层只需要依赖发布接口或消费接口,不直接关心 RabbitMQ 底层连接和通道生命周期。
实现目标
- 通过
RabbitMQOptions统一管理连接配置。 - 通过
IRabbitMQConnectionFactory统一管理 RabbitMQ 连接。 - 通过
IRabbitMQChannelProvider统一管理 Channel。 - 通过
IRabbitMQTopologyManager统一声明队列和绑定交换机。 - 通过
IRabbitMQPublisher统一发布对象消息和原始字节消息。 - 通过
IRabbitMQConsumerManager统一订阅、反序列化和手动 ACK。 - 通过
AddHQRabbitMQ一行代码完成 DI 注册。
项目结构
HQ.Common
└── Messaging
├── IRabbitMQChannelProvider.cs
├── IRabbitMQConnectionFactory.cs
├── RabbitMQChannelProvider.cs
├── RabbitMQConnectionFactory.cs
├── RabbitMQMessaging.cs
├── RabbitMQOptions.cs
├── RabbitMQPublishOptions.cs
├── RabbitMQQueueDeclareOptions.cs
├── RabbitMQServiceCollectionExtensions.cs
└── RabbitMQTopologyManager.cs
RabbitMQ 属于系统底层通用能力,后续订单、通知、日志、审批、任务等模块都可能使用,所以统一放到 Common 层更符合 HQServer 的项目结构。
安装 RabbitMQ.Client
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
项目使用 RabbitMQ.Client 7.1.2,新版本 API 已经大量异步化,所以封装时也统一采用 async/await。
RabbitMQOptions 配置对象
namespace HQ.Common.Messaging;
public sealed class RabbitMQOptions
{
public bool Enabled { get; set; } = true;
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
public string ClientProvidedName { get; set; } = "HQServer";
public ushort RequestedChannelMax { get; set; } = 0;
public uint RequestedHeartbeat { get; set; } = 60;
public bool AutomaticRecoveryEnabled { get; set; } = true;
public bool TopologyRecoveryEnabled { get; set; } = true;
public int NetworkRecoveryIntervalSeconds { get; set; } = 5;
public bool DispatchConsumersAsync { get; set; } = true;
public int ConsumerConcurrency { get; set; } = 1;
public bool PersistentMessages { get; set; } = true;
public string ExchangeName { get; set; } = "";
}
这里集中管理 RabbitMQ 的主机、端口、账号、虚拟主机、客户端名称、心跳、自动恢复、拓扑恢复、网络恢复间隔、默认交换机等配置。配置集中之后,部署环境只需要调整配置文件,不需要修改业务代码。
连接工厂
RabbitMQ 连接属于较重资源,不能在业务代码中频繁创建。项目中通过 IRabbitMQConnectionFactory 统一获取连接:
public interface IRabbitMQConnectionFactory
{
Task<IConnection> GetConnectionAsync(CancellationToken cancellationToken = default);
}
RabbitMQConnectionFactory 内部会复用已打开连接,并通过 SemaphoreSlim 避免并发重复创建连接。创建连接时会读取 RabbitMQOptions,并启用自动恢复、拓扑恢复、心跳和网络恢复间隔等配置。
Channel Provider
消息发布、队列声明和消息消费都需要 Channel。项目中通过 IRabbitMQChannelProvider 统一获取 Channel:
public interface IRabbitMQChannelProvider
{
Task<IChannel> GetChannelAsync(CancellationToken cancellationToken = default);
}
RabbitMQChannelProvider 会复用已打开 Channel;如果 Channel 不存在或已关闭,则通过连接工厂重新创建。这样业务层不直接处理连接和 Channel 生命周期。
队列声明与拓扑管理
public sealed class RabbitMQQueueDeclareOptions
{
public string QueueName { get; set; } = string.Empty;
public bool Durable { get; set; } = true;
public bool Exclusive { get; set; } = false;
public bool AutoDelete { get; set; } = false;
public IDictionary<string, object?>? Arguments { get; set; }
public string? ExchangeName { get; set; }
public string RoutingKey { get; set; } = string.Empty;
}
IRabbitMQTopologyManager 负责声明队列。如果设置了 ExchangeName,则继续通过 RoutingKey 绑定交换机:
public interface IRabbitMQTopologyManager
{
Task DeclareQueueAsync(RabbitMQQueueDeclareOptions options, CancellationToken cancellationToken = default);
}
消息发布
public sealed class RabbitMQPublishOptions
{
public string? Exchange { get; set; }
public string RoutingKey { get; set; } = string.Empty;
public bool Persistent { get; set; } = true;
public string? MessageId { get; set; }
public string? CorrelationId { get; set; }
public IDictionary<string, object?>? Headers { get; set; }
}
public interface IRabbitMQPublisher
{
Task PublishAsync<T>(T message, RabbitMQPublishOptions? options = null, CancellationToken cancellationToken = default);
Task PublishRawAsync(ReadOnlyMemory<byte> body, RabbitMQPublishOptions? options = null, CancellationToken cancellationToken = default);
}
PublishAsync<T> 会把对象序列化为 JSON,然后调用 PublishRawAsync。发布时会设置 Persistent、MessageId、CorrelationId、ContentType 和 Headers。如果发布参数中没有指定 Exchange,则使用 RabbitMQOptions.ExchangeName 作为默认交换机。
业务层使用示例:
await publisher.PublishAsync(new OrderCreatedMessage
{
OrderNo = "SO20260519001",
UserId = 1001,
Amount = 399.00m
}, new RabbitMQPublishOptions
{
Exchange = "order.exchange",
RoutingKey = "order.created"
});
消息消费
public interface IRabbitMQConsumerManager
{
Task<string> SubscribeAsync<T>(
RabbitMQQueueDeclareOptions queueOptions,
Func<T, CancellationToken, Task> handler,
CancellationToken cancellationToken = default);
}
RabbitMQConsumerManager 订阅时会先调用拓扑管理器声明队列,再创建 AsyncEventingBasicConsumer。收到消息后,将消息体按 UTF-8 转成 JSON,再反序列化为指定类型,最后调用业务 handler。业务处理成功后执行 BasicAckAsync 手动确认。
await consumerManager.SubscribeAsync<OrderCreatedMessage>(
new RabbitMQQueueDeclareOptions
{
QueueName = "order.created.queue",
ExchangeName = "order.exchange",
RoutingKey = "order.created"
},
async (message, ct) =>
{
Console.WriteLine($"收到订单消息:{message.OrderNo}");
await Task.CompletedTask;
});
统一 DI 注册
Common 层提供 AddHQRabbitMQ 扩展方法,统一注册 RabbitMQ 相关服务:
public static IServiceCollection AddHQRabbitMQ(
this IServiceCollection services,
Action<RabbitMQOptions> configure)
{
services.AddOptions<RabbitMQOptions>()
.Configure(configure)
.Validate(options => options.Validate(out _), "RabbitMQ 配置无效")
.ValidateOnStart();
services.TryAddSingleton<IRabbitMQConnectionFactory, RabbitMQConnectionFactory>();
services.TryAddSingleton<IRabbitMQChannelProvider, RabbitMQChannelProvider>();
services.TryAddSingleton<IRabbitMQPublisher, RabbitMQPublisher>();
services.TryAddSingleton<IRabbitMQConsumerManager, RabbitMQConsumerManager>();
services.TryAddSingleton<IRabbitMQTopologyManager, RabbitMQTopologyManager>();
return services;
}
在 Program.cs 中接入:
using HQ.Common.Messaging;
builder.Services.AddHQRabbitMQ(options =>
builder.Configuration.GetSection("RabbitMQ").Bind(options));
appsettings 配置示例
"RabbitMQ": {
"Enabled": true,
"HostName": "localhost",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"ClientProvidedName": "HQServer",
"RequestedHeartbeat": 60,
"AutomaticRecoveryEnabled": true,
"TopologyRecoveryEnabled": true,
"NetworkRecoveryIntervalSeconds": 5,
"DispatchConsumersAsync": true,
"ConsumerConcurrency": 1,
"PersistentMessages": true,
"ExchangeName": "hq.default.exchange"
}
实际生产环境中,账号密码建议通过环境变量或配置中心管理。文章中的配置只作为示例,不展示真实敏感信息。
构建验证
完成封装和注册后,项目执行构建:
dotnet build HQ.Application/HQ.Application.csproj
构建结果为 0 个错误。当前项目中存在的 UserService 异步 warning 属于原有代码提示,不影响本次 RabbitMQ 封装。
总结
本篇完成了 HQServer 中 RabbitMQ 底层能力的封装。整体设计保持轻量,不引入复杂框架,只围绕企业项目最常用的连接、Channel、队列声明、消息发布、消息消费和 DI 注册做统一封装。
封装之后,业务模块只需要依赖 IRabbitMQPublisher 或 IRabbitMQConsumerManager,就可以完成消息发布和订阅。后续如果要扩展死信队列、延迟消息、重试策略、消费失败处理、消息轨迹等能力,也可以继续在 HQ.Common/Messaging 层统一增强。










暂无评论内容