新郑市住房建设局网站,义乌网站建设九,包头企业微网站开发,wordpress二级目录创建1. 问题引发 由某个服务BI-collector-xx队列出现阻塞#xff0c;影响很整个rabbitMQ集群服务不可用#xff0c;多个应用MQ生产者服务出现假死状态#xff0c;系统影响面较广#xff0c;业务影响很大。当时为了应急处理#xff0c;恢复系统可用#xff0c;运维相对粗暴的把…1. 问题引发 由某个服务BI-collector-xx队列出现阻塞影响很整个rabbitMQ集群服务不可用多个应用MQ生产者服务出现假死状态系统影响面较广业务影响很大。当时为了应急处理恢复系统可用运维相对粗暴的把一堆阻塞队列信息清空然后重启整个集群。 
在复盘整个故障过程中我心中有不少疑惑至少存在以下几个问题点 
为什么出现队列阻塞某个队列出现阻塞为什么会影响到其他队列的运行即多队列间相互影响某个应用MQ队列出现问题为什么会导致应用不可用呢 2. 试验队列阻塞 
某天周末在家里找个测试环境安装rabbitmq尝试重现这过程并做模拟测试。 
写两个测试应用Demo假设是两个项目应用分别有生产者和消费者并分别使用队列testA和testB。 
为了尽可能还原生产的情况一开始测试使用了同一个vhost后面分别设置不同vhost。 生产者A示例代码如下 消费者A MQ配置 生产者B每次生产10万条消息 消费者B代码故意写错模拟出现异常的情况不是正常的json串导致解释json时抛出异常 先了解一下Rabbitmq客户端启动连接工作过程通过wireshark抓包分析如下 先对AMQP做一个简单的介绍请求的AMQP协议方法信息AMQP协议方法包含类名方法名参数这一列主要展示了类名和方法名 
Connection.Start请求服务端开始建立连接Channel.Open请求服务端建立信道Queue.Declare声明队列Basic.Consume开始一个消费者请求指定队列的消息 详细方法可以查看amqp官网https://www.rabbitmq.com/amqp-0-9-1-reference.html 工作过程分析 
Basic.Publish 客户端发送Basic.Publish方法请求将消息发布到exchangerabbitmq server会根据路由规则转发到队列中 
Basic.Deliver 服务端发送Basic.Deliver方法请求投递消息到监听队列的客户端消费者 
Basic.Ack 客户端发送Basic.Ack方法请求告知rabbimq server,消息已接收处理。 两个应用程序启动后通过rabbitmq管理控制台可以观察一些参数和监控指标 一开始A应用生产和消费都是正常的。 
B消费端错误代码异常狂刷报错信息 经过大概30分钟运行观察A生产者应用控制台也有出现异常信息 查看服务端连接状态出现blocked情况与生产故障发生情景很类似。 此时客户端即本机器CPU和内存上涨明显风扇声音很响明显卡顿再过30分钟应用基本不可用状态。 分析原因 
上面错误代码展示了消费者B无法ack由于没有进行ack导致队里阻塞。那么问题来了这是为什么呢其实这是RabbitMQ的一种保护机制。防止当消息激增的时候海量的消息进入consumer而引发consumer宕机。 RabbitMQ提供了一种QOS(服务质量保证)功能即在非自动确认的消息的前提下限制信道上的消费者所能保持的最大未确认的数量。可以通过设置prefetchCount实现自动确认prefetchCount设置无效。 
举例说明:可以理解为在consumer前面加了一个缓冲容器容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除从而放入新的消息。 
通过上面的配置发现prefetch初始我只配置了2并且concurrency配置的只有1所以当我发送了2条错误消息以后由于解析失败这2条消息一直没有被ack。将缓冲区沾满了这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了所以就造成了队列阻塞。 
判断队列是否有阻塞的风险。 当ack模式为manual并且线上出现了unacked消息这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount *消费节点数量得出。 
channlCount就是由concurrency,max-concurrency决定的。 
min  concurrency * prefetch *消费节点数量max  max-concurrency * prefetch *消费节点数量 
由此可以得出结论 
unacked_msg_count  min 队列不会阻塞。但需要及时处理unacked的消息。unacked_msg_count  min 可能会出现堵塞。unacked_msg_count  max 队列一定阻塞。 
重点注意 
1、unacked的消息在consumer切断连接后(如重启)再连接会自动回到队头。 
2、若将ack模式改成auto自动这样会使QOS不生效。会出现大量消息涌入consumer从而可能造成consumer宕机风险。 再回看程序配置做一些分析和调整 对B消费端问题代码加个try-catch-finally不管中间有何问题都进行消息签收ACK。 代码调整之后两个队列正常运行客户端两个应用也正常运行。 经过一段时间消费B消费者端已经把堆积的消息消费完了。 3、    第三个问题原因分析 
还是查看抓包信息 Basic.Reject 客户端发送Basic.Reject方法请求表示无法处理消息拒绝消息此时的requeue参数为true将消息返回原来的队列 
Basic.Deliver 服务端调用Basic.Deliver方法和第一次Basic.Deliver方法不同的是此时的redeliver参数为true表示重新投递消息到监听队列的消费者然后这两步会一直重复下去。 
RabbitMQ消息监听程序异常时consumer会向rabbitmq server发送Basic.Reject表示消息拒绝接受由于Spring默认requeue-rejected配置为true消息会重新入队然后rabbitmq server重新投递。就相当于死循环了所以容易导致消费端资源占用过高特别是TCP连接数、线程数、IO飙升如果个别程序带事务或数据库操作等连接资源得不到释放也会占满导致应用假死状态出现问题的时候查看问题应用出现大量的connection timeout错误报错日志。 
因此针对性的有些业务场景不强调数据强一致性的场景比如日志收集可以设置default-requeue-rejected: false即可。 
factory.setDefaultRequeueRejected(false); 会根据异常类型选择直接丢弃或加入dead-letter-exchange中。 
消费者端正确的使用手动确认示例结构代码很重要 
try {// 业务逻辑。
}catch (Exception e){// 输出错误日志。
}finally {// 消息签收。
} 4、    验证队列设置最大长度限制 
设置queueLengthLimit队列最大长度限制 x-max-length5 生产者原本想要生产10条消息 由于受到队列最大长度限制实际上只有5条入队列里面。 消费者拿出来的消息仅有5条从NO.6~NO.10 改变消费者程序让生产者一直产生消息消费者消费速度明显赶不上生产者的生产速度。 从消费端来看消息是随机性入队的队列里面一直最多5条消息发再多也进不了消息者和生产者也不会发生什么异常只是消息会随机性丢失并没有全部入队。 运行情况良好除了消息没有全部入队列 没有出现异常情况 消费比较慢本机器CPU和内存各项指标正常没有异常。 搞一个异常情况出现unack最大队列长度限制是不算unack数量的如下图所示 异常之后此观察MQ监控管理后台 生产者不停一直在生产消息运行30分钟观察生产者应用也是正常的的就是消息入不了队列。 5、  检查实际的业务端代码 
再看我们业务系统消费端代码消费端各种不规范写法都有以下例举几个典型 
1、手动签收有ACK但是没有try-catch-finally结构消费端业务代码如下 2、有try-catch-finally结构但是deliverTag是一个固定值0一样的会出问题。 3、自动签收确认的大量消息的时候容易搞死消费端应用。 6. 总结 
生产环境不建议使用自动ack模式这样会使QOS无法生效。在使用手动ack的时候需要非常注意消息签收业务代码使用try-catch-finally处理结构防止业务代码异常时无法签收。规范约束mq客户端代码正确的使用Rabbitmq配置。不同业务项目设置不同的vhost可以隔离一些影响提升rabbitmq资源使用。考虑设置dead-letter-exchange当设置了 requeuefalse时可以放入dead-letter-exchange可以快速排查定位问题。Exchange和队列的最大长度限制可以是限制消息的数量参数x-max-length或者是消息的总字节数总字节数表示的是所有的消息体的字节数忽略消息的属性和任何头部信息又或者两者都进行了限制两者取小值生效只有处于ready状态的消息被计数未被确认的消息不会被计数受到limit的限制。最大队列设置可以限制生产端但会造成消息丢失风险最大消息数量限制不能完全解决队列阻塞问题。尽量使用Direct-exchangeDirect 类型的 Exchange 投递消息是最快的。 Direct处理路由键需要将一个队列绑定到交换机上要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键为“A”则只有路由键为“A”的消息才被转发不会转发路由键为B只会转发路由键为“A”Topic将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词符号“*”只能匹配一个词Fanout不处理路由键。只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上Headers不处理路由键而是根据发送的消息内容中的 headers 属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配如果完全匹配则消息会路由到该队列否则不会路由到该队列。