怎么给网站图片加alt,如何创建一个网站0元,joomla 网站建设,运营团队架构#xff08;一#xff09;什么是MQ MQ#xff08;message queue#xff09;本质上是队列#xff0c;满足先入先出#xff0c;只不过队列中存放的内容是消息而已#xff0c;那什么是消息呢#xff1f; 消息可以是字符串#xff0c;json也可以是一些复杂对象 我们应用场…
一什么是MQ MQmessage queue本质上是队列满足先入先出只不过队列中存放的内容是消息而已那什么是消息呢 消息可以是字符串json也可以是一些复杂对象 我们应用场景通常是再分布式系统之间 系统通信方式
一般我们的系统调用方式有两种
一种是同步通信也就是a---b,直接调用对方服务数据从a出发直接到达b
另一种叫异步通信也就是a-----容器----b,数据从一段出发先进入一个容器进行临时存储当满足条件后由容器发给另一端这个容器的实现就是MQ
二MQ的作用
MQ主要的工作是接收并发送消息在不同场景下可以有不同的作用
这里说一些比较常见的作用
1.异步解耦 在业务流程中有一些操作时很耗时的并且可能阻塞其他任务但是这些操作并不要求立即执行我们就可以借助MQ来把这些操作异步化
2.流量削峰 我们在一些特殊时间可能会有访问量突增的情况那我们应用一般来说是不能立即全部处理的所以我们可以使用MQ把访问放到消息队列中然后依次的处理这些请求
3.消息分发 当多个系统要对同一个数据做出响应时我们可以使用MQ来进行消息分发比如支付成功后支付系统就可以向MQ发送消息然后其他系统来订阅这个消息就不需要轮询数据库了
4.延迟通知 在一些定时要进行处理的一些场景中我们可以使用MQ的延迟消息功能就比如在电商平台用户下单后一定时间未支付我们可以把限定的时间放到队列中时间到了自动取消订单
三RabbitMQ的核心概念
我们这里用RabbitMQ来学习消息队列
AMQP
首先我们来看一下这个AMQP是一种高级消息队列,定义了一套确定的消息交换功能包含交换机和队列如何映射等这些组件共同工作使消息能够正常被接收和发送AMQP还定义了一个网络协议允许客户端和AMQP模型进行通信
我们RabbitMQ就是实现了AMQP协议的RabbitMQ就是AMQP协议的Erlang的实现
所以他们两者的结构模型是一样的 首先我们来看一下RabbitMQ的工作流程 RabbitMQ是⼀个消息中间件,也是⼀个⽣产者消费者模型.它负责接收,存储并转发消息.
首先我们先来了解一下图中出现的一些概念
1.Producer和Consumer
Producer生产者是MQ的客户端用来向RabbitMQ发送消息的
Consumer消费者也是MQ的客户端是用来从RabbitMQ中取消息的
Brokeer就是RabbitMQ用来接收和转发消息的
我们也可以简化下上面的图只关注这三者 2.Connection和Channel
Connection连接是客户端和RabbitMQ服务器建立的一个TCP连接这个连接时建立消息传递的基础它负责传输客户端和服务器之间的所有数据和控制信息
Channel信道每个建立的connection都可以由很多个信道每个信道是一个独立的虚拟连接消息的发送和接收都是通过channel的
注信道的作用是为了把所有的读写操作都给复用到同一个TCP连接上这个可以减少建立和关闭连接的开销提高性能 3.Virtual host
Virtual host虚拟机这个是消息队列提供的一种逻辑上的隔离机制本质上没有隔离对于Rabbit一个Broker可以有多个虚拟机当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server提供的服务时可以虚拟划分出多个vhost每个⽤⼾在⾃⼰的vhost创建exchange/queue等
4.queue
queue队列是RabbitMQ中的内部对象是存储消息的地方
多个消费者可以订阅同一个队列 5.exchange
exchange交换机 是消息到达broker中的第一站因为虚拟机是逻辑上的本质上是不存在的它负责接收生产者传来的消息并根据routingkeybingingkey来把这些消息路由到一个或者多个queue中exchange起到了消息路由的作用
6.RabbitMQ的工作流程 1.生产者生产消息
2.生产者与RabbitMQ建立一个连接并且开启一个信道
3.生产者声明一个交换机
4.生产者声明一个队列
5.生产者发送消息到Broker
6.Broker接收消息并存入到相列中如果没有相应的队列就根据生产者的配置来进行一些处理
四RabbitMQ入门程序 首先我们要使用RabbitMQ我们要在服务器上进行安装那安装就先不说了我们可以在官方文档或者b站找到很多资料
1.引入依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version/dependency
生产者代码
public class Produce {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(62.234.46.219);connectionFactory.setPort(5672);connectionFactory.setUsername(wd);connectionFactory.setPassword(w85315678);connectionFactory.setVirtualHost(bit);Connection connection connectionFactory.newConnection();//开启信道Channel channel connection.createChannel();//声明交换机这里使用默认交换机//声明队列/*** var1 队列名称* var2 是否持久化* var3 是否独占这个队列只有一个消费者* var4 是否自动删除* argument 一些高级参数* AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, MapString, Object var5) throws IOException;*/channel.queueDeclare(hello,true,false,false,null);//发送消息/*** var1 交换机名称* var2 交换机通过什么映射到queue中内置交换机routingkey与队列名称保持一致* var3 属性配置* body 消息* void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)*/String s1hello wd;channel.basicPublish(,hello,null,s1.getBytes());//释放信道channel.close();//释放连接connection.close();}
}消费者代码
public class consumer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(62.234.46.219);connectionFactory.setPort(5672);connectionFactory.setUsername(wd);connectionFactory.setPassword(w85315678);connectionFactory.setVirtualHost(bit);Connection connection connectionFactory.newConnection();//创建信道Channel channel connection.createChannel();//声明队列(如果队列没有才创建有就不创建)channel.queueDeclare(hello,true,false,false,null);//消费消息/*** String basicConsume(String queue, boolean autoAck, Consumer callback)* queue 队列名称* autoAck是否自动确认* callback接收消息后要执行的逻辑*/DefaultConsumer consumernew DefaultConsumer(channel){//从队列中获取到消息就会执行的方法Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到: new String(body));}};channel.basicConsume(hello,true,consumer);//释放资源channel.close();connection.close();}
}消费者代码中有一个类叫Consumer
也就是我们channel.basicConsume()中的最后一个参数
这个参数是来定义消费者行为的当我们需要从MQ中接收消息时我们要提供一个实现了Consumer接口的对象这里我使用的是内部类
DefaultConsumer是RabbitMQ提供的一个默认消费者实现了Consumer接口
核心方法就是我们上面重写的方法
那上面我们没有说参数都是什么意思这里来说一下
consumerTag消费者标签通常是消费者在订阅队列时指定的
envelope有一些封包信息如队列名称交换机等
properties一些配置信息
body消息的具体内容
五可能出现的错误
1.
我们上面销毁的过程中一定要先销毁channel再销毁connection或者直接销毁connection不销毁channel
如果先销毁connection再销毁channel就会报错 2.
我们在消费者代码中如果不自己声明队列且MQ中没有这个队列时我们就会报错 3.
如果端口或者ip错误也会报错 4.账号密码不正确也会报错 5.用户对虚拟机没有操作权限会报错