当前位置: 首页 > news >正文

网站权重与排名浅谈杭州学校网站开发

网站权重与排名浅谈,杭州学校网站开发,项目管理软件有哪些,可以做商城网站的公司吗消息中间件#xff1a;RabbitMQ 前言安装Window安装Linux安装 管理页面什么是RabbitMQ#xff1f;入门基本概念简单队列工作队列#xff08;Work Queues#xff09;发布/订阅#xff08;Publish/Subscribe#xff09;临时队列 路由#xff08;Routing#xff09;主题RabbitMQ 前言安装Window安装Linux安装 管理页面什么是RabbitMQ入门基本概念简单队列工作队列Work Queues发布/订阅Publish/Subscribe临时队列 路由Routing主题Topics标题Headers远程过程调用 RPC死信队列延迟队列防止重复消费持久化交换机自动删除可靠传输唯一标识 面试题如何保证MQ顺序消费 前言 RabbitMQ 是一个开源的消息代理软件它实现了高级消息队列协议AMQP标准提供了可靠的消息传递机制用于应用解耦、异步提速、流量削锋、数据分发、错峰流控、日志收集等等。RabbitMQ 的主要特点包括 消息队列RabbitMQ 采用消息队列模型允许生产者将消息发送到队列中消费者可以从队列中获取消息进行处理。这种模型使得应用程序可以进行解耦提高了系统的灵活性和可扩展性。 可靠性RabbitMQ 提供了多种机制来确保消息的可靠传递包括持久化、确认机制、事务等保证消息不会丢失或重复传递。 灵活的路由RabbitMQ 支持多种消息路由方式例如直连、主题、广播等可以根据业务需求灵活地进行消息的路由和过滤。 可扩展性RabbitMQ 支持集群部署可以通过增加节点来实现横向扩展提高消息处理能力和系统的可用性。 管理界面RabbitMQ 提供了一个用户友好的管理界面可以通过界面监控队列、交换器、连接等信息并进行配置管理。 广泛的支持RabbitMQ 支持多种编程语言包括 Java、Python、Ruby、C# 等开发者可以使用不同语言的客户端库与 RabbitMQ 进行交互。 解耦应用通过引入消息队列不同的应用程序可以实现解耦发送方和接收方之间不需要直接通信降低了系统的耦合度。 异步通信消息队列支持异步通信发送方发送消息后即可继续处理其他任务接收方可以在合适的时间处理消息提高系统的响应速度和并发能力。 流量削锋是一种网络管理技术用于平稳控制网络流量的传输速率以避免网络拥堵或过载。 安装 进入官网看下RabbitMQ对应的Erlang版本免得到时候版本不对安装错误。 Window安装 先安装Erlang 进入官网。右边点击下载Download Windows installer下拉可以选择不同位数的系统下载速度那叫一个慢。 下载后安装有需要更换下安装目录直至下一步即可。 然后配置环境变量找到你的安装目录比如D:\Program Files\Erlang OTP 将目录配置的Path中 打开cmd输入erl弹出版本号安装成功。 再安装RabbitMQ 然后进入官网https://www.rabbitmq.com/docs/download#downloads-on-github点击Windows Installer。 下载完后点击安装可能等的时间比较久需要检查是否安装Erlang。 如有需要更换下安装目录直至下一步即可。 打开RabbitMQ的命令窗口 输入rabbitmq-plugins enable rabbitmq_management安装插件。 重启服务后输入http://127.0.0.1:15672/安装完成。 默认账号guest/guest Linux安装 先安装Erlang 再github上找到对应版本点击进入复制链接地址如果是centOS7就选el7否则安装时容易出问题 使用命令下载你也可以本地下载后上传到服务器 wget https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.2/erlang-26.2.2-1.el8.x86_64.rpm如图所示 执行命令 rpm -ivh erlang-26.2.2-1.el8.x86_64.rpm1-i表示安装软件包。 2-v在安装过程中显示详细的信息即 verbose 模式。 3-h以哈希标记的方式显示安装进度。 输入命令查看版本 erl -v安装完毕。 安装RabbitMQ 然后进入官网。找到RHEL, CentOS Stream 9.x, CentOS 8.x复制链接如图所示 执行命令下载你也可以本地下载后上传到服务器 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm如图所示 执行命令安装 rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpmRabbitmq默认安装路径再/usr/lib/rabbitmq目录。 开启界面管理 rabbitmq-plugins enable rabbitmq_management如果你是云服务器还需要开启防火墙 然后执行启动命令有两种方法 1传统的 SysVinit 命令格式 #启动 service rabbitmq-server start #停止 service rabbitmq-server stop #重启 service rabbitmq-server restart2systemd 命令格式 #启动 systemctl start rabbitmq-server #停止 systemctl start rabbitmq-server #重启 systemctl restart rabbitmq-server #查看状态 systemctl status rabbitmq-server输入http://ip地址:15672/安装完成。 添加用户 默认账号guest/guest由于RabbitMQ的默认账号仅允许本机访问我们需要远程添加一个用户 rabbitmqctl add_user 用户名 密码设置角色 执行以下命令 rabbitmqctl set_user_tags 用户 角色用户角色说明 1administrator可以登录控制台、查看所有信息、并对rabbitmq进行管理 2monToring监控者登录控制台查看rabbitmq节点的相关信息(进程数内存使用情况磁盘使用情况等) 3policymaker策略制定者可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息 4managment普通管理员无法看到节点信息也无法对策略进行管理。 5none无法登陆管理控制台通常就是普通的生产者和消费者。 设置权限 rabbitmqctl set_permissions -p / admin .* .* .*三个.* 分别代表配置、写入和读取的权限。 然后登录 再来讲讲其他几个常用命令 # 修改用户密码 rabbitmqctl change_password 用户名 密码 # 查看当前所有用户 rabbitmqctl list_users # 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户 rabbitmqctl delete_user 用户名管理页面 下面来了解下RabbitMQ再管理页面上如何使用的。 连接这里可以查看、管理和关闭当前所有的TCP连接。 通道展示了所有当前打开的通道以及它们的详细信息。 交换器查看、创建和删除的交换机。 添加交换机Add a new exchange 1Name名称交换机的唯一标识符用于在RabbitMQ中识别交换机。 2Type类型指定交换机的类型决定了交换机的路由策略。常见的交换机类型有Direct、Fanout、Topic和Headers。不同类型的交换机对应不同的路由规则。 3Durability持久化指定交换机是否持久化到磁盘。如果将该参数设置为Durable交换机将在RabbitMQ服务器重启后仍然存在Transient来描述不持久化的消息或队列。 4Auto delete自动删除交换机在不被使用时是否自动删除。如果将该参数设置为yes当没有与之绑定的队列或连接时交换机将被自动删除。 5Internal内部交换机指定交换机是否为内部交换机。内部交换机只能被直接连接到的交换机使用而无法通过路由键绑定到队列。该参数为可选参数用于特定的高级使用场景。 6Arguments参数创建交换机时指定一些额外的自定义参数。这些参数可以根据特定的需求来定义交换机的行为和特性。Arguments参数是一个键值对的字典其中键和值的类型可以是字符串、数字、布尔值等。 队列展示了所有当前的队列以及它们的详细信息还可以添加队列。 添加队列 1Virtual host虚拟主机 2Type类型传统队列classic queue、Quorum 队列quorum queue以及流队列stream queue。 3Name名称队列的唯一标识符。 4Durability持久化指定交换机是否持久化到磁盘。 5Arguments参数键值对形式自定义参数输入框下面有常用参数。 比如现在创建一个队列 然后点击列表中的Name进入详情 用户查看系统中所有的操作用户。 添加用户 1username用户名。 2password可以在下拉处选择no password。 3tags设置权限。 点击Name可以进入详情。可以对用户进行修改密码、修改权限、删除用户等操作。 什么是RabbitMQ RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局当你把想要寄出的邮件放进邮箱时你可以确定邮递员最终会把邮件送到你的收件人手中。在这个类比中RabbitMQ是一个邮箱、一个邮局和一个信件载体。 RabbitMQ和邮局之间的主要区别在于它不处理纸张而是接受、存储和转发二进制数据 RabbitMQ和一般的消息传递使用了一些术语。 生产只不过意味着发送。发送消息的程序是生产者 队列是RabbitMQ中邮箱的名称。虽然消息流经RabbitMQ和你的应用程序但它们只能存储在队列中。队列只受主机的内存和磁盘限制它本质上是一个大的消息缓冲区。 许多生产者可以向一个队列发送消息而许多消费者可以尝试从一个队列接收数据。下面是我们表示队列的方式 消费和接受有着相似的含义。consumer是一个主要等待接收消息的程序 注意生产者、消费者和代理不必驻留在同一主机上;事实上在大多数应用程序中它们不需要。应用程序既可以是生产者也可以是消费者。 入门 我们以Maven项目为例先引入官方依赖示例代码如下 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version /dependencySpringBoot提供依赖如下 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion3.3.2/version /dependency我们将用Java编写“hello world”发送单个消息的生产者和接收消息并将其打印出来的消费者。 生产方发送消息 如果我们想连接到不同机器上的一个节点我们只需在setHost()方法中指定它的主机名或IP地址。使用try-with-resources语句不需要在代码中显式地关闭它们注15672是页面访问端口号5672为消息队列连接端口号请确保防火墙开启此端口号。 public class MQProduct {//创建队列名private final static String QUEUE_NAME hello;public static void main(String[] args) {//创建连接且创建新通道ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(localhost);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);try (Connection connection connectionFactory.newConnection();Channel channel connection.createChannel()){} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}} }用于管理与 RabbitMQ 代理的连接的中心组件是 ConnectionFactory 接口。 有三种连接工厂可供选择 PooledChannelConnectionFactory该工厂基于Apache Pool2管理一个连接和两个通道池。一个池用于事务性通道另一个用于非事务性通道。池是具有默认配置的GenericObjectPools;提供了一个回调来配置池 public static void main(String[] args) {ConnectionFactory rabbitConnectionFactory new ConnectionFactory();rabbitConnectionFactory.setHost(localhost);PooledChannelConnectionFactory pcf new PooledChannelConnectionFactory(rabbitConnectionFactory);pcf.setPoolConfigurer((pool, tx) - {if (tx) {// configure the transactional pool}else {// configure the non-transactional pool}});}ThreadChannelConnectionFactory这个工厂管理一个连接和两个ThreadLocal一个用于事务性通道另一个用于非事务性通道。该工厂确保同一线程上的所有操作使用相同的通道(只要通道保持打开状态)。为了避免内存泄漏如果应用程序使用许多短寿命线程则必须调用工厂的closeThreadChannel()来释放通道资源。从2.3.7版本开始线程可以将其通道传输给另一个线程。 public static void main(String[] args) {ConnectionFactory rabbitConnectionFactory new ConnectionFactory();rabbitConnectionFactory.setHost(localhost);ThreadChannelConnectionFactory tcf new ThreadChannelConnectionFactory(rabbitConnectionFactory);tcf.closeThreadChannel();}CachingConnectionFactory默认情况下它建立一个可以由应用程序共享的单一连接代理。CachingConnectionFactory实现支持对这些通道进行缓存并根据通道是否为事务性通道维护单独的缓存。要配置通道缓存的大小(默认为25)可以调用setChannelCacheSize()方法。 public static void main(String[] args) {CachingConnectionFactory connectionFactory new CachingConnectionFactory(somehost);connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);connectionFactory.setChannelCacheSize(128);Connection connection connectionFactory.createConnection();}然后创建队列调用queueDeclare()方法 //声明一个队列是幂等的——只有当它不存在时才会被创建。 channel.queueDeclare(QUEUE_NAME,false,false,false,null);它一共有五个参数 1queue这是要声明的队列的名称。 2durable表示是否将队列持久化。如果设置为trueRabbitMQ将会将队列存储到磁盘上以便在RabbitMQ服务器重启后能够恢复。 3exclusive表示是否是一个排他队列。如果设置为true该队列只能被当前连接使用连接断开时会自动删除该队列。 4autoDelete表示是否在不再使用时自动删除队列。如果设置为true当最后一个消费者断开连接之后队列会自动删除。 5arguments额外的参数。 然后我们可以向队列发布一条消息调用basicPublish()方法 String message hello world; channel.basicPublish(,QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));它一共有四个参数 1exchange表示消息发送到的交换机的名称。根据消息的routingKey和交换机的类型交换机将消息路由到一个或多个队列设置为表示使用默认交换机。 2routingKey是消息路由的关键词。交换机根据这个关键词将消息路由到相应的队列。消息的路由规则是由交换机的类型决定的。 3props:是一个AMQP.BasicProperties对象用于设置消息的属性。消息的属性包括持久性、优先级、过期时间、消息标识符等。 4body是消息的实际内容以字节数组的形式传输。这个字节数组通常包含了需要传输的业务数据例如JSON数据、文本、二进制数据等。 消费方接收消息 消费者监听来自 RabbitMQ 的消息因此与发布单个消息的发布者不同我们将让消费者运行以侦听消息并将它们打印出来。 发布方相同我们打开一个连接和一个通道并声明我们要从中消费的队列。 为什么不使用 try-with-resource 语句来自动关闭通道和连接我们希望该过程保持活动状态而消费者正在异步侦听消息到达。 public class MQConsumer {//创建队列名private final static String QUEUE_NAME hello;public static void main(String[] args) {//创建连接且创建新通道ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(localhost);//如果我们想连接到不同机器上的一个节点我们只需在这里指定它的主机名或IP地址。connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);try {Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();//再次创建队列确保队列存在//channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消费消息的回调DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};// 取消消费的回调CancelCallback cancelCallback (consumerTag) - {System.out.println(消息消费被中断);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}} }如果先启动生产方先创建一个队列包含未消费的消息如图所示 然后启动消费方就会被消费执行结果如下 若最先启动消费方但是此时队列并未创建就会报队列找不到的错误信息 声明队列是幂等的 - 仅当队列不存在时才会创建队列。 由于我们可能在发布者之前启动使用者因此我们希望在尝试使用来自其中的消息之前确保队列存在。 Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); //再次创建队列确保队列存在 channel.queueDeclare(QUEUE_NAME, false, false, false, null);执行结果如图所示 再RabbitMQ页面可以看到连接的信息、通道、交换机、队列等信息如图所示 基本概念 经过简单的示例介绍我们知道了RabbitMQ的基本用法AMQPAdvanced Message Queuing Protocol高级消息队列协议是一个面向消息的中间件协议其重要的组成部分包括 Producer消息生产者即生产方客户端生产方客户端将消息发送Consumer消息消费者即消费方客户端接收MQ转发的消息。Broker接收客户端的连接用于接收和分发消息实现 AMQP 实体服务。Connection连接生产者/消费者与Broker之间的TCP网络连接。Channel信道在 Connection内部建立的逻辑连接客户端可以建立多个信道每个信道代表一个会话任务Channel之间是完全隔离的。Virtual Host虚拟主机用于逻辑隔离。一个虚拟主机里面可以有若干个不相同的Exchange和Queue。Message消息服务与应用程序之间传送的数据消息可以很简单也可以很复杂由Properties和Body组成。Properties为外包装可以对消息进行修饰比如消息的优先级、延迟等高级特性Body就是消息体内容。Exchange交换嚣接收消息按照路由规则将消息路由到一个或者多个队列。如果路由不到或者返回给生产者或者直接丢弃。Queue消息队列用来保存消息转发消费者消费。Binding绑定Exchange和Queue之间的虚拟连接绑定中可以包含一个或者多个RoutingKey。Routingkey路由规则生产者将消息发送给Exchange通过路由规则发送给指定队列。 流程如图所示 简单队列 往后的内容我们以Spring Boot为例进行讲解和使用Spring Boot 提供了许多功能但我们在这里只重点介绍一些。首先Spring Boot 应用程序可以选择通过 application.properties 或 application.yml 文件提供其属性还有更多选项但这将让我们继续前进。 # 主机名 spring.rabbitmq.host192.168.2.101 # 端口 spring.rabbitmq.port5672 # 虚拟主机 spring.rabbitmq.virtual-host/ # 用户名 spring.rabbitmq.usernameadmin # 密码 spring.rabbitmq.passwordadmin在下图中“P”是我们的生产者“C”是我们的受害者。中间的框是一个队列 - RabbitMQ 代表消费者保留的消息缓冲区。 我们将创建一个 Java 配置文件描述Spring bean Configuration public class MQConfig {Beanpublic Queue queue(){return new Queue(hello);}Beanpublic MQReceiver receiver(){return new MQReceiver();}Beanpublic MQSender sender(){return new MQSender();} }现在需要进入发送方和接收方类的代码非常少。 发送方 您会注意到 Spring AMQP 删除了样板代码只留下需要关注的消息传递逻辑。在 MQConfig 类中的 bean 定义中配置的队列中自动连线并且像许多 Spring 连接抽象一样通过RabbitTemplate 类可以自动连接到发送器中。剩下的就是创建一个消息并调用模板 convertAndSend() 的方法从我们定义的 bean 和我们刚刚创建的消息中传入队列名称。 public class MQSender {Autowiredprivate RabbitTemplate template;Autowiredprivate Queue queue;public void send() {String message Hello World!;this.template.convertAndSend(queue.getName(), message);System.out.println( [x] Sent message );} }convertAndSend()方法有很多种传参方式 1void convertAndSend(Object message);只接受一个参数即要发送的消息对象。消息会被默认的消息转换器MessageConverter转换成消息体后发送到默认的交换机和路由键。 2void convertAndSend(String exchange, String routingKey, Object message);指定消息要发送到的交换机和路由键以及要发送的消息对象。 3void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);这种方式在前两种的基础上增加了messagePostProcessor修改消息属性的接口和correlationData关联数据用于消息的确认处理 接收方 接收器同样简单。我们用 RabbitListener 队列的名称来注释我们的接收器类并传入队列的名称。然后我们通过传入已推送到队列的有效负载来 RabbitHandler 注释我们 receive 的方法。 RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} }调用发送方send()方法运行结果如图 RabbitListener 和 RabbitHandler 是 Spring AMQP高级消息队列协议中用于处理RabbitMQ消息的两个重要注解 RabbitListener注解它可以被用在类级别或者方法级别。被用在类级别时这个类中的所有带有RabbitHandler注解的方法都会被视为消息处理方法。如果被用在方法级别那么这个方法就直接作为消息处理方法而不需要再额外使用RabbitHandler。RabbitHandler注解用于指定一个方法作为消息处理器但它通常与RabbitListener注解一起使用在类级别。 工作队列Work Queues 在本文中我们将创建一个工作队列用于在多个工作线程之间分配耗时的任务。也就是一个生产者、一个队列、多个消费者让多个消费者绑定到一个队列共同消费队列中的消息。 工作队列又名任务队列背后的主要思想是避免立即执行资源密集型任务并不得不等待它完成。相反我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时任务将在它们之间共享。 再Spring Bean中定义第二个消费方 Configuration public class MQConfig {Beanpublic Queue queue(){return new Queue(hello);}Beanpublic MQReceiver receiver(){return new MQReceiver();}Beanpublic MQReceiver2 receiver2(){return new MQReceiver2();}Beanpublic MQSender sender(){return new MQSender();} }发送方 比如循环10次向队列中不停发送消息模拟消息堆积。 public class MQSender {Autowiredprivate RabbitTemplate template;Autowiredprivate Queue queue;public void send() {String message Hello World!;//循环发送10条消息for (int i 1; i 10; i) {this.template.convertAndSend(queue.getName(), messagei);}} }消费方 两个消费方监听同一个队列进行消费 //消费方1 RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} } //消费方2 RabbitListener(queues hello) public class MQReceiver2 {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received2 in );} }调用发送方send()方法执行两次后执行结果如图所示 由图可知一个消息只会被一个消费者消费消费方式也不一定按照队列顺序进行消费。工作队列Work Queues通常是通过轮询round-robin的方式进行消费的。 默认情况下 AbstractMessageListenerContainer 的值 DEFAULT_PREFETCH_COUNT 默认为 250则会告诉 RabbitMQ 一次不要向工作线程提供超过 250 条消息。相反它会将其分派给下一个尚未忙碌的工作人员。所以发送10条消息是有可能只被一个队列消费的。 公平派单与循环派单 默认情况下RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。在这种模式下调度不一定完全按照我们的要求工作。例如在有两个工作人员的情况下当所有奇数消息都很重偶数消息都很轻时一个工作人员将一直忙碌而另一个工作人员几乎不做任何工作。 “公平调度”是 Spring AMQP 的默认配置。将 AbstractMessageListenerContainer 的值 DEFAULT_PREFETCH_COUNT 定义为 250。 循环派单还有一个弊端因为每个消费者都会收到相同数量的消息所以当任意一个消费者执行时间过长导致后面的消息无法及时被消费其它执行速度快的消费者已经空闲下来示例代码如下 我们将任意消费方设置延迟时间模拟消费时间过长的问题 RabbitListener(queues hello) public class MQReceiver2 {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received2 in );Thread.sleep(1500);} } RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} }执行结果如图 那么如何将执行过慢的消费者的剩余消息交给其它执行速度快的消费者处理 答我们可以通过prefetchCount预取值属性预取值是消息缓存区允许存在未确认消息的最大数量。如果消费者收到消息后未及时应答那么就会认为该消息区已满就不会接收其它消息反之应答后消息区空闲才会接收新的消息。 设置预取值的方式有两种 Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(2); // 设置预取数量return factory;}或者使用yamlproperties配置文件 spring.rabbitmq.listener.simple.prefetch2将预取值设置为2执行结果如下 我们可以看到消费者1消费了8条数据消费者2只消费了2条数据。 如果将DEFAULT_PREFETCH_COUNT 设置为 1则行为将是最开始所述的循环传递。 在大多数情况下 prefetchCount 等于 1 过于保守并严重限制了消费者的吞吐量。如果所有工作人员都很忙您的队列可能会填满。你会想要密切关注这一点也许可以增加更多的工人或者有一些其他的策略。 发布/订阅Publish/Subscribe 在本部分中我们将实现交换机模式将消息传递给多个消费者此模式也称为“发布/订阅”。现在是时候在 RabbitMQ 中引入完整的消息传递模型了。 RabbitMQ 中消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上很多时候生产者甚至根本不知道消息是否会被传递到任何队列。 相反生产者只能向Exchange发送消息。交换是一件非常简单的事情。一方面它接收来自生产者的消息另一方面它将它们推送到队列。交易所必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列中是否应该将其附加到多个队列中或者它应该被丢弃。其规则由交换类型定义。 从本质上讲发布的消息将被广播到所有接收者。 Exchange也有一个由ExchangeTypes中定义的常量表示的“类型”。基本类型有fanout扇出、direct直连、 topic主题和 headers标题 。 我们先定义一个Fanout交换机然后定义两个队列示例代码如下 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}/*** 队列2* return*/Beanpublic Queue queue2(){return new Queue(hello2);}/*** 交换机* return*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(fanout.exchange);} }交换机和队列之间的这种关系称为 绑定通过BindingBuilder.bind()方法将队列和交换机进行绑定示例代码如下 /*** 绑定队列到指定交换机* return*/Beanpublic Binding bindingFanout(){return BindingBuilder.bind(queue()).to(fanoutExchange());}/*** 绑定队列到指定交换机* return*/Beanpublic Binding bindingFanout2(){return BindingBuilder.bind(queue2()).to(fanoutExchange());}我们在页面上看一下交换机绑定队列是否绑定成功如图所示 Queue 类表示消息使用者从中接收消息的组件。 消费方监听不同两个队列代码如下 Component RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String message) {System.out.println( [x] Received message );} } Component RabbitListener(queues hello2) public class MQReceiver2 {RabbitHandlerpublic void receive(String message) {System.out.println( [x] Received2 message );} }发送方向交换机发送消息示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;// 指定交换机发生消息this.template.convertAndSend(fanout.exchange, , msg);} }调用发送方send()方法执行结果如图 可以看到监听两个不同队列绑定同一个交换机的消费方都收到了消息。若其中某一个队列有多个监听则消息会按顺序进行消费。 临时队列 每当我们连接到 Rabbit 时我们都需要一个新的、空的队列。为此我们可以创建一个随机名称的队列或者 - 甚至更好的 - 让服务器为我们选择一个随机的队列名称。为了使用 Spring AMQP 客户端执行此操作我们定义了一个 AnonymousQueue它创建一个非持久的、排他性的、具有生成名称的自动删除队列 Configuration public class MQConfig {Beanpublic Queue hello(){return new AnonymousQueue();}Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(fanout.exchange);}Beanpublic Binding binding(){return BindingBuilder.bind(hello()).to(fanoutExchange());} }临时队列会随机生成一串字符串当作队列名称如图所示 或者你也可以通过设置autoDelete为true指定队列自动删除源码如图所示 示例代码如下 Beanpublic Queue queue() {return new Queue(normal_queue, true, false, true);}一旦我们断开了消费者的连接队列会自动删除。 路由Routing 在前面的教程中我们构建了一个简单的 fanout 交换。我们能够向许多接收者广播消息。它没有给我们太多的灵活性 - 它只能进行无意识的广播。 Direct Exchange直连交换机是RabbitMQ中的一种消息交换机类型其主要特点是基于路由键routingKey的精确匹配来进行消息的路由使用相同的routingKey绑定多个队列是完全可以的在这种情况下直接交换的行为将类似于 fanout 交换并将消息广播到所有匹配的队列。 我们将消息发送到DirectExchange 示例代码如下 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}/*** 队列* return*/Beanpublic Queue queue2(){return new Queue(hello2);}/*** 交换机* return*/Beanpublic DirectExchange direct(){return new DirectExchange(direct.exchange);}/*** 绑定队列到指定交换机通过routingKeydirect.routing进行匹配* return*/Beanpublic Binding bindingDirect(){return BindingBuilder.bind(queue()).to(direct()).with(direct.routing);}/*** 绑定队列到指定交换机通过routingKeydirect.routing进行匹配* return*/Beanpublic Binding bindingDirect2(){return BindingBuilder.bind(queue2()).to(direct()).with(direct.routing2);} }我们可以再页面上确认交换机是否绑定队列如图所示 创建两个消费方监听不同队列示例代码如下 Component RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} } Component RabbitListener(queues hello2) public class MQReceiver2 {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received2 in );} }SpringAMQP注解提供了简写这样你就不需要声明创建交换机和队列示例代码如下 Component RabbitListener(bindings QueueBinding(value Queue(value hello), exchange Exchange(value direct.exchange, type ExchangeTypes.DIRECT), key direct.routing )) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} }然后创建生产方指定交换机和RoutingKeydirect.routing发送消息示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String message Hello World!;this.template.convertAndSend(direct.exchange, direct.routing, message);} }调用发送方send()方法监听改队列的消费方收到消息执行结果如下 主题Topics Topic主题模式允许发送者生产者将消息发送到特定的主题上这种模式通过使用通配符来匹配路由键Routing Key从而实现了比Direct模式更灵活的路由机制。 Topic模式支持两种通配符*星号和#井号。其中*可以匹配一个单词而#可以匹配零个或多个单词。这种机制使得Topic模式能够基于复杂的路由键进行灵活的匹配。 配置示例代码如下你也可以直接使用RabbitListener注解 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}Beanpublic Queue queue2(){return new Queue(hello2);}/*** 交换机* return*/Beanpublic TopicExchange topicExchange(){return new TopicExchange(topic.exchange);}/*** 绑定队列到指定交换机* return*/Beanpublic Binding bindingTopic(){return BindingBuilder.bind(queue()).to(topicExchange()).with(*.*.rabbit);}/*** 绑定队列到指定交换机* return*/Beanpublic Binding bindingTopic2(){return BindingBuilder.bind(queue2()).to(topicExchange()).with(topic.#);} }交换机绑定队列信息如图所示 创建两个消费方监听不同队列示例代码如下 Component RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );} } Component RabbitListener(queues hello2) public class MQReceiver2 {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received2 in );} }然后创建生成指定交换机和路由键示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;this.template.convertAndSend(topic.exchange, topic.routing.rabbit, msg);} }调用发送方send()方法执行结果如图 路由键设置为 “topic.routing.rabbit” 的消息将被传送到两个队列。如果设置为“test.routing.rabbit” 只会进入第一个队列而 “topic.routing.test” 只会进入第二个队列。“quick.brown.fox” 不匹配任何绑定因此它将被丢弃。 主题交换功能强大可以像其他交换一样运行。 当队列与 “#” 哈希 绑定键绑定时 - 它将接收所有消息而不管路由键如何 - 就像在Fanout交换中一样。 当绑定中不使用特殊字符 “*” 星号 和 “#” 哈希 时Topic的行为将与Direct主题交换类似。 标题Headers Headers标题模式它不像Direct直连模式和Topic主题模式那样依赖于路由键routing key进行消息路由而是依赖于消息的Headers属性进行路由决策。 创建队列需要设置绑定的头部信息有两种模式全部匹配和部分匹配。换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值路由到对应的队列。 全部匹配 创建Headers交换机 然后将队列和Headers交换机进行绑定通过调用whereAll()方法对定义的Header进行全部匹配配置示例代码如下 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}/*** 交换机* return*/Beanpublic HeadersExchange headersExchange(){return new HeadersExchange(header.exchange);}/*** 绑定队列到指定交换机全部匹配* return*/Beanpublic Binding bindingHeader(){MapString, Object headerMap new HashMap();headerMap.put(key, 1);headerMap.put(key2, 2);return BindingBuilder.bind(queue()).to(headersExchange()).whereAll(headerMap).match();} }然后定义MessageProperties对象调用setHeader()方法设置请求头信息发送方代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {MessageProperties messageProperties new MessageProperties();//消息持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);messageProperties.setContentType(UTF-8);//添加请求头messageProperties.setHeader(key, 1);messageProperties.setHeader(key2, 2);//设置消息String msg hello world;Message message new Message(msg.getBytes(), messageProperties);this.rabbitTemplate.convertAndSend(header.exchange, , message);} }Message 类的目的是将 body 和 properties 封装在单个实例中以便 API 反过来可以更简单。以下示例显示了 Message 类定义 消费方代码如下 Component RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic void receive(byte[] message) {System.out.println( [x] Received new String(message) );} }你也可以使用注解方式节省你的配置代码示例代码如下 Component RabbitListener(bindings QueueBinding(value Queue(value hello),exchange Exchange(value header.exchange, type ExchangeTypes.HEADERS),arguments {Argument(name key, value 1),Argument(name key2, value 2),}) ) public class MQReceiver {RabbitHandlerpublic void receive(byte[] message) {System.out.println( [x] Received new String(message) );} }要注意接受的消息类型保持一致否则会报Failed to convert message消息转换错误。 调用发送方send()方法执行结果如图 2部分匹配 Headers交换机和队列绑定后通过调用whereAny()方法达到部分匹配配置示例代码如下 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}/*** 交换机* return*/Beanpublic HeadersExchange headersExchange(){return new HeadersExchange(header.exchange);}/*** 绑定队列到指定交换机部分匹配* return*/Beanpublic Binding bindingHeader(){MapString, Object args new HashMap();args.put(key, 1);args.put(key2, 2);return BindingBuilder.bind(queue()).to(headersExchange()).whereAny(args).match();} }发送方MessageProperties对象设置部分请求头信息发送方代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {MessageProperties messageProperties new MessageProperties();//消息持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);messageProperties.setContentType(UTF-8);//添加请求头messageProperties.setHeader(key, 1);//设置消息String msg hello world;Message message new Message(msg.getBytes(), messageProperties);this.rabbitTemplate.convertAndSend(header.exchange, , message);} }消费方代码如下 Component RabbitListener(queues hello2) public class MQReceiver2 {RabbitHandlerpublic void receive(byte[] message) {System.out.println( [x] Received2 new String(message) );} }调用发送方send()方法执行结果如图 最后我们看一下交换机绑定队列的信息如图 远程过程调用 RPC 如果我们需要在远程计算机上运行一个函数并等待结果这种模式通常称为远程过程调用 RPC。 在本教程中我们将使用 RabbitMQ 构建一个 RPC 系统一个客户端和一个可扩展的 RPC 服务器。 如有疑问请避免使用 RPC。如果可以您应该使用异步管道 - 而不是类似 RPC 的阻塞而是将结果异步推送到下一个计算阶段。 通过 RabbitMQ 执行 RPC 很容易。客户端发送请求消息服务器使用响应消息进行回复。Spring AMQP 的 RabbitTemplate 当我们使用上述 convertSendAndReceive() 方法时它会为我们处理回调队列。 我们以Direct交换机为例配置信息如下 Configuration public class MQConfig {/*** 队列* return*/Beanpublic Queue queue(){return new Queue(hello);}/*** 交换机* return*/Beanpublic DirectExchange topicExchange(){return new DirectExchange(direct.exchange);}/*** 绑定队列到指定交换机* return*/Beanpublic Binding bindingTopic(){return BindingBuilder.bind(queue()).to(topicExchange()).with(rpc);} }发送方调用convertSendAndReceive()方法接受回调参数示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;String res (String) template.convertSendAndReceive(direct.exchange, rpc, msg);System.out.println(res);} }消费方只需要定义返回类型即可示例代码如下 Component RabbitListener(queues hello) public class MQReceiver {RabbitHandlerpublic String receive(String message) {System.out.println( [x] Received message );return hi;} }调用发送方send()方法执行结果如图 死信队列 死信队列Dead Letter Queue是RabbitMQ中一个特殊的队列用于存储因消息消费失败而被标记为死信的消息。 当消息变成一个死信之后如果这个消息所在的队列存在x-dead-letter-exchange参数那么它会被发送到x-dead-letter-exchange对应值的交换器上这个交换器就称之为死信交换器与这个死信交换器绑定的队列就是死信队列。 死信队列的原因有如下几种 1.消费者显式地拒绝了消息通过basic.reject()或basic.nack()方法并且设置了requeue参数为false 先将消费者设置为手动应答方式 #手动应答 spring.rabbitmq.listener.simple.acknowledge-mode manual先定义死信队列和死信交换机并绑定绑定后可以队列详情页面里看到配置信息如图所示 然后定义普通队列和普通交换机并绑定在普通队列里面设置死信队列和死信队列的routingKey示例代码如下 Configuration public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue(){ // 方法一 // Queue normalQueue new Queue(normal_queue); // normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列 // normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey // 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);}}监听消费方普通队列调用basicReject()方法手动拒绝消息示例代码如下 Component RabbitListener(queues normal_queue) public class MQReceiver {RabbitHandlerpublic void receive(String msg, Message message, Channel channel) throws IOException {System.out.println(收到消息msg);// 参数一当前消息标签参数二false不重新放回队列true重新放回队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);// 参数一当前消息标签参数二true该条消息已经之前所有未消费设置为拒绝小于等于DeliveryTag的消息false只确认当前消息参数三false不重新放回队列true重新放回队列//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, false);} }监听消费方死信队列调用basicAck()方法接收消息示例代码如下 Component RabbitListener(queues dead_queue) public class MQReceiver2 {RabbitHandlerpublic void receive(String msg, Message message, Channel channel) throws IOException {System.out.println(死信队列收到消息 msg );// 参数一当前消息标签参数二true该条消息已经之前所有未消费设置为已消费小于等于DeliveryTag的消息false只确认当前消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }发送方向普通交换机发送消息示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;template.convertAndSend(normal.exchange, normal, msg);} }调用发送方send()方法执行结果如图 由于每个消息的TTL而过期 如果是之前创建的队列请重新创建队列否则会报队列不存在x-message-ttl属性如图所示 设置好的队列除了上述介绍的详情里面可以看到配置的信息外面也可以看到如图所示 我们需要先将手动确认改为自动确认或者删除配置默认手动确认示例代码如下 #spring.rabbitmq.listener.simple.acknowledge-mode auto1当你在队列级别设置TTL时这意味着该队列中的所有消息都将有一个统一的过期时间TTL和死信队列是两个独立的配置但是他们可以配合使用。 Configuration public class MQConfig {// 省略参考前面示例死信队列绑定代码... .../*** 普通队列* return*/Beanpublic Queue queue(){ // 方法一 // Queue normalQueue new Queue(normal_queue); // normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列 // normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey // normalQueue.addArgument(x-message-ttl, 10000); // 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).ttl(10000).build();}// 省略参考前面示例普通交换机和队列绑定代码... ... }监听消费方普通队列模拟业务处理过程中出现异常情况示例代码如下 Component RabbitListener(queues normal_queue) public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) {log.info(收到消息msg);throw new RuntimeException();} }监听消费方死信队列示例代码如下 Component RabbitListener(queues dead_queue) public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) {log.info(死信队列收到消息{},msg);} }发送方向普通交换机发送消息示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;template.convertAndSend(normal.exchange, normal, msg);} }调用发送方send()方法执行结果如图 你会发现就算TTL到期后并不会立即进入死信队列 2另一方面你也可以在消息级别设置TTL。这意味着每个消息都可以有自己的过期时间。 其他代码不动参考上述示例发送方示例代码如下 Component public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;log.info(发送消息msg);// 消息后处理对象可以设置一些消息的参数信息MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息过期时间5秒单位msmessage.getMessageProperties().setExpiration(5000);return message;}};template.convertAndSend(normal.exchange, normal, msg);} }调用发送方send()方法执行结果如图 当同时指定每个队列和每个消息的 TTL 时将选择两者之间的较低值。 在某些队列实现中过期的消息不会立即被删除过期的消息可能会排在未过期消息之后直到这些未过期消息被消费或也过期。这种延迟处理方式可能导致过期消息占用队列资源。 队列达到最大长度 再普通队列里设置最大长度自动确认模式将原来的队列删除后重新创建队列示例代码如下 Configuration public class MQConfig {// 省略参考前面示例死信队列绑定代码... .../*** 普通队列* return*/Beanpublic Queue queue(){ // 方法一 // Queue normalQueue new Queue(normal_queue); // normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列 // normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey // normalQueue.addArgument(x-max-length, 5);//设置队列最大长度 // normalQueue.addArgument(x-overflow,reject-publish);//最近发布的消息将被丢弃 // 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).maxLength(5).build();}// 省略参考前面示例普通交换机和队列绑定代码... ... }为了达到演示的效果我们将MQ的预期值设置为1不然要发送很多条消息才可以有效果示例代码如下 spring.rabbitmq.listener.simple.prefetch1普通队列和死信队列的消费者监听示例代码如下 Component RabbitListener(queues normal_queue) public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) throws IOException, InterruptedException {log.info(收到消息msg);} } Component RabbitListener(queues dead_queue) public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) {log.info(死信队列收到消息{},msg);} }发送方向普通交换机发送10条消息示例代码如下 Component public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {for (int i 0; i 10; i) {String msg hello world_ i;template.convertAndSend(normal.exchange, normal, msg);}} }调用发送方send()方法执行结果如图 有时因为消费速度的问题不一定等于你设置的最大长度但基本上可以保证设置了最大长度在队列满的情况下按溢出方式进行处理。 当设置了最大队列长度或大小并达到最大值时RabbitMQ 的默认行为是从队列前面删除或死信消息即队列中最早的消息。 要修改此行为请使用下面描述的 overflow 设置 默认drop-head如果overflow设置为reject-publish或reject-publish-dlx则最近发布的消息将被丢弃。此外如果启用了发布者确认则将通过 basic.nack 消息通知发布者拒绝。 延迟队列 延迟队列是一种特殊的队列用于存放那些需要在指定时间之后才能被消费者消费的消息。这种机制在很多场景下都非常有用比如订单超时自动取消、定时任务调度、延迟发送通知等。 RabbitMQ 本身并不直接支持延迟队列但可以通过一些插件或者特定的消息属性来实现。最常用的方式是使用 RabbitMQ 的 Dead Letter ExchangesDLX结合消息的 TTLTime-To-Live属性来实现侧重点就是延迟与死信队列略有不同示例代码如下 Configuration public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue(){ // 方法一 // Queue normalQueue new Queue(normal_queue); // normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列 // normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey // normalQueue.addArgument(x-message-ttl, 10000);//设置消息过期时间 // 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).ttl(10000).build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);} }只需要监听死信队列即可示例代码如下 Component RabbitListener(queues dead_queue) public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) {log.info(死信队列收到消息{},msg);} }发送方示例代码如下 Component public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;log.info(发送消息);template.convertAndSend(normal.exchange, normal, msg);} }调用发送方send()方法执行结果如图 另外通过RabbitMQ插件实现延迟队列下载 rabbitmq_delayed_message_exchange 插件然后选择想要下载的版本进行下载 安装完成后新建交换机类型多了一种延迟交换机x-delayed-message这里就不过多的演示有需求了解的可以自行百度学习。 防止重复消费 消息重复消费的根本原因在于消息队列的可靠性保证机制即确保消息至少被消费一次。这种机制确保了消息不会因为网络问题或消费者崩溃而丢失但也可能导致消息被多次投递给消费者。 在消息被消费者处理完成后消费者通过调用消息队列提供的basic.ack()方法确认接口来告知消息队列该消息已被成功处理消息队列随后会删除该消息从而避免其他消费者再次处理这条消息当然你也可以使用自动应答模式。 你需要先开启手动确认模式示例代码如下 spring.rabbitmq.listener.simple.acknowledge-mode manual创建普通交换机和队列并绑定示例代码如下 Configuration public class MQConfig {/*** 普通队列** return*/Beanpublic Queue queue() {return new Queue(normal_queue);}/*** 普通交换机** return*/Beanpublic DirectExchange normalExchange() {return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定** return*/Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);} }消费方再方法上定义Message对象和Channel对象然后调用Channel对象的basicAck()或basicNack()、basicReject()方法进行消息确认或拒绝示例代码如下 Component RabbitListener(queues normal_queue) public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg,Message message,Channel channel) throws IOException, InterruptedException {log.info(收到消息msg);try{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){e.printStackTrace();channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}} }发送方示例代码如下 Component public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;log.info(发送消息);template.convertAndSend(normal.exchange, normal, msg);} }调用发送方send()方法执行结果如图 另外一种情况业务处理过程中出现异常重复消费导致死循环 Spring Boot 提供了消息重试的一个属性spring.rabbitmq.template.retry.max-attempts示例代码如下 # 开启消费重试 spring.rabbitmq.listener.simple.retry.enabledtrue # 设置消费最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts3现在假设再自动应答模式中业务处理过程中出现了异常消费方示例代码如下 // 省略参考前面示例绑定代码和发送方代码... ... Component RabbitListener(queues normal_queue) public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) {log.info(收到消息msg);throw new RuntimeException();} }调用发送方send()方法执行结果如图 你还可以设置每次重发的间隔时间示例代码如下 # 设置间隔时间单位秒 spring.rabbitmq.listener.simple.retry.initial-interval10000调用发送方send()方法执行结果如图 手动拒绝并重新入队的行为并不直接受spring.rabbitmq.listener.simple.retry.max-attempts配置的控制。这个配置项主要控制的是监听器在自动模式下对消息处理失败后的自动重试次数。当消息通过手动方式被拒绝并重新入队时它被视为一个新的消费请求而不是之前失败尝试的延续。 持久化 RabbitMQ的持久化机制主要用于确保消息在队列中即使在Broker重启后也不会丢失。持久化分为三个部分交换机持久化、队列持久化、消息持久化。 交换机和队列的持久化将durable设置为true前面我们讲解的所有内容中创建的交换机和队列默认都是持久化的如图所示 当服务器重启后不会被丢失下面我们模拟一下服务器重启的情况现在有一个持久化的交换机和队列含D字母为持久化如图所示 然后我们执行命令systemctl restart rabbitmq-server让RabbitMQ重启如图所示稍等一会 然后刷新页面后发现交换机和队列依然存在。 下面我们来演示一下不设置为持久化先将之前队列删除示例代码如下 Configuration public class MQConfig {Beanpublic Queue queue() {return new Queue(normal_queue, false);}Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange, false, false);}Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);} }新创建的队列和交换机不含字母D如图所示 再次执行重启命令systemctl restart rabbitmq-server刷新页面后交换机和队列就不存在了如果有被监听的情况下重启后是不会删除的。 消息持久化 消息持久化并不能完全保证消息不丢失因为将消息写入磁盘比仅在内存中存储要慢。这可能导致生产者的发送速度降低。另一方面再存储到磁盘过程中消息还在缓存的一个间隔点此时宕机导致消息丢失。 如果队列本身被定义为持久化durable那么队列可以在RabbitMQ重启后保留其状态。这意味着队列的元数据和其结构会被保留。 如果消息的持久化是通过队列来确认那消息持久化就完全没啥必要了但是还是介绍下通过setDeliveryMode()方法设置为MessageDeliveryMode.PERSISTENT示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 持久化设置message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}};template.convertAndSend(normal.exchange, normal, msg, messagePostProcessor);} }交换机自动删除 在临时队列章节中若断开了消费者的连接队列会自动删除。我们同样也可以将交换机的autodelete属性被设置为true当所有与该交换机绑定的队列都断开连接并且不再存在时该交换机将会自动被RabbitMQ删除。 源码如图所示 示例代码如下 Configuration public class MQConfig {public Queue queue() {return new Queue(normal_queue);}Beanpublic DirectExchange normalExchange() {return new DirectExchange(normal.exchange, true, true);}Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);} }创建成功后可以看到交换机对应的配置标明AD标签如图所示 当没有队列绑定时对应的交换机将自动删除可以设置为临时队列或者删除队列后查看效果。 可靠传输 确保RabbitMQ中的消息可靠传输有很多种策略比如前面介绍的持久化、死信队列、确认机制还有一种方式是生产方确认机制它有两种方式confirm机制和return机制。 Confirm机制 Confirm机制主要用于确认消息是否已经被RabbitMQ代理成功接收并路由到指定的Exchange交换机。 开启Confirm确认机制示例代码如下 # 开启生产方confirm确认机制 SIMPLE: 使用同步的 Confirm 模式 CORRELATED: 使用异步的 Confirm 模式 spring.rabbitmq.publisher-confirm-typecorrelated定义一个全局Bean通过setConfirmCallback()方法接收消息回调示例代码如下 Configuration public class MQConfig {private static final Logger log LoggerFactory.getLogger(MQConfig.class);// 省略绑定交换机和队列代码参考前面示例... .../*** 统一配置* param connectionFactory* return*/Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 表示如果消息无法被路由到一个或多个队列例如因为指定的路由键没有匹配的队列那么消息代理如RabbitMQ将返回一个未路由的消息给生产者发送者。rabbitTemplate.setMandatory(true);// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {log.info(消息已经到达Exchange);} else {log.info(消息没有到达Exchange);}if (correlationData ! null) {log.info(相关数据 correlationData);}if (cause ! null) {log.info(原因 cause);}});return rabbitTemplate;} }生产方和消费方的代码就不展示了参考前面的示例启动后将交换机删除后调用send()方法执行结果如图 Return机制 Return机制用于处理那些由于路由问题而无法到达任何队列的消息。 开启Return确认机制示例代码如下 # 开启生产方return确认机制 spring.rabbitmq.publisher-returnstrue在全局Bean里通过setReturnsCallback()方法接收消息回调示例代码如下 Configuration public class MQConfig {private static final Logger log LoggerFactory.getLogger(MQConfig.class);// 省略绑定交换机和队列代码参考前面示例... .../*** 统一配置* param connectionFactory* return*/Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 表示如果消息无法被路由到一个或多个队列例如因为指定的路由键没有匹配的队列那么消息代理如RabbitMQ将返回一个未路由的消息给生产者发送者。rabbitTemplate.setMandatory(true);// 设置返回回调rabbitTemplate.setReturnsCallback(returnedMessage - {log.info(消息无法到达队列时触发);log.info(ReturnCallback: 消息 returnedMessage.getMessage());log.info(ReturnCallback: 回应码 returnedMessage.getReplyCode());log.info(ReturnCallback: 回应信息 returnedMessage.getReplyText());log.info(ReturnCallback: 交换机 returnedMessage.getExchange());log.info(ReturnCallback: 路由键 returnedMessage.getRoutingKey());});return rabbitTemplate;} }发送方发送一个不匹配的RoutingKey示例代码如下 Component public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;template.convertAndSend(normal.exchange, normal2, msg);} }调用send()方法执行结果如图 通过合理使用这两个机制一起使用生产者可以确保消息在发送到RabbitMQ时的可靠性并及时处理那些由于路由问题而无法到达队列的消息。 唯一标识 在 RabbitMQ 中设置消息的唯一标识Message ID可以帮助你跟踪消息的处理状态。你可以通过在发送消息时设置 MessageProperties 来实现这一点。 在发送消息的时候给消息设置指定标识示例代码如下 Component public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {String msg hello world;MessageProperties messagePostProcessor new MessageProperties();messagePostProcessor.setMessageId(202409300000001); // 设置消息标识Message message null;try {message new Message(msg.getBytes(utf-8), messagePostProcessor);} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}template.convertAndSend(normal.exchange, normal, message);} }消费方示例代码如下 Component RabbitListener(queues normal_queue) public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(byte[] msg,Message message,Channel channel) throws IOException {log.info(收到消息new String(msg)消息标识message.getMessageProperties().getMessageId());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} }执行结果如图 面试题如何保证MQ顺序消费 答可以在生产者端将同一组的消息发送到一个特定的队列。然后每个队列由一个单独的消费者处理使用basicAck() 来确认消息从而保持组内消息的顺序。
http://www.dnsts.com.cn/news/204009.html

相关文章:

  • 陕西省建设八大员官方网站最近几天的新闻
  • 陕西省建设厅执业资格注册中心网站报名系统企业网站备案所需材料 amp
  • 网站建设从零到精通.pdf公司内部网站的作用
  • 网站优缺点分析百度搜索排名怎么收费
  • 泉山微网站开发招商建设工程有限公司网站
  • 怎么查网站做百度竞价信息网站流量赚钱
  • 重庆网站建设中心网站页面设计报价表
  • 网站模板asp北京新闻媒体
  • 滁州市建设工程质量检测协会网站邮件网站怎么做
  • 溧阳网站优化wordpress getthetags
  • 甘肃省第八建设集团公司网站德州极速网站建设 小程序
  • 做分销网站做网站可以不用框架吗
  • 网站结构形式有哪些网站做icp备案需要多久
  • 做网站填写主要品牌怎么填写企石网站仿做
  • 怎么优化一个网站关键词举报网站建设公司
  • 手机网站设计宽度软件开发方式
  • 湖南省网站门户建设目标
  • 做网站联系电话网店设计流程
  • wordpress建哪些网站吗软件工程做项目网站
  • 网站编程培训机构上海室内设计有限公司
  • 什么免费网站可以链接域名wordpress音乐主题musik汉化
  • 学院网站设计说明书推广公司简介怎么写
  • 网站怎么做跳转页面兰州vx
  • 网站商城建设员招聘企管宝app下载
  • 三亚高端服务网站北京建设门户网站
  • 北京建网站费用怎么做境外电商平台
  • 北京网站外包公司在线销售网站设计文献
  • 建设网站的需要的工具wordpress插件清单 很多很全
  • 中山网站开发网页版传奇链接
  • 公司网站建设开发济南兴田德润优惠吗两个路由器做双网站