梅县区建设工程交易中心网站,魔方网站建设,知名设计公司网站,wordpress highslide文章目录 MQ 消息顺序性保证概述原因分析解决方案基于 spring-cloud-stream 实现分区消费 消息挤压问题概述原因分析解决方案 MQ 消息顺序性保证 概述
a#xff09;消息顺序性#xff1a;消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的.
例如 生产者 发送消息顺序… 文章目录 MQ 消息顺序性保证概述原因分析解决方案基于 spring-cloud-stream 实现分区消费 消息挤压问题概述原因分析解决方案 MQ 消息顺序性保证 概述
a消息顺序性消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的.
例如 生产者 发送消息顺序是 msg1、msg2、msg3那么消费者也需要按照 msg1、msg2、msg3 的顺序进行消费.
b顺序不一致可能会导致哪些问题
例如用户系统中用户需要对昵称进行了两次修改此时生产者发送两条消息
消息1修改 用户318 的昵称为 “白天”.消息2修改 用户318 的昵称为 “黑夜”. 那么按正常的逻辑来讲用户318 的名称最后因该为 “黑夜”但如果 消息1 是最后一个被消费者消费的消息那么 用户318 的名称就变成了 “白天”.
原因分析 Note以下场景成立的前提是只能有一个生产者因为生产者发送消息给 mq中间都需要经过网络传输而网络的不确定性是非常大的因此无法保证多个生产者的消息谁先到 mq. a多个消费者 多个消息会被不同的消费者并行处理也就意味着有的消费者消费的快有的消费者消费的慢从而导致消息处理的顺序性无法保证.
b网络波动 网络波动可能会导致消费者消费完消息返回的 ack 丢失从而使得 mq 以为消息发给消费者的中途丢失了进而使得消息重新入队这就意味着 如果队列中此时还有其他消息那么这个重新入队的消息就会排在队列尾部而头部的消息会被优先消费导致顺序性问题. 实际上也就意味着只要触发了消息重新入队的操作就会导致顺序性问题. c消息路由问题 在复杂的路由场景中例如大量应用 Topic 交换机消息可能会根据 routingKey 被分发到不同的队列使得无法保证全局的顺序性.
d死信队列 消息因为一些原因例如被消费者返回 nack requeuefalse然后放入死信队列那么死信队列无论是网络传输还是处理死信队列的消费者和普通队列的消费者并行处理都会导致顺序不一致的情况.
解决方案
顺序性的保证分为 局部顺序性保证 和 全局顺序性保证. 例如如下假设消息入队的顺序为 msg1、msg2、msg3、msg4、msg5… 消息顺序性保证的常见策略 Note以下顺序性保证策略往往不是单独使用进行保证的而是多种组合使用. a单队列单消费者全局顺序性 最简单的方式就是使用单个队列并由单个消费者进行消息. 对于消息在队列先进先出这是 RabbitMQ 给我们保证的.
b业务逻辑控制全局顺序性 例如给每个消息引入一个序号类似 TCP 确认应答序号 3 消费之前要保证序号 2 被成功消费…
c手动消息确认机制局部顺序性 消费者在处理完消息后显式的发送确认这样 RabbitMQ 才会移除并继续处理下一个消息. Ps在 RabbitMQ 中当消费者接收到一条消息时这条消息并不会立即从队列中删除。相反消息会保持在队列中直到 RabbitMQ 收到消费者发回的确认. d分区消费局部顺序性 单个消费者的吞吐量太低了当需要多个消费者来提高处理速度时可以使用分区消费. 也就是把一个队列分割成多个分区例如根据订单系统将 订单id 进行 hash 或者其他算法 - 保证同样的订单 id经过这个算法后得到的队列名称是一致的如果 同样的订单 id 一会跑到队列1一会跑到队列2就会导致多个消费并行消费最终消费顺序不一致最后每个分区由一个消费者处理保证每个分区内消息的顺序性. PsRabbitMQ 本身没有实现分区消费 基于 spring-cloud-stream 实现分区消费 Note: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html RabbitMQ 并没有实现分区消费因此这里可以引入一些其他的机制来实现.
a引入依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.3.2/versionrelativePath/ !-- lookup parent from repository --/parentdependencyManagementdependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion2023.0.2/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementdependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-stream-binder-rabbit/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency/dependenciesb配置文件如下
spring:rabbitmq:host: env-baseport: 5672username: rootpassword: 1111cloud:stream:bindings: # bindings 表示消息通道绑定配置generate-out-0: # generate-out-0 是一个输出通信的名称表示这是生成消息的第一个通道(还可能由类似 generate-out-1 的其他通道)destination: partitioned.destination # 消息发送的名称为 partitioned.destination 的目的地(目的地在这里就是 mq 消息队列).producer: # 生产者配置# partitioned: truepartition-key-expression: headers[partitionKey] # 表示消息应该发送到哪个分区(这个跟代码里配置的 header 有关)partition-count: 2 # 表示有两个分区(两个队列). 生产者会根据 partition-key-expression 计算的结果将消息分配到这两个分区之一required-groups: # 配置消费组- myGroupc代码如下
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.Random;
import java.util.function.Supplier;SpringBootApplication
public class SpringCloudStreamMqApplication {private static final Random RANDOM new Random(System.currentTimeMillis());private static final String[] data new String[] {abc1, abc2, abc3,abc4,};public static void main(String[] args) {new SpringApplicationBuilder(SpringCloudStreamMqApplication.class).web(WebApplicationType.NONE) //不运行其他 web 组件.run(args);}/*** 分区消息:* 方法返回一个函数这个函数每次调用都会从 data 中随机选择一个字符串* 生成一个带有分区键(partitionKey)的消息并将这个消息返回.*/Beanpublic SupplierMessage? generate() {return () - {String value data[RANDOM.nextInt(data.length)];System.out.println(Sending: value);return MessageBuilder.withPayload(value).setHeader(partitionKey, value).build();};}}d效果演示 在 mq 管理平台可以看到多出来了一个交换机 和 两个队列分区 在 partitioned.destination.myGroup-0 中获取消息可以看到都是 “abc2” 和 “abc4” 在 partitioned.destination.myGroup-1 中获取消息可以看到都是 “abc1” 和 “abc3”
消息挤压问题 概述
消息挤压在消息队列中待处理的消息数量超过了消费者的处理能力导致消息在队列中不断堆积的现象.
原因分析
a消息生产过快 在流量较大的情况下生产者发送消息速率大于消费者消费消息速率.
b消费者处理能力不足
消费端业务复杂耗时长.系统资源限制例如 CPU、内存、磁盘I/O 限制消费者处理速度.消费者在处理消息时出现异常导致消息无法被正确处理和确认.服务器端配置过低
c网络问题 由于网络抖动消费者没有及时反馈 ack/nack导致消息不断重发.
d消费者代码逻辑异常引发重试 消费者配置了手动 ack requeue true导致一旦由于消费者代码逻辑引发异常就会造成消息不断重新入队不断重试进而导致消息积压.
解决方案 Note实际工作中更多的是处理消费者的效率 a提高消费者效率
提高消费者的数量比如新增机器.如果消费端业务分散耗时可以考虑使用 CompletableFuture 实现多线程异步编排.设置 prefetchCount当一个消费者阻塞时消息转发到其他没有阻塞的消费者.消息引发异常时考虑配置重试机制或者转入死信队列.
b限制生产者速率
使用限流工具限制消息发送速率的上限.设置消息过期时间. 如果消息过期没有消费可以配置死信队列不仅避免消息丢失还减少了主队列的压力.