营销型网站建站,企业网站seo优化交流,大连口碑最好的装修公司,个人做论坛网站有哪些实现思路
使用扇出交换机#xff08;Fanout Exchange#xff09;#xff1a;扇出交换机会将消息广播到所有绑定的队列#xff0c;确保每个消费者组都能接收到相同的消息。为每个消费者组创建独立的队列#xff1a;每个消费者组拥有自己的队列#xff0c;所有属于该组的消…实现思路
使用扇出交换机Fanout Exchange扇出交换机会将消息广播到所有绑定的队列确保每个消费者组都能接收到相同的消息。为每个消费者组创建独立的队列每个消费者组拥有自己的队列所有属于该组的消费者都订阅这个队列。确保同一组内的消费者竞争消费RabbitMQ 会将消息推送给组内第一个可用的消费者确保同一消息不会被同一组中的多个消费者处理。
示例场景
假设我们有两个消费者组GroupA 和 GroupB。我们希望发送的消息能够同时被 GroupA 和 GroupB 消费但每个组内的多个消费者只会有一个成员消费该消息。
Spring Boot RabbitMQ 广播式分组消费示例
1. 引入依赖
首先在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖
dependencies!-- Spring Boot Starter for RabbitMQ --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!-- 其他依赖... --
/dependencies
2. 配置 RabbitMQ 交换机和队列
在 application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机并为每个消费者组创建一个独立的队列。
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认消息concurrency: 5 # 每个队列的并发消费者数量max-concurrency: 10 # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类
创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {// 定义扇出交换机Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(fanout_exchange, true, false);}// 定义 Group A 队列Beanpublic Queue groupAQueue() {return new Queue(group_a_queue, true);}// 定义 Group B 队列Beanpublic Queue groupBQueue() {return new Queue(group_b_queue, true);}// 绑定 Group A 队列到扇出交换机Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息
创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键它会将消息广播到所有绑定的队列。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println(Sending message: message);rabbitTemplate.convertAndSend(fanout_exchange, , message);}
}
5. 创建消费者
为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 RabbitListener 注解来监听队列中的消息。
Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class GroupAConsumer {RabbitListener(queues group_a_queue)public void consumeMessage(String message) {System.out.println(Group A consumer received: message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class GroupBConsumer {RabbitListener(queues group_b_queue)public void consumeMessage(String message) {System.out.println(Group B consumer received: message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试
启动 Spring Boot 应用程序后GroupAConsumer 和 GroupBConsumer 会分别监听 group_a_queue 和 group_b_queue。通过以下方式测试消息的发送和消费
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer messageProducer;}Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage(This is a broadcast message);}
}
7. 运行结果 当你启动应用程序时TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列因此 group_a_queue 和 group_b_queue 都会接收到这条消息。