🌟 后端 | RabbitMQ 实现延迟消息



延迟消息

在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。

在 RabbitMQ 中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

死信交换机 dead-letter

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过 dead-letter-exchange 属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。

死信交换机的作用:

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

延迟消息

通过发送消息到指定交换机(fanout),设置 routingkey 和消息有效期(eg: 5000ms),消息投递到指定队列(queue),但该队列没有消费者,因此 5s 之后消息过期成为死信,接着消息被投递到死信交换机,且沿用之前的 routingkey,而该死信交换机下有某个队列对应的 bindingkey 和该消息的 routingkey 相同,因此落入死信交换机下的此队列,如果有消费者绑定了该队列,那么这条消息也会被成功消费。此时刚好过了 5s,由此我们实现了 5s 的延迟消息。

Publishing the message to the specific exchange(fanout) configured with routing key and message valid peroid. The message will be published to the specific queue without the consumer. So the message will become a dead letter after 5s and then will be published to dead-letter exchange using the former routing key. The queue in dead letter exchange have the same routing key with the message. So the message will fall into this queue and will be consumed successfully if the queue has a bound the consumer. After of that,exactly 5s have passed. So wo achieve the 5s delayed message.

DelayExchange 插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件来实现相同的效果。

install the extension

1
docker volume inspect mq-plugins
1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

declare the delay exchange with annotations

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

declare the delay exchange with @Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}

@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}

@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}

send the delayed message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}