介绍
RabbitMQ 是一个开源的消息队列中间件,基于 Erlang 语言开发,实现了 AMQP 协议。它通过异步消息传递解耦生产者和消费者,支持可靠的消息持久化、灵活的路由策略(如 direct、topic、fanout 等交换机类型)以及高可用集群部署,广泛应用于分布式系统的流量削峰、任务异步处理和微服务通信等场景。
本文实现以下模块
| 组件 | 作用 | 本文实现 |
|---|---|---|
| Exchange | 消息路由中枢 | Topic 类型,支持通配符匹配 |
| Queue | 消息存储缓冲区 | 支持持久队列 vs 临时队列 |
| Routing Key | 路由规则(如 order.created) | 支持多 Key 绑定 |
| Ack/Nack | 消费确认机制 | 自动确认 + 手动确认双模式 |
功能及实现
生产者
| 功能 | 说明 |
|---|---|
| 双构造方式 | 支持参数构造(主机、端口、账号密码)或连接字符串构造 |
| 延迟初始化 | 线程安全的懒加载,首次发送时自动建立连接 |
| 显式初始化 | 提供 InitializeAsync() 方法提前建立连接 |
| 交换机声明 | 声明 Topic 类型交换机,支持持久化配置 |
| 消息发布 | 支持任意对象序列化为 JSON 发送,自动添加消息属性 |
| 资源释放 | 实现 IAsyncDisposable,优雅关闭连接 |
消息属性特性:
Persistent = true:消息持久化,防止重启丢失MessageId:唯一标识,用于幂等性控制Timestamp:消息时间戳,用于追踪ContentType = "application/json":标识消息格式
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
/// <summary>
/// RabbitMQ Topic 模式发布者(支持即时消息 + 延迟消息)
/// </summary>
public class TopicPublisher : IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private IConnection? _connection;
private IChannel? _channel;
private readonly SemaphoreSlim _initLock = new(1, 1);
private bool _initialized;
// 延迟交换机缓存(避免重复声明)
private readonly HashSet<string> _declaredDelayedExchanges = new();
/// <summary>
/// 初始化发布者(支持自定义端口)
/// </summary>
public TopicPublisher(
string hostName,
int port = 5672,
string userName = "guest",
string password = "guest",
string virtualHost = "/")
{
_factory = new ConnectionFactory
{
HostName = hostName,
Port = port,
UserName = userName,
Password = password,
VirtualHost = virtualHost,
// 生产环境建议配置
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
}
/// <summary>
/// 使用连接字符串初始化
/// </summary>
public TopicPublisher(string connectionString)
{
_factory = new ConnectionFactory
{
Uri = new Uri(connectionString),
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
}
/// <summary>
/// 显式初始化(立即连接)
/// </summary>
public async Task InitializeAsync()
{
await EnsureInitializedAsync();
}
/// <summary>
/// 延迟初始化(线程安全)
/// </summary>
private async Task EnsureInitializedAsync()
{
if (_initialized) return;
await _initLock.WaitAsync();
try
{
if (_initialized) return;
_connection = await _factory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
_initialized = true;
}
finally
{
_initLock.Release();
}
}
/// <summary>
/// 声明普通 Topic 交换机
/// </summary>
public async Task DeclareExchangeAsync(string exchangeName)
{
await EnsureInitializedAsync();
await _channel!.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false
);
}
/// <summary>
/// 声明延迟交换机(x-delayed-message 类型,需安装插件)
/// </summary>
public async Task DeclareDelayedExchangeAsync(string exchangeName)
{
await EnsureInitializedAsync();
if (_declaredDelayedExchanges.Contains(exchangeName))
return;
await _channel!.ExchangeDeclareAsync(
exchange: exchangeName,
type: "x-delayed-message",
durable: true,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{ "x-delayed-type", "topic" }
}
);
_declaredDelayedExchanges.Add(exchangeName);
}
/// <summary>
/// 发送即时消息(原有接口,保持不变)
/// </summary>
public async Task PublishAsync<T>(T message, string exchangeName, string routingKey) where T : class
{
await EnsureInitializedAsync();
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var props = new BasicProperties
{
Persistent = true,
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
};
await _channel!.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: props,
body: body
);
}
/// <summary>
/// 发送延迟消息(新增:分钟级延迟,不阻塞消费者)
/// </summary>
/// <param name="delayMinutes">延迟分钟数</param>
public async Task PublishDelayedAsync<T>(T message, string exchangeName, string routingKey, int delayMinutes) where T : class
{
await EnsureInitializedAsync();
// 确保延迟交换机已声明
if (!_declaredDelayedExchanges.Contains(exchangeName))
{
await DeclareDelayedExchangeAsync(exchangeName);
}
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var props = new BasicProperties
{
Persistent = true,
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
Headers = new Dictionary<string, object>
{
{ "x-delay", delayMinutes * 60 * 1000 } // 转毫秒
}
};
await _channel!.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: props,
body: body
);
}
/// <summary>
/// 发送延迟消息(TimeSpan 重载)
/// </summary>
public async Task PublishDelayedAsync<T>(T message, string exchangeName, string routingKey, TimeSpan delay) where T : class
{
await PublishDelayedAsync(message, exchangeName, routingKey, (int)delay.TotalMinutes);
}
public async ValueTask DisposeAsync()
{
if (_channel != null) await _channel.DisposeAsync();
if (_connection != null) await _connection.DisposeAsync();
_initLock.Dispose();
}
}
消费者
| 功能 | 说明 |
|---|---|
| 双构造方式 | 支持参数构造或连接字符串构造 |
| 双消费模式 | 自动确认模式(简单)+ 手动确认模式(可靠) |
| 队列类型选择 | 临时队列(自动删除)或持久队列(显式命名) |
| 多路由绑定 | 支持同时绑定多个 Topic 路由键 |
| 流量控制 | QoS 限制每次只推送1条消息,防止过载 |
| 运维工具 | 拉取积压消息、查看队列深度、预览消息(不消费) |
| 停止消费 | 支持随时停止消费,不关闭连接 |
| 资源释放 | 实现 IAsyncDisposable,取消消费后关闭连接 |
消费模式对比:
| 模式 | 适用场景 | 特点 |
|---|---|---|
| 自动确认 | 日志、监控等可丢数据场景 | 业务处理成功后自动 ack,异常时 nack 重新入队 |
| 手动确认 | 订单、支付等关键业务场景 | 业务代码控制 ack/nack,失败可丢弃或重试 |
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
/// <summary>
/// RabbitMQ Topic 模式订阅者,支持手动确认和自动确认
/// </summary>
public class TopicSubscriber : IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly string _exchangeName;
private readonly string[] _routingKeys;
private readonly Func<string, string, Task>? _onMessageAutoAck; // 自动确认回调
private IConnection? _connection;
private IChannel? _channel;
private string? _queueName;
private string? _consumerTag;
private bool _isTemporaryQueue;
/// <summary>
/// 初始化订阅者(自动确认模式)
/// </summary>
public TopicSubscriber(
string hostName,
int port,
string userName,
string password,
string exchangeName,
string[] routingKeys,
Func<string, string, Task> onMessage)
{
_factory = new ConnectionFactory
{
HostName = hostName,
Port = port,
UserName = userName,
Password = password
};
_exchangeName = exchangeName;
_routingKeys = routingKeys;
_onMessageAutoAck = onMessage;
}
/// <summary>
/// 初始化订阅者(连接字符串方式,自动确认模式)
/// </summary>
public TopicSubscriber(
string connectionString,
string exchangeName,
string[] routingKeys,
Func<string, string, Task> onMessage)
{
_factory = new ConnectionFactory { Uri = new Uri(connectionString) };
_exchangeName = exchangeName;
_routingKeys = routingKeys;
_onMessageAutoAck = onMessage;
}
/// <summary>
/// 启动订阅(自动确认模式,兼容旧代码)
/// </summary>
/// <param name="queueName">
/// 队列名称:
/// - null/空:自动生成临时队列(autoDelete=true, exclusive=true)
/// - 指定名称:创建持久队列(durable=true, autoDelete=false)
/// </param>
public async Task StartAsync(string? queueName = null)
{
await StartAsyncInternal(queueName, null);
}
/// <summary>
/// 启动订阅(手动确认模式)
/// </summary>
/// <param name="onMessage">
/// 回调函数参数:
/// - string: 消息内容(JSON)
/// - string: 路由键(主题)
/// - Func<Task>: 确认回调(ack),处理成功时调用
/// - Func<bool, Task>: 拒绝回调(nack),参数requeue表示是否重新入队
/// </param>
/// <param name="queueName">队列名称,null/空则创建临时队列</param>
public async Task StartAsync(
Func<string, string, Func<Task>, Func<bool, Task>, Task> onMessage,
string? queueName = null)
{
await StartAsyncInternal(queueName, onMessage);
}
/// <summary>
/// 内部启动方法
/// </summary>
private async Task StartAsyncInternal(
string? queueName,
Func<string, string, Func<Task>, Func<bool, Task>, Task>? onMessageManual)
{
_connection = await _factory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
// 声明交换机
await _channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false
);
// 判断队列类型
_isTemporaryQueue = string.IsNullOrWhiteSpace(queueName);
QueueDeclareOk queue;
if (_isTemporaryQueue)
{
// 临时队列:自动生成名称,断开删除
queue = await _channel.QueueDeclareAsync(
queue: "",
durable: false,
exclusive: true,
autoDelete: true,
arguments: null
);
_queueName = queue.QueueName;
}
else
{
// 持久队列:指定名称,重启保留
_queueName = queueName.Trim();
queue = await _channel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
}
// 绑定路由键
foreach (var key in _routingKeys)
{
await _channel.QueueBindAsync(_queueName, _exchangeName, key);
}
// QoS:每次只取1条
await _channel.BasicQosAsync(0, 1, false);
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (onMessageManual != null)
{
// ========== 手动确认模式 ==========
async Task AckAsync() => await _channel.BasicAckAsync(ea.DeliveryTag, false);
async Task NackAsync(bool requeue) => await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue);
await onMessageManual(message, ea.RoutingKey, AckAsync, NackAsync);
}
else if (_onMessageAutoAck != null)
{
// ========== 自动确认模式 ==========
try
{
await _onMessageAutoAck(message, ea.RoutingKey);
await _channel.BasicAckAsync(ea.DeliveryTag, false);
}
catch
{
// 异常时重新入队(可选:改为丢弃)
await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
throw;
}
}
};
_consumerTag = await _channel.BasicConsumeAsync(
queue: _queueName,
autoAck: false, // 统一手动确认,由回调控制
consumer: consumer
);
}
/// <summary>
/// 获取队列信息
/// </summary>
public (string QueueName, bool IsTemporary, uint MessageCount) GetQueueInfo()
{
return (_queueName ?? "unknown", _isTemporaryQueue, 0);
}
/// <summary>
/// 拉取积压消息(仅持久队列)
/// </summary>
public async Task<List<(string Body, string RoutingKey)>> PullPendingMessagesAsync(int maxMessages = 100)
{
EnsureChannel();
if (_isTemporaryQueue)
{
throw new InvalidOperationException("临时队列不支持拉取积压");
}
var messages = new List<(string, string)>();
for (int i = 0; i < maxMessages; i++)
{
var result = await _channel!.BasicGetAsync(_queueName, autoAck: false);
if (result == null) break;
var body = Encoding.UTF8.GetString(result.Body.ToArray());
messages.Add((body, result.RoutingKey));
await _channel.BasicAckAsync(result.DeliveryTag, multiple: false);
}
return messages;
}
/// <summary>
/// 查看队列深度
/// </summary>
public async Task<uint> GetQueueDepthAsync()
{
EnsureChannel();
try
{
var info = await _channel!.QueueDeclarePassiveAsync(_queueName!);
return info.MessageCount;
}
catch
{
return 0;
}
}
/// <summary>
/// 查看消息(不消费,重新入队)
/// </summary>
public async Task<List<(string Body, string RoutingKey)>> PeekAllMessagesAsync(int maxMessages = 100)
{
EnsureChannel();
var messages = new List<(string, string)>();
for (int i = 0; i < maxMessages; i++)
{
var result = await _channel!.BasicGetAsync(_queueName, autoAck: false);
if (result == null) break;
var body = Encoding.UTF8.GetString(result.Body.ToArray());
messages.Add((body, result.RoutingKey));
// 拒绝并重新入队
await _channel.BasicNackAsync(result.DeliveryTag, false, requeue: true);
}
return messages;
}
/// <summary>
/// 停止消费
/// </summary>
public async Task StopConsumingAsync()
{
if (_consumerTag != null && _channel != null)
{
await _channel.BasicCancelAsync(_consumerTag);
_consumerTag = null;
}
}
private void EnsureChannel()
{
if (_channel == null || _connection?.IsOpen != true)
throw new InvalidOperationException("通道未初始化");
}
public async ValueTask DisposeAsync()
{
if (_consumerTag != null && _channel != null)
await _channel.BasicCancelAsync(_consumerTag);
if (_channel != null) await _channel.DisposeAsync();
if (_connection != null) await _connection.DisposeAsync();
}
}
使用方式
发送消息
// 方式1:参数构造
await using var publisher = new TopicPublisher(
hostName: "localhost",
port: 5672,
userName: "admin",
password: "admin"
);
// 方式2:连接字符串构造
// await using var publisher = new TopicPublisher("amqp://admin:admin@localhost:5672/%2f");
// 定义订单对象
var order = new { OrderId = 123, Amount = 99.9, Status = "Created" };
// 发送到交换机,路由键为 "order.created"
await publisher.PublishAsync(order, "business.exchange", "order.created");
接收消息
// 自动确认模式:适合日志、监控等场景
var subscriber = new TopicSubscriber(
hostName: "localhost",
port: 5672,
userName: "admin",
password: "admin",
exchangeName: "business.exchange",
routingKeys: new[] { "order.*", "payment.#" }, // 监听 order. 开头和 payment. 开头的消息
onMessage: async (message, routingKey) =>
{
Console.WriteLine($"[{routingKey}] 收到: {message}");
// 业务处理...
await Task.CompletedTask;
}
);
// 启动消费,使用持久队列 "order-service"
await subscriber.StartAsync("order-service");
// 程序退出时释放
await subscriber.DisposeAsync();
延迟消息
部署前提
# 安装 RabbitMQ 延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启服务
systemctl restart rabbitmq-server
# 验证
rabbitmq-plugins list | grep delayed
await using var publisher = new TopicPublisher("localhost", 5672, "admin", "admin");
var msg = new TaskEndMessage { TaskId = 456 };
// 方式1:指定分钟数(1分钟后投递)
await publisher.PublishDelayedAsync(msg, "business.exchange.delayed", "task.end", 1);
// 方式2:使用 TimeSpan(更灵活)
await publisher.PublishDelayedAsync(msg, "business.exchange.delayed", "task.end", TimeSpan.FromMinutes(2));
手动确认模式
await using var subscriber = new TopicSubscriber(
"localhost", 5672, "admin", "admin",
"business.exchange",
new[] { "order.created", "order.paid" }
);
await subscriber.StartAsync(
onMessage: async (message, routingKey, ack, nack) =>
{
try
{
Console.WriteLine($"处理 [{routingKey}]: {message}");
// 反序列化
var order = JsonSerializer.Deserialize<Order>(message);
// 业务处理
await ProcessOrder(order);
// 成功确认(消息从队列删除)
await ack();
}
catch (BusinessException ex)
{
// 业务异常:丢弃消息,不重新入队(可进死信队列)
Console.WriteLine($"业务失败,丢弃: {ex.Message}");
await nack(requeue: false);
}
catch (Exception ex)
{
// 系统异常:重新入队重试
Console.WriteLine($"系统异常,重试: {ex.Message}");
await nack(requeue: true);
}
},
queueName: "order-worker-queue" // 持久队列
);
临时队列(广播/通知场景)
// 不指定队列名,自动创建临时队列
await subscriber.StartAsync("order-service-queue"); // 持久队列
await subscriber.StartAsync(null); // 临时队列(随机名称,断开删除)
运维操作
// 查看队列堆积情况
var depth = await subscriber.GetQueueDepthAsync();
Console.WriteLine($"队列堆积: {depth} 条");
// 拉取积压消息(补偿消费)
var messages = await subscriber.PullPendingMessagesAsync(maxMessages: 50);
foreach (var (body, routingKey) in messages)
{
Console.WriteLine($"拉取到 [{routingKey}]: {body}");
}
// 预览消息(不消费,仅查看)
var preview = await subscriber.PeekAllMessagesAsync(10);
Console.WriteLine($"预览到 {preview.Count} 条消息");
// 暂停消费(不断开连接)
await subscriber.StopConsumingAsync();
// 恢复消费(重新调用 StartAsync)
完整示例
public class Program
{
public static async Task Main()
{
// ========== 生产者 ==========
await using (var publisher = new TopicPublisher("localhost", 5672, "admin", "admin"))
{
for (int i = 1; i <= 10; i++)
{
var order = new { Id = i, Name = $"订单-{i}", Time = DateTime.Now };
// 偶数发 created,奇数发 paid
var routingKey = i % 2 == 0 ? "order.created" : "order.paid";
await publisher.PublishAsync(order, "shop.exchange", routingKey);
Console.WriteLine($"发送 [{routingKey}]: {order.Name}");
}
}
// ========== 消费者 ==========
await using var subscriber = new TopicSubscriber(
"localhost", 5672, "admin", "admin",
"shop.exchange",
new[] { "order.*" }
);
await subscriber.StartAsync(
async (msg, key, ack, nack) =>
{
Console.WriteLine($"[{key}] 处理: {msg}");
await Task.Delay(100); // 模拟处理
await ack(); // 确认
},
"order-consumer-1"
);
Console.WriteLine("按任意键停止...");
Console.ReadKey();
}
}
Topic 路由规则速查
| 路由键 | 匹配 order.* | 匹配 order.# | 匹配 *.created | 匹配 #.created |
|---|---|---|---|---|
order | ❌ | ✅ | ❌ | ❌ |
order.created | ✅ | ✅ | ✅ | ✅ |
order.paid | ✅ | ✅ | ❌ | ❌ |
order.created.success | ❌ | ✅ | ❌ | ✅ |
user.created | ❌ | ❌ | ✅ | ✅ |
建议
- 连接配置:启用自动恢复
AutomaticRecoveryEnabled = true - 消息持久化:队列和消息都设置
durable = true,Persistent = true - 幂等性:利用
MessageId去重,防止重复消费 - 死信队列:配置
x-dead-letter-exchange处理失败消息 - 监控告警:关注队列深度、消费者连接数、内存使用率
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END










暂无评论内容