app开发公司推荐安徽创逸科技有限公司,品牌关键词优化,wordpress 股票主题,百度推广代理怎么加盟目录 一、简单模型
1、首先控制台创建一个队列
2、父工程导入依赖
3、生产者配置文件 4、写测试类
5、消费者配置文件
6、消费者接收消息
二、WorkQueues模型
1、在控制台创建一个新的队列
2、生产者生产消息
3、创建两个消费者接收消息
4、能者多劳充分利用每一个消…目录 一、简单模型
1、首先控制台创建一个队列
2、父工程导入依赖
3、生产者配置文件 4、写测试类
5、消费者配置文件
6、消费者接收消息
二、WorkQueues模型
1、在控制台创建一个新的队列
2、生产者生产消息
3、创建两个消费者接收消息
4、能者多劳充分利用每一个消费者的能力
三、交换机
四、Fanout交换机
1、 声明队列
2、 创建交换机
编辑 3、 绑定交换机
4、示例
五、Diect交换机
1、 声明队列
2、创建交换机 3、绑定交换机 4、示例
六、Topic交换机
1、创建队列
2、创建交换机
3、绑定队列
4、示例
7、、声明队列交换机
1、SpringAMQP提供的类声明
2、基于注解声明
七、消息转换器
配置JSON转换器 一、简单模型
创建一个父工程和两个子工程consumer和publisher 1、首先控制台创建一个队列
命名为simple.queue 2、父工程导入依赖 dependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies
3、生产者配置文件
spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码 4、写测试类
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue() {// 队列名称String queueName simple.queue;// 消息String message hello, rabbitmq!;// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
} 查看消息 5、消费者配置文件
spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码
6、消费者接收消息
Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息就会推送给当前服务调用当前方法处理消息。// 可以看到方法体中接收的就是消息体的内容RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println(spring 消费者接收到消息【 msg 】);}
} 结果
二、WorkQueues模型 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了
1、在控制台创建一个新的队列
命名为work.queue 2、生产者生产消息
/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName work.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}}
3、创建两个消费者接收消息 RabbitListener(queues work.queue)public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】);}RabbitListener(queues work.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】);}
结果 如果消费者睡眠时间不同 RabbitListener(queues work.queue)public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】);Thread.sleep(20);}RabbitListener(queues work.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】);Thread.sleep(200);}
消费者1 sleep了20毫秒相当于每秒钟处理50个消息消费者2 sleep了200毫秒相当于每秒处理5个消息 消息是平均分配给每个消费者并没有考虑到消费者的处理能力。导致1个消费者空闲另一个消费者忙的不可开交。没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
4、能者多劳充分利用每一个消费者的能力
在spring中有一个简单的配置可以解决这个问题。我们修改consumer服务的application.yml文件添加配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息可以发现由于消费者1处理速度较快所以处理了更多的消息消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳这样充分利用了每一个消费者的处理能力可以有效避免消息积压问题。
三、交换机 在订阅模型中多了一个exchange角色而且过程略有变化
Publisher生产者不再发送消息到队列中而是发给交换机Exchange交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Queue消息队列也与以前一样接收消息、缓存消息。不过队列一定要与交换机绑定。Consumer消费者与以前一样订阅队列没有变化
Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
交换机的类型有四种
Fanout广播将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机Direct订阅基于RoutingKey路由key发送给订阅了消息的队列Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符Headers头匹配基于MQ的消息头匹配用的较少。
四、Fanout交换机
Fanout英文翻译是扇出我觉得在MQ中叫广播更合适。 在广播模式下消息发送流程是这样的 1 可以有多个队列2 每个队列都要绑定到Exchange交换机3 生产者发送的消息只能发送到交换机4 交换机把消息发送给绑定过的所有队列5 订阅队列的消费者都能拿到消息
1、 声明队列 创建两个队列fanout.queue1和fanout.queue2绑定到交换机hmall.fanout 2、 创建交换机
创建一个名为fanout的交换机类型是Fanout 3、 绑定交换机 4、示例 生产者
/*** Fanout交换机*/Testpublic void testFanoutExchange() {// 交换机名称String exchangeName mq.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);}
消费者
RabbitListener(queues fanout.queue1)public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);}RabbitListener(queues fanout.queue2)public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);} 五、Diect交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue因此称为定向路由。 在Direct模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
1、 声明队列
首先在控制台声明两个队列direct.queue1和direct.queue2 2、创建交换机 3、绑定交换机 一个队列绑定两个RoutingKey 4、示例
生产者RoutingKey为red /*** Direct交换机*/Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName mq.direct;// 消息String message hello direct red;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);}
消费者
RabbitListener(queues direct.queue1)public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】);}RabbitListener(queues direct.queue2)public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】);} 两个队列都收到消息
RoutingKey为blue /*** Direct交换机*/Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName mq.direct;// 消息String message hello direct blue;// 发送消息rabbitTemplate.convertAndSend(exchangeName, blue, message);} 只有队列2收到消息
六、Topic交换机
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符
BindingKey 一般都是有一个或多个单词组成多个单词之间以.分割例如 item.insert
通配符规则
#匹配一个或多个词*匹配不多不少恰好1个词
举例
item.#能够匹配item.spu.insert 或者 item.spuitem.*只能匹配item.spu 示例
publisher发送的消息使用的RoutingKey共有四种
china.news 代表有中国的新闻消息china.weather 代表中国的天气消息japan.news 则代表日本新闻japan.weather 代表日本的天气消息
解释
topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括 china.newschina.weathertopic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括: china.newsjapan.news
1、创建队列 2、创建交换机 3、绑定队列 4、示例
生产者RoutingKey为china.news /*** topicExchange*/Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName mq.topic;// 消息String message hello topis china.news;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);}
消费者 RabbitListener(queues topic.queue1)public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);}RabbitListener(queues topic.queue2)public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);} RoutingKey为china.people /*** topicExchange*/Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName mq.topic;// 消息String message hello topis china.people;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.people, message);}
只有消费者1收到消息 7、、声明队列交换机
SpringAMQP提供了几个类用来声明队列、交换机及其绑定关系:
1、Queue:用于声明队列可以用工厂类QueueBuilder构建 2、Exchange:用于声明交换机可以用工厂类ExchangeBuilder构建 3、Binding:用于声明队列和交换机的绑定关系可以用工厂类BindingBuilder构建 1、SpringAMQP提供的类声明
示例创建Fanout交换机队列
Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange2(){return new FanoutExchange(mq.fanout2);}/*** 第1个队列*/Beanpublic Queue fanoutQueue3(){return new Queue(fanout.queue3);}/*** 绑定队列和交换机1*/Beanpublic Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);}/*** 第2个队列*/Beanpublic Queue fanoutQueue4(){return new Queue(fanout.queue4);}/*** 绑定队列和交换机2*/Beanpublic Binding bindingQueue2(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());}
} direct示例
Configuration
public class DirectConfig {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(mq.direct2).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}direct模式由于要绑定多个KEY会非常麻烦每一个Key都要编写一个binding
2、基于注解声明
/*** 注解声明交换机* param msg*/RabbitListener(bindings QueueBinding(value Queue(name direct.queue3),//队列名称exchange Exchange(name mq.direct, //交换机名称type ExchangeTypes.DIRECT),//交换机类型key {red, blue}//RoutingKey))public void listenDirectQueue3(String msg){System.out.println(消费者3接收到direct.queue3的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue4),exchange Exchange(name mq.direct, type ExchangeTypes.DIRECT),key {red, yellow}))public void listenDirectQueue4(String msg){System.out.println(消费者4接收到direct.queue4的消息【 msg 】);} 队列 交换机
七、消息转换器 Testpublic void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, 张三);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(object.queue, msg);}
当发送的数据为Objiet类型时会出现乱码现象而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 只不过默认情况下Spring采用的序列化方式是JDK序列化 配置JSON转换器
在publisher和consumer两个服务中都引入依赖 dependencies!--mq消息转换为json--dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version/dependency/dependencies
注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。
配置消息转换器在publisher和consumer两个服务的启动类中添加一个Bean即可
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
结果 消费者接收Object
我们在consumer服务中定义一个新的消费者publisher是用Map发送那么消费者也一定要用Map接收格式如下 RabbitListener(queues object.queue)public void listenSimpleQueueMessage(MapString, Object msg) throws InterruptedException {System.out.println(消费者接收到object.queue消息【 msg 】);}