如何申请网站,怎么在百度免费推广,电商网站要素,云虚拟主机wordpress前言
最近有个想法想整理一个内容比较完整springboot项目初始化Demo。
SpringBoot集成RabbitMQ RabbitMQ中的一些角色#xff1a; publisher#xff1a;生产者 consumer#xff1a;消费者 exchange个#xff1a;交换机#xff0c;负责消息路由 queue#xff1a;队列…前言
最近有个想法想整理一个内容比较完整springboot项目初始化Demo。
SpringBoot集成RabbitMQ RabbitMQ中的一些角色 publisher生产者 consumer消费者 exchange个交换机负责消息路由 queue队列存储消息 virtualHost虚拟主机隔离不同租户的exchange、queue、消息的隔离
SpringAMQP是基于RabbitMQ封装的一套模板并且还利用SpringBoot对其实现了自动装配使用起来非常方便。
junit用于测试。
一、pom引入依赖amqp !--rabbitmq--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency!--rabbitmq-- 二、application-dev.yaml 增加RabbitMQ相关配置
spring:#RabbitMQ服务器配置地址账号密码virtualhost等配置rabbitmq:host: 127.0.0.1port: 5672username: murgpassword: 123456virtual-host: murg-host#队列中没有消息阻塞等待时间template:receive-timeout: 2000
logging:level:org.springframework.security: debug
三、发布/订阅
RabbitMQ官方提供了5个不同的Demo示例对应了不同的消息模型。此处只列举发布/订阅模式。
此模式下根据交换机类型又分为三种。
1.Fanout类型 广播模式 把消息交给所有绑定到交换机的队列
2.Direct类型 路由模式 把消息交给符合指定routing key 的队列
3Topic类型 主题模式 把消息交给符合主题通配符的队列
3.1 Fanout类型
在广播模式下消息发送流程是这样的
1 可以有多个队列
2 每个队列都要绑定到Exchange交换机
3 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定
4 交换机把消息发送给绑定过的所有队列
5 订阅队列的消费者都能拿到消
3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起
创建配置类FanoutConfig
声明一个Fanout类型交换机命名为murg.fanout
声明两个Queue队列分别为fanout.queue1和fanout.queue2
分别将两个队列和交换机绑定后续用于消费消息。
package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Fanout 广播模式下*声明交换机和队列*/
Configuration
public class FanoutConfig {/*** 声明交换机和队列 将交换机和队列绑定在一起* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(murg.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}3.1.2创建消息生产服务
创建消息生产服务MessageProducerService 注入RabbitTemplate 用于发送消息。 注意一点是rabbittemplate.convertAndSend不会自己创建队列要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
新增测试广播模式下发送消息方法
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** Rabbitmq消息生产者*/
Service
RequiredArgsConstructor
public class MessageProducerService {private final RabbitTemplate rabbitTemplate;/*** 测试广播模式下发送消息* param msg*/public void testFanoutExchange(String msg) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend(murg.fanout,,msg);}}3.13创建消息消费服务
创建消息生产服务MessageConsumerService
RabbitListener是 SpringAMQP AMQP提供的注解用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 RabbitListener 注解可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定所有可同时消费murg.fanout交换机的消息。
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** 消息消费者**/
Service
RequiredArgsConstructor
public class MessageConsumerService {/*** 下面是Fanout广播模式的监听* 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2* param msg*/RabbitListener(queues fanout.queue1)public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);}RabbitListener(queues fanout.queue2)public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);}}3.14修改测试类进行测试
修改创建项目时生成的 BootdemoApplicationTests.class
增加以下注解
ActiveProfiles(dev)
指定运行环境为开发环境 RunWith(SpringRunner.class)
指定测试类的运行器Runner。其主要作用是将Spring的测试支持集成到JUnit测试中使得在运行JUnit测试时Spring的上下文可以被正确地加载和配置。
SpringBootTest(classes{BootdemoApplication.class})
指定启动类
package com.murg.bootdemo;import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(dev)
RunWith(SpringRunner.class)
SpringBootTest(classes{BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {AutowiredMessageProducerService messageProducerService;Testpublic void contextLoads() {System.out.printf(aaaaaaaaaaaaa);}//测试Testpublic void testFanoutExchange(){String msg 遍身罗绮者不是养蚕人;for (int i0;i10;i){messageProducerService.testFanoutExchange(msg);}}}3.15 增加一个测试方法调用消息生产服务发送 Fanout类型的消息。运行测试控制台输出结果 两个队列都可消费。
3.2 Direct类型
3.2.1MessageProducerService生产服务增加方法testDirectExchange
在Direct模型下 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key 消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。 Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
/*** 测试Direct模式下发送消息* 在Direct模型下** 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key** 消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。** Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息* murg.Direct* param msg*/public void testDirectExchange(String msg,String rountingkey) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend(murg.direct,rountingkey,msg);}
3.2.2MessageConsumerService消费服务增加方法testDirectExchange
Direct模式的消费监听修改队列和交换机的方式改为注解方式。 基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。 在consumer的SpringRabbitListener中添加两个消费者同时基于注解来声明队列和交换机Exchange(name murg.directtype ExchangeTypes.DIRECT)声明交换机名字及类型 通过key的值声明接收不同路由的消息
direct.queue1 消费 rountingkey为“蚕妇”的消息
direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息 /**** 下面是rountingKey 路由key 模式的消费监听* 基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。** 在consumer的SpringRabbitListener中添加两个消费者同时基于注解来声明队列和交换机**/RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),//定义队列名字 direct.queue1exchange Exchange(name murg.direct, type ExchangeTypes.DIRECT),//指定交换机和交换机类型key {蚕妇} //指定消费rountingkey))public void listenDirectQueue1(String msg){System.out.println(消费者接收到direct.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),//定义队列名字 direct.queue2exchange Exchange(name murg.direct, type ExchangeTypes.DIRECT),//指定交换机和交换机类型key {自京赴奉先县咏怀五百字}//指定消费rountingkey))public void listenDirectQueue2(String msg){System.out.println(消费者接收到direct.queue2的消息【 msg 】);}3.2.3测试类增加测试方法 Testpublic void testDirectExchange() throws InterruptedException {String msg1 遍身罗绮者不是养蚕人;String key1 蚕妇;String msg2 朱门酒肉臭路有冻死骨;String key2 自京赴奉先县咏怀五百字;for (int i0;i10;i){if (i % 2 0){messageProducerService.testDirectExchange(msg2,key2);Thread.sleep(1000);}else {messageProducerService.testDirectExchange(msg1,key1);Thread.sleep(1000);}}}
调用消息生产服务发送 Direct类型的消息。运行测试控制台输出结果 3.3 Topic类型
3.3.1MessageProducerService生产服务增加方法testTopicExchange
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符 Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert 通配符规则 #匹配一个或多个词 *匹配不多不少恰好1个词 举例 demo.#能够匹配demo.spu.insert 或者 demo.spu #.demo写法也可以 demo.*只能匹配demo.spu public void testTopicExchange(String msg, String key) {rabbitTemplate.convertAndSend(murg.topic,key,msg);} 3.3.2MessageConsumerService消费服务增加方法testDirectExchange
Exchange(name murg.topictype ExchangeTypes.TOPIC)声明交换机名字及类型 通过key的值声明接收不同路由的消息
topic.queue1 消费 rountingkey为“罗隐.*”的消息
topic.queue2 消费 rountingkey为“#.贫女”的消息
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),//定义队列名字 topic.queue1exchange Exchange(name murg.topic, type ExchangeTypes.TOPIC),//指定交换机和交换机类型key {罗隐.*} //指定消费rountingkey))public void listenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),//定义队列名字 topic.queue2exchange Exchange(name murg.topic, type ExchangeTypes.TOPIC),//指定交换机和交换机类型key {#.贫女}//指定消费rountingkey))public void listenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息【 msg 】);}
3.3.3测试类增加测试方法 Testpublic void testTopicExchange() throws InterruptedException {String msg 采得百花成蜜后为谁辛苦为谁甜;String key 罗隐.蜂;String msg2 苦恨年年压金线为他人作嫁衣裳;String key2 秦韬玉.贫女;for (int i0;i10;i){if (i % 2 0){messageProducerService.testTopicExchange(msg,key);Thread.sleep(1000);}else {messageProducerService.testTopicExchange(msg2,key2);Thread.sleep(1000);}}}
调用消息生产服务发送 Topic类型的消息。运行测试控制台输出结果