🌟 后端 | RabbitMQ 基础



同步与异步

微服务注册与发现后,相互调用可以采用之前提到的 OpenFeign 。但是 OpenFeign 调用发起后需要等待服务返回结果后才能继续执行后面的业务,也就是所说的 同步调用 ,为了不阻断后边的程序,我们通常会使用到 异步通信 ,例如前端中的方法前边加 async 。

同步通讯面临的 问题 有:扩展性差,阻塞导致性能下降,级联失败导致整个事务回滚。

异步通讯可以实现 操作挂起 ,不影响其他流程。比如我们在淘宝完成一笔交易,采用异步通讯可以同时调用多个服务:

  1. 用户服务 - 余额减扣
  2. 订单服务 - 更新订单状态
  3. 通知服务 - 聊天窗通知用户下单成功
  4. 卖家服务 - 商品被下单通知发货
  5. 积分服务 - 购买累加积分
  6. 物流服务 - 更新物流状态页

以上服务均可异步调用,自行完成,而整个下单环节不应通知消息发不了而足赛住不动导致失败退还金额,哪个步骤出问题了,由对应步骤处理而不影响全流程。

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可。

不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,大大提高了业务性能。

RabbitMQ

docker 安装

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management

两个端口:

  • 15672:RabbitMQ 提供的管理控制台的端口
  • 5672:RabbitMQ 的消息发送处理接口

MQ 的架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
              ————————————————————————————————————————
| —————- |
| ————————— |queue| ↘️ ————————
| |exchange| ↗️ —————- | | consumer|
—————————— ↗️ ————————— ————— | ————————
Publisher | | |queue| |
—————————— ↘️ ————————— ↗️ ————— ↘️
| |exchange| ————— | ————————
| ————————— ➡️ |quene| ➡️ |consumer|
| ————— | ————————
| |————
| VirtualHost 1 | |
————————————————————————————————————————— |
| VirtualHost 2 |
—————————————————————————————————————————
RabbitMQ server Broker
  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

网页操作

  1. RabbitMQ 用 docker run 后,访问 ip + 15672 就可以进入管理控制台,使用指定的密码登录

  2. 进入 Queues 菜单可以创建一个队列 hello.queue1,创建一个队列 hello.queue2

  3. 进入 Exchanges 菜单 - amq.fanout 交换机 - Bindings 菜单 - 绑定上边创建的两个队列
  4. 通过 Exchange 页面的交换机 amq.fanout ,可以发送消息
  5. 在 Queues 页面可以发现创建的两个队列均收到 1 条消息
  6. 如有消费者监听 MQ 这两个队列,将收到消息通知

对于公司有多个不同项目的情况,通常会共同使用一套 MQ 集群,这时 MQ 的 virtual host 会提供隔离的功能。
只需要给每个项目创建独立的运维账号,将管理权限隔离;给每个项目下创建不同的 virtual host ,将每个项目的数据隔离。

Spring AMQP

RabbitMQ 采用了 AMQP 协议,因此它具备跨语言的特性。任何语言只要遵循 AMQP 协议收发消息,都可以与RabbitMQ 交互。并且 RabbitMQ 官方也提供了各种不同语言的客户端。

Spring 的官方刚好基于 RabbitMQ 提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于 SpringBoot 对其实现了自动装配,使用起来非常方便。

SpringAMQP 提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了 RabbitTemplate 工具,用于发送消息

项目配置依赖

在父工程配置依赖,子工程包含两个模块 publisher 和 consumer ,均可以使用 SpringAMQP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

消息发送

publisher 配置 mq 地址

首先配置 MQ 地址,在 publisher 服务的 application.yml 中添加配置

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

消息发送类 SpringAmqpTest

publisher 服务中编写测试类 SpringAmqpTest,并利用 RabbitTemplate 实现消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.itheima.publisher.amqp;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}

消息接收

consumer 配置 mq 地址

在 consumer 服务的 application.yml 中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

消息监听类 SpringRabbitListener

然后在 consumer 服务的 com.itheima.consumer.listener 包中新建一个类 SpringRabbitListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.itheima.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
// 利用 RabbitListener 来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}

启动 consumer 服务,然后在 publisher 服务中运行测试代码,发送 MQ 消息。最终 consumer 收到消息

WorkQueues 模型

任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息。消息处理的速度就能大大提高了。

配置服务消费能力,这样使得处理快的消费者可以能者多劳,充分利用到消费者能力,有效避免消息积压。

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机类型

Exchange(交换机)只负责接收生产者的消息并转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失。

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于 RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与 Direct 类似,只不过 RoutingKey 可以使用通配符
  • Headers:头匹配,基于 MQ 的消息头匹配,用的较少。

Fanout 交换机

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

Direct 交换机

  • 队列与交换机的绑定,不是任意绑定,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key完全一致,才会接收到消息

例如,创建两个队列 direct.queue1 和 direct.queue2, 分别绑定 routing key:

To Routing key Arguments
direct.queue1 blue /
direct.queue1 red /
direct.queue2 red /
direct.queue2 yellow /

消息接收

在 consumer 服务的 SpringRabbitListener 中添加方法

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送

在 publisher 服务的 SpringAmqpTest 类中添加测试方法

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message1 = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message1);
// 换个 key
String message2 = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
rabbitTemplate.convertAndSend(exchangeName, "blue", message2);
}

测试发送 red 的 routing key,所以 message1 将被发到 direct.queue1 和 direct.queue2
测试发送 blue 的 routing key,所以 message2 将被发到 direct.queue1

Topic 交换机

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。
只不过 Topic 类型 Exchange 可以让队列在绑定 BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配 item.spu.insert 或者 item.spu
  • item.*:只能匹配 item.spu 到第二层截止

假如此时 publisher 发送的消息使用的 RoutingKey 共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

现在有一个消费者订阅 china 的消息,另一个消费者订阅 new 相关的消息:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

声明队列和交换机

程序启动时检查队列和交换机是否存在,如果不存在自动创建。

SpringAMQP 提供了一个 Queue 类,用来创建队列:

1
2
3
public class Queue extends AbstractDeclarable implements Cloneable {

}

SpringAMQP 提供了 ExchangeBuilder 来表示所有不同类型的交换机.

绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象.

fanout 声明

在consumer中创建一个类,声明队列和交换机.创建 hmall.fanout 交换机;创建 fanout.queue1 和 fanout.queue2 两个队列,接着分别把两个队列绑定到交换机上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}

/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}

/**
* 绑定队列到交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}

/**
* 绑定队列到交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

direct 声明

direct 模式由于要绑定多个 KEY,会非常麻烦,每一个 Key 都要编写一个 binding:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}

/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}

/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}

基于注解声明

基于 @Bean 的方式声明队列和交换机比较麻烦,Spring 还提供了基于注解方式来声明。这样可以直接将队列名、交换机、routing key 同时传入进行注册监听(消费者)

Direct 注解声明

例如,我们同样声明 Direct 模式的交换机和队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic 注解声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

消息转换器

Spring 的消息发送代码接收的消息体是一个 Object,在数据传输时会将消息序列化为字节发送给MQ, 接收消息时则把字节反序列化为 Java 对象。 Spring 采用序列化方式是 JDK 序列化。然而 JDK 序列化存在 数据体积过大、有安全漏洞、可读性差。

所以我们需要配置 JSON 转换器做序列化和反序列化

在 publisher 和 consumer 两个服务中都引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

配置消息转换器,在 publisher 和 consumer 两个服务的启动类中添加一个 Bean 即可:

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}

发送消息

publisher 模块的 SpringAmqpTest 中新增一个消息发送的代码,发送一个 Map 对象

1
2
3
4
5
6
7
8
9
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "柳岩");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}

接收 Object

1
2
3
4
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}