4 RabbitMQ 进阶

felix.shao2025-02-18

4 RabbitMQ 进阶

TIP

 本小节主要介绍以下知识:

  • 一些更具特色的细节和功能。

概述

 略。

消息何去何从

 mandatory 和 immediate 是 channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返给给生产者的功能。RabbitMQ 提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

mandatory 参数

 当mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形,则消息直接被丢弃。
 生产者可以通过调用channel.addReturnListener来添加 ReturnListener 监听器来获取没有被正确路由到合适队列的消息。
 使用 mandatory 参数的关键代码如下所示:

channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange,
                                String routingKey, AMQP.BasicProperties properties,
                                byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("Basic.Return 返回的结果是:" + message);
    }
});

 从 AMQP 协议层面来说,其对应的流转过程如下图所示:

uml diagram

immediate 参数

 当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。
 官方不建议使用,这里就不写源码展开了。

TIP

  • mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。
  • immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
     RabbitMQ 3.0 版本开始去掉了 immediate 参数的支持,对此 RabbitMQ 官方解释是:immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替代。

备份交换器

 备份交换器,英文名称为 Alternate Exchange ,简称 AE,或者更直白地称之为“备胎交换器”。
 使用 mandatory 参数的话,代码会比较复杂,如果既不想复杂化的生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再去需要的时候去处理这些消息。
 可以通过在声明交换器的时候添加alternate-exchange参数来实现,也可以通过策略(Policy,后续章节会介绍)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置。
 使用alternate-exchange参数的关键代码如下所示:

Map<String, Object> params = new HashMap<>();
params.put("alternate-exchange", "myAe");
// 声明两个交换器 normalExchange 和 myAe
channel.exchangeDeclare("normalExchange", "direct", true, false, params);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
// 声明 normalQueue 并绑定 normalExchange
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
// 声明 unroutedQueue 并绑定 myAe
channel.queueDeclare("unroutedQueue", true, false, false, null);
channel.queueBind("unroutedQueue", "myAe", "");
// 发送消息会经过 normalExchange-> normalQueue
channel.basicPublish("normalExchange", "normalKey", true,
        MessageProperties.PERSISTENT_TEXT_PLAIN, "testAe body".getBytes());
// 发送消息会经过 normalExchange->myAe->unroutedQueue
channel.basicPublish("normalExchange", "errorKey", true,
        MessageProperties.PERSISTENT_TEXT_PLAIN, "testAe body".getBytes());

 同样,如果采用 Policy 的方式来设置备份交换器,可以参考如下 $ rabbitmqctl set_policy AE "^normalExchange$" '{"alternate-exchange":"myAE"}'

 备份交换器和普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型,也可以设置为其他类型(设置为direct时,key不匹配照样会丢失消息)。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

TIP

 对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务器都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时销毁会丢失。
  • 如果备份交换器和 mandatoty 参数一起使用,那么 mandatory 参数无效。

过期时间(TTL)

 TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列设置 TTL。

设置消息的 TTL

 目前有两种方法可以设置消息的 TTL。

  1. 通过队列属性设置(channel.queueDeclare),队列中所有消息都有相同的过期时间。
  2. 对消息本身进行单独设置,每条消息的 TTL 可以不同。

 如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成"死信",消费者将无法再收到该消息(不是绝对的)。
 使用x-message-ttl参数的关键代码如下所示:

Map<String, Object> queueParams = new HashMap<>();
queueParams.put("x-message-ttl", 60000);
channel.queueDeclare("ttlQueue", true, false, false, queueParams);

 同时,如果采用 Policy 的方式来设置 TTL,可以参考如下
$ rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

 还可以通过调用 HTTP API 接口设置:

$ curl -i -u root:root -H "content-type:application/json" -X PUT 
-d '{"auto_delete":false, "durable":true, "arguments": {"x-message-ttl":60000}}'  
http://localhost:15672/api/queues/{vhost}/{queuename}

 如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递给消费者,否则该消息会被立即丢弃。
 针对每条消息设置 TTL 的方法是在channel.basicPublish方法中假如 expiration 的属性参数,单位为毫秒。
 每条消息设置 TTL 的关键代码如下所示:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 持久化消息
builder.expiration("60000"); //设置 TTL = 60000ms
AMQP.BasicProperties props = builder.build();
channel.basicPublish("ttlExchange", "ttlKey", true, props, "testTtl body".getBytes());

 还可以通过 HTTP API 接口设置,具体配置略。

TIP

 第一种设置队列 TTL 属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。
 为什么这两种方法处理的方式不一样?
 因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

设置队列的 TTL

 通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。
 RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在 RabbitMQ 重启后,持久化的队列的过期时间会被重新计算。
 用于表示过期时间的 x-expires 参数以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,不过不能设置为0。
 设置队列 TTL 的关键代码如下所示:

Map<String, Object> queueParams = new HashMap<>();
queueParams.put("x-expires", 600000); //设置队列的ttl
channel.queueDeclare("ttlQueue2", true, false, false, queueParams);

死信队列

 DLX,全称为 Dead-Letter-Exchange,可以称之为私信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
 消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;
  • 消息过期;
  • 消息达到最大长度。

 DLX 也是一个正常的交换器,和一般的交换器没有区别(设置某个队列的属性指定其为死信队列)。当这个队列中存在死信时,RabbitMQ 会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理,这个特性与将消息的 TTL 设置为0配合使用可以弥补 immediate 参数的功能。
 设置死信队列的关键代码如下所示:

channel.exchangeDeclare("dlxExchange", "direct");
Map<String, Object> queueParams = new HashMap<>();
queueParams.put("x-dead-letter-exchange", "dlxExchange");
// 也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键
// queueParams.put("x-dead-letter-routing-key", "dlx-routing-key");
channel.queueDeclare("dlxQueue", false, false, false, queueParams);

 对应的 Policy 方式设置:$ rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlxExchange"}' --apply-to queues

 TTL + DLX 示例代码如下:

/**
 * 生产者首先发送一条路由键为“rk”的消息,然后经过交换器e xchange.normal 顺利地存储到对立 queue.normal 中。
 * 由于队列 queue.normal 设置了过期时间 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息为过期。
 * 由于设置了 SLX,过期之时,消息被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx,
 * 最后消息被存储在 queue.dlx 这个死信队列中。
 */
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
// 队列设置 ttl dlx
Map<String, Object> queueParams = new HashMap<>();
queueParams.put("x-message-ttl", 10000);
queueParams.put("x-dead-letter-exchange", "exchange.dlx");
queueParams.put("x-dead-letter-routing-key", "routingKey");
channel.queueDeclare("queue.normal", true, false, false, queueParams);
channel.queueBind("queue.normal", "exchange.normal", "");

channel.queueDeclare("queue.dlx", true, false, false, queueParams);
channel.queueBind("queue.dlx", "exchange.dlx", "routingKey");

channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx body".getBytes());

 DLX 配合 TTL 使用还可以实现延迟队列的功能。

延迟队列

 延迟队列存储的对下是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能,即如上"TTL+DLX"示例代码。
 在真实应用中,对于延迟队列可以根据延迟时间的长短可分多个等级,一般分为 5 秒、10 秒、30 秒、1 分钟、5 分钟、10 分钟、30 分钟、1 小时这几个维度。
 延迟队列示意图如下: delay_queue.jpg

优先级队列

 优先级队列:具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
 可以通过设置队列的 x-max-priority 参数来实现。
 核心代码如下:

Map<String, Object> queueParams = new HashMap<>();
//设置最大优先级
queueParams.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, queueParams);

channel.queueBind("queue.priority", "exchange.priority", "priorityKey");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 设置消息的优先级为 5,最低优先级默认为 0,最高为队列设置的最大优先级
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.priority", "priorityKey", properties, "test priority body".getBytes());

 优先级高的消息可以被优先消费,但是如果在消费者的消费速度大于生产者的速度,且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为刚发送完一条消息就被消费者消费了。

RPC 实现

 一般在 RabbitMQ 中做 RPC 是很简单的。客户端发送请求消息,服务器回复响应的消息。为了接受响应的消息,我们需要在请求消息中发送一个回调队列。可以使用默认的队列。
 以下是客户端 RPC 相关代码:

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue",props,message.getBytes());
// then code to read a response message from the callback_queue...

 其中属性:

  • deliveryMode: 标记消息传递模式,2-消息持久化,其他值-瞬态。
  • contentType:内容类型,用于描述编码的 mime-type. 例如经常为该属性设置 JSON 编码。
  • replyTo:应答,通用的回调队列名称,
  • correlationId:关联 ID,方便 RPC 相应与请求关联。

 在上述方法中为每个 RPC 请求创建一个回调队列。这是很低效的。幸运的是,一个解决方案:可以为每个客户端创建一个单一的回调队列。
 新的问题被提出,对于回调队列而言,在其接收到一条回复的消息之后,它并不知道这条消息应该和哪一个请求匹配。这里就用到 correlationId 这个属性了,我们应该为每一个请求设置唯一的 correlationId。之后在回调队列接收到回复的消息时,可以根据这个属性匹配到相应的请求。如果回调队列接收到一条未知 correlationId 的回复消息,可以简单地将其丢弃。
 你有可能会问,为什么要忽略回调队列中未知的信息,而不是当作一个失败?这是由于在服务器端竞争条件的导致的。虽然不太可能,但是如果RPC服务器在发送给我们结果后,发送请求反馈前就挂掉了,这有可能会发送未知 correlationId 属性值的消息。如果发生了这种情况,重启 RPC 服务器将会重新处理该请求。这就是为什么在客户端必须很好的处理重复响应,RPC 应该是幂等的。
RPC 示意图如下:
rpc_signal.jpg

 RPC 的处理流程:

  1. 当客户端启动时,创建一个匿名的回调队列。
  2. 客户端为 RPC 请求设置 2 个属性:replyTo,设置回调队列名字;correlationId,标记 request。
  3. 请求被发送到 rpc_queue 队列中。
  4. RPC 服务器端监听 rpc_queue 队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是 replyTo 设定的回调队列。
  5. 客户端监听回调队列,当有消息时,检查 correlationId 属性,如果与 request 中匹配,那就是结果了。

 参考 RabbitMQ RPC 官网例子的 RPC 代码略,可见全文示例代码。

持久化

 持久化可以提高 RabbitMQ 的可靠性,以防在异常情况(重启、关闭、沓机等)下的数据丢失。RabbitMQ 的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
 交换器的持久化是通过在声明交换器时将 durable 参数设置为 true 实现的。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失了,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,将以将其置为持久化的。
 队列的持久化是通过在声明队列时将 durable 参数设置为 true 实现的。如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
 队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式设置为2即可实现消息的持久化。前面示例中多次提及的MessageProperties.PERSISTENT_TEXT_PLAIN实际上是封装了这个属性。

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                null,
                null,
                2,
                0, null, null, null,
                null, null, null, null,
                null, null);

 设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列丢失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

TIP

 可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

 将 queue,exchange, message 等都设置了持久化之后就能保证100%保证数据不丢失了吗?答案是否定的。

  1. 从 consumer 端来说,如果这时a utoAck = true,那么当 consumer 接收到相关消息之后,还没来得及处理就沓机(crash)掉了,那么这样也算数据丢失,这种情况也好只需将 autoAck 设置为 false (方法定义如下),然后在正确处理完消息之后进行手动 ack(channel.basicAck)。
  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ 并不是为每条消息都做同步存盘(fsync)的可能仅仅保存到 cache 中而不是物理磁盘上,在这段时间内 RabbitMQ broker 发生 crash, 消息保存到 cache 但是还没来得及落盘,那么这些消息将会丢失。那么这个怎么?这里可以引入 RabbitMQ 的 mirrored-queue 即镜像队列,这个相当于配置了副本,当 master 在此特殊时间内 crash 掉,可以自动切换到 slave,这样有效的保障了 HA, 除非整个集群都挂掉,这样也不能完全的 100% 保障 RabbitMQ 不丢消息,但比没有 mirrored-queue 的要好很多,很多现实生产环境下都是配置了 mirrored-queue 的。
  3. 还可以在发送端(producer)引入事务机制或者确认(Confirm)机制来确保消息已经正确的发送至 broker 端,前提还要保证在 channel.basicPublish 方法的时候交换器能够将消息正确路由到相应的队列之中。详细可参考下一节。

生产者确认

 默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
 RabbitMQ 针对这个问题,提供了两种解决方式:

  • 通过事务机制实现;
  • 通过发送方确认机制(publisher confirm)实现。

事务机制

 RabbitMQ 客户端中与事务机制相关的方法有三个:

  • channel.txSelect:将当前的信道设置成事务模式;
  • channel.txCommit:提交事务;
  • channel.txRollback:事务回滚。

 在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ 一次崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。
 事务提交代码如下:

channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, 
        MessageProperties.PERSISTENT_TEXT_PLAIN, "test tran body".getBytes());
channel.txCommit();

 上面代码对应的 AMQP 协议转让流程如下图所示:

uml diagram

 事务回滚代码如下:

try {
        channel.txSelect();
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, "test tran body".getBytes());
        int result = 1 / 0;
        channel.txCommit();
} catch (Exception e) {
        e.printStackTrace();
        channel.txRollback();
}

 上面代码对应的 AMQP 协议转让流程如下图所示:

uml diagram

 发送多条消息代码如下:

channel.txSelect();
for (int i = 0; i < 5; i++) {
        try {
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, MessageProperties.PERSISTENT_TEXT_PLAIN, "test tran body".getBytes());
        channel.txCommit();
        } catch (Exception e) {
        e.printStackTrace();
        channel.txRollback();
        }
}

发送方确认机制

 使用事务机制会严重降低 RabbitMQ 的消息吞吐量,RabbitMQ 提供了一个轻量级的改进方案,即发送方确认机制(publisher confirm)。
 生产者将信道设置成 confirm (确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID (从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。注意识别这里的确认和消费时的确认之间的异同。
producer_ack.jpg
 事务机制在一条消息发送之后会使发送端阻塞,相比之下,发送方确认机制最大的好处在于它是异步的,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack (Basic.Nack)命令,生产者应用的程序同样可以在回调方法中处理该 nack 命令。
 生产者通过调用 channel.confirmSelect 方法(即Confirm.Select命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示统一生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息被 ack 又被 nack 的情况,并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证。
 以下是 publisher confirm 机制简要示例代码:

channel.confirmSelect(); // 将信道设置为 publisher confirm 模式
for (int i = 0; i < 2; i++) {
        try {
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,
                        MessageProperties.PERSISTENT_TEXT_PLAIN, "test confirm body".getBytes());
                if(!channel.waitForConfirms()){
                        System.out.println("send message failed");
                }
        } catch (Exception e) {
                e.printStackTrace();
        }
}

 在 publisher confirm 模式下发送多条消息的 AMQP 协议流转过程如下图所示:

uml diagram

 对于 channel.waitForConfirms () 而言,在 RabbitMQ 客户端中它有 4 个同类的方法:

/**
 *  客户端收到了相应的 Basic.Ack/.Nack 或者被中断
 */
boolean waitForConfirms() throws InterruptedException;

/**
 *  一旦等待 RabbitMQ 回应超时就会抛出 java.util.concurrent.TimeOutException 的异常。
 */
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

/**
 *  接收到 RabbitMQ 返回的 Basic.Nack 之后会抛出 java.io.IOException
 */
void waitForConfirmsOrDie() throws IOException, InterruptedException;

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException

 如果信道没有开启 publisher confirm 模式,则调用 waitForConfirms 方法都会爆出 java.lang.IllegalStateException。

TIP

 发送方确认机制的 QPS 比事务机制略快。  发送方确认机制也是一种串行同步等待的方式。在同步等待的状态下,publisher confirm 机制发送一条消息需要通信交互的命令是2条:Basic.Publish 和 Basic.Ack;事务机制是3条:Basic.Publish、Tx.Commit/.Commit-OK(或者Tx.Rollback/.Rollback-ok),事务机制多了一个命令帧报文的交互,所以 QPS 会略微下降。

 注意要点:

  • 事务机制和 publisher confirm 机制两者是互斥的,不能共存。
  • 事务机制和 publisher confirm 机制确保的是消息能够正确地发送到 RabbitMQ。即消息被正确地发送至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。简而言之:发送方要配合 mandatory 参数或者备份交换器一起使用来提供消息传输的可靠性。

 publisher confirm 的优势在于并不一定需要同步确认。以下是两种改进方式:

  • 批量 confirm 方法:每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回。
  • 异步 confirm 方法:提供一个回调方法,服务器确认了一条或者多条消息后客户端会回调这个方法进行处理。

批量 confirm 方法
 客户端程序需要定期或者定量(达到多少条,亦或者两者结合起来调用 channel.waitForConfirms 来等待 RabbitMQ 的确认返回)。相比于普通 confirm 方法,极大地提升了 confirm 的效率,但是问题在于出现返回Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能应该是不升反降的。

 批量 onfirm 方法的示例代码如下所示:

try {
        channel.confirmSelect(); // 将信道设置为 publisher confirm 模式
        int msgCount = 0;
        while (true) {
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,
                        MessageProperties.PERSISTENT_TEXT_PLAIN, "test batch confirm body".getBytes());
                // 将发送出去的消息存入缓存中,缓存可以是一个 ArrayList 或者 BolockingQueue
                if (++msgCount >= 10) {
                        msgCount = 0;
                        try {
                                if (channel.waitForConfirms()) {
                                        // 将缓冲中的消息清空
                                        Thread.sleep(5000L);
                                        continue;
                                }
                        } catch (InterruptedException e) {
                                e.printStackTrace();    
                                // 将缓存中的消息清空
                        } catch (Exception e) {
                                e.printStackTrace();
                                // 将缓存中的消息清空
                        }
                }
        }
} catch (IOException e) {
        e.printStackTrace();
}

异步 confirm 方法
 最为复杂。在客户端 Channel 接口中提供的 addConfirmListener 方法可以添加 ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法: handleAck 和 handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。 在这两个方法中都包含有一个参数 deliveryTag (在 publisher confirm 模式下用来标记消息的唯一有序序号)。我们需要为每一个信道维护一个“unconfirm”的消息序号集合,每发送一条消息,集合中的元素加 1。每当调用 ConfirmListener 中的 handleAck 方法时,“unconfirm”集合中删掉相应的一条(multiple设置为false)或者多条(multiple设置为true)记录。从程序运行效率上来看,这个“unconfirm”集合最好采用有序集合 SortedSet 的存储结构。事实上,Java 客户端 SDK中的 waitForConfirms 方法也是通过 SortedSet 维护消息序号的。

 异步 confirm 方法的示例代码如下所示,其中的 ConfirmSet 就是一个 SortedSet 类型的集合:

channel.confirmSelect(); // 将信道设置为 publisher confirm 模式
channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("ack, SeqNo: " + deliveryTag + ", multiple:" + multiple);
                // 处理消息 集合减 1,如果批量则减掉 tag 前面的所有的消息
                if(multiple){
                        confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                        confirmSet.remove(deliveryTag);
                }
        }

        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple:" + multiple);
                if(multiple){
                        confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                        confirmSet.remove(deliveryTag);
                }
                // 这里需要添加处理消息重发的场景
        }
});
// 下面是演示一直发送消息的场景
int i = 1;
while(i < 1000){
        // 发送 1 条消息,集合 + 1
        long nextSeqNo = channel.getNextPublishSeqNo();
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,
                MessageProperties.PERSISTENT_TEXT_PLAIN, "test confirm aysnc body".getBytes());
        confirmSet.add(nextSeqNo);
        Thread.sleep(5000);
}

 批量 confirm 和异步 confirm 两种方式的性能要比事务机制和普通 confirm 的方式吞吐量好得多。

  • 事务机制和普通 confirm 的方式吞吐量很低,但是编程方式简单,不需要在客户端维护状态(指维护deliveryTag及缓存未确认的消息)。
  • 批量 confirm 方式的问题在于遇到 RabbitMQ 服务器返回 Basic.Nack 需要重发批量消息而导致的性能降低。
  • 异步 confirm 方式编程模型最为复杂,而且和批量 confirm 方式一样需要在客户端维护状态。

 在实际生产环境中采用何种方式,这里就看实际情况了,还是推荐使用异步或者批量 confirm 的方式。

消费者要点介绍

 消费者客户端可以通过推模式或者拉模式的方式来获取并消费消息,当消费者处理完业务逻辑需要手动确认消息已被接收,这样 RabbitMQ 才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。
 还有几点需要注意:

  • 消息分发;
  • 消息顺序性;
  • 弃用 QueueingConsumer。

消息分发

 当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式拓展消息者非常方便。
 轮询分发机制的缺陷:默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给第 m % n 个消费者,RabbitMQ 不管消费者是否消费并已经确认了消息。如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。我们可以用channel.basicQos(int prefetchCount)方法限制信道上的消费者所能保持的最大未确认消息的数量。
 举例说明如下:在订阅消费队列之前,消费端程序调用了 channel.basicQos(5),之后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某几条消息之后,RabbitMQ 将相应的计数减 1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的“滑动窗口”。

TIP

 Basic.Qos 的使用对于拉模式的消费方式无效。

 channel.basicQos有三种类型的重载方法:

void basicQos(int prefetchCount) throws IOException;

void basicQos(int prefetchCount, boolean global) throws IOException;

/**
 * 限制信道上的消费者所能保持的最大未确认消息的数量
 * @param prefetchSize  表示消费者所能接收未确认消息的总体大小的上限,单位为 B,设置为 0 表示没有上限
 * @param prefetchCount 表示消费者所能保持的最大未确认消息的数量,设置为 0 表示没有上限
 * @param global        下面详细介绍
 */
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

 对于一个信道来说,它可以同时消费多个队列,当设置了 prefetchCount 这个参数,当 prefetchCount 大于 0 时,这个信道需要和各个队列协调以确保发送的消息都没有超过所限定的 prefetchCount 的值,这样会使 RabbitMQ 的性能降低,尤其是这些队列分散在集群中的多个 Broker 节点之中。RabbitMQ 为了提升相关的性能,在 AMQP 0-9-1 协议之上重新定义了 global 这个参数,对比如下表所示。

global参数AMQP 0-9-1RabbitMQ
false信道上所有的消费者都需要遵从 prefetchCount 的限定值信道上新的消费者需要遵从prefetchCount的限定值
true当前通信链路(Connection)上所有的消费者都需要遵从 prefetchCount的限定值信道上所有的消费者都需要遵从 prefetchCount 的限定值

 前面介绍的 channel.basicQos 方法的示例都是针对单个消费者的,而对于同一个信道上的多个消费者而言,如果设置了 prefetchCount 的值,那么都会生效。
 以下代码有两个消费者,各自的能接收到的未确认消息的上限都为10。

Channel channel = connection.createChannel();
// 设置客户端最多接收未被 ack 的消息个数,实测每批消费者消费 prefetchCount 数消息
channel.basicQos(10);
Consumer consumer1 = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message1: " + new String(body) + ", consumerTag = " + consumerTag + ", deliveryTag = " + envelope.getDeliveryTag());
                try {
                        TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
        }
};
Consumer consumer2 = ...;
channel.basicConsume(QUEUE_NAME, false, consumer1);
channel.basicConsume(QUEUE_NAME, false, consumer2);

 如果在订阅消息之前,既设置了 global 为 true 的限制,又设置了 global 为 false 的限制,那么哪个会生效呢?RabbitMQ 会确保两者都会生效。  举例说明如下代码:

Channel channel = connection.createChannel();
// 设置客户端最多接收未被 ack 的消息个数,实测每批消费者消费 prefetchCount 数消息
Consumer consumer1 = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message1: " + new String(body) + ", consumerTag = " + consumerTag + ", deliveryTag = " + envelope.getDeliveryTag());
                try {
                        TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
        }
};
channel.basicQos(3, false);
channel.basicQos(2, true);
Consumer consumer2 = ...
channel.basicConsume(QUEUE_NAME, false, consumer1);
channel.basicConsume(QUEUE_NAME, false, consumer2);

 这里每个消费者最多只能收到 3 个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为 5。在未确认消息的情况下,如果 consumer1 接收到了消息 1、2、3,那么 consumer2 至多只能收到 11 和 12。如果 consumer1 接收到了消息 1、2 和 3,那么 consumer2 至多只能收到 11 和 12。如果像这样同时使用两种 global 的模式,则会增加 RabbitMQ 的负载,因为 RabbitMQ 需要更多的资源来协调完成这些限制。如无特殊需要,最好只使用 global 为 false 的设置,这也是默认的配置。

消息顺序性

 消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。
 目前很多资料显示 RabbitMQ 的消息能够保障顺序性,这个是不正确的,或者说这个观点有很大的局限性。在不使用任何 RabbitMQ 的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。
 那么哪些情况下 RabbitMQ 的消息顺序性会被打破呢?以下是几种常见的情形。

  1. 如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。同样,如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ 的 Basic.Nack 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发送的时候开始的
  2. 如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
  3. 如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。
  4. 如果一个队列按照前后顺序分有 msg1、msg2、msg3、msg4 这 4 个消息,同时有 ConsumerA 和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为 msg1 和 msg3,ConsumerB 中的消息为msg2、msg4。ConsumerA 收到消息 msg1 之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msg1 之后被发送到了 ConsumerB 中,此时 ConsumerB 已经消费了 msg2、msg4,之后再消费 msg1,这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 已经消费了 msg3,那么再消费 msg1,消费顺序性也无法得到保障。同样可以用在 Basic.Revocer 这个 AMQP 命令中。

 包括但不仅限于以上几种情形会使 RabbitMQ 消息错序。如果要保证消息的顺序性,需要也无法使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 Sequence ID)来实现。

弃用 QueueingConsumer

 前面介绍的订阅消费的方式都是通过继承 DefaultConsumer 类来实现的。还有 QueueingConsumer 这个类,但是并不建议使用这个类来实现订阅消费。  QueueingConsumer 包括(但不限于)以下一些缺陷:

  • 内存溢出。如果由于某些原因,队列之中堆积了比较多的消息,就可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消化。
     这个内存溢出的问题可以使用 Basic.Qos 来得到有效的解决,Basic.Qos 可以限制某个消费者所保持未确认消息的数量,也就是间接地限制了 QueueingConsumer 中的 LinkedBlockingQueue 的大小。注意一定要在调用 Basic.Consume 之前调用 Basic.Qos 才能生效。
  • QueueingConsumer 会拖累同一个 Connection 下的所有信道,使其性能降低;
  • 同步递归调用 QueueingConsumer 会产生死锁;
  • RabbitMQ 的自动连接恢复机制(automatic connection revovery)不支持 QueueingConsumer 的这种形式;
  • QueueingConsumer 不是事件驱动的。

 为了避免不必要的麻烦,建议在消费的时候尽量使用继承 DefaultConsumer 的方式。

消息传输保障

 消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once:最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

  RabbitMQ 支持其中的“最多一次”和“最少一次”。其中“最少一次”投递实现需要考虑以下这几个方面的内容:

  1. 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

 “最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失。
 “恰好一次”是 RabbitMQ 目前无法保障的。  那么 RabbitMQ 有没有去重的机制来保证“恰好一次”呢?答案是并没有,不仅是 RabbitMQ,目前大多数主流的消息中间件都没有消息去重机制,也不保障“恰好一次”。去重处理一般是在业务端实现,比如引入 GUID(Globally Unique Identifier)的概念。针对 GUID,如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理。

参考文献

  • [RabbitMQ实战指南]
Last Updated 2/18/2025, 5:05:12 PM