MQ

MQ

MessageQueue消息队列。 队列是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。

MQ作用以及优点

异步


例如:快递员送快递,直接送到家效率很低,把快递放入驿站,通知客户自己安排时间去驿站取快递。

主要作用:减少请求响应时间,实现非核心流程异步化,提高系统响应性能和吞吐量。

解耦

应用场景:例如电商系统下订单,交易服务整个过程中要调用四个服务,订单服务、库存服务、仓储服务、积分服务。如果积分服务挂了整个交易服务无法进行。引入MQ之后,交易服务只跟MQ交互,把消息发到MQ里面就行了,无需关心另外四个服务是否可用。

主要作用:

  • 服务之间进行解耦,减少服务之间的影响,提高系统整体的稳定性以及可扩展性。
  • 解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

削峰



主要作用:以稳定的系统资源应对突发的流量冲击。

MQ的缺点

系统可用性降低

系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响,需要考虑保证MQ的高可用。

系统复杂度提高

引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。

消息一致性问题

A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

市面上MQ比较

优点 缺点 使用场景 活跃度,事务
Kafka 17W QPS 吞吐量非常大,性能非常好,集群高可用。 会丢数据,功能比较单—。 日志分析,大数据采集 活跃度高,成熟 不支持事务
RabbitMQ 5W QPS 消息可靠性高,功能全面。 吞吐量比较低,消息积累会影响性能,erlang语言不好定制。 小规模场景 活跃度高,成熟 不支持事务
RocketMQ 12W QPS 高吞吐,高性能,高可用;功能全面。 开源版功能不如云上版;官方文档比较简单;客户端只支持java。 几乎全场景 活跃度中,比较成熟 支持事务

RocketMQ

RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。

目前RocketMQ在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。

安装

rocketmq-all-5.0.0-bin-release.zip

  1. 创建操作用户与root用户区分开,创建一个工作目录。
  2. 安装JDK,配置环境变量jdk路径
  3. rocketmq只依赖jdk,rocketmq-all-5.0.0-bin-release.zip解压放入工作目录,配置环境变量rocketmq的bin目录。作用是加载用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。
  4. 如果不配置的话,启动NameSever和Broker都会报错。

运行

1. 修改runserver.sh配置, nohup ./mqnamesrv & 启动NameServer

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m 
    -XX:MaxMetaspaceSize=320m"

2.修改 runbroker.sh 配置, nohup ./mqbroker & 启动Broker

JAVA_OPT="${JAVA_OPT} -server -Xms512m 1 -Xmx512m -Xmn256m"
autoCreateTopicEnable=true ##测试中方便

3.快速验证,配置环境变量

export NAMESRV_ADDR='localhost:9876'

测试

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer    ##测试生产者发送
bin/tools.sh org.apache.rocketmq.example.1 quickstart.Consumer  ##测试消费者接收

4.关闭

sh bin/mqshutdown namesrv       ##关闭NameServer
sh bin/mqshutdown broker        ##关闭Broker

架构

RocketMQ由以下这几个组件组成

  • NameServer : 提供轻量级的Broker路由服务。
  • Broker:实际处理消息存储、转发等服务的核心组件。
  • Producer:消息生产者集群。
  • Consumer:消息消费者集群。
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

基础概念

消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式:**同步发送异步发送单向发送**、顺序发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

生产者中,会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:**拉取式消费推动式消费**。

  • 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
  • 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是生产者发送消息与消费者消费消息的最小单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务(Name Server)

名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。

RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

存储消息

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

  1. MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。
  2. MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。
  3. MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。

消息主从复制

如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。

同步复制

同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。
在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。

异步复制

异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。

在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。

配置:消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。

消息重试

对于广播模式的消息, 是不存在消息重试的机制的。消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。

对于普通模式的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:

  • 返回Action.ReconsumeLater-推荐
  • 返回null
  • 抛出异常
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}

如果希望消费失败后不重试,可以直接返回Action.CommitMessage。

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

重试消息如何处理

重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。

然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重试次数

如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。

另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。

配置覆盖

消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的名称是%DLQ%+ConsumGroup

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查,然后对死信消息进行处理,比如转发到正常的Topic重新进行消费或者丢弃。

注: 默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,权限perm被设置成了2:禁读(权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。

消息幂等

基础概念

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多一次:每条消息最多只会被消费一次
  • at least once 至少一次:每条消息至少会被消费一次
  • exactly once 正好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。


at most once是最好保证的,在RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。


at least once,RocketMQ也有同步发送、事务消息等很多方式能够保证。


exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。使用RocketMQ时,需要由业务系统自行保证消息的幂等性

消息幂等的必要性

在实际应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,可以概括为以下情况

  • 发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 投递时消息重复 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启) 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式

在RocketMQ中无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

消息存储介质

RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具。

目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

零拷贝技术加速文件读写

服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

实际进行了4 次数据复制

1. 从磁盘复制数据到内核态内存;
2. 从内核态内存复 制到用户态内存;
3. 然后从用户态 内存复制到网络驱动的内核态内存;
4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过NIO包中MappedByteBuffer实现的。RocketMQ充分利用了“零拷贝”技术,提高消息存盘和网络发送的速度。

JAVA的NIO中提供了两种实现方式,mmap和sendfile,其中mmap适合比较小的文件,而sendfile适合传递比较大的文件。

消息存储结构

RocketMQ消息的存储分为三个部分:

  • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
  • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

刷盘机制

RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘

  • 同步刷盘 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
  • 异步刷盘 在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
  • 配置方式:刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

源码环境搭建

1.从RocketMQ的官网下载5.0.0 https://rocketmq.apache.org/dowloading/releases/,解压导入到 IDEA。

2.对源码进行编译。编译指令 clean install -Dmaven.test.skip=true

3.在项目目录下创建一个conf目录,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml

4.配置ROCKETMQ_HOME环境变量,运行NamesrvStartup类启动,看到"The Name Server boot success. serializeType=JSON"日志启动成功

5.修改之前复制的broker.conf文件配置,配置-c 参数指向broker.conf配置文件,broker模块下的BrokerStartup启动

brokerClusterName = DefaultCluster
 brokerName = broker-a
 brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
 flushDiskType = ASYNC_FLUSH

 # 自动创建Topic
 autoCreateTopicEnable=true
 # nameServ地址
 namesrvAddr=127.0.0.1:9876
 # 存储路径
 storePathRootDir=D:\\RocketMQ\\data\\rocketmq\\dataDir
 # commitLog路径
 storePathCommitLog=D:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
 # 消息队列存储路径
 storePathConsumeQueue=D:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
 # 消息索引存储路径
 storePathIndex=D:\\RocketMQ\\data\\rocketmq\\dataDir\\index
 # checkpoint文件路径
 storeCheckpoint=D:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
 # abort文件存储路径
 abortFile=D:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

6.在测试源码中,通过代码"producer.setNamesrvAddr("127.0.0.1:9876");"指定NameServer,就可以发送消息了。

7.同样,通过代码"consumer.setNamesrvAddr("192.168xx.xx:9876");"指定NameServer,就可以消费消息了。

源码解读

源码下很多的功能模块,我们只关注下几个最为重要的模块:

  • broker:broker模块, broker启动
  • client:消息客户端,包含消息生产者、消息消费者相关类
  • example:RocketMQ 例代码
  • namesrv:NameServer实现相关类,NameServer启动
  • store:消息存储实现相关类

NameServer启动

NameServer的核心作用其实就只有两个:
1.维护Broker的服务地址并进行及时的更新
2.给Producer和Consumer提供服务获取Broker列表。

public static NamesrvController main0(String[] args) {
    try {
        //创建namesrv控制器启动
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" 
            + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

在创建NamesrvController对象时,有两个关键的配置文件NamesrvConfig这个是NameServer自己运行需要的配置信息,还一个NettyServerConfig包含Netty服务端的配置参数,固定的占用了9876端口。

流程如下

在启动服务时,启动RemotingServer,用来响应请求的。在关闭服务时,关闭了四个东西remotingServer,响应请求的服务,remotingExecutor Netty服务线程池,scheduledExecutorService 定时任务,fileWatchService 跟踪acl配置的。

Broker启动

Broker的内部架构,有点类似于JavaWeb开发的MVC架构。有Controller负责响应请求,各种Service组件负责具体业务,然后还有负责消息存盘的功能模块则类似于Dao。

BrokerController对象先创建,然后再启动,Broker中有一大堆的功能组件负责具体的业务。

public BrokerController(final BrokerConfig brokerConfig, 
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;

    //消费端的offset管理
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    //topic的管理
    this.topicConfigManager = new TopicConfigManager(this);

    //pull消息的处理
    this.pullMessageProcessor = new PullMessageProcessor(this);
    //pull消息的服务
    this.pullRequestHoldService = new PullRequestHoldService(this);

    //消息到达后执行的监听处理
    this.messageArrivingListener = 
        new NotifyMessageArrivingListener(this.pullRequestHoldService);
    //消费消息的id监听
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    //消费者管理,基于特定的消费id
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    this.consumerFilterManager = new ConsumerFilterManager(this);

    //发送方管理
    this.producerManager = new ProducerManager();
    //监听客户端的网络
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    //broker作为客户端进行心跳鉴定对访问者的操作
    this.broker2Client = new Broker2Client(this);
    //订阅消息的管理
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    //broker的接口服务管理,主要和namesrv交互
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    //过滤管理
    this.filterServerManager = new FilterServerManager(this);

    //主从同步管理,主要对slave有效,同步元数据
    this.slaveSynchronize = new SlaveSynchronize(this);

    //内部操作队列
    this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getSendThreadPoolQueueCapacity());
    this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getPullThreadPoolQueueCapacity());
    this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getQueryThreadPoolQueueCapacity());
    this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
    this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
    this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
    this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
        this.brokerConfig.getEndTransactionPoolQueueCapacity());

    //当前broker的状态管理
    this.brokerStatsManager = new BrokerStatsManager(
        this.brokerConfig.getBrokerClusterName());
    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), 
        this.getNettyServerConfig().getListenPort()));

    //快速失败策略
    this.brokerFastFailure = new BrokerFastFailure(this);
    this.configuration = new Configuration(log,
        BrokerPathConfigHelper.getBrokerConfigPath(),
        this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, 
        this.messageStoreConfig
    );
}

Broker注册

BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向NameServer发送心跳。

public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {

    // 获取所有Topic信息
    TopicConfigSerializeWrapper topicConfigWrapper = 
        this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = 
            new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : 
                topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), 
                topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

Producer

Producer有两种

1.普通发送者:DefaultMQProducer,需要构建一个Netty客户端。

2.事务消息发送者:TransactionMQProducer,需要构建一个Netty客户端同时也要构建Netty服务端。

Producer的流程大步骤

1.关于Borker路由信息的管理: Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个Topic的Broker列表呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法。在send方法中,首先需要获得Topic的路由信息,会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。

路由信息大致的管理流程

2.获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, 
        final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            //Producer选择MessageQueue的方法就是自增,然后取模,并且只有这一种方法。
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName 
                            || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
            //选择一个相对好的broker,不考虑可用性的消息队列
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //自增取模
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() 
                            % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //随机选择一个消息队列
        return tpInfo.selectOneMessageQueue();
    }
    //获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

3.封装Netty请求发送消息。消息发从到Borker后,会由一个CommitLog类写入到CommitLog文件中。

消息存储

接着上面的流程来关注下Broker是如何把消息进行存储,消息存储的入口在:DefaultMessageStore.putMessage

最终存储的文件:

  • commitLog:消息存储目录
  • config:运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命Broker非正常关闭
  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

commitLog写入

CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个线程写入。

分发ConsumeQueue和IndexFile

当CommitLog写入一条消息后,会有一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层的实现逻辑。

如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。

文件同步刷盘与异步刷盘

入口:CommitLog.putMessage -> CommitLog.handleDiskFlush

其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。

过期文件删除

入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意他删除时,并不会检查消息是否被消费了。

整个文件存储的核心入口入口在DefaultMessageStore的start方法中。

文件存储总结

RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、关闭异常文件(abort)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。

当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

消费者

消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。然后消费模式有推模式和拉模式。推模式是由拉模式封装组成。

集群模式下,消费队列负载均衡的通用原理:一个消费队列同一时间只能被一个消费者消费,而一个消费者可以同时消费多个队列。

消息顺序:RocketMQ只支持一个队列上的局部消息顺序,不保证全局消息顺序。 要实现顺序消息,可以把有序的消息指定为一个queue,或者给Topic只指定一个Queue,这个不推荐。

启动

DefaultMQPushConsumer.start方法,然后客户端启动的核心是mQClientFactory 主要是启动了一大堆的服务。

这些服务可以结合具体场景再进行深入。例如pullMessageService主要处理拉取消息服务,rebalanceService主要处理客户端的负载均衡。

消息拉取

拉模式:PullMessageService。PullRequest里有messageQueue和processQueue,其中messageQueue负责拉取消息,拉取到后,将消息存入processQueue,进行处理。 存入后就可以清空messageQueue,继续拉取了。

长轮询拉取机制

在Broker的配置中,有一个配置项longPollingEnable可以配置为true开启长轮询模式。我们看下这个是干什么的。

消息长轮询的处理入口在Broker端的PullMessageProcessor.processReuquest方法

caseResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend&&hasSuspendFlag) {
        longpollingTimeMills=suspendTimeoutMillisLong;
        //消息长轮询
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = 
                this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        Stringtopic=requestHeader.getTopic();
        longoffset=requestHeader.getQueueOffset();
        intqueueId=requestHeader.getQueueId();
        PullRequestpullRequest=newPullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, 
            messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, 
            queueId, pullRequest);
        response = null;
        break;
    }

如果开启了长轮询机制,PullRequestHoldService会每隔5S被环境去尝试检查是否有新的消息到来,并给客户端响应,或者直到超时才给客户端响应。消息的实时性比较差,为了避免这种情况,RocketMQ还有另外一个机制,当消息到达时唤醒挂起的线程再检查一次。

这个机制的入口在DefaultMessageStore的start方法中,会启动一个reputMessageService。然后在commitLog消息分发成功后,会检查如果开启了长轮询,就会唤醒NotifyMessageArrivingListener,进行一起请求线程的检查。

if (dispatchRequest.isSuccess()) {
    if (size>0) {
        //分发CommitLog写入消息
        DefaultMessageStore.this.doDispatch(dispatchRequest);
        //K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。
        if (BrokerRole.SLAVE != 
                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
            &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
            //唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
            DefaultMessageStore.this.messageArrivingListener.arriving(
                dispatchRequest.getTopic(), dispatchRequest.getQueueId(), 
                dispatchRequest.getConsumeQueueOffset() + 1, 
                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
        }
    }
}

客户端负载均衡策略

在消费者示例的start方法中,启动RebalanceService,这个是客户端进行负载均衡 策略的启动服务。他只负责根据负载均衡策略获取当前客户端分配到的 MessageQueue示例。五种负载策略,可以由Consumer的allocateMessageQueueStrategy属性来选 择。最常用的是AllocateMessageQueueAveragely平均分配和 AllocateMessageQueueAveragelyByCircle平均轮询分配。

平均分配是把MessageQueue按组内的消费者个数平均分配。而平均轮询分配就是把MessageQueue按组内的消费者一个一个轮询分配。

例如,六个队列q1,q2,q3,q4,q5,q6,分配给三个消费者c1,c2,c3

平均分配的结果就是: c1:{q1,q2},c2:{q3,q4},c3{q5,q6}

平均轮询分配的结果就是:c1:{q1,q4},c2:{q2,q5},c3:{q3,q6}

延迟消息

延迟消息的处理入口在scheduleMessageService这个组件中,会在broker启动时也一起加载。

整个延迟消息的实现方式如下:

消息写入时,会将延迟消息转为写入到SCHEDULE_TOPIC_XXX这个Topic中。这个系统内置的Topic有18个队列,对应18个延迟级别4。

ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。

消费者总结

RocketMQ消息消费方式分别为集群模式、广播模式。

消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。

消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。

并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。

RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。

顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。

使用RocketMQ如何保证消息不丢失

这个是在面试时,关于MQ面试官最喜欢问的问题。这个问题是所有MQ都需要面对的一个共性问题。大致的解决思路都是一致的,但是针对不同的MQ产品又有不同的解决方案,分析这个问题要从以下几个角度入手:

哪些环节会有丢消息的可能

通用的MQ场景:

其中1,2,4三个场景都是跨网络的,跨网络就肯定会有丢消息的可能。关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,也有可能会造成消息丢失。如果此时服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。

RocketMQ消息零丢失方案

1.生产者使用事务消息机制保证消息零丢失

RocketMQ的事务消息机制就是为了保证零丢失来设计的,并且经过阿里的验证是靠谱的。以最常见的电商订单场景为例,来简单分析下事务消息机制如何保证消息不丢失。
如下这个流程图:

1.发送half消息有什么作用?

half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。这个消息的作用更多的体现在确认RocketMQ的服务是否正常。如果没有half消息这个流程,通常是会在订单系统中先完成下单,再发送消息给MQ,如果这时写入消息到MQ失败就会非常尴尬。half消息如果写入失败,就可以认为MQ的服务是有问题的,就不能通知下游服务了。可以在下单时给订单一个状态标记,等待MQ服务正常后再进行补偿操作重新下单通知下游服务。

2.订单系统写数据库失败了怎么办?

这个问题可以同样比较没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下单失败,然后抛出异常,就不往MQ发消息了,至少保证不会对下游服务进行错误的通知。但是如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。

如果使用事务消息机制,就可以有一种更优雅的方案。下单时写数据库失败,可以使用Redis把订单消息先缓存起来,然后给RocketMQ返回一个UNKNOWN状态。RocketMQ就会过一段时间来回查事务状态,可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务,这样这个订单的消息就不会因为数据库临时崩了而丢失。

3.half消息写入成功后RocketMQ挂了怎么办?

需要注意在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种挂了的情况,RocketMQ就不会回调到事务消息中回查事务状态的服务。我们就可以将订单一直标记为"新下单"的状态,等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。

4.下单成功后如何优雅的等待支付成功?

在订单场景下,通常会要求下单完成后,客户在一定时间内(例如10分钟)完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。如果不用事务消息,最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。
更进一步的方案是使用RocketMQ提供的延迟消息机制。

往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。如果没有支付,则再发一个延迟1分钟的消息。最终在第10个消息时把订单回收,这个方案就不用对全部的订单表进行扫描,只需要每次处理一个单独的订单消息。

如果使用事务消息,可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态,在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单,只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

5.事务消息机制的作用

在订单这个场景下,消息不丢失的问题实际上转化成了下单业务与下游服务的业务的分布式事务一致性问题。事务一致性问题一直以来都是一个非常复杂的问题,RocketMQ的事务消息机制实际上只保证了整个事务消息的一半,即订单系统下单和发消息这两个事件的事务一致性,对下游服务的事务并没有保证。但是即便如此,也是目前业内最好的降级方案。

RocketMQ配置同步刷盘+Dledger主从架构保证MQ自身不会丢消息

  1. 同步刷盘
    简单的把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。
  2. Dledger的文件同步
    在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。

数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。

  1. Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,接着通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
  2. Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
  3. Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样就基于Raft协议完成了两阶段的数据同步。

3 消费者端不要使用异步消费机制

通常情况下,消费者端需要先处理本地事务,再给MQ一个ACK响应。此时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是需要注意在下面这种情况异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能:

DefaultMQPushConsumer consumer = new 
        DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        new Thread(() -> {
            //处理业务逻辑
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);[
        };
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

4.RocketMQ特有的问题,NameServer挂了如何保证消息不丢失?

NameServer在RocketMQ中,是扮演的一个路由中心的角色,提供到Broker的路由功能,集群中任意多的节点挂掉,都不会影响他提供的路由功能。路由中心功能在所有的MQ中都是需要的。
● kafka是用zookeeper和一个作为Controller的Broker一起来提供路由服务,整个功能是相当复杂纠结的。
● RabbitMQ是由每一个Broker来提供路由服务。
● RocketMQ把这个路由中心单独抽取了出来,并独立部署。
如果集群中所有的NameServer节点都挂了,很多人认为在生产者和消费者中都会有全部路由信息的缓存副本,整个服务可以正常工作一段时间。其实可以测试发现当NameServer全部挂了后,生产者和消费者是立即就无法工作了的。

在这种情况下,RocketMQ相当于整个服务都不可用了,无法保证消息不丢失。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,就另外使用Redis把订单消息缓存下来,起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。在大型互联网项目中,整个这套降级的机制都是必要的。

5.RocketMQ消息零丢失方案总结

整套的消息零丢失方案,在各个环节都大量的降低了系统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远远大于部分消息丢失的代价,方案如下:
● 生产者使用事务消息机制。
● Broker配置同步刷盘+Dledger主从架构
● 消费者不要使用异步消费。
● 整个MQ挂了之后准备降级方案

我们在设计RocketMQ使用方案时,要根据实际的业务情况来考虑。例如:
● 针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。
● 针对消息可靠性要求没有那么高的场景,在生产者端就可以采用一些更简单的方案来提升吞吐,定时对账补偿的机制来提高消息的可靠性。
● 针对消费者不需要进行消息存盘的场景,可以使用异步消费的机制来提升性能。

如何保证消息有序?

MQ的顺序问题分为全局有序和局部有序。


●全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
●局部有序:只保证一部分关键消息的消费顺序。

在通常MQ业务场景中,只需要能够保证局部有序,全局有序可以压缩成局部有序问题。例如常用的聊天室就是个典型的需要保证消息全局有序的场景。我们可以改成整个系统只有一个聊天通道,就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。

发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,消费者也就同样需要从多个MessageQueue上消费消息。MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下是无法保证消息全局有序的。

对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。
RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue,这样就可以保证一组有序的消息能够发到同一个MessageQueue里。

另外,保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用基本上就没有使用MQ的必要了。

使用RocketMQ如何快速处理积压消息?

1.如何确定RocketMQ有大量的消息积压?

在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。例如某天一个数据库突然挂了把数据库恢复过来,或者网络波动等情况,这时就会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的,需要时间关注。

  1. 最简单的方式就是使用web控制台可以直接看到消息的积压情况,通过Consumer管理按钮实时看到消息的积压情况。
  2. 通过mqadmin指令在后台检查各个Topic的消息延迟情况。
  3. RocketMQ会在"${storePathRootDir}/config" 目录下的json文件也可以用来跟踪消息积压情况。

2.如何处理大量积压的消息?

1.如果Topic下的MessageQueue配置得足够多,每个Consumer实际上会分配多个MessageQueue来进行消费,可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。

2.如果Topic下的MessageQueue配置得不够多,就不能使用上面这种增加Consumer节点个数的方法。我们可以创建一个新的Topic,配置足够多MessageQueue,然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中。在新的Topic上,就可以通过增加消费者个数来提高消费速度了,之后再根据情况恢复成正常情况。

RocketMQ的消息轨迹

  1. RocketMQ消息轨迹数据的关键属性
  2. 消息轨迹配置
    默认关闭,打开消息轨迹功需要在broker.conf中配置:traceTopicEnable=true。
  3. 消息轨迹数据存储
    默认情况下,消息轨迹数据是存于一个系统级别的

Topic:RMQ_SYS_TRACE_TOPIC。

这个Topic在Broker节点启动时,会自动创建出来。另外也支持客户端自定义轨迹数据存储的Topic。

在客户端的两个核心对象 DefaultMQProducer和DefaultMQPushConsumer,他们的构造函数中,都有两个可选的参数来打开消息轨迹存储

●enableMsgTrace:是否打开消息轨迹。默认是false。
●customizedTraceTopic:配置将消息轨迹数据存储到用户指定的Topic 。

最后修改:2022 年 10 月 13 日
如果觉得我的文章对你有用,请随意赞赏