Appearance
如何消息不丢失
一般来说,在以下几种情况下都有可能发生消息丢失的情况
- 1、消息未到达交换机
- 2、消息未到达队列
- 3、队列中消息丢失
- 4、消费者未接收到消息

RabbitMQ提供了多种机制来确保消息在传递过程中不丢失,主要通过以下几个方面实现
发布确认
发布确认(Publisher Confirms)
发布确认是RabbitMQ提供的一种轻量级、高效的保证消息不丢失的机制,相比事务机制有更好的性能。生产者可以在发送消息后异步接收一个来自RabbitMQ的确认,表明消息已被RabbitMQ正确接收。如果RabbitMQ没有确认消息,生产者可以选择重试发送消息。
- 开启发布确认模式,首先需要将信道(Channel)设置为发布确认模式(
confirmSelect)。 - 发送消息后,通过监听信道上的确认回调来检查消息是否被成功接收。
如果开启发布确认模式后,当生产者发送消息到交换机或者队列失败会返回不同的信息给生产者
发送失败之后的处理方式:
- 1、回调方法即时重发
- 2、记录日志
- 3、保存到数据库进行定时重发,重发后修改标识或者删除记录
消息持久化
为了防止消息中间件服务宕机造成的消息丢失,可以使用消息持久化避免。
消息持久化是防止消息丢失的重要手段之一。在RabbitMQ中,可以将队列(Queue)和消息(Message)设置为持久化,以确保即使在RabbitMQ服务器重启的情况下,消息也不会丢失。
- 持久化队列:在创建队列时,将其声明为持久化(durable)。这样,队列的元数据和状态会被保存到磁盘上。
- 持久化消息:发送消息时,将消息的
delivery_mode属性设置为2(表示持久化消息)。这样,消息会被写入到磁盘上,直到被完全消费。
消息持久化:如果需要开启持久化,可以将交换机、队列、消息都进行持久化操作。

消费者确认
消费者确认(Consumer Acknowledgements)
为了防止队列到消费者这一步中消息丢失,可以开启消费者确认机制,
消费者确认(ACK)机制确保消息被消费者正确处理。在消费者成功处理消息后,它会发送一个ACK给RabbitMQ,RabbitMQ收到ACK后会从队列中删除该消息。
如果消费者处理消息失败(例如,处理过程中发生异常),可以发送拒绝(nack)或未确认(reject),根据配置,RabbitMQ可以重新将消息放回队列中,等待重新消费。

消费者确认机制
这里再讲述一下消费者确认机制相关的内容:
消费者确认机制(Consumer Acknowledgements)在RabbitMQ中是一种保证消息被正确处理的机制。当消息从队列传递给消费者后,消费者需要对处理的结果进行反馈。如果消息被成功处理,消费者会发送一个ACK(Acknowledgement)信号给RabbitMQ,告知消息已经被成功消费,随后RabbitMQ会从队列中删除该消息。如果消费失败,消费者可以选择不发送ACK,或者显式地发送NACK(negative acknowledgement)或拒绝(reject)信号,根据配置,RabbitMQ会相应地处理这些未被确认的消息。
所以说如果开启了这个机制,消息在未收到反馈消息前是会保留在队列中的
消费失败的处理
当消费者消费失败,且没有返回ACK时,RabbitMQ会根据消息的配置来决定如何处理这条消息:
- 消息重新排队:如果消费者因为某些原因处理消息失败(例如异常退出或逻辑错误),并且没有发送ACK,那么RabbitMQ默认会将消息重新放回队列中,位置可能在队列头部或尾部,这取决于消息的
requeue标志。如果设置为true,消息会被重新排队,等待其他消费者或再次被同一消费者消费。 - 死信队列(DLX):在某些情况下,不希望无限制地重新尝试处理消息(可能会造成死循环)。可以通过配置死信队列来处理这些"无法处理"的消息。如果消息消费失败,并且达到了最大重试次数(这需要在应用逻辑中实现重试机制),可以选择将消息拒绝并重新排队到死信队列中。死信队列是一个普通的RabbitMQ队列,专门用来接收这些无法处理的消息,便于后续分析和处理。
实现重试机制
在实际应用中,通常需要在消费者侧实现消息消费的重试机制,这包括:
- 延迟重试:通过配置消息的延迟(例如,使用消息TTL和死信队列),在消息消费失败时将消息发送到一个延迟队列中,等待一段时间后再次尝试处理。
- 重试次数限制:在消息属性中记录重试次数,每次消费失败时更新这个计数器。当重试次数达到预设的阈值后,可以将消息发送到死信队列或进行其他处理,以避免无限重试。
通过这样的机制,可以灵活地控制消息的重试策略和失败处理,确保消息可以被正确处理,同时避免因重试机制不当导致的问题,如消息顺序混乱、系统资源过度占用等。
小结
如何保证消息不丢失
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息末消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失数重试机别(重试次数一般设置为3),多次重试失数后将消息投递到异常交换机,交由人工处理
面试者回答参考:
要求确保消息的高可用情景:实现Redis 的双写一致性采用了RabbitMQ 进行异步更新/删除操作
回答见下:👇

消息重复消费
出现消息重复消费的问题一般是在网络抖动或者消息者挂掉,由于开启了消费者确认机制,导致消息重发
一般解决方式有两种:
- 一种是每条消息设置一个唯一的标识 ID 从而避免重发消费
- 也可以考虑幂等性设计问题,考虑使用分布式锁、数据库锁(悲观锁、乐观锁)等,但锁会对性能有影响,使用的时候需要考虑使用
一般回答可以倾向回答第一种回答。

参考回答:

如何处理消息堆积问题
消息堆积(也称“消息积压”)是指消息队列中的消息由于未能及时消费而长时间滞留,导致队列消息数量不断增加。长期堆积会影响系统性能、延迟和可用性。
一般的处理手段:
提升消费能力、合理限流、故障隔离、定期清理、完善监控,是解决消息堆积的核心手段
面试题:RabbitQM 如果有100万消息堆积在MQ,如何解决(消息堆积怎么解决)

解决消息堆积有三种种思路:
- 1、增加更多消费者,提高消费速度
- 2、在消费者内开启线程池加快消息处理速度
- 3、扩大队列容积,提高堆积上限,采用惰性队列
- 在声明队列的时候可以设置属性x-queue-mode为lazy,即为惰性队列
- 基于磁盘存储,消息上限高
- 性能比较稳定,但基于磁盘存储,受限于磁盘 IO,时效性会降低 · 参考回答:

惰性队列
惰性队列(Lazy Queue)是 RabbitMQ 3.6 及以上版本推出的一种特殊队列类型,目的是大幅度提升海量消息场景下的性能和可靠性。它通过将消息优先存储在磁盘上,减少对内存的消耗,从而更适合消息堆积、历史消息回放、大数据批量处理等应用场景。
- 普通队列(默认模式):RabbitMQ 会尽量把消息都存放在内存中,只有在内存压力大、或消息未被消费很久才移到磁盘。
- 惰性队列:收到消息后直接持久化到磁盘,只有消费者需要处理时才读入内存。这大幅度降低了内存的压力,即使消息量很大也能稳定运行。
不过惰性队列是在声明的时候指定配置参数,然后作为惰性队列存在,它虽然降低了内存压力,但每次消费需要从磁盘加载,单条消息延迟略高,不适合对实时性要求极高的场景。

死信队列
死信队列(Dead Letter Queue, DLQ) 是消息队列中用来存放无法被正常消费的“死信”消息的队列。 在 RabbitMQ 及其他消息队列产品中,死信队列是一种核心机制,主要用于消息可靠性、异常消息处理和系统解耦。
所谓“死信”(Dead Letter),是指在消息队列中的消息由于各种原因无法被正常消费、投递或者处理,这些消息会被队列通过特定机制转发到事先指定的死信队列中。
常见产生死信的场景包括:
| 场景 | 说明 |
|---|---|
| 消息被拒绝(basic.reject/basic.nack) | 消费者拒绝消费消息,并且不再重新入队 |
| 消息过期(TTL到期) | 消息在队列中存放时间超过了设置的存活时间(TTL) |
| 队列达到最大长度限制 | 队列长度已满,新消息到来时,早期消息会被丢弃,丢弃的消息变成死信 |
死信交换机
什么是死信交换机
死信交换机(Dead-Letter Exchange, DLX)是RabbitMQ中用于处理无法正常投递的消息的一种机制。
当消息在队列中变成死信(Dead Letter)后,可以被自动重新路由到另一个交换机,这个交换机就是所谓的死信交换机。
RabbitMQ 使用 死信交换机(Dead Letter Exchange,DLX) 将死信消息路由到另一个指定队列(即死信队列)。
消息变成死信的情况
消息变成死信的情况通常包括:
- 消息被拒绝(Basic.Reject/Basic.Nack)并且设置了requeue参数为false,不重新入队。
- 消息TTL过期(消息设置了生存时间,超过这个时间还未被消费),超时无人消费
- 队列达到最大长度(队列满了,无法再添加更多消息到队列中),最早的消息可能成为死信
配置死信交换机

使用死信交换机需要进行以下配置:
- 声明死信交换机:首先需要声明一个交换机作为死信交换机,可以是任何类型(Direct, Topic, Fanout等)。
- 声明队列并指定死信交换机:在声明队列时,通过arguments参数设置
x-dead-letter-exchange属性为死信交换机的名称。可选地,还可以设置x-dead-letter-routing-key来指定发送到死信交换机的消息将使用的路由键。 - 绑定死信队列:如果需要,可以创建一个或多个专门的队列作为死信队列,并将其绑定到死信交换机上。这样,成为死信的消息就会被路由到这些队列中。
使用场景
- 错误处理:死信交换机和死信队列常用于错误处理。当消息因为某种原因无法被正常消费时,将其路由到死信队列,后续可以对这些消息进行分析和处理。
- 延迟消息:通过设置消息的TTL和死信交换机,可以实现延迟消息的功能。消息首先被发送到一个普通队列中,在过期后变成死信,然后被发送到死信交换机,并最终路由到最终的队列中被消费。
- 消息重试:在处理消息时,如果因为临时的问题导致消息处理失败,可以将消息发送到死信交换机,再通过特定的逻辑将消息重新投递到原队列或另一个队列进行重试。
简单示例(在Spring AMQP中配置一个死信交换机和队列):
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue") //指定队列名称,并持久化
.ttl(10000) //设置队列的超时时间,10秒
// 设置死信交换机的名称,当消息变成死信后将被路由到此交换机
.withArgument("x-dead-letter-exchange", "myDlxExchange")
// 设置死信交换机的路由键,指定消息从死信交换机路由到死信队列时使用的路由键
.withArgument("x-dead-letter-routing-key", "dlx.routing.key")
.build();
}
------
// 声明死信交换机
@Bean
public DirectExchange myDlxExchange() {
return new DirectExchange("myDlxExchange");
}
// 声明死信队列
@Bean
public Queue myDlxQueue() {
// 使用QueueBuilder构建一个持久化的队列"myDlxQueue",这个队列用于接收死信消息
return QueueBuilder.durable("myDlxQueue").build();
}
// 将死信队列绑定到死信交换机
@Bean
public Binding myDlxBinding() {
// 使用 BindingBuilder 将死信队列"myDlxQueue"绑定到死信交换机"myDlxExchange"
// 绑定键为"dlx.routing.key",表示只有路由键匹配此值的消息才会被路由到"myDlxQueue"队列
return BindingBuilder.bind(myDlxQueue()).to(myDlxExchange()).with("dlx.routing.key");
}这个配置声明了一个队列myQueue和一个死信交换机myDlxExchange,以及一个死信队列myDlxQueue。当myQueue中的消息变成死信后,它们会被路由到myDlxQueue中。
总之,死信交换机提供了一种有效的方式来处理RabbitMQ中无法正常消费的消息,使得系统能够更加健壮和可靠。
TTL
TTL(Time-To-Live)是一种设置数据存活时间的机制,在RabbitMQ中,可以用来控制消息或队列的存活时间。
如果一个队列中的消息TTL结束仍未消费,则会变为死信, ttl 超时分为两种情况:
- 1、消息所在的队列设置了存活时间
- 2、消息本身设置了存活时间

消息TTL
- 消息TTL是指一条消息在队列中可以存活的最大时间。如果消息在队列中的存活时间超过了设置的TTL,它将变成死信(Dead Letter)。对于未及时消费的消息,这是一种有效的过期策略。
- 在RabbitMQ中,消息TTL可以在发送消息时通过设置消息属性中的
expiration字段来指定,单位是毫秒。例如,将expiration设置为60000,意味着消息的TTL为60秒。
示例代码:
设置消息TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("60000") // 消息TTL设置为60000毫秒(60秒)
.build();
channel.basicPublish("", "myQueue", props, message.getBytes());队列TTL
- 队列TTL是指队列中的所有消息的统一存活时间。不同于为每条消息单独设置TTL,队列TTL为队列中所有消息设定了统一的最大存活时间。
- 队列TTL是在声明队列时通过队列的
arguments参数设置的,使用x-message-ttl属性来指定,单位也是毫秒。
设置队列TTL
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000); // 队列中所有消息的TTL设置为60000毫秒(60秒)
channel.queueDeclare("myQueue", false, false, false, args);注意事项:
- 如果队列和消息都设置了TTL,以较短的那个TTL为准。
- 设置了TTL的消息,如果在TTL到期时还没有被消费,会被RabbitMQ自动丢弃或发送到设置的死信交换机。
- 消息的TTL从它被推送到队列的那一刻开始计算,而不是从它被消费者接收的那一刻开始。
延迟队列
延迟队列实现的几种方式:
利用 TTL + 死信队列实现延迟队列
实现思路
- 普通队列设置消息 TTL(Time-To-Live,存活时间),即消息在队列内存活的时间。
- TTL 到期后消息会被 RabbitMQ 刪除并变成“死信”,转入指定的死信交换机和死信队列。
- 消费者监听死信队列,消费消息,实现延迟效果。
RabbitMQ 的延迟队列是通过死信交换机 + TTL (生存时间)来实现的;
- 延迟队列:进入队列的消息会被延迟消费的队列
延迟队列的使用场景较多,比如:超市订单、限时优惠、定时发布等
使用 RabbitMQ 官方插件实现延迟队列
RabbitMQ 提供了 rabbitmq-delayed-message-exchange 插件,可以直接实现消息延迟投递,无需借助死信队列。
实现方式
- 安装插件后,声明一种特殊交换机
"x-delayed-message"类型。 - 发送消息时通过
headers设置延迟时间,消息到达队列时才被消费
在 RabbitMQ 中,延迟队列的使用是有一个插件:叫做:DelayExchange
地址在官方插件社区
插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
使用示例:
