RabbitMQ

RabbitMQ学习笔记,内容包括:Rabbit在Linux的部署SpringAMQP的基本操作RabbitMQ的5种工作模型以及配置消息转换器

1.部署RabbitMQ

1.1.下载镜像

1
docker pull rabbitmq:3-management

1.2.创建运行容器

执行下面的命令来运行MQ容器:

–hostname 主机名,单机可不配,集群中需要

-p 15672:15672 图形化管理界面UI的端口

-p 5672:5672 消息通信使用的端口

1
2
3
4
5
6
7
8
9
docker run \
-e RABBITMQ_DEFAULT_USER=zgl \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

通过浏览器访问15672端口来到管理界面,输入账号密码

2.RabbitMQ消息模型

RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

3.SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

以下对消息队列的使用均采用SpringAMQP

4.Basic Queue 基本消息队列

在父工程mq-demo中引入依赖

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.1.消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.222.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: zgl # 用户名
password: 123456 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
// 队列需要提前创建好,因为这里不会自动创建队列
// 就算运行成功,consumer也无法接收到消息
rabbitTemplate.convertAndSend(queueName, message);
}
}

4.2.消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.222.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: zgl # 用户名
password: 123456 # 密码

然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下:

1
2
3
4
5
6
7
8
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}

5.WorkQueue 工作消息队列

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

5.1 消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}

5.2 消息接收

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}

5.3 能者多劳

以上的例子中,消费者1消费消息的速度是消费者2的10倍,但两者确实获取同样数量的消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

这样配置以后,就可以使消费者获取的消息数量和他的处理能力相匹配。

6.发布订阅

6.1 Fanout 广播

向与交换机绑定的所有队列发送消息

消息接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class SpringRabbitListener {
//通过注解的方式,自动生成队列和交换机,并将队列绑定到交换机上
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue1"),
exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg){
System.out.println("FanoutQueue1:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout.queue2"),
exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg){
System.out.println("FanoutQueue2:"+msg);
}

}

启动后可以看到与之前不同的是,交换机和队列已经自动创建了

消息发送

1
2
3
4
5
6
7
8
9
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "fanout.exchange";
// 消息
String message = "fanout.....fanout.....";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}

接收成功,两个队列都得到了消息:

6.2 Direct 路由

发送消息时通过key来匹配指定的队列

消息接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class SpringRabbitListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","yellow"} //只有key包含在这其中的消息才能接收到
))
public void listenDirectQueue1(String msg){
System.out.println("DirectQueue1:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
//比如key为red,则这两个队列都能接收到
//如果key为yellow,则只有queue1能接收到
key = {"red","blue"}
))
public void listenDirectQueue2(String msg){
System.out.println("DirectQueue2:"+msg);
}
}

消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "direct.exchange";
// 消息
String messageRed = "Direct.....red.....";
String messageBlue = "Direct.....blue.....";
String messageYellow = "Direct.....yellow.....";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", messageRed);
rabbitTemplate.convertAndSend(exchangeName, "blue", messageBlue);
rabbitTemplate.convertAndSend(exchangeName, "yellow", messageYellow);
}

结果:

6.3 Topic 主题

Topic交换机接收的消息RoutingKey必须是多个单词,以“.”分割,并且可以使用通配符

#:代表0个或多个词

*:代表1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

消息接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class SpringRabbitListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "#.news" //接收news相关的消息
))
public void listenTopicQueue1(String msg){
System.out.println("TopicQueue1:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "china.#" //接收china相关的消息
))
public void listenTopicQueue2(String msg){
System.out.println("TopicQueue2:"+msg);
}

}

消息发送

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "topic.exchange";
// 消息
String messageOfChinaNews = "China NO.1!";
String messageOfJapNews = "Jap...out...";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", messageOfChinaNews);
rabbitTemplate.convertAndSend(exchangeName, "jap.news", messageOfJapNews);
}

结果:

7.消息转换器

rabbitTemplate.convertAndSend()中我们可以看到message的类型是Object,也就是说也可以传递对象

下面试一下发送一个对象

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend(queueName,msg);
}

可以看到对象成功传入队列中了,但是却是一堆乱码

这是因为Spring

  • 发送消息的时候会把消息序列化为字节发送给MQ
  • 接收消息的时候,还会把字节反序列化为Java对象

而默认情况下Spring采用的序列化方式是JDK序列化

配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

在publisher和consumer两个服务中都引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

配置消息转换器。

在启动类中添加一个Bean即可:

1
2
3
4
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

测试结果

重新发送可以看到结果已经变为JSON格式:

consumer接收对象:

1
2
3
4
5
6
7
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(Map<String,Object> msg){
System.out.println("Message:"+msg);
}
}

反序列化也成功了: