【C#教程】RabbitMQ通知订阅模式实现

介绍

RabbitMQ 是一个开源的消息队列中间件,基于 Erlang 语言开发,实现了 AMQP 协议。它通过异步消息传递解耦生产者和消费者,支持可靠的消息持久化、灵活的路由策略(如 direct、topic、fanout 等交换机类型)以及高可用集群部署,广泛应用于分布式系统的流量削峰、任务异步处理和微服务通信等场景。

本文实现以下模块

组件作用本文实现
Exchange消息路由中枢Topic 类型,支持通配符匹配
Queue消息存储缓冲区支持持久队列 vs 临时队列
Routing Key路由规则(如 order.created支持多 Key 绑定
Ack/Nack消费确认机制自动确认 + 手动确认双模式

功能及实现

生产者

功能说明
双构造方式支持参数构造(主机、端口、账号密码)或连接字符串构造
延迟初始化线程安全的懒加载,首次发送时自动建立连接
显式初始化提供 InitializeAsync() 方法提前建立连接
交换机声明声明 Topic 类型交换机,支持持久化配置
消息发布支持任意对象序列化为 JSON 发送,自动添加消息属性
资源释放实现 IAsyncDisposable,优雅关闭连接

消息属性特性:

  • Persistent = true:消息持久化,防止重启丢失
  • MessageId:唯一标识,用于幂等性控制
  • Timestamp:消息时间戳,用于追踪
  • ContentType = "application/json":标识消息格式
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;

/// <summary>
/// RabbitMQ Topic 模式发布者(支持即时消息 + 延迟消息)
/// </summary>
public class TopicPublisher : IAsyncDisposable
{
    private readonly ConnectionFactory _factory;
    private IConnection? _connection;
    private IChannel? _channel;
    private readonly SemaphoreSlim _initLock = new(1, 1);
    private bool _initialized;

    // 延迟交换机缓存(避免重复声明)
    private readonly HashSet<string> _declaredDelayedExchanges = new();

    /// <summary>
    /// 初始化发布者(支持自定义端口)
    /// </summary>
    public TopicPublisher(
        string hostName, 
        int port = 5672, 
        string userName = "guest", 
        string password = "guest",
        string virtualHost = "/")
    {
        _factory = new ConnectionFactory
        {
            HostName = hostName,
            Port = port,
            UserName = userName,
            Password = password,
            VirtualHost = virtualHost,
            // 生产环境建议配置
            AutomaticRecoveryEnabled = true,
            TopologyRecoveryEnabled = true,
            RequestedHeartbeat = TimeSpan.FromSeconds(30)
        };
    }

    /// <summary>
    /// 使用连接字符串初始化
    /// </summary>
    public TopicPublisher(string connectionString)
    {
        _factory = new ConnectionFactory
        {
            Uri = new Uri(connectionString),
            AutomaticRecoveryEnabled = true,
            TopologyRecoveryEnabled = true,
            RequestedHeartbeat = TimeSpan.FromSeconds(30)
        };
    }

    /// <summary>
    /// 显式初始化(立即连接)
    /// </summary>
    public async Task InitializeAsync()
    {
        await EnsureInitializedAsync();
    }

    /// <summary>
    /// 延迟初始化(线程安全)
    /// </summary>
    private async Task EnsureInitializedAsync()
    {
        if (_initialized) return;

        await _initLock.WaitAsync();
        try
        {
            if (_initialized) return;
            
            _connection = await _factory.CreateConnectionAsync();
            _channel = await _connection.CreateChannelAsync();
            _initialized = true;
        }
        finally
        {
            _initLock.Release();
        }
    }

    /// <summary>
    /// 声明普通 Topic 交换机
    /// </summary>
    public async Task DeclareExchangeAsync(string exchangeName)
    {
        await EnsureInitializedAsync();
        
        await _channel!.ExchangeDeclareAsync(
            exchange: exchangeName,
            type: ExchangeType.Topic,
            durable: true,
            autoDelete: false
        );
    }

    /// <summary>
    /// 声明延迟交换机(x-delayed-message 类型,需安装插件)
    /// </summary>
    public async Task DeclareDelayedExchangeAsync(string exchangeName)
    {
        await EnsureInitializedAsync();

        if (_declaredDelayedExchanges.Contains(exchangeName)) 
            return;

        await _channel!.ExchangeDeclareAsync(
            exchange: exchangeName,
            type: "x-delayed-message",
            durable: true,
            autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                { "x-delayed-type", "topic" }
            }
        );

        _declaredDelayedExchanges.Add(exchangeName);
    }

    /// <summary>
    /// 发送即时消息(原有接口,保持不变)
    /// </summary>
    public async Task PublishAsync<T>(T message, string exchangeName, string routingKey) where T : class
    {
        await EnsureInitializedAsync();

        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);

        var props = new BasicProperties
        {
            Persistent = true,
            ContentType = "application/json",
            MessageId = Guid.NewGuid().ToString(),
            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
        };

        await _channel!.BasicPublishAsync(
            exchange: exchangeName,
            routingKey: routingKey,
            mandatory: false,
            basicProperties: props,
            body: body
        );
    }

    /// <summary>
    /// 发送延迟消息(新增:分钟级延迟,不阻塞消费者)
    /// </summary>
    /// <param name="delayMinutes">延迟分钟数</param>
    public async Task PublishDelayedAsync<T>(T message, string exchangeName, string routingKey, int delayMinutes) where T : class
    {
        await EnsureInitializedAsync();

        // 确保延迟交换机已声明
        if (!_declaredDelayedExchanges.Contains(exchangeName))
        {
            await DeclareDelayedExchangeAsync(exchangeName);
        }

        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);

        var props = new BasicProperties
        {
            Persistent = true,
            ContentType = "application/json",
            MessageId = Guid.NewGuid().ToString(),
            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
            Headers = new Dictionary<string, object>
            {
                { "x-delay", delayMinutes * 60 * 1000 }  // 转毫秒
            }
        };

        await _channel!.BasicPublishAsync(
            exchange: exchangeName,
            routingKey: routingKey,
            mandatory: false,
            basicProperties: props,
            body: body
        );
    }

    /// <summary>
    /// 发送延迟消息(TimeSpan 重载)
    /// </summary>
    public async Task PublishDelayedAsync<T>(T message, string exchangeName, string routingKey, TimeSpan delay) where T : class
    {
        await PublishDelayedAsync(message, exchangeName, routingKey, (int)delay.TotalMinutes);
    }

    public async ValueTask DisposeAsync()
    {
        if (_channel != null) await _channel.DisposeAsync();
        if (_connection != null) await _connection.DisposeAsync();
        _initLock.Dispose();
    }
}

消费者

功能说明
双构造方式支持参数构造或连接字符串构造
双消费模式自动确认模式(简单)+ 手动确认模式(可靠)
队列类型选择临时队列(自动删除)或持久队列(显式命名)
多路由绑定支持同时绑定多个 Topic 路由键
流量控制QoS 限制每次只推送1条消息,防止过载
运维工具拉取积压消息、查看队列深度、预览消息(不消费)
停止消费支持随时停止消费,不关闭连接
资源释放实现 IAsyncDisposable,取消消费后关闭连接

消费模式对比:

模式适用场景特点
自动确认日志、监控等可丢数据场景业务处理成功后自动 ack,异常时 nack 重新入队
手动确认订单、支付等关键业务场景业务代码控制 ack/nack,失败可丢弃或重试
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

/// <summary>
/// RabbitMQ Topic 模式订阅者,支持手动确认和自动确认
/// </summary>
public class TopicSubscriber : IAsyncDisposable
{
    private readonly ConnectionFactory _factory;
    private readonly string _exchangeName;
    private readonly string[] _routingKeys;
    private readonly Func<string, string, Task>? _onMessageAutoAck;  // 自动确认回调
    private IConnection? _connection;
    private IChannel? _channel;
    private string? _queueName;
    private string? _consumerTag;
    private bool _isTemporaryQueue;

    /// <summary>
    /// 初始化订阅者(自动确认模式)
    /// </summary>
    public TopicSubscriber(
        string hostName,
        int port,
        string userName,
        string password,
        string exchangeName,
        string[] routingKeys,
        Func<string, string, Task> onMessage)
    {
        _factory = new ConnectionFactory
        {
            HostName = hostName,
            Port = port,
            UserName = userName,
            Password = password
        };
        _exchangeName = exchangeName;
        _routingKeys = routingKeys;
        _onMessageAutoAck = onMessage;
    }

    /// <summary>
    /// 初始化订阅者(连接字符串方式,自动确认模式)
    /// </summary>
    public TopicSubscriber(
        string connectionString,
        string exchangeName,
        string[] routingKeys,
        Func<string, string, Task> onMessage)
    {
        _factory = new ConnectionFactory { Uri = new Uri(connectionString) };
        _exchangeName = exchangeName;
        _routingKeys = routingKeys;
        _onMessageAutoAck = onMessage;
    }

    /// <summary>
    /// 启动订阅(自动确认模式,兼容旧代码)
    /// </summary>
    /// <param name="queueName">
    /// 队列名称:
    /// - null/空:自动生成临时队列(autoDelete=true, exclusive=true)
    /// - 指定名称:创建持久队列(durable=true, autoDelete=false)
    /// </param>
    public async Task StartAsync(string? queueName = null)
    {
        await StartAsyncInternal(queueName, null);
    }

    /// <summary>
    /// 启动订阅(手动确认模式)
    /// </summary>
    /// <param name="onMessage">
    /// 回调函数参数:
    /// - string: 消息内容(JSON)
    /// - string: 路由键(主题)
    /// - Func<Task>: 确认回调(ack),处理成功时调用
    /// - Func<bool, Task>: 拒绝回调(nack),参数requeue表示是否重新入队
    /// </param>
    /// <param name="queueName">队列名称,null/空则创建临时队列</param>
    public async Task StartAsync(
        Func<string, string, Func<Task>, Func<bool, Task>, Task> onMessage,
        string? queueName = null)
    {
        await StartAsyncInternal(queueName, onMessage);
    }

    /// <summary>
    /// 内部启动方法
    /// </summary>
    private async Task StartAsyncInternal(
        string? queueName,
        Func<string, string, Func<Task>, Func<bool, Task>, Task>? onMessageManual)
    {
        _connection = await _factory.CreateConnectionAsync();
        _channel = await _connection.CreateChannelAsync();

        // 声明交换机
        await _channel.ExchangeDeclareAsync(
            exchange: _exchangeName,
            type: ExchangeType.Topic,
            durable: true,
            autoDelete: false
        );

        // 判断队列类型
        _isTemporaryQueue = string.IsNullOrWhiteSpace(queueName);
        QueueDeclareOk queue;

        if (_isTemporaryQueue)
        {
            // 临时队列:自动生成名称,断开删除
            queue = await _channel.QueueDeclareAsync(
                queue: "",
                durable: false,
                exclusive: true,
                autoDelete: true,
                arguments: null
            );
            _queueName = queue.QueueName;
        }
        else
        {
            // 持久队列:指定名称,重启保留
            _queueName = queueName.Trim();
            queue = await _channel.QueueDeclareAsync(
                queue: _queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );
        }

        // 绑定路由键
        foreach (var key in _routingKeys)
        {
            await _channel.QueueBindAsync(_queueName, _exchangeName, key);
        }

        // QoS:每次只取1条
        await _channel.BasicQosAsync(0, 1, false);

        // 创建消费者
        var consumer = new AsyncEventingBasicConsumer(_channel);
        
        consumer.ReceivedAsync += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            if (onMessageManual != null)
            {
                // ========== 手动确认模式 ==========
                async Task AckAsync() => await _channel.BasicAckAsync(ea.DeliveryTag, false);
                async Task NackAsync(bool requeue) => await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue);
                
                await onMessageManual(message, ea.RoutingKey, AckAsync, NackAsync);
            }
            else if (_onMessageAutoAck != null)
            {
                // ========== 自动确认模式 ==========
                try
                {
                    await _onMessageAutoAck(message, ea.RoutingKey);
                    await _channel.BasicAckAsync(ea.DeliveryTag, false);
                }
                catch
                {
                    // 异常时重新入队(可选:改为丢弃)
                    await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
                    throw;
                }
            }
        };

        _consumerTag = await _channel.BasicConsumeAsync(
            queue: _queueName,
            autoAck: false,  // 统一手动确认,由回调控制
            consumer: consumer
        );
    }

    /// <summary>
    /// 获取队列信息
    /// </summary>
    public (string QueueName, bool IsTemporary, uint MessageCount) GetQueueInfo()
    {
        return (_queueName ?? "unknown", _isTemporaryQueue, 0);
    }

    /// <summary>
    /// 拉取积压消息(仅持久队列)
    /// </summary>
    public async Task<List<(string Body, string RoutingKey)>> PullPendingMessagesAsync(int maxMessages = 100)
    {
        EnsureChannel();
        
        if (_isTemporaryQueue)
        {
            throw new InvalidOperationException("临时队列不支持拉取积压");
        }

        var messages = new List<(string, string)>();
        
        for (int i = 0; i < maxMessages; i++)
        {
            var result = await _channel!.BasicGetAsync(_queueName, autoAck: false);
            if (result == null) break;
            
            var body = Encoding.UTF8.GetString(result.Body.ToArray());
            messages.Add((body, result.RoutingKey));
            await _channel.BasicAckAsync(result.DeliveryTag, multiple: false);
        }

        return messages;
    }

    /// <summary>
    /// 查看队列深度
    /// </summary>
    public async Task<uint> GetQueueDepthAsync()
    {
        EnsureChannel();

        try
        {
            var info = await _channel!.QueueDeclarePassiveAsync(_queueName!);
            return info.MessageCount;
        }
        catch
        {
            return 0;
        }
    }

    /// <summary>
    /// 查看消息(不消费,重新入队)
    /// </summary>
    public async Task<List<(string Body, string RoutingKey)>> PeekAllMessagesAsync(int maxMessages = 100)
    {
        EnsureChannel();
        
        var messages = new List<(string, string)>();
        
        for (int i = 0; i < maxMessages; i++)
        {
            var result = await _channel!.BasicGetAsync(_queueName, autoAck: false);
            if (result == null) break;
            
            var body = Encoding.UTF8.GetString(result.Body.ToArray());
            messages.Add((body, result.RoutingKey));
            
            // 拒绝并重新入队
            await _channel.BasicNackAsync(result.DeliveryTag, false, requeue: true);
        }

        return messages;
    }

    /// <summary>
    /// 停止消费
    /// </summary>
    public async Task StopConsumingAsync()
    {
        if (_consumerTag != null && _channel != null)
        {
            await _channel.BasicCancelAsync(_consumerTag);
            _consumerTag = null;
        }
    }

    private void EnsureChannel()
    {
        if (_channel == null || _connection?.IsOpen != true)
            throw new InvalidOperationException("通道未初始化");
    }

    public async ValueTask DisposeAsync()
    {
        if (_consumerTag != null && _channel != null)
            await _channel.BasicCancelAsync(_consumerTag);
        if (_channel != null) await _channel.DisposeAsync();
        if (_connection != null) await _connection.DisposeAsync();
    }
}

使用方式

发送消息

// 方式1:参数构造
await using var publisher = new TopicPublisher(
    hostName: "localhost",
    port: 5672,
    userName: "admin",
    password: "admin"
);

// 方式2:连接字符串构造
// await using var publisher = new TopicPublisher("amqp://admin:admin@localhost:5672/%2f");

// 定义订单对象
var order = new { OrderId = 123, Amount = 99.9, Status = "Created" };

// 发送到交换机,路由键为 "order.created"
await publisher.PublishAsync(order, "business.exchange", "order.created");

接收消息

// 自动确认模式:适合日志、监控等场景
var subscriber = new TopicSubscriber(
    hostName: "localhost",
    port: 5672,
    userName: "admin",
    password: "admin",
    exchangeName: "business.exchange",
    routingKeys: new[] { "order.*", "payment.#" },  // 监听 order. 开头和 payment. 开头的消息
    onMessage: async (message, routingKey) =>
    {
        Console.WriteLine($"[{routingKey}] 收到: {message}");
        // 业务处理...
        await Task.CompletedTask;
    }
);

// 启动消费,使用持久队列 "order-service"
await subscriber.StartAsync("order-service");

// 程序退出时释放
await subscriber.DisposeAsync();

延迟消息

部署前提

# 安装 RabbitMQ 延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启服务
systemctl restart rabbitmq-server

# 验证
rabbitmq-plugins list | grep delayed
await using var publisher = new TopicPublisher("localhost", 5672, "admin", "admin");

var msg = new TaskEndMessage { TaskId = 456 };

// 方式1:指定分钟数(1分钟后投递)
await publisher.PublishDelayedAsync(msg, "business.exchange.delayed", "task.end", 1);

// 方式2:使用 TimeSpan(更灵活)
await publisher.PublishDelayedAsync(msg, "business.exchange.delayed", "task.end", TimeSpan.FromMinutes(2));

手动确认模式

await using var subscriber = new TopicSubscriber(
    "localhost", 5672, "admin", "admin",
    "business.exchange",
    new[] { "order.created", "order.paid" }
);

await subscriber.StartAsync(
    onMessage: async (message, routingKey, ack, nack) =>
    {
        try
        {
            Console.WriteLine($"处理 [{routingKey}]: {message}");
            
            // 反序列化
            var order = JsonSerializer.Deserialize<Order>(message);
            
            // 业务处理
            await ProcessOrder(order);
            
            // 成功确认(消息从队列删除)
            await ack();
        }
        catch (BusinessException ex)
        {
            // 业务异常:丢弃消息,不重新入队(可进死信队列)
            Console.WriteLine($"业务失败,丢弃: {ex.Message}");
            await nack(requeue: false);
        }
        catch (Exception ex)
        {
            // 系统异常:重新入队重试
            Console.WriteLine($"系统异常,重试: {ex.Message}");
            await nack(requeue: true);
        }
    },
    queueName: "order-worker-queue"  // 持久队列
);

临时队列(广播/通知场景)

// 不指定队列名,自动创建临时队列
await subscriber.StartAsync("order-service-queue");  // 持久队列
await subscriber.StartAsync(null);                    // 临时队列(随机名称,断开删除)

运维操作

// 查看队列堆积情况
var depth = await subscriber.GetQueueDepthAsync();
Console.WriteLine($"队列堆积: {depth} 条");

// 拉取积压消息(补偿消费)
var messages = await subscriber.PullPendingMessagesAsync(maxMessages: 50);
foreach (var (body, routingKey) in messages)
{
    Console.WriteLine($"拉取到 [{routingKey}]: {body}");
}

// 预览消息(不消费,仅查看)
var preview = await subscriber.PeekAllMessagesAsync(10);
Console.WriteLine($"预览到 {preview.Count} 条消息");

// 暂停消费(不断开连接)
await subscriber.StopConsumingAsync();
// 恢复消费(重新调用 StartAsync)

完整示例

public class Program
{
    public static async Task Main()
    {
        // ========== 生产者 ==========
        await using (var publisher = new TopicPublisher("localhost", 5672, "admin", "admin"))
        {
            for (int i = 1; i <= 10; i++)
            {
                var order = new { Id = i, Name = $"订单-{i}", Time = DateTime.Now };
                
                // 偶数发 created,奇数发 paid
                var routingKey = i % 2 == 0 ? "order.created" : "order.paid";
                
                await publisher.PublishAsync(order, "shop.exchange", routingKey);
                Console.WriteLine($"发送 [{routingKey}]: {order.Name}");
            }
        }

        // ========== 消费者 ==========
        await using var subscriber = new TopicSubscriber(
            "localhost", 5672, "admin", "admin",
            "shop.exchange",
            new[] { "order.*" }
        );

        await subscriber.StartAsync(
            async (msg, key, ack, nack) =>
            {
                Console.WriteLine($"[{key}] 处理: {msg}");
                await Task.Delay(100); // 模拟处理
                await ack(); // 确认
            },
            "order-consumer-1"
        );

        Console.WriteLine("按任意键停止...");
        Console.ReadKey();
    }
}

Topic 路由规则速查

路由键匹配 order.*匹配 order.#匹配 *.created匹配 #.created
order
order.created
order.paid
order.created.success
user.created

建议

  1. 连接配置:启用自动恢复 AutomaticRecoveryEnabled = true
  2. 消息持久化:队列和消息都设置 durable = true, Persistent = true
  3. 幂等性:利用 MessageId 去重,防止重复消费
  4. 死信队列:配置 x-dead-letter-exchange 处理失败消息
  5. 监控告警:关注队列深度、消费者连接数、内存使用率

© 版权声明
THE END
喜欢就支持一下吧
点赞9 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容