银川做网站哪家好,店面设计说明,logo图案生成器,小程序登录失败是什么原因文章目录 一、初识 MQ1. 同步通讯2. 异步通讯3. MQ 常见框架 二、RabbitMQ 入门1. 概述和安装2. 常见消息模型3. 基础模型练习 三、SpringAMQP1. 简单队列模型2. 工作队列模型3. 发布订阅模型3.1 Fanout Exchange3.2 Direct Exchange3.3 Topic Exchange 一、初识 MQ
1. 同步通… 文章目录 一、初识 MQ1. 同步通讯2. 异步通讯3. MQ 常见框架 二、RabbitMQ 入门1. 概述和安装2. 常见消息模型3. 基础模型练习 三、SpringAMQP1. 简单队列模型2. 工作队列模型3. 发布订阅模型3.1 Fanout Exchange3.2 Direct Exchange3.3 Topic Exchange 一、初识 MQ
1. 同步通讯 同步和异步的区别 ① 同步通讯类似于打电话是一对一的同时发生的通讯因此它的时效性更好一步走完才能走下一步 ② 异步通讯类似于发短信给一个人发的同时还可以给别人发支持多线操作但是由于通讯不同步所以它的时效性差多步可以同时走。 微服务间基于 Feign 的调用就属于同步调用比如支付服务内部一般都会调用订单服务这个时候它就必须要等待对方的回应才能进行后面的操作显然这个过程是实时性的但也存在一定的问题。
同步通讯劣势 ① 耦合度高。每次加入新的需求都要修改原来的代码 ② 性能下降。调用者需要等待服务提供者的响应响应时间等于每次调用的时间之和如果调用链过长会直接导致性能下降 ③ 资源浪费。调用链中的每个服务在等待响应的过程中不能释放所占用的资源。 A 服务调用 B 服务在 B 未执行完之前A 一直干等着它占用的资源也不会释放 ④ 级联失败。如果服务提供者出现问题所有的调用方也会跟着出现问题。 A 服务里面同时调用了 B 服务和 C 服务B 服务出故障导致 A 所占用的资源一直得不到释放一次、两次、无数次这样的请求最终导致 A 的资源被耗尽而 A 服务里面还有 CC 是正常的但由于 A 的资源已被耗尽了导致 A 调用 C 的业务时也会出现问题。
2. 异步通讯
异步调用常见的实现就是事件驱动模式。
异步调用引入一个 Broker 事件代理者当服务消费者接收到前端请求后就会发布一个事件给 Broker然后由 Broker 广播将事件通知给所有的提供者让这些提供者各自开始行动。
同步通讯中消费者会在代码中依次调用提供者而异步通讯中的消费者不再负责调用提供者它只管发布事件到 Broker由提供者自行订阅事件就可以了。
异步通讯优势 ① 解除了服务与服务之间的耦合。消费者不再负责调用提供者只管发布事件到 Broker 就可以了 所以一旦有新的业务出现直接去订阅事件就可以了 ② 性能提升吞吐量提高。消费者发布完事件后立马就会响应给客户至于提供者什么时候运行完我们并不在意 只要最终能做完就行了所以业务的总耗时就为消费者的请求时间和事件的发布时间之和 ③ 不存在资源浪费。 什么是资源浪费就是占着资源却不用。以前消费者会占着资源等待提供者运行完毕现在服务之间已经彻底解耦了又怎么会浪费 ④ 解决级联失败问题。服务之间没有强依赖不必担心级联失败问题 即使某一个提供者挂掉了也不会影响消费者 ⑤ 流量削峰。 当大量的用户请求过来的时候消费者会发布大量的事件给 Broker因为提供者处理业务的能力有限所以 Broker 在这里会对事件做一个缓冲具体的提供者根据自己的能力处理业务压力全给到 Broker 就可以了最终对微服务起到一个保护的作用。
异步通讯劣势 ① 依赖于 Broker 的可靠性、安全性和吞吐能力 ② 架构复杂业务没有明显的流程线不好追踪管理。
异步通讯只是通知提供者去干至于什么时候干完谁都不知道。实际上大多数情况下都会使用同步通讯因为提供者返回的结果消费者立马要用我们更追求的是时效性而对并发并没有很高的要求
3. MQ 常见框架
MQ 即消息队列也就是事件驱动架构中的 Broker。 比较 ① RabbitMQ 支持多种协议这意味着它支持的功能更多其中 AMQP 可以实现跨语言通讯而 RocketMQ 和 Kafka是自定义的协议 ② RabbitMQ、RocketMQ 和 Kafka 都支持主从集群可用性较高 ③ Kafka的吞吐量非常高但是高吞吐量带来的牺牲就是消息的延迟性和不可靠性容易出现消息丢失的情况。 Kafka 适用于海量数据的传输但是对数据的安全要求不高的情况比如日志信息RabbitMQ 和 RocketMQ 的稳定性更强更适合对稳定性要求较高的场景比如业务通信如果要做自定义协议我们可以选择 RocketMQ因为 RockerMQ 是基于 Java 语言开发的且支持自定义协议。
二、RabbitMQ 入门
1. 概述和安装
RabbitMQ 的整体结构如下
消息的发布者将来会把消息发送到 exchange 交换机交换机负责将消息路由到具体的 queue 队列队列负责暂存消息消息的接收者就会从队列中拿取并处理消息。每个用户会有自己的虚拟主机各个虚拟主机之间是相互隔离的这样可以避免干扰
① 将本地的压缩文件上传至 Linux并加载为一个镜像 ② 创建并运行 MQ 容器15672 是 RibbitMQ 控制台的端口5672 是消息通讯的端口发消息、收消息
docker run \
-e RABBITMQ_DEFAULT_USERzxe \
-e RABBITMQ_DEFAULT_PASS856724bb \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management③ 浏览器访问一下 RabbitMQ 的控制台根据刚才设置的用户名和密码登录 每个模块的含义 ① Overview总览主要可以看到 RabbitMQ 的一些节点信息 ② Connections连接将来消息的发布者和接收者都应该与 RabbitMQ 建立连接 ③ Channels通道是操作 MQ 的工具建立连接之后必须创建一个 Channel 通道提供者和消费者才能基于 Channel 完成消息的发送或接收 ④ Exchanges交换机消息的路由器负责将消息路由到对面 ⑤ Queues队列用来做消息存储 ⑥ Admin管理管理当前用户信息 ⑦ virtual host虚拟主机是对 queue、exchange 等资源的逻辑分组。 2. 常见消息模型
RabbitMQ 官方文档rabbitmq.com
查看官方文档可以看到官方给出的很多 Demo 案例但是只有前五个和消息的接收发送有关。
其中前两个是基于队列发送的也就是说消息的发布者和订阅者之间直接通过队列连接没有交换机。而后三个属于发布订阅模式有交换机根据交换机的类型分为广播、路由和主题三种。 3. 基础模型练习
官方的 HelloWorld 是基于最基础的消息队列模型来实现的只包括三个角色消息发布者、消息队列和订阅者。
① 发布消息
发布消息基本步骤创建连接工厂 → 设置连接参数 → 建立连接 → 创建通道 → 创建队列 → 发送消息 → 关闭通道和连接。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory factory new ConnectionFactory();// 2.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.149.101);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(zxe);factory.setPassword(123321);// 3.建立连接Connection connection factory.newConnection();// 4.创建通道ChannelChannel channel connection.createChannel();// 5.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 6.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 7.关闭通道和连接channel.close();connection.close();}
}② 订阅消息
订阅消息基本步骤创建连接工厂 → 设置连接参数 → 建立连接 → 创建通道 → 创建队列 → 订阅消息 → 处理消息。
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.149.101);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(zxe);factory.setPassword(123321);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息队列一有消息函数就会执行body就是消息体String message new String(body);System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);}
}发布者已经创建了队列订阅者为什么还要创建这是因为它们的先后执行顺序是不确定的以防万一订阅者在发布者之前执行队列不存在的问题其实最终也不会创建两个队列而是有则不创建没有就创建
从 MQ 上接收消息也需要时间所以函数外的代码先执行函数内的代码后执行这就是异步。 消息一旦被接收就会立马从队列中消失。 三、SpringAMQP
1. 简单队列模型 上面案例中发布消息和订阅消息的步骤十分繁琐吧接下来的 SpringAMQP 就是用来简化这些步骤的它是一种模板协议与语言和平台无关利用 SpringAMQP 提供好的模板工具直接可以发送和接收消息其实底层封装的还是 Rabbit 的客户端。 ① 父工程中引入 AMQP 依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency② 编写 publisher 和 consumer 的 application.yml添加连接信息
spring:rabbitmq:host: 192.168.150.101 #主机名port: 5672 #端口virtual-host: / #虚拟主机username: zxe #用户名password: 123321 #密码③ SpringAMQP 方式并不会主动创建队列需要我们提前去控制台创建 simple.queue 队列 ④ 在 publisher 中编写测试方法向 simple.queue 队列发送消息
主机名、端口等信息都写在配置文件里面了代码中只需要传入队列名和所需发送的消息即可。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage2SimpleQueue() {String queueName simple.queue;String message hello, spring amqp!;//发送消息rabbitTemplate.convertAndSend(queueName, message);}
}⑤ 发布一个消息去 RabbitMQ 控制台看一下 ⑥ 在 consumer 服务中新建一个类用于编写消费逻辑
主机名、端口等信息都写在配置文件里面了代码中只需要指定监听的队列名和接下来的行为方法就可以了Spring 会自动将监听到的消息以参数的形式传递给行为方法。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) {System.out.println(消息者已接收到simple.queue的消息【 msg 】);}
}⑦ 运行 Consumer 的启动类同时去看一个 RabbitMQ 控制台发现队列中的消息条数变成了 0说明消息已经被成功消费了 2. 工作队列模型 以前我们发布者发送过来的消息都由一个消费者去处理但是当发布者发送的消息很多时一个消费者就可能处理不过来了处理不过来的消息全部堆积在队列中长期下去队列肯定会爆满最终造成数据丢失。 解决办法就是我们可以安排多个消费者让它们共同处理队列中的消息。 消费者之间是一种合作的关系一条消息如果被某一消费者接收了它就会立马从队列中消失所以不会出现重复消费的问题。
Work Queue工作队列可以提高消息的处理速度避免队列消息堆积。 模拟 WorkQueue基本思路如下 ① 在 publisher 服务中定义测试方法每秒产生 50 条消息发送到 simple.queue ② 在 consumer 服务中定义两个消息监听者都监听 simple.queue 队列 ③ 消费者 1 每秒处理 50 条消息消费者 2 每秒处理 10 条消息。 ① 编写 publish 代码for 循环模拟发送 50 条消息每发送一次我们让线程休眠 20 毫秒这样发完 50 条消息就是 1 秒
Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName simple.queue;String message hello, message_;for (int i 1; i 50; i) {rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}② 编写 consumer 代码两个方法模拟两个消费者同样设置休眠时间模拟消费者的处理速度
RabbitListener(queues simple.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues simple.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {//err打印出红色字体比较容易区分System.err.println(消费者2接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}③ 运行之后我们发现消费者并没有在一秒内将消息处理完且消费者1 只处理偶数消息消费者2 只处理奇数消息且消费者1 提前就处理完了消费者2 晚了好久才把消息处理完 这是因为我们 RabbitMQ 的预取机制所导致的。当消息进入队列之后我们的消费者并不是立马去消费这些消息而是先根据轮询的方式把消息平均分配给所有的消费者这就是预取。 它不管每个消费者的消费能力如何一律平均分配这就出现了消费者1 处理完毕消费者2 还在继续的问题。 ④ 所以接下来可以去配置文件里设置消费预取限制设置 preFetch 值可以控制预期消息的上限
spring:rabbitmq:host: 192.168.149.100 #主机名port: 5672 #端口virtual-host: / #虚拟主机username: zxe #用户名password: 856724bb #密码listener:simple:prefetch: 1 #每次只能获取一条消息处理完成后才能获取下一个消息⑤ 配置完之后重启服务果然在一秒内完成消费 3. 发布订阅模型
发布订阅模式与之前案例的区别就是它允许将同一消息发送给多个消费者这里的多个消费者指的是多个不同的服务实现方式是加入了 exchange 交换机。
发布者将消息发送给交换机交换机负责将消息路由到不同的队列即一个消息同时被多个消费者消费。
注意交换机只负责消息的路由而不是存储如果路由失败则消息丢失
3.1 Fanout Exchange
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的 queue因此称为广播模式。
一次发布多个消费者都能接受。 ① 声明队列、交换机并将队列绑定到交换机上
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {//声明交换机Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(zxe.fanout);}//声明队列Beanpublic Queue fanoutQueue1() {return new Queue(fanout.queue1);}Beanpublic Queue fanoutQueue2() {return new Queue(fanout.queue2);}//将队列绑定到交换机参数为上面声明的方法名Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}② 运行项目测试以下 ② 编写消费者监听代码分别监听两个队列的消息
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者已接收到fanout.queue1的消息【 msg 】);
}
RabbitListener(queues fanout.queue2)
public void listenFanoutQueue(String msg) {System.out.println(消费者已接收到fanout.queue2的消息【 msg 】);
}③ 编写发布者代码将消息直接发送到交换机
Test
public void testSendFanoutExchange() {//交换机名称String exchangeName zxe.fanout;//消息String message hello, every one!;//发送消息rabbitTemplate.convertAndSend(exchangeName, , message);
}④ 启动项目发布消息并消费 步骤总结 ① 首先声明交换机和队列并将队列绑定到交换机 ② 对于发布者需要指定交换机名称和所要发布的消息 ③ 对于消费者直接指定想要监听的队列名和行为方法即可。 3.2 Direct Exchange
Direct Exchange 会将接收到的消息根据规则路由到指定的 queue因此称为路由模式。 原理 ① 每一个队列都会设置一个 BindingKey ② 发布者发送消息时会指定消息的 RoutingKey ③ 交换机负责将消息路由到 BindingKey 和 RoutingKey 一致的队列。 BindingKey 和 RoutingKey 相当于对暗号暗号相同则路由。
一个队列可以指定多个 BindingKey如果多个队列的 BindingKey 都可以与 RoutingKey 匹配那交换机就会将消息发送给多个队列。
① 用 RabbitListener 注解声明交换机、队列和 BindingKey
//用RabbitListener注解声明交换机、队列和 BindingKey
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(zxe.direct),key {red, blue}
))
public void listenDirectQueue1(String msg) {System.out.println(消费者已接收到direct.queue1的消息【 msg 】);
}
RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(zxe.direct),key {red, yellow}
))
public void listenDirectQueue2(String msg) {System.out.println(消费者已接收到direct.queue2的消息【 msg 】);
}② 编写发布者代码指定交换机名要发布的消息以及 RoutingKey ③ 启动项目可以看到队列1 接收到消息因为它里面包含 blue 3.3 Topic Exchange
Topic Exchange 与 Direct Exchange 类似区别在于 Topic Exchange 的 routingKey 必须是由多个单词组成的列表并以 . 分隔比如 china.newschina.weather所以它又称为话题模式。
Queue 与 Exchange 指定 BindingKey 时可以使用通配符。 #代指 0 个或多个单词 *代指一个单词
比如中国相关的一切可以这样表示china.* 所有的新闻可以这样表示#.news
以前 bindingKey 需要绑定 news、weather、food 等多个 key现在只需要一个 china.* 就搞定了只要是 china 的我都要
① 用 RabbitListener 声明队列、交换机和 BindingKey这里使用通配符来指定 key
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name zxe.topic, type ExchangeTypes.TOPIC),key china.*))public void listenTopicQueue1(String msg) {System.out.println(消费者已接收到topic.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name zxe.topic, type ExchangeTypes.TOPIC),key #.news))public void listenTopicQueue2(String msg) {System.out.println(消费者已接收到topic.queue2的消息【 msg 】);}② 编写发布者代码指定 RoutingKey
Testpublic void testSendTopicExchange() {String exchangeName zxe.topic;String message hello, china!;rabbitTemplate.convertAndSend(exchangeName, china.weather, message);}③ 运行一下