网站建设柒首先金手指7,财经投资公司网站建设方案,做网站手机模板app,免费建设一个可以访问的网站背景 随着物联网行业的发展、智能设备数量越来越多#xff0c;随着设备活跃量过大#xff0c;常常存在一些高并发的请求#xff0c;形成了流量尖峰#xff0c;过多的请求会压垮服务器#xff0c;影响其他服务运行。因此#xff0c;为了保护云端服务#xff0c;需要对请求…背景 随着物联网行业的发展、智能设备数量越来越多随着设备活跃量过大常常存在一些高并发的请求形成了流量尖峰过多的请求会压垮服务器影响其他服务运行。因此为了保护云端服务需要对请求进行缓冲RocketMQ就是一款非常优秀消息队列的中间件在互联网领域久经考验也被各个行业广泛应用。 在上一篇文章中介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小 当生产者将大量的消息堆积到消息队列中时需要同步启用消费者去消费队列中的消息达到动态平衡的效果。 如下代码所示在消费者类上会使用RocketMQMessageListener注解并填写必要的属性consumerThreadNumber消费线程、主题、消费组。
RocketMQMessageListener(// 指定消费线程大小consumeThreadNumber 16,topic TOPIC_DEMO,consumerGroup consumer_demo_group)
Component
public class Consumer implements RocketMQListenerMessageExt {private final String CHARSET Charset.defaultCharset().name();Overridepublic void onMessage(MessageExt message) {byte[] body message.getBody();String str new String(body, Charset.forName(this.CHARSET));System.out.println(消费者消费的消息为 str);}
}其中的topic、consumerGroup可以指定一次就不会变啦但是consumerThreadNumber会根据机器的性能发生变化因此需要将其提出放到配置文件中方便修改比如application.yaml。 那应该如何实现呢 其中 consumerThreadNumber 16表明填入的是一个static的变量因此如果简单地使用Value来进行注入变量是不成功的因为它只能注入非静态变量。为了能实现从配置文件中读取变量并转为static变量采用了显示set方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;Value(${biz.RocketMQThreadSize})
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize threadSize;
}那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。
biz: RocketMQThreadSize: 200如何使得自定义的线程大小生效 如上一章所示可以得到静态的变量那如何在消费者中生效呢幸好RocketMQ提供一个接口可以实现消费者线程的自定义。 在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可然后在类中实现prepareStart方法即可。具体如下所示
RocketMQMessageListener(topic TOPIC_DEMO,consumerGroup consumer_demo_group)
Component
public class Consumer implements RocketMQListenerMessageExt, RocketMQPushConsumerLifecycleListener {private final String CHARSET Charset.defaultCharset().name();Overridepublic void onMessage(MessageExt message) {byte[] body message.getBody();String str new String(body, Charset.forName(this.CHARSET));System.out.println(消费者消费的消息为 str);}Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 指定消费线程大小defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);}
}在prepareStart方法中指定一些必要的线程参数
最大线程defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);最小线程defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码其中最大、最小设置一样才会生效。
如何确定合适的线程大小 以上的步骤已经帮忙把线程设置提取出来啦之后只需在配置文件中修改线程数大小而不需去代码中修改避免代码导致系统出现问题。那如何去确定线程的数量大小呢 线程是计算机执行任务的基本的单位即消费任务可以交给线程去执行。 当线程数量较少时CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中机器的负载升高。因此需要确定适合当前机器的线程数量。 在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小
消费者的TPS表明消费者的能力机器负载分为CPU负载和IO负载和自身的核心数量有关。 RocketMQ提供web界面可以监测TPS的大小这个数量当然是越大越好但是也要考虑负载。 在服务器输入top命令就可以看大当前机器的负载 分别为1、5、15分钟负载。 在进行压测的时候需要知道机器的核心数量监测负载的时候负载的大小就不能超过核心数量。 在测试的时候可以从小到大调节线程数大小并且关注TPS和负载。
结尾 以上就是确定消费者线程大小的整个过程有疑问欢迎留言交流 线程介绍