前言
上一篇我们已经把 RabbitMQ 的发布、消费、手动 Ack/Nack 和消费失败处理补进了 HQServer。正常业务里,仅仅把消息消费失败后 BasicNackAsync 掉还不够,因为失败消息如果直接丢弃,问题排查会很困难;如果无限重新入队,又可能造成消息反复失败、消费者被拖垮。
所以这一篇继续优化 RabbitMQ:给 HQServer 增加死信队列(Dead Letter Queue,简称 DLQ)支持。它的目标很明确:当消息消费失败、不重新入队时,失败消息可以进入一个专门的死信队列,方便后续排查、补偿或人工处理。
一、什么是死信队列
死信队列不是 RabbitMQ 里一种特殊队列,本质上还是普通队列。区别在于:业务队列配置了死信交换机后,满足条件的消息会被 RabbitMQ 转发到死信交换机,再路由到死信队列。
常见进入死信队列的情况有:
- 消费者拒绝消息,并且不重新入队。
- 消息过期。
- 队列长度达到上限,旧消息被挤出。
在 HQServer 当前封装里,最常见的是第一种:业务消费异常后执行 BasicNackAsync,并且 RequeueOnConsumerError = false,消息就可以进入死信队列。
二、本次封装目标
这次死信队列封装遵循一个原则:默认不影响现有队列,想用时一行配置打开。
主要实现了这些能力:
- 全局开关:可以统一启用死信队列。
- 单队列开关:某个队列可以单独启用或关闭死信队列。
- 自动声明死信交换机。
- 自动声明死信队列。
- 自动绑定死信队列。
- 自动给业务队列增加
x-dead-letter-exchange和x-dead-letter-routing-key参数。
三、RabbitMQOptions 增加死信配置
先在 RabbitMQ 全局配置类中增加死信队列相关选项:
public sealed class RabbitMQOptions
{
public bool EnableDeadLetter { get; set; } = false;
public string DeadLetterExchangeName { get; set; } = "hq.deadletter.exchange";
public string DeadLetterExchangeType { get; set; } = "direct";
public string DeadLetterQueueSuffix { get; set; } = ".deadletter";
public bool DeadLetterQueueDurable { get; set; } = true;
}
这里默认 EnableDeadLetter = false,也就是说升级后不会改变原来队列行为。需要启用时再打开。
四、队列声明参数增加单队列覆盖
有些项目不是所有队列都需要死信队列,所以队列声明参数里也加了覆盖配置:
public sealed class RabbitMQQueueDeclareOptions
{
public string QueueName { get; set; } = string.Empty;
public bool Durable { get; set; } = true;
public bool Exclusive { get; set; } = false;
public bool AutoDelete { get; set; } = false;
public IDictionary<string, object?>? Arguments { get; set; }
public string? ExchangeName { get; set; }
public string RoutingKey { get; set; } = string.Empty;
public bool? EnableDeadLetter { get; set; }
public string? DeadLetterExchangeName { get; set; }
public string? DeadLetterQueueName { get; set; }
public string? DeadLetterRoutingKey { get; set; }
}
EnableDeadLetter 使用 nullable bool,是为了区分三种状态:
null:跟随全局配置。true:当前队列强制启用死信。false:当前队列强制关闭死信。
五、拓扑管理器自动声明 DLQ
核心逻辑放在 RabbitMQTopologyManager 中。声明业务队列时,如果启用了死信队列,就自动给业务队列补充两个参数:
arguments.TryAdd("x-dead-letter-exchange", deadLetterExchangeName);
arguments.TryAdd("x-dead-letter-routing-key", deadLetterRoutingKey);
同时自动声明死信交换机、死信队列,并完成绑定:
await channel.ExchangeDeclareAsync(
deadLetterExchangeName,
_options.DeadLetterExchangeType,
durable: true,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await channel.QueueDeclareAsync(
deadLetterQueueName,
_options.DeadLetterQueueDurable,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
deadLetterQueueName,
deadLetterExchangeName,
deadLetterRoutingKey,
arguments: null,
cancellationToken: cancellationToken);
这样业务方不需要手动写一堆 RabbitMQ 拓扑声明代码,只要声明队列时打开死信即可。
六、用法一:全局启用死信队列
如果希望所有默认队列都启用死信队列,可以在配置中打开:
{
"RabbitMQ": {
"Enabled": true,
"EnableDeadLetter": true,
"DeadLetterExchangeName": "hq.deadletter.exchange",
"DeadLetterExchangeType": "direct",
"DeadLetterQueueSuffix": ".deadletter",
"RequeueOnConsumerError": false
}
}
注意这里建议保持:
"RequeueOnConsumerError": false
这样消费异常时,消息不会无限重新入队,而是进入死信队列。
七、用法二:单个队列启用死信队列
如果不想全局启用,也可以只给某个队列开启:
await consumerManager.SubscribeAsync<OrderCreatedMessage>(
new RabbitMQQueueDeclareOptions
{
QueueName = "order.created.queue",
ExchangeName = "order.exchange",
RoutingKey = "order.created",
EnableDeadLetter = true
},
async (message, cancellationToken) =>
{
// 处理订单创建消息
await orderService.HandleCreatedAsync(message, cancellationToken);
});
如果队列名是 order.created.queue,默认死信队列名会是:
order.created.queue.deadletter
默认死信路由键会是:
order.created.queue.deadletter
八、用法三:自定义死信队列名称和路由键
如果希望更明确地控制死信队列命名,可以这样写:
await consumerManager.SubscribeAsync<OrderCreatedMessage>(
new RabbitMQQueueDeclareOptions
{
QueueName = "order.created.queue",
ExchangeName = "order.exchange",
RoutingKey = "order.created",
EnableDeadLetter = true,
DeadLetterExchangeName = "order.deadletter.exchange",
DeadLetterQueueName = "order.created.deadletter.queue",
DeadLetterRoutingKey = "order.created.deadletter"
},
async (message, cancellationToken) =>
{
await orderService.HandleCreatedAsync(message, cancellationToken);
});
这种方式适合业务线比较多、需要按模块管理死信队列的项目。
九、消息什么时候进入死信队列
HQServer 消费端失败处理逻辑如下:
try
{
await handler(message, cancellationToken);
await channel.BasicAckAsync(args.DeliveryTag, false, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "RabbitMQ 消息消费失败");
await channel.BasicNackAsync(
args.DeliveryTag,
false,
_options.RequeueOnConsumerError,
cancellationToken);
}
当满足下面条件时,失败消息会进入死信队列:
- 当前业务队列启用了死信队列。
- 消费处理抛出异常。
RequeueOnConsumerError = false。- 消费者执行
BasicNackAsync。
如果 RequeueOnConsumerError = true,消息会重新入队,不会立刻进入死信队列。这个配置要谨慎使用,避免坏消息无限循环。
十、如何处理死信消息
死信队列不是终点,它只是把失败消息保存下来。后续可以按业务需要做几种处理:
- 后台管理页面查看死信消息。
- 定时任务扫描死信队列并告警。
- 人工确认后重新投递。
- 记录到数据库,生成异常工单。
- 针对可恢复异常做延迟重试。
当前封装先完成最基础也最重要的一步:失败消息不丢,先进入可观察、可补偿的队列里。
十一、构建验证
本次修改完成后执行构建:
dotnet build HQ.Application/HQ.Application.csproj --nologo
构建结果:
已成功生成。
0 个警告
0 个错误
十二、总结
这次我们给 HQServer 的 RabbitMQ 封装补上了死信队列能力。它解决的不是“怎么消费消息”,而是“消息消费失败后怎么办”。
到这一步,RabbitMQ 基础能力已经更接近生产可用:
- 消息发布统一封装。
- 消费者手动 Ack/Nack。
- 消费异常可控是否重新入队。
- 失败消息可进入死信队列。
- 队列、交换机、绑定关系由框架统一声明。
后续还可以继续扩展延迟重试队列、死信消息管理后台、失败消息告警等能力,让消息队列在企业项目里真正可观测、可恢复。










暂无评论内容