【ASP.NET CORE】10.RabbitMQ消息队列底层封装

前言

在企业级系统里,RabbitMQ 常用于异步通知、日志审计、业务解耦、任务拆分和跨系统事件传递。如果每个业务模块都自己创建连接、Channel、声明队列、序列化消息和处理 ACK,后期会出现重复代码多、连接管理混乱、拓展困难等问题。

所以 HQServer 这次把 RabbitMQ 封装到 HQ.Common/Messaging 中,作为框架级通用能力。业务层只需要依赖发布接口或消费接口,不直接关心 RabbitMQ 底层连接和通道生命周期。

实现目标

  • 通过 RabbitMQOptions 统一管理连接配置。
  • 通过 IRabbitMQConnectionFactory 统一管理 RabbitMQ 连接。
  • 通过 IRabbitMQChannelProvider 统一管理 Channel。
  • 通过 IRabbitMQTopologyManager 统一声明队列和绑定交换机。
  • 通过 IRabbitMQPublisher 统一发布对象消息和原始字节消息。
  • 通过 IRabbitMQConsumerManager 统一订阅、反序列化和手动 ACK。
  • 通过 AddHQRabbitMQ 一行代码完成 DI 注册。

项目结构

HQ.Common
└── Messaging
    ├── IRabbitMQChannelProvider.cs
    ├── IRabbitMQConnectionFactory.cs
    ├── RabbitMQChannelProvider.cs
    ├── RabbitMQConnectionFactory.cs
    ├── RabbitMQMessaging.cs
    ├── RabbitMQOptions.cs
    ├── RabbitMQPublishOptions.cs
    ├── RabbitMQQueueDeclareOptions.cs
    ├── RabbitMQServiceCollectionExtensions.cs
    └── RabbitMQTopologyManager.cs

RabbitMQ 属于系统底层通用能力,后续订单、通知、日志、审批、任务等模块都可能使用,所以统一放到 Common 层更符合 HQServer 的项目结构。

安装 RabbitMQ.Client

<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />

项目使用 RabbitMQ.Client 7.1.2,新版本 API 已经大量异步化,所以封装时也统一采用 async/await。

RabbitMQOptions 配置对象

namespace HQ.Common.Messaging;

public sealed class RabbitMQOptions
{
    public bool Enabled { get; set; } = true;
    public string HostName { get; set; } = "localhost";
    public int Port { get; set; } = 5672;
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string VirtualHost { get; set; } = "/";
    public string ClientProvidedName { get; set; } = "HQServer";
    public ushort RequestedChannelMax { get; set; } = 0;
    public uint RequestedHeartbeat { get; set; } = 60;
    public bool AutomaticRecoveryEnabled { get; set; } = true;
    public bool TopologyRecoveryEnabled { get; set; } = true;
    public int NetworkRecoveryIntervalSeconds { get; set; } = 5;
    public bool DispatchConsumersAsync { get; set; } = true;
    public int ConsumerConcurrency { get; set; } = 1;
    public bool PersistentMessages { get; set; } = true;
    public string ExchangeName { get; set; } = "";
}

这里集中管理 RabbitMQ 的主机、端口、账号、虚拟主机、客户端名称、心跳、自动恢复、拓扑恢复、网络恢复间隔、默认交换机等配置。配置集中之后,部署环境只需要调整配置文件,不需要修改业务代码。

连接工厂

RabbitMQ 连接属于较重资源,不能在业务代码中频繁创建。项目中通过 IRabbitMQConnectionFactory 统一获取连接:

public interface IRabbitMQConnectionFactory
{
    Task<IConnection> GetConnectionAsync(CancellationToken cancellationToken = default);
}

RabbitMQConnectionFactory 内部会复用已打开连接,并通过 SemaphoreSlim 避免并发重复创建连接。创建连接时会读取 RabbitMQOptions,并启用自动恢复、拓扑恢复、心跳和网络恢复间隔等配置。

Channel Provider

消息发布、队列声明和消息消费都需要 Channel。项目中通过 IRabbitMQChannelProvider 统一获取 Channel:

public interface IRabbitMQChannelProvider
{
    Task<IChannel> GetChannelAsync(CancellationToken cancellationToken = default);
}

RabbitMQChannelProvider 会复用已打开 Channel;如果 Channel 不存在或已关闭,则通过连接工厂重新创建。这样业务层不直接处理连接和 Channel 生命周期。

队列声明与拓扑管理

public sealed class RabbitMQQueueDeclareOptions
{
    public string QueueName { get; set; } = string.Empty;
    public bool Durable { get; set; } = true;
    public bool Exclusive { get; set; } = false;
    public bool AutoDelete { get; set; } = false;
    public IDictionary<string, object?>? Arguments { get; set; }
    public string? ExchangeName { get; set; }
    public string RoutingKey { get; set; } = string.Empty;
}

IRabbitMQTopologyManager 负责声明队列。如果设置了 ExchangeName,则继续通过 RoutingKey 绑定交换机:

public interface IRabbitMQTopologyManager
{
    Task DeclareQueueAsync(RabbitMQQueueDeclareOptions options, CancellationToken cancellationToken = default);
}

消息发布

public sealed class RabbitMQPublishOptions
{
    public string? Exchange { get; set; }
    public string RoutingKey { get; set; } = string.Empty;
    public bool Persistent { get; set; } = true;
    public string? MessageId { get; set; }
    public string? CorrelationId { get; set; }
    public IDictionary<string, object?>? Headers { get; set; }
}
public interface IRabbitMQPublisher
{
    Task PublishAsync<T>(T message, RabbitMQPublishOptions? options = null, CancellationToken cancellationToken = default);
    Task PublishRawAsync(ReadOnlyMemory<byte> body, RabbitMQPublishOptions? options = null, CancellationToken cancellationToken = default);
}

PublishAsync<T> 会把对象序列化为 JSON,然后调用 PublishRawAsync。发布时会设置 PersistentMessageIdCorrelationIdContentTypeHeaders。如果发布参数中没有指定 Exchange,则使用 RabbitMQOptions.ExchangeName 作为默认交换机。

业务层使用示例:

await publisher.PublishAsync(new OrderCreatedMessage
{
    OrderNo = "SO20260519001",
    UserId = 1001,
    Amount = 399.00m
}, new RabbitMQPublishOptions
{
    Exchange = "order.exchange",
    RoutingKey = "order.created"
});

消息消费

public interface IRabbitMQConsumerManager
{
    Task<string> SubscribeAsync<T>(
        RabbitMQQueueDeclareOptions queueOptions,
        Func<T, CancellationToken, Task> handler,
        CancellationToken cancellationToken = default);
}

RabbitMQConsumerManager 订阅时会先调用拓扑管理器声明队列,再创建 AsyncEventingBasicConsumer。收到消息后,将消息体按 UTF-8 转成 JSON,再反序列化为指定类型,最后调用业务 handler。业务处理成功后执行 BasicAckAsync 手动确认。

await consumerManager.SubscribeAsync<OrderCreatedMessage>(
    new RabbitMQQueueDeclareOptions
    {
        QueueName = "order.created.queue",
        ExchangeName = "order.exchange",
        RoutingKey = "order.created"
    },
    async (message, ct) =>
    {
        Console.WriteLine($"收到订单消息:{message.OrderNo}");
        await Task.CompletedTask;
    });

统一 DI 注册

Common 层提供 AddHQRabbitMQ 扩展方法,统一注册 RabbitMQ 相关服务:

public static IServiceCollection AddHQRabbitMQ(
    this IServiceCollection services,
    Action<RabbitMQOptions> configure)
{
    services.AddOptions<RabbitMQOptions>()
        .Configure(configure)
        .Validate(options => options.Validate(out _), "RabbitMQ 配置无效")
        .ValidateOnStart();

    services.TryAddSingleton<IRabbitMQConnectionFactory, RabbitMQConnectionFactory>();
    services.TryAddSingleton<IRabbitMQChannelProvider, RabbitMQChannelProvider>();
    services.TryAddSingleton<IRabbitMQPublisher, RabbitMQPublisher>();
    services.TryAddSingleton<IRabbitMQConsumerManager, RabbitMQConsumerManager>();
    services.TryAddSingleton<IRabbitMQTopologyManager, RabbitMQTopologyManager>();

    return services;
}

Program.cs 中接入:

using HQ.Common.Messaging;

builder.Services.AddHQRabbitMQ(options =>
    builder.Configuration.GetSection("RabbitMQ").Bind(options));

appsettings 配置示例

"RabbitMQ": {
  "Enabled": true,
  "HostName": "localhost",
  "Port": 5672,
  "UserName": "guest",
  "Password": "guest",
  "VirtualHost": "/",
  "ClientProvidedName": "HQServer",
  "RequestedHeartbeat": 60,
  "AutomaticRecoveryEnabled": true,
  "TopologyRecoveryEnabled": true,
  "NetworkRecoveryIntervalSeconds": 5,
  "DispatchConsumersAsync": true,
  "ConsumerConcurrency": 1,
  "PersistentMessages": true,
  "ExchangeName": "hq.default.exchange"
}

实际生产环境中,账号密码建议通过环境变量或配置中心管理。文章中的配置只作为示例,不展示真实敏感信息。

构建验证

完成封装和注册后,项目执行构建:

dotnet build HQ.Application/HQ.Application.csproj

构建结果为 0 个错误。当前项目中存在的 UserService 异步 warning 属于原有代码提示,不影响本次 RabbitMQ 封装。

总结

本篇完成了 HQServer 中 RabbitMQ 底层能力的封装。整体设计保持轻量,不引入复杂框架,只围绕企业项目最常用的连接、Channel、队列声明、消息发布、消息消费和 DI 注册做统一封装。

封装之后,业务模块只需要依赖 IRabbitMQPublisherIRabbitMQConsumerManager,就可以完成消息发布和订阅。后续如果要扩展死信队列、延迟消息、重试策略、消费失败处理、消息轨迹等能力,也可以继续在 HQ.Common/Messaging 层统一增强。

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容