当前位置: 首页 > news >正文

国家电网账号注册网站帐号是什么企业建设营销型网站步骤

国家电网账号注册网站帐号是什么,企业建设营销型网站步骤,简单的网站开发流程,温州手机网站制作联系电话一、初识 MQ 传统的单体架构#xff0c;分布式架构的同步调用里#xff0c;无论是方法调用#xff0c;还是 OpenFeign 难免会有以下问题#xff1a; 扩展性差#xff08;高耦合#xff0c;需要依赖对应的服务#xff0c;同样的事件#xff0c;不断有新需求#xff0…一、初识 MQ 传统的单体架构分布式架构的同步调用里无论是方法调用还是 OpenFeign 难免会有以下问题 扩展性差高耦合需要依赖对应的服务同样的事件不断有新需求这个事件的业务代码会越来越臃肿这些子业务都写在一起性能下降等待响应最终整个业务的响应时长就是每次远程调用的执行时长之和级联失败如果是一个事务一个服务失败就会导致全部回滚若是分布式事务就更加麻烦了但其实一些行为的失败不应该导致整体回滚服务宕机如果服务调用者未考虑服务提供者的性能导致提供者因为过度请求而宕机 但如果不是很要求同步调用其实也可以用异步调用如果是单体架构你可能很快能想到一个解决方案就是阻塞队列实现消息通知 但是在分布式架构下可能就需要一个中间件级别的阻塞队列这就是我们要学习的 Message Queue 消息队列简称 MQ而现在流行的 MQ 还不少在实现其基本的消息通知功能外还有一些不错的扩展 以 RabbitMQ 和 Kafka 为例 RabbitMQKafka公司/社区RabbitApache开发语言ErlangScala Java协议支持AMQPXMPPSMTPSTOMP自定义协议可用性高高单机吞吐量一般非常高Kafka 亮点消息延迟微秒级毫秒以内消息可靠性高一般 消息延迟指的是消息到队列并在队列中“就绪”的时间与预期时间的差距其实就是数据在中间件中流动的耗时预期时间可以是现在、几毫秒后、几秒后、几天后… 据统计目前国内消息队列使用最多的还是 RabbitMQ再加上其各方面都比较均衡稳定性也好因此我们课堂上选择 RabbitMQ 来学习。 二、RabbitMQ 安装 Docker 安装 RabbitMQ mkdir /root/mq cd /root/mqdocker rm mq-server -f docker rmi rabbitmq:3.8-management -f docker volume rm mq-plugins -fdocker pull rabbitmq:3.8-management# 插件数据卷最好还是直接挂载 volume而不是挂载我们的目录 docker run \ --name mq-server \ -e RABBITMQ_DEFAULT_USERxxx \ -e RABBITMQ_DEFAULT_PASSxxx \ --hostname mq1 \ -v mq-plugins:/plugins \ -p 15672:15672 \ -p 5672:5672 \ -d rabbitmq:3.8-management三、RabbitMQ 基本知识 1架构 15672RabbitMQ 提供的管理控制台的端口 5672RabbitMQ 的消息发送处理接口 用户名密码就是安装时启动容器时指定的用户名密码 MQ 对应的就是这里的消息代理 Broker RabbitMQ 详细架构图 其中包含几个概念 publisher生产者也就是发送消息的一方consumer消费者也就是消费消息的一方queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理exchange交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的 exchange、queue 现在你可能只认识生产者、消费者、队列其他是什么呢 其实你可以理解为 MQ 也是存储东西的存储的就是消息virtual host 就是数据库queue 就是表消息就是一行数据而 MQ 有特殊的机制消息先通过 exchange 再决定前往哪个 queue 管理控制台的使用就不多说了 2五大模式 这只是最常见的五种模式 简单模式 工作模式 发布订阅模式 关联交换机的队列都能收到一份消息广播 路由模式 关联交换机时提供 routing key可以是多个队列之间可以重复发布消息时提供一个 routing key由此发送给指定的队列 值得注意的是简单模式和工作模式其实也是有交换机的任何队列都会绑定一个默认交换机 类型是 directrouting key 为队列的名称 主题模式 路由模式的基础上队列关联交换机时 routing key 可以是带通配符的 routing key 的单词通过 . 分割 # 匹配 n 个单词n ≥ 0* 只匹配一个单词 例如 #.red 可以匹配的 routing keyp1.red、red、p2.p1.red 在发布消息时要使用具体的 routing key交换机发送给匹配的队列 3数据隔离 隔离 virtual host 隔离用户赋予访问权限 四、RabbitMQ 基本使用 Spring AMQP 引入 RabbitMQ 相关的 SDK可以通过创建连接 Connection、创建通道 Channel用 Channel 进行操作接受消息也差不多不过多演示 public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(xx.xx.xx.xx);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(xxx);factory.setPassword(xxx);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();} }但比较麻烦Spring AMQP 框架可以自动装配 RabbitMQ 的操作对象 RabbitTemplate这样我们就可以更方便的操作 MQ并充分发挥其特性 !--AMQP依赖包含RabbitMQ-- dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency默认包含 RabbitMQ 的实现如果你想对接其他 AMQP 协议的 MQ得自己实现其抽象封装的接口 1发送消息 注意下面是 Spring3 的写法所以会有点不一样可能看不懂稍后解释 消息发送器封装 Repository RequiredArgsConstructor Slf4j public class RabbitMQSender {private final static ThreadPoolExecutor EXECUTOR ThreadPoolUtil.getIoTargetThreadPool(Rabbit-MQ-Thread);private final RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);}private final static FunctionThrowable, ? extends CorrelationData.Confirm ON_FAILURE ex - {log.error(处理 ack 回执失败, {}, ex.getMessage());return null;};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message - {// 小于 0 也是立即执行// setDelay 才是给 RabbitMQ 看的setReceivedDelay 是给 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());}/*** param exchange 交换机* param routingKey routing key* param msg 消息* param delay 延迟时间如果是延迟交换机delay 才有效* param maxRetries 最大重试机会* param T 消息的对象类型*/private T void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info(准备发送消息exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {},exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData newCorrelationData();MessagePostProcessor delayMessagePostProcessor delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer() {private int retryCount 0; // 一次 send 从始至终都用的是一个 Consumer 对象所以作用的都是同一个计数器Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c - {if(c.isAck()) {log.info(ACK {} 消息成功到达{}, correlationData.getId(), c.getReason());} else {log.warn(NACK {} 消息未能到达{}, correlationData.getId(), c.getReason());if(retryCount maxRetries) {log.error(次数到达上限 {}, maxRetries);return;}retryCount;log.warn(开始第 {} 次重试, retryCount);CorrelationData cd newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);}public void sendMessage(String exchange, String routingKey, Object msg) {send(exchange, routingKey, msg, 0, 0);}public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){send(exchange, routingKey, msg, delay, 0);}public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {send(exchange, routingKey, msg, 0, maxReties);}public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {send(exchange, routingKey, msg, delay, maxReties);}}2接受消息 监听器 RabbitTemplate 是可以主动获取消息的也可以不实时监听但是一般情况都是监听有消息就执行监听的是 queue若 queue 不存在就会根据注解创建一遍 RabbitListener(bindings QueueBinding(value Queue(name xxx),exchange Exchange(name xxx, delayed true),key {xxx} )) public void xxx(X x) {}3声明交换机与队列 可以通过 Bean 创建 Bean 对象的方式去声明可以自行搜索我更喜欢监听器注解的形式而且 Bean 的方式可能会因为配置不完全一样导致其他配置类的交换机队列无法声明现象如此底层为啥我不知道 4消息转换器 消息是一个字符串但为了满足更多需求需要将一个对象序列化成一个字符串但默认的序列化实现貌似用的是 java 对象的序列化这种方式可能得同一个程序的 java 类才能反序列化成功所以我们应该选择分布式的序列化方式比如 json Configuration RequiredArgsConstructor Slf4j public class MessageConverterConfig {Beanpublic MessageConverter messageConverter(){// 1. 定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);// 2. 配置自动创建消息 id用于识别不同消息jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);return jackson2JsonMessageConverter;} }这里的 JsonUtil.OBJECT_MAPPER就是框架的或者自己实现的 ObjectMapper 5配置文件 spring:rabbitmq:host: ${xxx.mq.host} # rabbitMQ 的 ip 地址port: ${xxx.mq.port} # 端口username: ${xxx.mq.username}password: ${xxx.mq.password}virtual-host: ${xxx.mq.virtual-host}publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true # 若是 false 则直接丢弃了并不会发送者回执listener:simple:prefetch: 1 # 预取为一个消费完才能拿下一个concurrency: 2 # 消费者最少 2 个线程max-concurrency: 10 # 消费者最多 10 个线程auto-startup: true # 为 false 监听者不会实时创建和监听为 true 监听的过程中若 queue 不存在会再根据注解进行创建创建后只监听 queuedeclare false 才是不自动声明default-requeue-rejected: false # 拒绝后不 requeue成为死信若没有绑定死信交换机就真的丢了acknowledge-mode: auto # 消费者执行成功 ack、异常 nackmanual 为手动、none 代表无论如何都是 ackretry: # 这个属于 spring amqp 的 retry 机制enabled: false # 不开启失败重试 # initial-interval: 1000 # multiplier: 2 # max-attempts: 3 # stateless: true # true 代表没有状态若有消费者包含事务这里改为 false五、常见问题 1RabbitMQ 如何保证消息可靠性 保证消息可靠性、不丢失。主要从三个层面考虑 如果报错可以先记录到日志中再去修复数据保底 1、生产者确认机制 生产者确认机制确保生产者的消息能到达队列 publisher-confirm针对的是消息从发送者到交换机的可靠性成功则进行下一步失败返回 NACK private final static ThreadPoolExecutor EXECUTOR ThreadPoolUtil.getIoTargetThreadPool(Rabbit-MQ-Thread);private final RabbitTemplate rabbitTemplate;PostConstruct public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR); }private final static FunctionThrowable, ? extends CorrelationData.Confirm ON_FAILURE ex - {log.error(处理 ack 回执失败, {}, ex.getMessage());return null; };private MessagePostProcessor delayMessagePostProcessor(long delay) {return message - {// 小于 0 也是立即执行// setDelay 才是给 RabbitMQ 看的setReceivedDelay 是给 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;}; };private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32()); }/*** param exchange 交换机* param routingKey routing key* param msg 消息* param delay 延迟时间如果是延迟交换机delay 才有效* param maxRetries 最大重试机会* param T 消息的对象类型*/ private T void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info(准备发送消息exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {},exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData newCorrelationData();MessagePostProcessor delayMessagePostProcessor delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer() {private int retryCount 0; // 一次 send 从始至终都用的是一个 Consumer 对象所以作用的都是同一个计数器Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c - {if(c.isAck()) {log.info(ACK {} 消息成功到达{}, correlationData.getId(), c.getReason());} else {log.warn(NACK {} 消息未能到达{}, correlationData.getId(), c.getReason());if(retryCount maxRetries) {log.error(次数到达上限 {}, maxRetries);return;}retryCount;log.warn(开始第 {} 次重试, retryCount);CorrelationData cd newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData); }Spring3 的 RabbitMQ Confirm需要配置为 correlated发送消息时提供 CorrelationData也就是与消息关联的数据包括发送者确认时的回调方法 要想提供 Confirm 的回调办法需要配置 correlationData.getFuture() 返回的 CompletableFuture 对象新的 JUC 工具类可以查一查如何使用 配置后在未来根据回调函数进行处理当然也可以直接设置在 RabbitTemplate 对象的 ConfirmCallBack 还可以自己实现消息的发送者重试 publisher-returns针对的是消息从交换机到队列的可靠性成功则返回 ACK失败触发 returns 的回调方法 Component RequiredArgsConstructor Slf4j public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {// 不存在 routing key 对应的队列那在我看来转发到零个是合理的现象但在这里也认为是路由失败MQ 认为消息一定至少要进入一个队列之后才能被处理这就是可靠性反正就是回执了你爱咋处理是你自己的事情Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 可能一些版本的 mq 会因为是延时交换机导致发送者回执只要没有 NACK 这种情况其实并不是不可靠其实我也不知道有没有版本会忽略// 但是其实不忽略也不错毕竟者本来就是特殊情况一般交换机是不存储的但是这个临时存储消息// 但这样也就代表了延时后消息路由失败是没法再次处理的因为我们交给延时交换机后就不管了可靠性有 mq 自己保持MessageProperties messageProperties returnedMessage.getMessage().getMessageProperties();// 这里的 message 并不是原本的 message是额外的封装x-delay 在 publish-returns 里面封装到 receiveDelay 里了Integer delay messageProperties.getReceivedDelay();// 如果不是延时交换机却设置了 delay 大于 0是不会延时的所以是其他原因导致的以防万一把消息记录到日志里if(Objects.nonNull(delay) delay.compareTo(0) 0) {log.info(交换机 {}, 路由键 {} 消息 {} 延迟 {} s, returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));return;}log.warn(publisher-returns 发送者回执(应答码{}应答内容{})(消息 {} 成功到达交换机 {}但路由失败路由键为 {}),returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());} } RabbitMQSender private final static ThreadPoolExecutor EXECUTOR ThreadPoolUtil.getIoTargetThreadPool(Rabbit-MQ-Thread);private final RabbitTemplate rabbitTemplate;private final PublisherReturnsCallBack publisherReturnsCallBack;PostConstruct public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);// 设置统一的 publisher-returnsconfirm 也可以设置统一的但最好还是在发送时设置在 future 里// rabbitTemplate 的 publisher-returns 同一时间只能存在一个// 因为 publisher confirm 后其实 exchange 有没有转发成功publisher 没必要每次发送都关注这个 exchange 的内部职责更多的是“系统与 MQ 去约定”rabbitTemplate.setReturnsCallback(publisherReturnsCallBack); }同理你也可以按照自己的想法进行重试… 在测试练习阶段里这个过程是异步回调的如果是单元测试发送完消息进程就结束了可能就没回调程序就结束了自然就看不到回调时的日志 如果既没有 ACK 也没有 NACK也没有发布者回执那就相当于这个消息销声匿迹了没有任何的回应那么就会抛出异常我们可以处理这个异常比如打印日志、重发之类的… private final static FunctionThrowable, ? extends CorrelationData.Confirm ON_FAILURE ex - {log.error(处理 ack 回执失败, {}, ex.getMessage());return null; };2、持久化 消息队列的数据持久化确保消息未消费前在队列中不会丢失其中的交换机、队列、和消息都要做持久化 默认都是持久化的 3、消费者确认 队列的消息出队列并不会立即删除而是等待消费者返回 ACK 或者 NACK 消费者要什么时候发送 ACK 呢 1RabbitMQ投递消息给消费者2消费者获取消息后返回ACK给RabbitMQ3RabbitMQ删除消息4消费者宕机消息尚未处理 如果出现这种场景就是不可靠的所以应该是消息处理后再发送 ACK Spring AMQP 有三种消费者确认模式 manual手段 ack自己用 rabbitTemplate 去发送 ACK/NACK这个比较麻烦不用 RabbitListener 接受消息才必须用这个auto配合 RabbitListener 注解代码若出现异常NACK成功则 ACKnone获得消息后直接 ACK无论是否执行成功 出现 NACK 后要如何处理此过程还在我们的服务器 拒绝默认重新入队列返回 ACK消费者重新发布消息指定的交换机 Configuration RequiredArgsConstructor Slf4j public class MessageRecovererConfig {Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue成为死信默认 // return new ImmediateRequeueMessageRecoverer(); // nack、requeue // return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error); // ack、发送给指定的交换机confirm 机制需要设置到 rabbitTemplate 里}}Spring 提供的 retry 机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。 spring:rabbitmq:listener:simple:acknowledge-mode: auto # 消费者执行成功 ack、异常 nackmanual 为手动、none 代表无论如何都是 ackretry: # 这个属于 spring amqp 的 retry 机制enabled: false # 不开启失败重试initial-interval: 1000 # 第一次重试时间间隔multiplier: 3 # 每次重试间隔的倍数max-attempts: 4 # 最大接受次数stateless: true # true 代表没有状态若有消费者包含事务这里改为 false解释第一次失败一秒后重试、第二次失败三秒后重试第三次失败九秒后重试第四次失败就没机会了SpringAMQP会抛出异常AmqpRejectAndDontRequeueException 失败之后根据对应的处理策略进行处理 2死信交换机 消息过期、消息执行失败并且不重试也不重新入队列堆积过多等情况消息会成为死信若队列绑定死信交换机则转发给死信交换机若没有则直接丢弃 队列1 - 死信交换机 - 队列2这个过程是消息队列内部保证的可靠性消息也没有包含原发送者的信息甚至连接已经断开了所以没有 publisher-confirm 也没有 publisher-returns 这个机制和 republish 有点像但是有本质的区别republish 是消费者重发而这里是队列将死信转发给死信交换机 死信的情况 nack requeue false超时未消费队列满了由于队列的特性队列头会先成为死信 3延迟功能如何实现 刚才提到死信的诞生可能是超时未消费那么其实这个点也可以简单的实现一个延迟队列 队列为一个不被监听的专门用来延迟消息发送的缓冲带其死信交换机才是目标交换机 message.getMessageProperties().setExpiration(1000);设置的是过期时间其本意并不是延迟是可以实现延迟~ 另外队列本身也能设置 ttl 过期时间但并不是队列的过期时间显然不合理截止后无论啥都丢了冤不冤啊至少我想不到这种场景而是队列中的消息存活的最大时间消息的过期时间和这个取一个最小值才是真实的过期时间 值得注意的是虽然能实现延时消息的功能但是 实现复杂延迟可能不准确因为队列的特性如果队列头未出队列哪怕其后者出现死信也只能乖乖等前面的先出去之后才能前往死信交换机例如消息的 ttl 分别为 9s、3s、1s最终三个消息会被同时转发因为“最长寿的”排在了前面 这种方式的顺序优先级大于时间优先级 而 RabbitMQ 也提供了一个插件叫 DelayExchange 延时交换机专门用来实现延时功能 Scheduling Messages with RabbitMQ | RabbitMQ 请自行上网下载 延时交换机的声明 RabbitListener(bindings QueueBinding(value Queue(name delay.queue, durable true),exchange Exchange(name delay.direct, delayed true),key delay )) public void listenDelayMessage(String msg){log.info(接收到delay.queue的延迟消息{}, msg); }延时消息的发送 private MessagePostProcessor delayMessagePostProcessor(long delay) {return message - {// 小于 0 也是立即执行message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;}; };这里设置的是 Delay不是过期时间哪怕超过了时间也不叫做死信 期间一直存在延时交换机的硬存里延迟消息插件内部会维护一个本地数据库表同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长可能会导致堆积的延迟消息非常多会带来较大的CPU开销同时延迟消息的时间会存在误差。 4消息堆积如何解决 死信的成因还可能是堆叠过多 我在实际的开发中没遇到过这种情况不过如果发生了堆积的问题解决方案也所有很多的 提高消费者的消费能力 ,可以使用多线程消费任务增加更多消费者提高消费速度使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息扩大队列容积提高堆积上限 但是RabbitMQ 队列占的是内存间接性的落盘提高上限最终的结果很有可能就是反复落库特别不稳定且并没有解决消息堆积过多的问题 我们可以使用 RabbitMQ 惰性队列惰性队列的好处主要是 接收到消息后直接存入磁盘而非内存虽然慢但没有间歇性的 page-out性能比较稳定消费者要消费消息时才会从磁盘中读取并加载到内存正常消费后就删除了基于磁盘存储消息上限高支持数百万条的消息存储 声明方式 而要设置一个队列为惰性队列只需要在声明队列时指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列 rabbitmqctl set_policy Lazy ^lazy-queue$ {queue-mode:lazy} --apply-to queues 命令解读 rabbitmqctl RabbitMQ的命令行工具set_policy 添加一个策略Lazy 策略名称可以自定义^lazy-queue$ 用正则表达式匹配队列的名字{queue-mode:lazy} 设置队列模式为lazy模式--apply-to queues 策略的作用对象是所有的队列 x-queue-mode 参数的值为 lazy RabbitListener(bindings QueueBinding(exchange Exchange(name xxx),value Queue(name xxx, arguments Argument(name x-queue-mode, value lazy)),key xxx ))交换机、队列扩展属性叫参数消息的拓展属性叫头部扩展属性一般都以 x- 开头extra 消息堆积问题的解决方案 队列上绑定多个消费者提高消费速度使用惰性队列可以再mq中保存更多消息 惰性队列的优点有哪些 基于磁盘存储消息上限高没有间歇性的 page-out性能比较稳定 惰性队列的缺点有哪些 基于磁盘存储消息时效性会降低性能受限于磁盘的IO 5高可用如何保证 RabbitMQ 在服务大规模项目时一般情况下不会像数据库那样存储的瓶颈用惰性队列已经是很顶天了的其特性和用途不会有太极端的存储压力 更多的是在并发情况下处理消息的能力有瓶颈可能出现节点宕机的情况而避免单节点宕机数据丢失、无法提供服务等问题需要解决也就是需要保证高可用性 Erlang 是一种面向并发的语言天然支持集群模式RabbitMQ 的集群有两种模式 普通集群是一种分布式集群将队列分散到集群的各个节点从而提高整个集群的并发能力镜像集群是一种主从集群在普通集群的基础上添加了主从备份的功能提高集群的数据可用性 镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险虽然重启能解决但那不是强一致而是最终一致因此 RabbitMQ 3.8 以后推出了新的功能仲裁队列来代替镜像集群底层采用 Raft 协议确保主从的数据一致性 1、普通集群 各个节点之间实时同步 MQ 元数据一些静态的共享的数据 交换机的信息队列的信息 但不包括队列中的消息动态的数据不同步 监听队列的时候如果监听的节点不存在该队列只是知道元数据当前节点会访问队列所在的节点该节点返回数据到当前节点并返回给监听者 队列所在节点宕机队列中的消息就会“丢失”是在重启之前这个消息就消失无法被处理的意思 如何部署上网搜搜就行 2、镜像集群 各个节点之间实时同步 MQ 元数据一些静态的共享的数据 交换机的信息队列的信息 本质是主从模式创建队列的节点为主节点其他节点为镜像节点队列中的消息会从主节点备份到镜像节点中 注意 像 Redis 那样的主从集群同步都是全部同步来着但 RabbitMQ 集群的主从模式比较特别他的粒度是队列而不是全部 也就是说一个队列的主节点可能是另一个队列的镜像节点所以分析某个场景的时候要确认是哪个队列单独进行观察分析讨论 不同队列之间只有交互不会相互影响数据同步 针对某一个队列所有写操作都在主节点完成然后同步给镜像节点读操作任何一个都 ok 主节点宕机镜像节成为新的主节点 镜像集群有三种模式 exactly 准确模式指定副本数 count 主节点数 1 镜像节点数集群会尽可能的维护这个数值如果镜像节点出现故障就在另一个节点上创建镜像比较建议这种模式可以设置为 N/2 1all 全部模式count N主节点外全部都是镜像节点nodes 模式指定镜像节点名称列表随机一个作为主节点如果列表里的节点都不存在或不可用则创建队列时的节点作为主节点之后访问集群列表中的节点若存在才会创建镜像节点 没有镜像节点其实就相当于普通模式了 如何配置上网搜搜就行比较麻烦需要设置策略以及匹配的队列不同队列分开来讨论可以设置不同的策略 3、仲裁队列 RabbitMQ 3.8 以后推出了新的功能仲裁队列来 代替镜像集群都是主从模式支持主从数据同步默认是 exactly count 5约定大于配置使用非常简单没有复杂的配置队列的类型选择 Quorum 即可底层采用 Raft 协议确保主从的数据强一致性 Spring Boot 配置 仲裁队列声明 RabbitListener(bindings QueueBinding(exchange Exchange(name xxx),value Queue(name xxx, arguments Argument(name x-queue-type, value quorum)),key xxx ))队列不声明默认就是普通集群这里声明的仲裁队列也只是针对一个队列 6消息重复消费问题 在保证MQ消息不重复的情况下MQ 的一条消息被消费者消费了多次 消费者消费消息成功后在给MQ发送消息确认的时候出现了网络异常或者是服务宕机MQ 迟迟没有接收到 ACK 也没有 NACK此时 MQ 不会将发送的消息删除按兵不动消费者重新监听或者有其他消费者的时候交由它消费而这条消息如果在之前就消费过了的话则会导致重复消费 解决方案 消息消费的业务本身具有幂等性再次处理相同消息时不会产生副作用一些时候可能需要用到分布式锁去维护幂等性 比如一个订单的状态设置为结束那重复消费的结果一致 记录消息的唯一标识如果消费过了的则不再消费 消费成功将 id 缓存起来消费时查询缓存里是否有这条消息设置允许的缓存时间时你不必想得太极端一般很快就有消费者继续监听拿到消息哪怕真有那个情况这里带来的损失大概率可以忽略不记了一切要结合实际情况 有时候两种方案没有严格的界定
http://www.dnsts.com.cn/news/106369.html

相关文章:

  • 中小公司做网站做网站一般需要什么
  • 做推广可以上那些网站哪里有建设哪里有我们
  • 网站对应的ip西安外包网络推广
  • 网站名称和域名有关系页面设计参评
  • 做问卷的网站有那些网站注销备案表下载
  • 网站系统改教程西青天津网站建设
  • 网站流量统计分析的误区安卓门户网站开发
  • 网站一键提交网站的字体做多大合适
  • 潍坊网站建设熊掌号php做网站安性如何
  • 怎么免费从网站上做宣传android显示wordpress
  • 建网站备案好麻烦网站交换链接如何实施
  • 网站开发税率多少钱河北建设厅网站开通账号
  • 广州海珠区培训机构网站建设代理网点
  • 万维网网站注册网站开发seo
  • 四川城乡建设厅官方网站网站备案icp
  • 重庆卓光网站建设网站建设冫金手指谷哥十四
  • 大连网站推广爱得科技网站公司 模板
  • cms免费企业网站wordpress图片添加音乐
  • 华为云做网站如何建设网站挣钱
  • 双语外贸网站源码贵阳市建设局网站
  • 江西昌宇建设工程公司网站福州网站制作官网
  • 延安市建设局网站网站开发建站微信公众号小程序
  • 怎样用eclipse做网站济南电子商务网站开发
  • 网站空间是不是服务器2昌平区网站建设
  • 免费网站安全闵行网站建设推广
  • dede音乐网站源码阜宁做网站哪家公司最好
  • 青岛建设公司网站费用大连网站建设资讯
  • 枣强网站建设培训学校拼多多开店流程
  • 学校的网站如何建设提高网站权重的方法
  • 百色网站建设公司建网站点击率