Skip to content

消息队列_RocketMQ_基础

一、基础概念

RocketMQ 是什么

RocketMQ 是一款开源的分布式消息和流处理平台,由阿里巴巴集团开发并贡献给 Apache 软件基金会。它被设计用于处理大规模、高吞吐量的消息传递,并在阿里巴巴集团内部及众多企业中广泛使用。

特点:高性能,高可靠,高实时,分布式

RocketMQ 是发布/订阅 的消息模型

优点:

  • 可靠性很好
  • 单机吞吐量:十万级
  • 源码是 Java,方便二次开发
分类优点缺点
高性能高吞吐、低延迟,顺序写磁盘与零拷贝,适合集群与高并发场景
高可用主从架构,多副本容错,支持 Broker 快速切换
扩展性Topic、Producer、Consumer 可水平扩展,适合大规模部署部分分布式特性仍需业务层配合,跨集群场景下运维复杂
消息模型支持顺序、事务、延时、批量消息等多种模型顺序消息并发受限,事务消息使用和容错管理较复杂
监控与追踪内置消息轨迹,Dashboard 管控便捷
安全性支持 ACL,权限粒度可控
生态与支持多语言支持、社区活跃、企业级应用多国际化社区、生态和第三方插件相对 Kafka 略弱
存储依赖-磁盘堆积大量消息时 IOPS 需求大,极端场景下需关注物理瓶颈
易用性-文档相对 Kafka 略少,使用门槛略高,批量消费不如 Kafka 方便

消息模型

消息队列有两种模型:队列模型发布/订阅模型。RocketMQ 是后者。

  • 队列模型
    • 队列方式,生产者消费者
  • 主题模型/发布订阅
    • 发布者将消息发送到指定主题中,订阅者需要 提前订阅主题 才能接受特定主题的消息。image.png|500

RocketMQ 的消息模型

基础模型:Producer、Broker、Consumer、Topic

  • Producer:消息生产者,负责向消息队列发送消息。
  • Broker:消息服务器,负责存储和转发消息,是 Producer 和 Consumer 的中间桥梁。
  • Consumer:消息消费者,从 Broker 拉取或订阅消息进行消费处理。
  • Topic:逻辑上的消息分类,Producer 按 Topic 发送消息,Consumer 可订阅感兴趣的 Topic。
mermaid
sequenceDiagram
participant Producer
participant Broker
participant Consumer
Producer->>Broker: 发送消息 (按 Topic 分类)
Broker->>Consumer: 推送或等待拉取消息
Consumer-->>Broker: 消费成功确认(ACK)

Rocket MQ 的主要概念

名称中文名作用/描述备注
Message消息RocketMQ 基本数据单元,包含正文、Topic/Tag/Keys/属性等Producer 发送/Consumer接收
Topic主题消息一级分类,区分业务主题一个 Topic 可含多个队列
Tag标签Topic 二级分类,便于细粒度过滤与分流可按 Tag 订阅/筛选消息
Group消费组一组消费者实例并发消费同一 Topic 下的消息保证消息唯一/分布广播模式
Message Queue消息队列Topic 下物理存储分片,实现并发消费绑定单 Consumer 实例消费
Offset消费偏移量消息队列消费游标,标识已消费进度便于断点续消费/回溯

RocketMQ 消息的组成: image.png

Message

RocketMQ 的最基本数据单元,由生产者发送到 Broker,再由消费者读取。每条消息通常包含主题(Topic)、内容(Body)、标签(Tag)、消息键(Keys)、属性(Properties)等字段。

Topic

用于对消息进行逻辑分类。Producer 按 Topic 发送消息,Consumer 订阅 Topic 获取消息。一个 Topic 下可划分多个队列(Message Queue),实现高并发和消费分片。

Tag

Topic 的二级分类,用于细化筛选消息。Producer 可为消息指定 Tag,Consumer 支持按 Tag 过滤订阅,实现业务分流和细粒度消费。

Topic 和 Tag 的区别

消息发送的时候一般会指定消息体,消息体对象包括 Topic, Tag (可选),消息数据

其中 Tag是进一步细化Topic消息分类的标签。

Group

Consumer Group 表示一组消费同一 Topic 的消费者实例。在 集群模式 下,同一 Group 的消费实例共同处理 Topic 下的所有消息(避免重复);在 广播模式 下,每个 Group 内所有 Consumer 都接收到完整消息。

Message Queue

物理实际分区,消息的实际存储地方

一个 Topic 下的实际存储分片,Broker 按原理可将 Topic 分为多个队列,实现消息并发存储与拉取。消费者可并行处理,每个队列的数据由某个 Consumer 实例负责。

Offset

Offset 是消费者组在 Queue 的消费位置下标

标识消费者当前已消费到队列中的哪条消息(唯一编号),用于保证消息不丢失、可断点续消费。Broker 和 Consumer 均管理 Offset,支持精确消费和消息重试/回溯。

image.png

消息的消费模式

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。


消息投递/消费模式

模式名称描述适用场景
Push 模式Broker 自动推送消息到 Consumer(主动通知)实时消费、低延迟业务
Pull 模式Consumer 主动从 Broker 拉取消息(定时/轮训)高吞吐、灵活控制消费进度
广播消费每个 Consumer 都会收到一份消息群体通知、监控日志、同步事件
集群消费多个 Consumer 节点协作消费同一 Topic,消息分摊任务分布、业务分片

二、架构和组件

基础组件

基础组件有:NameServer 注册中心、Broker 消息队列服务器、Producer、Consumer

  • NameServer
    • ·去中心化的注册中心:进行Broker节点管理和路由信息管理
    • 负载均衡,路由管理,注册中心
    • 操作步骤
        1. Broker 会将自己的信息注册到 NameServer 中(此时,NameServer 会存放很多 Broker 的信息 ← Broker 的路由表信息)
        1. 消费者和生产者从 NameServer 获取路由表信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)
    • 多个 NameServer 形成集群,但它们之间不同步信息。客户端(Producer 和 Consumer)可以连接任意一个 NameServer
  • Broker
    • 消息队列服务器,生产者生产消息到 Broker ,消费者从 Broker 拉取消息并消费。
      • Broker 可以是 Master 也可以是 Slave。
        • 每个 Master 可以有多个 Slave。Master 负责读写消息,Slave 可以进行消息的同步,保证数据的高可用性。
    • 定期向 Name Server 发送心跳包(包含自身的主题信息);Broker:消息处理的节点中心;NameSpace 管理的路由也是对应节点来处理的;一个Broker 节点下有多个Queue
    • 消息存储和转发:Broker 是消息存储和转发的核心。生产者将消息发送到 Broker,Broker 负责将消息保存在相应的队列中。
    • 消息处理:Broker 处理所有与消息相关的操作,包括消息的存储、检索、删除以及向消费者分发消息。
    • 维护队列状态:Broker 负责维护每个队列的状态,如消息偏移量(Offset),确保消息能够被正确地消费。
  • Producer
    • 生产者
    • 生产者会定期向 NameServer 查询 Broker 的信息
    • 当生产者准备发送消息时,它首先联系 NameServer 以获取 Topic 所在的 Broker 信息。然后,根据这些信息,生产者将消息发送到正确的 Broker 和对应的队列中。
  • Consumer
    • 消费者
    • 定期向 NameServer 查询 Broker 的信息
    • 订阅主题:消费者向 Broker 订阅特定的 Topic。这个过程可能会涉及 NameServer 来找到托管该 Topic 的 Broker。
    • 消息接收:一旦 Broker 中的 Topic 收到消息,符合条件的消费者(那些订阅了该 Topic 的消费者)就可以接收到这些消息。
    • 消费模式
      • 在集群消费模式下,每条消息只会被消费者组中的一个消费者处理。
      • 在广播消费模式下,每条消息会被发送到消费者组中的所有消费者。

常见问题

讲述一下NameServer。

NameServer 是 RocketMQ 的路由中心,负责 Broker 信息注册与发布,为 Producer、Consumer 提供快速、可靠的 Topic 路由服务

特性描述
组件定位轻量级名称服务、分布式注册路由管理
是否有状态无状态、天然分布式
运维特点可随时扩容缩容、无单点
功能Broker 注册/心跳/注销、Producer/Consumer 路由查询
与 Zookeeper 比较更轻量,专用于 Broker 路由和 Topic 查找
为什么RocketMQ不使用Zookeeper作为注册中心?

RocketMQ主要是保障最终⼀致性,它只需要⼀个轻量级的元数据服 务就⾏了,⽽Zookeeper是强⼀致性的解决⽅案,并且少使⽤⼀个中 间件可以减少维护成本

还有一方面RocketMQ 是设计是让注册中心尽量轻量化,使得消息发送弱依赖注册中心。

RocketMQ 自研的 NameServer 专为消息路由注册/查询设计,比 Zookeeper 更轻量、高效、弹性和易运维,完全贴合 RocketMQ 的极端高性能和灵活可扩展的业务场景,而 Zookeeper 主要用于需要分布式一致性、配置、事务等更复杂的系统。因此,RocketMQ 并不采用 Zookeeper 作为其注册中心

消息发送流程

生产消息和消费消息操作:

image.png

Q1:生产者发送消息的流程

生产者组发送消息以及消费者组是如何接受消息:

在RocketMQ 中,NameServer 负责服务发现和路由管理,当生产者组发送某一个主题的消息时候,Producer Groups 会先从 NameServer 获取到 Topic 主题消息 所在的 Broker 信息,根据这些信息,生产者组会发送消息到正确的 Broker 和 对应的队列中。

里面的一个核心观点是 Topic ,个人理解主题Topic 是 将Queue进行分类,类似分组、标签(逻辑上)

而 Queue 的实际作用是 物理实际分区,消息的实际存储地方;消息发送到正确的 Broker 后,Broker 会负责将消息保存在相应的队列中。

Consumer Groups 会从NameServer 查询 Broker 的信息(从 NameServer 来找到托管该 Topic 的 Broker),订阅对应主题的消息,当 一旦 Broker 中的 Topic 收到消息,就可以接收到对应消息。

路由注册

  • NameServer 每隔 10 s 扫描 Broker Live Table
  • 检查上次心跳时间与当前时间的时差,如果超过 120 s,则认为 Broker 不可用,移除路由表中 Broker 相关的信息
    • Broker 一般会每隔 30s 向 NameServer 报告是否存活

路由剔除

  • 定时扫描判断该路由节点是否可用

RocketMQ的整体工作流程

image.png

简单来说,RocketMQ是一个分布式消息队列,也就是消息队列+分布式系统

作为消息队列,它是--的一个模型,对应的就是Producer、Broker、Cosumer;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer

image.png

RocketMQ 的负载均衡是如何实现的

RocketMQ实现了客户端负载均衡,包括生产者(Producer)和消费者(Consumer)两个方面。

生产者

image.png

消费者

image.png

  • 在集群模式下,消息队列被均匀地分配给同一个消费组内的所有消费者,确保在一个时间点,一个消息队列只被一个消费者消费。
  • 在广播模式下,所有消费者都会消费到所有的消息队列中的消息。

消息长轮询

消息长轮询优化了消息拉取的过程

image.png

三、功能实现和常用概念

Topic 和 Queue 的关系

Topic Data ↑ Queue Num ↑

当某个 Topic 消息量很大,应该给它多配置几个队列,并且 尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力

Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大。

消费者和Queue 的关系

Advice Num(Consumer) = Num(Queue)

通常建议消费者组中的消费者数量与主题中的队列数量保持一致

  • 这样做的目的是为了最大化并行处理的效率。
    • 如果消费者的数量多于队列的数量,那么会有一些消费者实际上没有分配到队列,从而闲置。
    • 相反,如果消费者的数量少于队列的数量,某些队列可能不会被及时消费,导致消息积压。

每个消费组在每个队列上维护一个消费位置

  • Offset 的作用:消费位移是指消费者在队列中消费到的位置。它用于跟踪消费者已经消费到哪里,以便在消费者重启或故障后能够从上次的位置继续消费。
  • 谁来维护:这个位移是由 Broker 维护的。消费者在处理完消息后,会向 Broker 报告自己的消费位移。
  • 消息的保留:队列中的消息通常会在所有消费者组消费之后才会被删除。这意味着,只要有一个消费者组还没有消费某条消息,这条消息就会被保留在队列中。

消息的持久化和删除

概念:消息的存储和删除

  • 数据的持久化:Broker 负责消息的存储。只要消息还没有被所有订阅的消费者组消费,它就会一直存储在 Broker 中。
  • 消息的删除:消息通常在满足一定条件后才会被删除,例如,当所有消费者组都消费了该消息,或者消息达到了其存储的最大时间(如设置了消息的存储时长)。

Topic 和 Queue 的关系

为什么一个主题中需要维护多个队列?

  1. 提高并发能力:在一个主题下配置多个队列的目的是为了提高消息处理的并发能力。这允许多个生产者同时向不同的队列发送消息,同时也允许多个消费者并行地从不同队列中拉取和处理消息。
  2. 负载均衡和效率:如果一个主题只有一个队列,那么所有的消息都会被顺序地放入这个单一的队列中,这限制了并行处理的可能性。在这种情况下,即使有多个消费者,也只能有一个消费者在任何给定时间处理队列中的消息,从而降低了整体的处理效率。

负载均衡

生产者发送消息(负载均衡转发到 Broker )

在RocketMQ中,一条消息在正常情况下是不会被发送到多个Broker的

所谓的负载均衡一般指的是:轮询,随机和其他策略

然后发送给某一个 Broker ,但是这个 Broker 如果是主从的话,就是消息默认发送到Master节点,Master节点负责将消息复制到Slave节点。

推拉模式

消费者的拉取请求

实际都是主动拉取的,不过消费者端的推送是有一个监听机制(定时拉取,不过对于用户来说这个过程是隐藏的)

RocketMQ 采用的是拉模式。

回溯消费

官方文档解释

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,

RocketMQ 中, Broker 在向Consumer 投递成功消息后,消息仍然需要保留 。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

生产者分组

待整理一下

消费者分组 🦌

  • PushConsumer
  • SimpleConsumer
  • PullConsumer

生产者分组和消费者分组

生产者分组

  1. RocketMQ 5.x 版本及以后
    • 生产者现在是“匿名”的,即不再强调生产者分组(ProducerGroup)的概念。
    • 在服务端,生产者不需要显式地配置为特定的分组,这简化了生产者的管理。
  2. RocketMQ 3.x 和 4.x 版本
    • 在这些历史版本中,生产者分组是一个重要的概念,用于标识和管理一组有着相同行为的生产者。
    • 如果你是从这些旧版本升级到 5.x,那么之前使用的生产者分组可以废弃,不再需要设置,且不会影响当前业务。

消费者分组

  1. 概念
    • 消费者分组是多个消费者的一个逻辑分组,这些消费者有着一致的消费行为。
    • 通过消费者分组,可以实现消费行为的负载均衡和高可用性。
  2. 功能
    • 订阅关系管理:RocketMQ 以消费者分组的粒度来管理和追溯订阅关系。
    • 投递顺序性:服务端支持顺序投递和并发投递,这些投递方式在消费者分组中统一配置。
    • 消费重试策略:包括重试次数和死信队列的设置,用于处理消费失败的情况。
  3. 版本差异
    • 5.x 版本:消费者的消费行为统一由关联的消费者分组在服务端配置和管理,确保同一分组内所有消费者的行为一致。
    • 3.x/4.x 历史版本:消费行为由消费者客户端接口定义,需要在消费者客户端设置时确保同一分组下的消费者行为一致。

RocketMQ 5.x 版本中对生产者分组的概念进行了简化,使其成为匿名的,而消费者分组则仍然是一个关键概念,用于管理消费者的行为和策略。

消费者分组

四、RocketMQ支持的消息类型

在 RocketMQ 中,一般使用的是普通消息,还有定时消息、顺序消息、事务消息;下面我们分别来了解一下具体的一些使用。

普通消息

  • 业务场景:适用于大多数标准的消息传递需求,如异步处理、系统解耦、数据分发等。
  • 使用:生产者发送消息到指定的主题(Topic),消费者从主题订阅并处理这些消息。消息按照到达 Broker 的顺序存储。

普通消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时消息

  • 业务场景:用于那些需要在指定时间点执行的任务,如延迟通知、定时任务等。
  • 使用:生产者发送消息时指定一个延时级别,消息将在指定的延时后才被投递。RocketMQ 不支持任意的延时时间,而是提供了几个预设的延时级别(如1分钟、5分钟、10分钟等)。

定时消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

事务消息

  • 业务场景:适用于需要保证消息发送和本地事务同时成功或失败的场景,如支付和订单服务间的交互。
  • 使用
    • 生产者发送半消息(预备消息)。
    • 执行本地事务(比如数据库操作)。
    • 根据本地事务执行结果,提交或回滚消息。提交的消息将被消费者消费,回滚的消息将被删除。

顺序消息(顺序消费)

顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的所有消息都要保证顺序;

部分顺序消息只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 个消息能按顺序消费即可。

全局顺序消费

image.png

部分顺序消息

image.png

部分顺序消息:顺序投递,顺序消费

image.png

发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue 。

image.png

消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题。

image.png


  • 分为全局消息和分区消息
  • 全局消息
    • 将一个主题的所有消息都发送到同一个队列中
    • 消费者从这个队列中顺序拉取并消费消息,从而实现全局顺序。
  • 分区消息
    • 生产者在发送消息时指定消息的Key(业务标识),RocketMQ根据这个Key的hash值将消息发送到指定的队列。
    • 消费者从这个队列中拉取消息进行消费,由于消息是按顺序发送到队列的,消费者也会按这个顺序来消费消息。
  • 分区消息一般是为了某一个事件的全链路顺序,全局消息是这个类型的全部事件都顺序。

延迟消息(延时队列)

RocketMQ实现延时消息的方式是通过预设的延时级别来实现的。当发送一条消息时,可以指定这条消息的延时级别。RocketMQ内部定义了多个延时级别,每个级别对应一定的延时时间,如1s、5s、10s等。

  1. 设置延时级别:生产者在发送消息时,设置消息的延时级别属性(delayTimeLevel)。
  2. 存储延时消息:Broker接收到这条延时消息后,根据其延时级别,将消息存储在对应的延时队列中。
  3. 计时等待:消息在延时队列中等待,直到延时时间过去。
  4. 转移至正常队列:达到延时时间后,消息会从延时队列转移到正常的消费队列中,此时消费者可以消费到这条消息

RocketMQ的延时队列提供了一种简便的延时消息处理机制,非常适合需要消息定时处理的应用场景。

示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        int messageCount = 100;
        for (int i = 0; i < messageCount; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
            // 设置延时级别3,这个时间大约延时10s
            msg.setDelayTimeLevel(3);
            // 发送消息
            producer.send(msg);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

在这个示例中,消息通过设置setDelayTimeLevel(3)来指定延时级别。RocketMQ预定义了多个延时级别,每个级别对应不同的延时时间,从1s到几个小时不等。具体的延时级别和对应的时间可以在RocketMQ的文档中找到或通过查看Broker配置文件(broker.conf)来了解。

  • 延时消息的使用需要注意,延时级别是预设的,不能自定义具体的延时时间,只能选择最接近的延时级别。
  • 延时消息可能会因为Broker的重启而导致延时不准确。
  • 使用延时消息时,要考虑到消息的存储问题,因为消息在延时期间仍占用存储空间。

原理实现

先暂存到 SCHEDULE_TOPIC_xxxx 的 Topic 中

image.png

RocketMQ怎么实现延时消息的?

image.png

重试消息

重试消息并不是我们业务中主动发送的,而是指当消费者消费消息失败之后,会间隔一段时间之后再次消费这条消息

重试的机制在并发消费模式和顺序消费模式下实现的原理并不相同

顺序消费

当消费失败的时候,他并不会将消息发送到服务端,而是直接在本地等1s钟之后重试(无限重试)

在这个等待的期间其它消息是不能被消费的

并发消费

RocetMQ会为每个消费者组创建一个重试消息所在的Topic,名字格式为:%RETRY% + 消费者组名称

举个例子,假设消费者组为sanyouConsumer,那么重试Topic的名称为:%RETRY%sanyouConsumer

当消息消费失败后,RocketMQ会把消息存到这个Topic底下

重试消息是如何实现间隔一段时间来消费呢?

并发消费模式下间隔一段时间底层就是使用的延迟消息来实现的

RocetMQ会为重试消息设置一个延迟级别,并且延迟级别与重试次数的关系为:delayLevel = 3 + 已经重试次数

  • 比如第一次消费失败,那么已经重试次数就是0,那么此时延迟级别就是3
  • 对应的默认的延迟时间就是10s,也就是一次消息重试消费间隔时间是10s
  • 随着重试次数越多,延迟级别也越来越高,重试的间隔也就越来越长,但是最大也是最大延迟级别的时间

不过需要注意的是,在并发消费模式下,只有集群消费才支持消息重试,对于广播消费模式来说,是不支持消息重试的,消费失败就失败了,不会管。

死信消息(死信队列)

死信队列用于处理无法被正常消费的消息,即死信消息。

发送失败自动重试后仍然失败放入到死信队列中

image.png

命名规则:

  • RocketMQ为死信队列使用了特殊的Topic命名约定。默认情况下,当消息在消费过程中重试次数超过设定阈值而无法被消费时,这些消息会被自动发送到一个特殊的死信队列。
  • 这个队列的Topic通常是原Topic名称前加上%DLQ%前缀,表示这是一个用于存放死信的队列

关于重试的说明:

  • 并发消费的话,最大重试次数默认是16次
  • 顺序消费,默认最大重试次数就是 Integer.MAX_VALUE,基本上就是无限次重试,所以默认情况下顺序消费的消息几乎不可能成为死信消息

五、高可用和高级特性

分布式事务

这里建议用实际例子去补充一下具体如何完成的,目前我们讲述一下 RocketMQ 是如何实现分布式事务的。

分布式事务的常见实现方式(每种方法都有其适用场景和局限性)

  • 2PC(两阶段提交)
  • 三阶段提交(3PC)
  • 分布式事务框架
    • Seata
    • TCC(Try-Confirm-Cancel)
  • 本地消息表
  • 消息队列【 事务消息(half 半消息机制)】

RocketMQ 的分布式事务实现

在 RocketMQ 中,其采用的实现机制是:事务消息加上事务反查机制。

参考一下这篇博客中关于订单关于的一个流程使用: https://juejin.cn/post/6844904099993878536

image.png

上述内容提及到自己认为需要注意的点:

  • 本地事务进行下一步操作后,服务方才会提交或者回滚半消息到 RocketMQ 的节点 Broker

关于 Half Message,半消息 和 事务状态回查 的一个基本表述:

  1. 事务消息(Half 消息)
    • 第一步:发送 Half 消息:所谓的 Half 消息是指一种特殊状态的消息,它在事务完成之前对消费者是不可见的。
    • 实现机制:当发送一个事务消息时,RocketMQ 首先存储这个消息,但将其标记为不可见状态(Half 状态)。这是通过改变消息的主题为 RMQ_SYS_TRANS_HALF_TOPIC 实现的,因为消费者通常不订阅这个特殊主题,所以不会消费这些 Half 消息。
  2. 事务反查机制
    • 定时任务:RocketMQ 会启动一个定时任务,定期检查 RMQ_SYS_TRANS_HALF_TOPIC 中的消息。
    • 反查请求:对于每条 Half 消息,RocketMQ 会根据其生产者组向相应的服务发送事务状态回查请求。
    • 事务状态决定:根据回查结果,RocketMQ 会决定是提交还是回滚这条消息。如果事务成功,消息将变为正常状态并可被消费;如果事务失败,消息将被回滚或删除。

RocketMQ 主要是依赖半消息,实现的分布式消息事务,通过 二次确认以及消息回查机制实现。

流程描述:

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务,执行本地事务。

image.png

半消息:

image.png

事务回查机制

一般会有一个定时机制,然后如果长时间没有收到二次确认,会进行事务回查

  • 这个是生产者反而是消费者,收到这条消息后,通过本地实现的 checker 接口的 checkLocalTransaction 方法进行检查本地事务是否执行完成,并将结果告知 MQ 服务器。

image.png

高可用

高可用情景下,

  • 会构建 broker 集群并且进行主从部署,同时NameServer 也进行集群部署。
  • 同步复制 + 异步刷盘

NameServer因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过集群主从实现的。

看一下这句话:关于如何实现写的高可用

  • 当 Master 挂机之后,其他节点的 Master 是仍然可用的
  • 该节点下的Slave 仍然是提供读服务的,但是如果需要从 Slave 改为 Master ,需要修改配置文件

image.png

官网架构图

image.png

构建 broker 集群并且进行主从部署,同时NameServer 也进行集群部署(去中心化)

第一,构建 broker 集群并且进行主从部署,此时,消息会分布在多个 broker

当某个 Broker 宕机,在Rocketmqmaster/slave 结构下,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息

第二, NameServer 进行了集群部署( 去中心化 的,没有主节点);

Broker 会和所有 NameServer 保持长连接 ,并且在每隔 30 秒 Broker会向所有Nameserver发送心跳,心跳包含了自身的Topic配置信息,这个步骤就对应图中的Routing Info`

第三,在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。

第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。

Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。


消息过滤

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。

  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。

对消息的过滤有三种方式:

image.png

  • 根据Tag过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");  
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:SQL表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
// 只有订阅的消息有这个属性a, a >=0 and a <= 3  
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
   @Override  
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
   }  
});  
consumer.start();
  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

六、读写(内存、日志)

Broker 如何保存数据( 消息持久化)

  • CommitLog 消息存储文件
  • ComsumerQueue 消费消息队列索引文件
  • Indexfile 消息索引(快速检索消息)

image.png

RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。

image.png

消息存储的整体设计

image.png

以下是这些组件的功能和它们在整个存储结构中的作用:

CommitLog 核心文件
  • 功能:CommitLog 是消息存储的核心文件,它存储了生产者写入的消息主体内容及其元数据。
  • 存储结构:CommitLog 文件的大小默认为1GB,文件命名基于起始偏移量,例如 00000000001073741824
  • 写入方式:消息主要是顺序写入CommitLog文件,提高了写入效率。当一个文件写满后,写入会继续在下一个文件。

image.png

ConsumeQueue 索引文件
  • 功能:ConsumeQueue 作为消费消息的索引,提升读取效率。
  • 结构:ConsumeQueue 保存了特定Topic下队列消息在CommitLog中的起始物理偏移量、消息大小和消息Tag的HashCode值。
  • 存储路径:采用 topic/queue/file 的三层组织结构,文件路径为 $HOME/store/consumequeue/{topic}/{queueId}/{fileName}
  • 条目组织:每个条目固定20字节,包括commitlog的物理偏移量、消息长度和tag hashcode,允许像数组一样随机访问每个条目。

image.png

IndexFile
  • 功能:提供了通过key或时间区间查询消息的能力,但不涉及消息的主体内容。

image.png

总结 🐉

image.png

ReputMessageService 服务会将对应的 CommitLog 文件中的消息异步更新内容到 ComsumeQueue 和 IndexFile 中

image.png

RocketMQ 是默认开启 Broker 持久化机制的,消费者 Comsumer 的消费实际是根据 ComsumerQueue 来进行消费的。

image.png

高性能读写

RocketMQ怎么对文件进行读写的

RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache顺序读写零拷贝

PageCache、顺序读取

image.png

顺序读写提高了 CommitLog 和 ComsumerQueue 的写入和读取速率

image.png

关于 PageCache 的理解,因为 CommitLog 是顺序写入的,而 PageCache 技术能够当第一次加载文件的时候会将数据放入到缓存中,通过直接对内存的读取加快对消息处理的性能。

image.png

零拷贝

通过了零拷贝技术提高了⽂件传输速度,实现方式是 mmap

  • mmap 就是通过⽂件位置与进程地址空间建⽴映射关系,程序可 以直接访问⽂件内容
  • 这种机制在Java中是通过MappedByteBuffer实现的

传统方式:

在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换

image.png

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输

零拷贝方式:

可以通过零拷贝的方式,减少用户态与内核态的上下文切换内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。

image.png

零拷贝的好处:

image.png

看一下这个对比:

image.png

扩展:

RocketMQ主要通过MappedByteBuffer对文件进行读写操作。

其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)

刷盘机制

同步刷盘和异步刷盘

在 RocketMQ 中,同步刷盘和异步刷盘是两种不同的消息持久化策略,它们各有优劣,并适用于不同的业务场景。

image.png

以下是对这两种策略的详细说明和比较:

同步刷盘(SYNC_FLUSH)

  • 工作原理:在同步刷盘模式下,每当生产者发送一条消息,Broker 需要将消息写入磁盘并完成刷盘操作后,才会向生产者返回确认(ACK)。只有收到这个确认,生产者才知道消息已经安全存储。
  • 优点:提供了更高的消息可靠性。即使在Broker意外宕机的情况下,也不会丢失已确认的消息。
  • 缺点:影响性能。由于每条消息都需要等待磁盘写入完成,这会增加消息的延迟时间,降低吞吐量。
  • 适用场景:适用于对消息可靠性有极高要求的场景,如金融交易、重要业务数据处理等。

异步刷盘(ASYNC_FLUSH)

  • 工作原理:在异步刷盘模式下,Broker 将消息写入内存后,就立即向生产者返回确认。实际的刷盘操作是由后台线程异步完成的,不会阻塞消息的发送。
  • 优点:提高了性能和吞吐量。由于消息发送不需要等待磁盘操作完成,因此减少了延迟。
  • 缺点:消息可靠性降低。如果Broker在消息还未刷入磁盘前发生宕机,那么这部分消息可能会丢失。
  • 适用场景:适用于对性能要求较高,但对消息可靠性要求不是特别严格的场景,如日志收集、一些非关键的业务通知等。

刷盘策略的设置

  • 配置参数:在 RocketMQ 中,可以通过设置Broker的FlushDiskType参数来调整刷盘策略。
  • 可以选择ASYNC_FLUSH(异步刷盘)或SYNC_FLUSH(同步刷盘)。

同步刷盘和异步刷盘在消息可靠性和性能之间提供了不同的平衡点。

在实际应用中,应根据业务需求的不同,选择最合适的刷盘策略。对于那些要求高可靠性的场景,同步刷盘是更好的选择;而对于那些更看重性能和吞吐量的应用,异步刷盘则更为适合

同步复制和异步复制

同步复制和异步复制在 RocketMQ 中指的是主从Broker之间的数据复制方式,这直接影响消息的可靠性和系统的可用性。这两种复制策略在保证数据一致性和提高系统性能之间提供了不同的平衡点。以下是对这两种策略的详细说明和比较:

同步复制(同步双写)

  • 工作原理:在同步复制模式下,当消息被写入主Broker后,它必须同时被复制到从Broker上。只有当消息在主从Broker上都成功写入后,才向生产者返回写入成功的确认。
  • 优点:提高了数据的可靠性。即使主Broker宕机,从Broker上仍然有消息的完整副本。
  • 缺点:影响性能。由于消息写入操作需要在主从Broker上都完成,这可能导致较高的延迟和降低吞吐量。
  • 适用场景:适用于对数据可靠性要求极高的场景,如金融交易。

异步复制

  • 工作原理:在异步复制模式下,当消息写入主Broker之后,系统就直接返回写入成功。消息的复制到从Broker是异步进行的。
  • 优点:提高了性能。由于不需要等待从Broker确认,消息的写入和确认速度更快。
  • 缺点:可能存在数据不一致的风险。如果主Broker在数据复制完成前宕机,从Broker可能缺少最新的数据。
  • 适用场景:适用于对性能要求较高,但数据可靠性要求不是极端严格的场景。

可用性与顺序性的考虑

  • 可用性问题:由于RocketMQ不支持自动主从切换,如果主Broker宕机,整个系统将无法继续生产消息。虽然消费者可以从从Broker消费消息,但在主Broker宕机期间会存在短暂的主从数据不一致情况。
  • 解决方案:通过部署多个主从集群(多Broker架构)来提高可用性。每个Topic分布在不同的Broker中,可以在一定程度上解决单个Broker宕机问题。
  • 顺序性问题:在多Broker架构下,如果某个主Broker负责特定顺序消息宕机,其他Broker无法替代它处理这些消息,可能影响消息的顺序性。
  • Dledger:RocketMQ通过引入Dledger技术,支持半数以上节点的消息复制后才确认写入成功,并支持主节点的动态选举,解决了严格顺序性和高可用性的问题。

同步复制和异步复制在RocketMQ中提供了不同的数据一致性和性能平衡。在设计系统时,需要根据业务需求的具体情况选择合适的复制策略。

Dledger作为一种解决方案,虽然提高了可用性和顺序性保证,但在选举过程中可能会暂时无法提供服务,并且对节点的数量有一定要求。在实际应用中,需综合考虑业务场景的特性和对数据可靠性、系统性能的具体需求来决定使用哪种复制策略。

七、常见生产问题

消息发送有时候会遇到的问题(消息过大、发送异常)

发送消息会遇到的一些问题(特殊情况处理)

  • 不建议单一进程创建大量生产者
    • 对于生产者的创建和初始化,建议遵循够用即可、最大化复用原则
  • 不建议频繁创建和销毁生产者

创建和销毁生产者

正确示例:

Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
    Message m= MessageBuilder.build();
    p.send(m);
 }
p.shutdown();

发送异常

选择队列后会与 Broker 建立连接,通过网络请求将消息发送到 Broker 上,如果 Broker 挂了或者网络波动发送消息超时此时 RocketMQ 会进行重试。

重新选择其他 Broker 中的消息队列进行发送,默认重试两次,可以手动设置。

producer.setRetryTimesWhenSendFailed(5);

消息过大

消息超过 4k 时 RocketMQ 会将消息压缩后在发送到 Broker 上,减少网络资源的占用。

消息堆积问题

image.png

一般是采用增加消费者实例的方式

个人理解:如果是应对生产快的,一方面是需要需要这个队列数量和消费者数量,然后应对堆积的情况,就要加一个中转的,把这个堆积的消息分散到多个Queu,然后多个Comsumer消费


发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

image.png

image.png

消息迁移扩容消费 image.png

如何保证消息不丢失

可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。

image.png

  • 可能的点:生产 → Broker;Broker 持久化;Broker → 消费
  • 生产 → Broker
    • RockerMQ生产端有三种发送模式
      • 同步发送(需要等 Broker 有反馈之后才能继续发)
      • 单向发送(生产端直接发)
      • 异步发送(发送消息的同时注册一个回调 去处理响应,安全性低,容易丢消息)
    • 防止消息丢失,可以采用同步发送
  • Broker 持久化
    • 采用同步刷盘以及2PC 两阶段提交,来保证同步时不会丢消息
    • 异步刷盘一断电就会丢消息
  • Broker → 消费
    • 采用同步消费机制,不要使用异步消费机制
      • 在同步消费情况下,消费完消息之后再去给Broker端反馈,然后Broker端会去维护消息偏移量,如果消费失败可以进⾏⼀定次数的重试
      • 在异步消费情况下,消费完消息的同时也会向Broker端反馈,然后Broker端会去维护消息偏移量,如果处理失败了,不会进⾏重试因为偏移量已经变更
  • 另外RocketMQ服务需要有降级⽅案,对于RocketMQ来说,NameServer挂了,本身就⽆法保证消息不丢失了,所以应对这种场景,我们可以使⽤服务降级⽅案,将消息暂存到Redis、⽂件或内存中,等MQ服务恢复之后再将消息转移过去

生产阶段

image.png

生产者可以利用RocketMQ提供的查询API来确认消息是否已经成功存储在Broker上。

存储阶段

如果需要保证消息不丢失,可以使用 同步复制(节点主从) + 同步刷盘(日志存储)

不过一般集群是 同步复制 + 异步刷盘

image.pngimage.png

消费阶段

image.png

如何避免重复消费

处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等消息去重

image.png

在消息队列系统中,由于各种原因(如网络波动、Broker 重启等),消息可能会被重复发送给消费者。

实现幂等性的方法

  • 1、通过唯一标识符标记某条消息
  • 2、通过数据库唯一约束或者唯一主键进行幂等操作
  • 3、分布式锁:通过 Redis 的 setnx 进行判断是否相同标识符

参考: