网站设计是什么意思,seo关键词优化经验技巧,照片制作动态图片软件,尚普咨询市场调研公司文章目录 RabbitMQ 消息分发概述如何实现消费分发机制#xff08;限制每个队列消息数量#xff09;使用场景限流背景实现 demo 非公平发送#xff08;负载均衡#xff09;背景实现 demo RabbitMQ 消息分发 概述
RabbitMQ 的队列在有多个消费者订阅时#xff0c;默认会通过… 文章目录 RabbitMQ 消息分发概述如何实现消费分发机制限制每个队列消息数量使用场景限流背景实现 demo 非公平发送负载均衡背景实现 demo RabbitMQ 消息分发 概述
RabbitMQ 的队列在有多个消费者订阅时默认会通过轮询的机制将消息分发给不同的消费者但是有些消费者消费速度慢有些消费者消费速度快就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.
例如公司有1个正式员工和1个实习生现在有 10 个任务分配平均给他们各 5 个而由于实习生干活比较慢就会导致整个完成任务的吞吐量下降.
消息分发机制给 “正式工” 多分一些任务给 “实习生” 少分一些任务.
如何实现消费分发机制限制每个队列消息数量
可以在配置文件中配置 prefetchCount或者使用原生的 channel.basicQos(int prefetchCount)来限制当前消息通道上channel的每一个消费所能保持的最大未确认消息的数量.
例如 prefetchCount 为 10并且一个 channel 上有两个消费者那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.
具体使用例如配置 prefetch 5那么 RabbitMQ 就会为消费者计数. 发送一条消息计数1消费一条消息计数-1当达到了上限5mq队列 就不会再发送消息直到消费者确认了某条消息类似 TCP 中的华滑动窗口.
使用场景
限流
背景
假设订单系统每秒最多处理 1000 请求正常情况下该订单系统可以满足日常使用. 但是在突发的秒杀场景下请求瞬间增多每秒 1w qps这不得把订单系统打成筛子. 问题mq 在中间的话不是已经有削峰填谷的作用了么为什么还要使用 mq 的 prefetch 限流机制 尽管消息队列可以延缓高峰压力但消费者的处理能力还是有限的如果不配置 prefetch消费者自身从队列中取消息的量是不可控的. 如果消费者一次性取走过多的消息就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量确保消费者不会过载. 实现 demo
假设限制未确认消息上限为 5发送消息数量为 20.
a配置 prefetch 参数设置应答方式为手动应答.
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5b配置交换机队列
Configuration
class MQConfig {Beanfun transQueue() Queue(MQConst.TRANS_QUEUE)Beanfun qosExchange() DirectExchange(MQConst.QOS_EXCHANGE)Beanfun qosQueue() Queue(MQConst.QOS_QUEUE)Beanfun qosBinding(): Binding BindingBuilder.bind(qosQueue()).to(qosExchange()).with(MQConst.QOS_BINDING_KEY)}c接口生产者
RestController
RequestMapping(/mq)
class MQApi(val rabbitTemplate: RabbitTemplate,
) {RequestMapping(/qos)fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, qos msg $i)}return ok}}
d消费者
Component
class QosListener {RabbitListener(queues [MQConst.QOS_QUEUE])fun handMessage(message: Message,channel: Channel) {val deliverTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, charset(UTF-8))}, ${message.messageProperties.messageId})// 这里不主动应答模拟超长业务// channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}e效果如下 可以观察到消费者只接收到 5 个消息但由于没有主动应答队列 就不会给消费者发送新的消息. Ps此时如果直接关闭程序这 5 个为应答的消息就会重回队列成为 Ready 状态. 如下可以直接清理掉这些消息 非公平发送负载均衡
背景
假设有两个消费者mq 默认会按照轮询的策略将消息分发给消费者.
*但有一个中情况就比较尴尬打个比方 一个是正式工另一个是实习生正式工就处理的很快而实习生就很慢就会造成整个任务的进度被拖慢. *
因此我们可以通过 负载均衡 的方式让处理的快的消费者多处理一些处理慢的消费者少处理一些.
具体的只需要配置 prefetch并开启自动应答即可. 这样一来处理的快的消费者自动应答的就更快接收的消息也就更多.
实现 demo
a配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 具体配置为多少需要根据实际业务以及系统承受能力压测b生产者 RequestMapping(/qos)fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, qos msg $i)}return ok}c两个消费者
Component
class QosListener {RabbitListener(queues [MQConst.QOS_QUEUE])fun fastHandMessage(message: Message,channel: Channel) {val deliverTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, charset(UTF-8))}, ${message.messageProperties.messageId})Thread.sleep(1000)println(正式工: 任务处理完成)channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}RabbitListener(queues [MQConst.QOS_QUEUE])fun slowHandMessage(message: Message,channel: Channel) {val deliverTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, charset(UTF-8))}, ${message.messageProperties.messageId})Thread.sleep(2000)println(实习生: 任务处理完成)channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}d效果如下