RabbitMQ学习笔记,内容包括:Rabbit在Linux的部署,SpringAMQP的基本操作,RabbitMQ的5种工作模型以及配置消息转换器
1.部署RabbitMQ
1.1.下载镜像
1
| docker pull rabbitmq:3-management
|
1.2.创建运行容器
执行下面的命令来运行MQ容器:
–hostname 主机名,单机可不配,集群中需要
-p 15672:15672 图形化管理界面UI的端口
-p 5672:5672 消息通信使用的端口
1 2 3 4 5 6 7 8 9
| docker run \ -e RABBITMQ_DEFAULT_USER=zgl \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
通过浏览器访问15672端口来到管理界面,输入账号密码
2.RabbitMQ消息模型
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:
3.SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
以下对消息队列的使用均采用SpringAMQP
4.Basic Queue 基本消息队列
在父工程mq-demo中引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
4.1.消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.222.130 port: 5672 virtual-host: / username: zgl password: 123456
|
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @SpringBootTest public class SpringAmqpTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
|
4.2.消息接收
首先配置MQ地址,在consumer服务的application.yml中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.222.130 port: 5672 virtual-host: / username: zgl password: 123456
|
然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下:
1 2 3 4 5 6 7 8
| @Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
|
5.WorkQueue 工作消息队列
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
5.1 消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message_"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
5.2 消息接收
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
|
5.3 能者多劳
以上的例子中,消费者1消费消息的速度是消费者2的10倍,但两者确实获取同样数量的消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
这样配置以后,就可以使消费者获取的消息数量和他的处理能力相匹配。
6.发布订阅
6.1 Fanout 广播
向与交换机绑定的所有队列发送消息
消息接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout.queue1"), exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("FanoutQueue1:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("fanout.queue2"), exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String msg){ System.out.println("FanoutQueue2:"+msg); }
}
|
启动后可以看到与之前不同的是,交换机和队列已经自动创建了
消息发送
1 2 3 4 5 6 7 8 9
| @Test public void testSendFanoutExchange() { String exchangeName = "fanout.exchange"; String message = "fanout.....fanout....."; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
接收成功,两个队列都得到了消息:
6.2 Direct 路由
发送消息时通过key来匹配指定的队列
消息接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue1"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), key = {"red","yellow"} //只有key包含在这其中的消息才能接收到 )) public void listenDirectQueue1(String msg){ System.out.println("DirectQueue1:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue2"), exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT), //比如key为red,则这两个队列都能接收到 //如果key为yellow,则只有queue1能接收到 key = {"red","blue"} )) public void listenDirectQueue2(String msg){ System.out.println("DirectQueue2:"+msg); } }
|
消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void testSendDirectExchange() { String exchangeName = "direct.exchange"; String messageRed = "Direct.....red....."; String messageBlue = "Direct.....blue....."; String messageYellow = "Direct.....yellow....."; rabbitTemplate.convertAndSend(exchangeName, "red", messageRed); rabbitTemplate.convertAndSend(exchangeName, "blue", messageBlue); rabbitTemplate.convertAndSend(exchangeName, "yellow", messageYellow); }
|
结果:
6.3 Topic 主题
Topic交换机接收的消息RoutingKey必须是多个单词,以“.”分割,并且可以使用通配符
#:代表0个或多个词
*:代表1个词
举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
消息接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue1"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "#.news" //接收news相关的消息 )) public void listenTopicQueue1(String msg){ System.out.println("TopicQueue1:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue2"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "china.#" //接收china相关的消息 )) public void listenTopicQueue2(String msg){ System.out.println("TopicQueue2:"+msg); }
}
|
消息发送
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testSendTopicExchange() { String exchangeName = "topic.exchange"; String messageOfChinaNews = "China NO.1!"; String messageOfJapNews = "Jap...out..."; rabbitTemplate.convertAndSend(exchangeName, "china.news", messageOfChinaNews); rabbitTemplate.convertAndSend(exchangeName, "jap.news", messageOfJapNews); }
|
结果:
7.消息转换器
在rabbitTemplate.convertAndSend()中我们可以看到message的类型是Object,也就是说也可以传递对象
下面试一下发送一个对象
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testSimpleQueue() { String queueName = "simple.queue"; Map<String,Object> msg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); rabbitTemplate.convertAndSend(queueName,msg); }
|
可以看到对象成功传入队列中了,但是却是一堆乱码
这是因为Spring
- 发送消息的时候会把消息序列化为字节发送给MQ
- 接收消息的时候,还会把字节反序列化为Java对象
而默认情况下Spring采用的序列化方式是JDK序列化
配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
配置消息转换器。
在启动类中添加一个Bean即可:
1 2 3 4
| @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
|
测试结果
重新发送可以看到结果已经变为JSON格式:
consumer接收对象:
1 2 3 4 5 6 7
| @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(Map<String,Object> msg){ System.out.println("Message:"+msg); } }
|
反序列化也成功了: