云南网站推广,哪个网站做相册好,买了个服务器 怎么做网站,凤蝶直播一、MQ概述
异步通信的优点#xff1a;
耦合度低吞吐量提升故障隔离流量削峰
异步通信的缺点#xff1a;
依赖于Broker的可靠性、安全性、吞吐能力架构复杂#xff0c;业务么有明显的流程线#xff0c;不方便追踪管理
什么是的MQ MQ#xff08;Message Queue#xf…一、MQ概述
异步通信的优点
耦合度低吞吐量提升故障隔离流量削峰
异步通信的缺点
依赖于Broker的可靠性、安全性、吞吐能力架构复杂业务么有明显的流程线不方便追踪管理
什么是的MQ MQMessage Queue,消息队列就是放消息的队列。也是事件驱动架构中的Broker。
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒级消息可靠性高一般高一般
二、RabbitMQ概述
1. RabbitMQ的结构和概念
Channel操作MQ的工具Exchange路由消息到队列中Queue缓存消息Virtual Host虚拟主机是对Queue、Exchange等资源的逻辑分组 2. 常见消息模型 基本消息队列BasicQueue Publisher消息发布者将消息发送到队列QueueQueue消息队列负责接受并缓存消息Consumer订阅队列处理队列中的消息 工作消息队列WorkQueue Work queues也被称为Task queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息。 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。此时就可以使用work 模型多个消费者共同进行消息处理提高消费速度。 发布订阅Publish、Subscribe根据交换机类型不同分为三种 Publisher生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机 Exchange交换机图中的X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有以下3种类型 Fanout广播将消息交给所有绑定到交换机的队列Direct定向把消息交给符合指定routing key 的队列Topic通配符把消息交给符合routing pattern路由模式 的队列 Consumer消费者与以前一样订阅队列没有变化 Queue消息队列也与以前一样接收消息、缓存消息。 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 Fanout Exchange 广播 在广播模式下消息发送流程 - 1 可以有多个队列
- 2 每个队列都要绑定到Exchange交换机
- 3 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定
- 4 交换机把消息发送给绑定过的所有队列
- 5 订阅队列的消费者都能拿到消息Direct Exchange路由 在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息 Topic Exchange主题
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。
通配符规则
#匹配一个或多个词*匹配不多不少恰好1个词
如下图
Queue1绑定的是china.# 因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weatherQueue2绑定的是#.news 因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
3. RabbitMQ的安装
1、安装ErlangRabbitMQ是用Erlang编写的因此首先需要安装Erlang运行环境注意Erlang与RabbitMQ的对应版本。运行以下命令进行安装sudo apt install erlang
2、在线拉取镜像docker pull rabbitmq:3-management
3、运行以下命令来下载并启动RabbitMQ Docker镜像docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
4、浏览器访问RabbitMQ管理页面http://IP:15672/注意若网页无法访问可能是rabbitmq_management插件未启用
5、进入sbin目录下查看插件命令rabbitmq-plugins list
6、 若 rabbitmq_management 插件未启用状态无 * 通过命令启用该插件rabbitmq-plugins enable rabbitmq_management
7、启用后重新访问地址用户名/密码默认guest/guest
三、SpringAMQP
AMQPAdanced Message Queuing Protocol是用于在应用程序之间传递业务消息的开发标准与语言和平台无关。
SpringAMQP是基于RabbitMQ封装的一套模板并且还利用SpringBoot对其实现了自动装配使用起来非常方便。
SpringAmqp的官方地址https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能
自动声明队列、交换机及其绑定关系基于注解的监听器模式异步接收消息封装了RabbitTemplate工具用于发送消息
1、使用SpringBootAMQP- SimpleQueue的步骤
引入AMQP的Starter依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency配置RabbitMQ地址
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /listener:simple:prefetch: 3 # 每次只能获取一条消息处理完成才能获取下一个消息利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
class RabbitmqDemoApplicationTests {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue() {//queueNameString queueName ty.simple.queue;//messageString message hello world ;//send MessagerabbitTemplate.convertAndSend(queueName, message);}}2、使用SpringBootAMQP- FanoutExchange的步骤
创建Spring配置类绑定交换机 - 队列
package com.example.rabbitmq_demo.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(ty.fanout);}Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定交换机与队列*/Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
class RabbitmqDemoApplicationTests {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendFanoutExchange() {//exchangeNameString exchangeName ty.fanout;//messageString message hello world fanout;//send MessagerabbitTemplate.convertAndSend(exchangeName, , message);}}
2、使用SpringBootAMQP- Direct的步骤
基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class ConsumerDemo {/*** 基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明* 在consumer的SpringRabbitListener中添加两个消费者同时基于注解来声明队列和交换机* param msg*/RabbitListener(bindings QueueBinding(value Queue(name ty.direct.queue1),exchange Exchange(name ty.direct, type ExchangeTypes.DIRECT),key {red, green}))public void listenDirectQueue1(String msg){System.out.println(listener ty.direct.queue1 Get message : msg);}RabbitListener(bindings QueueBinding(value Queue(name ty.direct.queue2),exchange Exchange(name ty.direct, type ExchangeTypes.DIRECT),key {red, blue}))public void listenDirectQueue2(String msg){System.out.println(listener ty.direct.queue2 Get message : msg);}
}通过convertAndSend发送消息会根据的RoutingKey将消息发送至指定队列。
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
class RabbitmqDemoApplicationTests {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendDirectExchange(){String exchangeName ty.direct;String message hello ty;rabbitTemplate.convertAndSend(exchangeName, red, message);}
}
3、使用SpringBootAMQP- Tpic的步骤
基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class ConsumerDemo {/*** Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符* param msg*/RabbitListener(bindings QueueBinding(value Queue(name ty.topic.queue1),exchange Exchange(name ty.topic, type ExchangeTypes.TOPIC),key ty.#))public void listenTopicQueue1(String msg){System.out.println(listener ty.topic.queue1 Get message : msg);}RabbitListener(bindings QueueBinding(value Queue(name ty.topic.queue2),exchange Exchange(name ty.topic, type ExchangeTypes.TOPIC),key #.tyty))public void listenTopicQueue2(String msg){System.out.println(listener ty.topic.queue2 Get message : msg);}
}根据RoutingKey通配符发送到对应Queue
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
class RabbitmqDemoApplicationTests {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendTopicExchange(){String exchangeName ty.topic;String message hello ty;rabbitTemplate.convertAndSend(exchangeName, ty.tyty, message);}}
4、SpringBootAMQP对象序列化
SpringBootAMQP默认使用的是 x-java-serialized-objectJDK序列化数据体积过大、有安全漏洞且可读性差。 可通过配置JSON转换器使用Json的方式做序列化和反序列化。
引入jar
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency配置类中增加Bean
Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}