互联网博客网站,汕头网站搜索优化,网站备案背景幕布打印多大,网站站群RabbitMQ RabbitMQ#xff0c;作为当今流行的开源消息代理软件#xff0c;以其卓越的可靠性、灵活性和易用性在微服务架构和分布式系统中扮演着至关重要的角色。它不仅能够确保消息在不同系统组件间的高效传递#xff0c;还能通过其高级消息队列协议#xff08;AMQP#x…RabbitMQ RabbitMQ作为当今流行的开源消息代理软件以其卓越的可靠性、灵活性和易用性在微服务架构和分布式系统中扮演着至关重要的角色。它不仅能够确保消息在不同系统组件间的高效传递还能通过其高级消息队列协议AMQP支持复杂的路由功能从而满足各种消息分发场景。RabbitMQ的高性能和可扩展性使其成为处理大规模消息传递任务的理想选择同时其丰富的API和工具集也极大地简化了开发人员在不同编程环境中的集成和使用。无论是应对日常的消息传递需求还是构建复杂的事件驱动架构RabbitMQ都能提供强大而稳定的支持。 kafka消息中间件在上一篇文章SpringBoot3全面复习已经写过下面主要介绍RabbitMQ的内容。 一、初始MQ 同步调用的优势是时效性强等待到结果后返回。 异步调用 在异步调用中发送者不再直接同步调用接收者的业务接口而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后接受者都能获取消息并处理。 综上异步调用的优势包括
耦合度更低 扩展性强异步调用无需等待性能更好缓存消息流量削峰填谷故障隔离避免级联失败
当然异步通信也并非完美无缺它存在下列缺点
完全依赖于Broker的可靠性、安全性和性能 架构复杂后期维护和调试麻烦 技术选型 消息Broker目前常见的实现方案就是消息队列MessageQueue简称为MQ. 追求可用性Kafka、 RocketMQ 、RabbitMQ 追求可靠性RabbitMQ、RocketMQ 追求吞吐能力RocketMQ、Kafka 追求消息低延迟RabbitMQ、Kafka 安装RabbitMQ 同样基于Docker来安装RabbitMQ使用下面的命令即可
docker run \-e RABBITMQ_DEFAULT_USERitheima \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management同样基于Docker来安装RabbitMQ使用下面的命令即可 15672RabbitMQ提供的管理控制台的端口 5672RabbitMQ的消息发送处理接口 安装完成后我们访问 http://虚拟机ip:15672即可看到管理控制台。首次访问需要登录默认的用户名和密码在配置文件中已经指定了。 publisher生产者也就是发送消息的一方 consumer消费者也就是消费消息的一方 queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理 exchange交换机负责消息路由转发没有存储消息的能力。生产者发送的消息由交换机决定投递到哪个队列。 virtual host虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue 数据隔离 Nameitheima也就是用户名 Tagsadministrator说明itheima用户是超级管理员拥有所有权限 Can access virtual host /可以访问的virtual host这里的/是默认的virtual host 此时hmall用户没有任何virtual host的访问权限 SpringAMQP
已有依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.itcast.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.12/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies
/project消息发送 首先配置MQ地址在publisher服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码编写测试类利用rabbitTemplate实现消息发送:
package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue() {// 队列名称String queueName simple.queue;// 消息String message hello, spring amqp!;// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}消息接受 配置MQ地址在consumer服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener代码如下
package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息就会推送给当前服务调用当前方法处理消息。// 可以看到方法体中接收的就是消息体的内容RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println(spring 消费者接收到消息【 msg 】);}
}WorkQueues模型 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。 消息发送
这次我们循环发送模拟大量消息堆积现象。 在publisher服务中的SpringAmqpTest类中添加一个测试方法
/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}消息接受 要模拟多个消费者绑定同一个队列我们在consumer服务的SpringRabbitListener中添加2个新的方法
RabbitListener(queues work.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues work.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}消费者1 sleep了20毫秒相当于每秒钟处理50个消息 消费者2 sleep了200毫秒相当于每秒处理5个消息 也就是说消息是平均分配给每个消费者并没有考虑到消费者的处理能力。导致1个消费者空闲另一个消费者忙的不可开交。没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 能者多劳 修改consumer服务的application.yml文件添加配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息由于消费者1处理速度较快所以处理了更多的消息消费者2处理速度较慢只处理了6条消息。而最终总的执行耗时也在1秒左右大大提升。 正所谓能者多劳这样充分利用了每一个消费者的处理能力可以有效避免消息积压问题。 交换机 Fanout广播将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机 Direct订阅基于RoutingKey路由key发送给订阅了消息的队列 Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 1 可以有多个队列 2 每个队列都要绑定到Exchange交换机 3 生产者发送的消息只能发送到交换机 4 交换机把消息发送给绑定过的所有队列 5 订阅队列的消费者都能拿到消息 消息发送
Test
public void testFanoutExchange() {// 交换机名称String exchangeName hmall.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}消息接收
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 Direct交换机 消息接收 在consumer服务的SpringRabbitListener中添加方法
RabbitListener(queues direct.queue1)
public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(queues direct.queue2)
public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}消息发送
Test
public void testSendDirectExchange() {// 交换机名称String exchangeName hmall.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);
}Topic Queue1绑定的是china.# 因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather Queue2绑定的是#.news 因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news .消息发送
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName itcast.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);
}
.消息接收
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name itcast.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name itcast.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息【 msg 】);
} 声明队列交换机
Exchange 只是一个接口其具体的实现类分别是对应的几个不同类型的交换机如 FanoutExchange 、DirectExchange 、TopicExchange 等等。 其中绑定关系的构建是 BindingBuilder.bind(队列).to(交换机).with(RoutingKey) 这样的。
RabbitListener 注解声明 可以使用当时用于定义消费者的注解 RabbitListener 来定义队列、交换机、及绑定关系只需其中的 bindings 属性在其中使用 QueueBinding 注解进行定义。 1.value Queue(…) 定义了队列的具体属性。 2.exchange Exchange(…) 指定关联的交换机详情。 3.key {“hi”} 设置了绑定的路由键。 .消息转换器 在 Spring AMQP 在内部进行消息转化的时候会使用 JDK 自带的序列化方式这种方法存在着问题首先 JDK 的序列化存在安全风险反序列化时容易被代码注入其次序列化后的消息占用空间太多可读性差。 建议使用 JSON 序列化代替默认的 JDK 序列化。
1.在消息的接收者和消费者中都引入 jackson 的依赖 !--jackson--dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.13.4/version/dependency两者中都要配置 MessageConverter 成 Bean可在启动类中配置 发送者的可靠性 发送者重连 发送者确认机制 MQ的可靠性 数据持久化 在默认情况下是非持久的可以选择 2 发送持久化的消息而 Spring AMQP 发送的消息默认是持久化的我们也可以通过自定义构建消息来发送非持久化的消息。
Message message MessageBuilder.withBody(holle, SpringAMQP.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();其中 setDeliveryMode 用于设置投递模式为持久化或非持久化。 持久化的优点在于重启后持久化的交换机、队列、消息仍然会存在提高了效率。 Lazy Queue惰性队列 接收者的可靠性 消费者确认机制
spring:rabbitmq:listener:simple:ackonwledge-mode: auto # 配置为 auto 模式失败重试机制 Spring AMQP 提供了消费者重试机制在消费者出现异常时利用本地重试而不是无限的发送消息到 MQ 中我们可以通过在 yml 配置文件中添加相关配置来开启重试机制。
spring:rabbitmq:listener:simple:retry:enabled: true # 开启重试机制initial-interval: 1000ms # 第一次重试间隔时间multiplier: 1 # 失败后重试间隔倍数max-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务则设置为false业务幂等性 延迟消息 延迟消息插件