卡片式网站,专业分销网站建设,网站添加背影音乐怎么做,北京市建设工程交易中心网站RabbitMQ 基本使用方法
在你的代码中#xff0c;涉及到了 RabbitMQ 的基本使用#xff0c;包括队列定义、交换机的配置、消息的发送与接收等内容。下面我将详细总结 RabbitMQ 的基本使用方法#xff0c;重点解释如何在 Spring Boot 项目中与 RabbitMQ 集成。
1. 引入依赖 …RabbitMQ 基本使用方法
在你的代码中涉及到了 RabbitMQ 的基本使用包括队列定义、交换机的配置、消息的发送与接收等内容。下面我将详细总结 RabbitMQ 的基本使用方法重点解释如何在 Spring Boot 项目中与 RabbitMQ 集成。
1. 引入依赖
在 Spring Boot 项目中使用 Spring AMQP 组件来集成 RabbitMQ。首先需要在 pom.xml 中添加相关的依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId
/dependency该依赖会自动导入 Spring AMQP 库以及 RabbitMQ 的客户端允许你在 Spring 环境下方便地使用 RabbitMQ。
2. 配置 RabbitMQ
首先你需要配置 RabbitMQ 的相关参数如连接信息、交换机、队列等。在你的例子中RabbitConfig 类就负责了这些配置。
package com.easylive.entity.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Beanpublic MessageConverter messageConverter() {// 使用自定义的消息转换器来更严格地处理反序列化return new Jackson2JsonMessageConverter();}// 队列定义Beanpublic Queue transferFileQueue() {return new Queue(transferFileRouting, true); // durable 确保队列持久化}Beanpublic Queue videoPlayQueue() {return new Queue(videoPlayRouting, true);}// Direct 类型交换机Beanpublic DirectExchange directExchange() {return new DirectExchange(directExchange, true, false); // durable是否持久化}// 队列和交换机的绑定Beanpublic Binding transferFileBinding(Queue transferFileQueue, DirectExchange directExchange) {return BindingBuilder.bind(transferFileQueue).to(directExchange).with(transferFileRoutingKey);}Beanpublic Binding videoPlayBinding(Queue videoPlayQueue, DirectExchange directExchange) {return BindingBuilder.bind(videoPlayQueue).to(directExchange).with(videoPlayRoutingKey);}
}
2.1 定义队列
在 RabbitMQ 中队列用于存储消息直到消费者从队列中取出。队列是消息传递的基础。
Bean
public Queue transferFileQueue() {return new Queue(transferFileRouting, true); // durable 确保队列持久化
}Queue(transferFileRouting, true)创建一个名为 transferFileRouting 的队列true 表示该队列是持久化的即 RabbitMQ 会在服务器重启后保留队列。
2.2 定义交换机
交换机Exchange负责接收来自生产者的消息并根据队列绑定的规则将消息路由到相应的队列。RabbitMQ 支持不同类型的交换机如 Direct, Fanout, Topic 等在这个例子中使用的是 Direct Exchange。
Bean
public DirectExchange directExchange() {return new DirectExchange(directExchange, true, false); // durable是否持久化
}DirectExchange(directExchange, true, false)创建一个名为 directExchange 的交换机true 表示该交换机是持久化的false 表示不自动删除。
2.3 队列与交换机的绑定
队列和交换机之间的绑定决定了消息如何路由。在你的例子中队列通过 Routing Key 和交换机进行绑定。
Bean
public Binding transferFileBinding(Queue transferFileQueue, DirectExchange directExchange) {return BindingBuilder.bind(transferFileQueue).to(directExchange).with(transferFileRoutingKey);
}BindingBuilder.bind(transferFileQueue).to(directExchange).with(transferFileRoutingKey)这表示将 transferFileQueue 队列与 directExchange 交换机进行绑定使用 transferFileRoutingKey 作为路由键。
3. 消息发送
消息生产者通过交换机发送消息到队列消费者从队列中获取消息并处理。你在代码中的生产者部分使用了 amqpTemplate.convertAndSend 方法来发送消息消息是发送给direct交换机并指定key值。
for (VideoInfoFilePost filePost : addFileList) {amqpTemplate.convertAndSend(directExchange, transferFileRoutingKey, filePost);
}amqpTemplate.convertAndSend(directExchange, videoPlayRoutingKey, videoPlayInfoDto);amqpTemplate.convertAndSend(directExchange, transferFileRoutingKey, filePost)这表示通过交换机 directExchange使用 transferFileRoutingKey 作为路由键发送 filePost 对象到消息队列。amqpTemplate.convertAndSend 会自动序列化对象这里使用 Jackson2JsonMessageConverter并发送到队列。
4. 消息接收
消费者使用 RabbitListener 注解监听队列中的消息。当队列中有消息时消费者会触发相应的处理方法。
RabbitListener(queues transferFileRouting)
public void consumeTransferFileQueue(Payload VideoInfoFilePost videoInfoFile) {try {videoInfoPostService.transferVideoFile(videoInfoFile);} catch (Exception e) {log.error(处理转码文件队列消息失败, e);}
}RabbitListener(queues transferFileRouting)表示该方法会监听名为 transferFileRouting 的队列。Payload 注解用于指定消息体的类型。这里接收的是 VideoInfoFilePost 类型的消息。消费者方法会在消息到达时被触发执行相应的业务逻辑。
5. 消息转换
Spring AMQP 提供了 MessageConverter 接口可以用于消息内容的转换。在你的配置中使用了 Jackson2JsonMessageConverter 作为消息转换器它会将消息对象转换为 JSON 格式发送并在接收时进行反序列化。
Bean
public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();
}6. 完整流程总结 队列和交换机的定义与配置 定义队列和交换机设置持久化属性。使用 Binding 将队列与交换机绑定并指定路由键。 消息生产者发送消息 使用 amqpTemplate.convertAndSend 发送消息到指定的交换机并根据路由键将消息发送到特定的队列。发送的消息会经过 MessageConverter 转换成 JSON 格式。 消息消费者接收消息 使用 RabbitListener 注解来监听队列中的消息。当消息到达时消费者会自动触发相应的方法并处理消息。 异常处理 消费者中可以加入异常处理逻辑如 try-catch以确保在消息处理失败时记录日志并进行适当的处理。
7. 注意事项 持久化与确认机制 如果消息队列和交换机是持久化的那么即使 RabbitMQ 重启队列和交换机也会被保留。但这要求队列中的消息也需要持久化否则消息会丢失。 事务与确认 Spring AMQP 提供了事务支持可以在发送消息时确保消息的可靠性。消费者可以使用 RabbitListener 的 ackMode 属性来控制消息确认机制保证消息被成功消费后才从队列中移除。 消息格式与序列化 使用 Jackson2JsonMessageConverter 作为默认消息转换器可以方便地进行 Java 对象与 JSON 格式的互转。 死信队列与重试机制 RabbitMQ 支持死信队列DLX和消息重试机制用于处理消费失败的消息。
总结
RabbitMQ 在 Spring Boot 中的集成非常简便通过 Configuration 配置类定义队列、交换机和绑定关系生产者通过 amqpTemplate 发送消息消费者使用 RabbitListener 监听队列消息。结合 MessageConverter可以方便地进行消息的序列化和反序列化。整个流程中队列、交换机和消息的绑定机制是核心保证了消息的有效传递和处理。