🌟 后端 | RabbitMQ 消息的可靠性



本章关于 MQ 消息可靠性的介绍
This chapter is the introduce about the reliability of MQ messages.

发送者的可靠性 reliability of Publisher

生产者到消费者可能导致消息丢失:
It may lead to losing the messages from the publisher to the consumers.

  • 发送消息时丢失:
    • 生产者发送消息时连接 MQ 失败
    • 生产者发送消息到达 MQ 后未找到 Exchange
    • 生产者发送消息到达 Exchange 后,未找到合适的 Queue
    • 消息到达 MQ 后,处理消息的进程发生异常
  • MQ 导致消息丢失:
    • 消息到达 MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

that is what we need to handle it by 3 steps if we want to fix the problems of message missing and ensure the reliability of MQ.

  • ensure that the publisher sends the messages to MQ.
  • ensure that MQ will not lost messages.
  • ensure that the consumer handles the messages.

生产者重试机制 retry mechanism

Sometimes messages might not be processed correctly or might fail. To deal with this, we can set up mechanisms like retries.

SpringAMQP provides retry mechanism for sending message. So if RabbitTemplate times out when trying to connect to the MQ, it will automatically retry multiple times.

modify ‘application.yaml’ in the module of publisher:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

生产者确认机制 Publisher Acknowledgements

Message ACK(acknowledgements) help make sure that a message is actually processed. After the publisher sends the message to MQ, MQ will send ack/nack back to the publisher to inform the message’s situation.

RabbitMQ 提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执。

  • 当消息投递到 MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时返回 ack 的确认信息,代表投递成功
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
  • 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
  • 其它情况都会返回 NACK,告知投递失败

其中 ack 和 nack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。
而 return 则属于 Publisher Return 机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

配置生产者确认 configure publisher Acknowledgement

生产者重试是在 spring 配置 rabbitmq 下的 template,而消费者这里是配置 listener

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

定义 ReturnCallback

每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// config/MqConfig
package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}

定义 ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此 ConfirmCallback 需要在每次发消息时定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ 的可靠性 reliability of MQ

消息到达 MQ 后,MQ 不能及时保存,也会导致消息丢失。

数据持久化

为了提升性能,默认情况下 MQ 的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化

交换机持久化

在控制台的 Exchanges 页面,添加交换机时可以配置交换机的 Durability 参数

队列持久化

在控制台的 Queues 页面,添加队列时,同样可以配置队列的 Durability 参数

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个 properties,delivery mode 设置为 persistent

LazyQueue

RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个过程会阻塞队列进程, RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。

Lazy Queues 的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

mq控制台中,在添加队列的时候,添加 x-queue-mod=lazy 参数即可设置队列为 Lazy 模式

or 在代码中:

1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}

或基于注解来声明队列并设置为Lazy模式:

1
2
3
4
5
6
7
8
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

消费者的可靠性 reliability of consumer

消费者确认机制 Consumer confirmation mechanism

When a consumer receives a message, it sends an ACK back to RabbitMQ.If RabbitMQ doesn’t get an ACK, it knows the message wasn’t processed and can resent it to another consumer or retry.

RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过 try catch 机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.

消息回执的处理代码比较统一,SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回 nack;
    • 如果是消息处理或校验异常,自动返回 reject;
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理

acknowledge-mode: none

模拟一个消费者处理消息异常,可发现消息会被 RabbitMQ 删除。

1
2
3
4
5
6
7
8
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}

acknowledge-mode: auto

按照上边的模拟异常,依旧会获得 reject ,而消息被删除.

将异常换为运行时异常,如打上断点,则会发现消息状态变成 unacked(未确认),放行后,会抛出业务异常,Spring 返回 nack,最终消息恢复至 Ready 状态,并且没有被 RabbitMQ 删除

1
2
3
4
5
6
7
8
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息处理完成");
}

失败重试机制 failure retry mechanism

生产者重试是在 spring 配置 rabbitmq 下的 template,而消费者这里是配置 listener

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

本地重试 3 次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是 reject

失败处理策略 failure processing strategy

上边配置 listener 最大重试数后,超出 3 次消息会直接被删除,且返回 reject。

Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由 MessageRecovery 接口来定义的,它有 3 个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

RepublishMessageRecoverer 可以让我们更换交换机:

  1. 在 consumer 服务中定义处理失败消息的交换机和队列
  2. 定义 RepublishMessageRecoverer 关联队列和交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct"); // 定义交换机
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true); // 定义队列
}
@Bean // 绑定队列和交换机
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

@Bean // 把新的队列、交换机关联至之前的消息 —— 覆盖
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

业务幂等性

幂等指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

因此实际业务场景中,MQ 消息也可能重复被投递。为保证消息处理的幂等性,可以用:

  • 唯一消息 ID
  • 业务状态判断

唯一消息 ID

SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}

业务判断

处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。简言之就是对于待处理的业务,增加指定状态字段判断,对于符合要求的字段状态则继续处理,不符合字段则不执行。