当前位置: 首页 > news >正文

做ps合成的网站北京兄弟搬家公司

做ps合成的网站,北京兄弟搬家公司,广州网站制作服务,个人网站制作成品图片目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求#xff0c;我们项目的技术负责人希望通过配置的形式#xff0c;在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思#xff0c;随即记录一下~ 一、情景介绍 比如我这里有一个消费者 Mes… 目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求我们项目的技术负责人希望通过配置的形式在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思随即记录一下~ 一、情景介绍 比如我这里有一个消费者 MessageConsumer Slf4j Service RocketMQMessageListener(consumerGroup mike-group,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER,consumeThreadMax 6,consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }在项目启动的时候会根据 RocketMQMessageListener 注解上的配置生成一个消费者 假如我还需要一个 MessageConsumer 消费者其 selectorExpression 的配置为 TAG_MESSAGE_CONSUMER_01consumeThreadMax 要设置为 8 通常情况下我们会再复制一个 MessageConsumer 命名为 MessageConsumer_01然后在新的消费者上改对应的配置例如 Slf4j Service RocketMQMessageListener(consumerGroup mike-group-01,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER_01,consumeThreadMax 8,consumeTimeout 60L) public class MessageConsumer_01 implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }这样做虽然没啥问题只是这两个类除了配置不一样其它的代码都是一摸一样的倘若之后还要有一个 selectorExpression TAG_MESSAGE_CONSUMER_02 的消费者那我又得再复制一个 MessageConsumer这样就造成了大量的代码冗余 所以就希望通过读取配置文件生成对应配置的消费者 二、问题分析 要如何实现这个功能可以去看下 RocketMQ 的源码看 Spring 是如何创建 RocketMQ 的消费者的 源码如下 org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer 在该方法中可以看到 Spring 是如何初始化消费者参照这个方法只需要在项目启动完成后将初始化从注解上获取消费者配置的地方换成从配置文件上获取就可以了 通过实现 ApplicationListenerApplicationReadyEvent 可以监听项目是否启动完成 三、代码实现 因为消费者是需要通过配置文件的配置来自动生成那么可以将需要自动生成的消费者比如 MessageConsumer其 RocketMQMessageListener 的配置注释掉 Slf4j Service //RocketMQMessageListener( // consumerGroup mike-group, // topic mike-message, // selectorExpression TAG_MESSAGE_CONSUMER, // consumeThreadMax 6, // consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }配置文件上自动注入消费者的配置最好和 RocketMQMessageListener 的属性相同并且可以配置多个自动注入的消费者那么对应的映射文件可以这么写 AutoConsumerProperties.java import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration;import java.util.List;Data RefreshScope Configuration ConfigurationProperties(prefix auto-consumer) public class AutoConsumerProperties {private ListAutoConsumer messageConsumer; }AutoConsumer.java import lombok.Data; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType;Data public class AutoConsumer {private String consumerGroup;private String topic;private SelectorType selectorType SelectorType.TAG;private String selectorExpression *;private ConsumeMode consumeMode ConsumeMode.CONCURRENTLY;private MessageModel messageModel MessageModel.CLUSTERING;private int consumeThreadMin 64;private int consumeThreadMax 64;private long consumeTimeout 15L;private String accessKey;private String secretKey;private boolean enableMsgTrace;private String customizedTraceTopic;private String nameServer;private String accessChannel; }核心代码 ConsumerStarted.java import cn.hutool.core.collection.CollUtil; import com.mike.common.core.utils.JacksonUtil; import com.mike.server.message.config.properties.AutoConsumer; import com.mike.server.message.config.properties.AutoConsumerProperties; import com.mike.server.message.consumer.MessageConsumer; import com.mike.server.message.domain.entity.NotifyMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert;import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects;Slf4j Component public class ConsumerStarted implements ApplicationContextAware, ApplicationListenerApplicationReadyEvent /* , InitializingBean, SmartLifecycle */ {Value(${rocketmq.name-server:})private String nameServer;Value(${rocketmq.consumer.topic:})private String topic;Value(${rocketmq.consumer.access-key:})private String accessKey;Value(${rocketmq.consumer.secret-key:})private String secretKey;Resourceprivate AutoConsumerProperties autoConsumerProperties;Resourceprivate MessageConsumer messageConsumer;private ApplicationContext applicationContext;private final static boolean enableMsgTrace true;private final static String customizedTraceTopic null;OverrideSuppressWarnings(all)public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 需要等到程序启动完全之后再去启动initConsumer();}public void initConsumer() {ListAutoConsumer messageConsumers autoConsumerProperties.getMessageConsumer();if (CollUtil.isEmpty(messageConsumers)) return;final RocketMQListenerNotifyMessage messageConsumerListener messageConsumer;this.autoGenerateConsumer(messageConsumers, messageConsumerListener, NotifyMessage.class);}SuppressWarnings(all)private R void autoGenerateConsumer(ListAutoConsumer autoConsumers, RocketMQListenerR rocketMQListener, ClassR objClass) {// 根据 tag 自动生成对应的消费者for (AutoConsumer autoConsumer : autoConsumers) {String consumerGroup autoConsumer.getConsumerGroup();String nameServer getValueOrDefault(autoConsumer.getNameServer(), this.nameServer);String topic getValueOrDefault(autoConsumer.getTopic(), this.topic);String accessKey getValueOrDefault(autoConsumer.getAccessKey(), this.accessKey);String secretKey getValueOrDefault(autoConsumer.getSecretKey(), this.secretKey);try {Assert.notNull(consumerGroup, Property consumerGroup is required);Assert.notNull(nameServer, Property nameServer is required);Assert.notNull(topic, Property topic is required);DefaultMQPushConsumer consumer;RPCHook rpcHook RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), accessKey, secretKey);if (Objects.nonNull(rpcHook)) {consumer new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);consumer.setVipChannelEnabled(false);} else {consumer new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, customizedTraceTopic);}consumer.setInstanceName(RocketMQUtil.getInstanceName(this.nameServer));consumer.setNamesrvAddr(this.nameServer);consumer.setAccessChannel(AccessChannel.LOCAL);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeThreadMin(autoConsumer.getConsumeThreadMin());consumer.setConsumeThreadMax(autoConsumer.getConsumeThreadMax());if (consumer.getConsumeThreadMax() consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumer.getConsumeThreadMax());}switch (autoConsumer.getMessageModel()) {case BROADCASTING:consumer.setMessageModel(MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException(Property messageModel was wrong.);}switch (autoConsumer.getSelectorType()) {case TAG:consumer.subscribe(topic, autoConsumer.getSelectorExpression());break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(autoConsumer.getSelectorExpression()));break;default:throw new IllegalArgumentException(Property selectorType was wrong.);}switch (autoConsumer.getConsumeMode()) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly(autoConsumer, rocketMQListener, objClass));break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently(autoConsumer, rocketMQListener, objClass));break;default:throw new IllegalArgumentException(Property consumeMode was wrong.);}consumer.start();log.info(Consumer Start Success: {}:{}, topic, autoConsumer.getSelectorExpression());} catch (MQClientException e) {e.printStackTrace();log.info(Consumer Start Failed: {}:{}, topic, autoConsumer.getSelectorExpression());}}}private String getValueOrDefault(String value, String defaultValue) {return StringUtils.isNotBlank(value)? value: defaultValue;}OverrideSuppressWarnings(all)public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}public class DefaultMessageListenerOrderlyT implements MessageListenerOrderly {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerOrderly(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgList, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, this.objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final long suspendCurrentQueueTimeMillis 1000L;context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}public class DefaultMessageListenerConcurrently T implements MessageListenerConcurrently {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerConcurrently(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final int delayLevelWhenNextConsume 0;context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}SuppressWarnings(unchecked)private T T doConvertMessage(MessageExt messageExt, ClassT objClass) {if (Objects.equals(objClass, MessageExt.class)) {return (T)messageExt;} else {String str new String(messageExt.getBody(), StandardCharsets.UTF_8);if (Objects.equals(objClass, String.class)) {return (T)str;} else {if (objClass ! null) {return JacksonUtil.fromJson(str, objClass);} else {log.info(convert failed. str:{}, msgType:{}, str, null);throw new RuntimeException(cannot convert message to null);}}}} }配置文件 yml 新增自动注入消费者的配置 auto-consumer:message-consumer:- consumer-group: mico-grouptopic: mike-messageselector-expression: TAG_MESSAGE_CONSUMERconsume-thread-max: 6- consumer-group: mike-group-01topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_01consume-thread-max: 8- consumer-group: mike-group-02topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_02consume-thread-max: 10如果是配置在 properties 文件中配置如下 auto-consumer.message-consumer[0].consumer-group mico-group auto-consumer.message-consumer[0].topic mike-message auto-consumer.message-consumer[0].selector-expression TAG_MESSAGE_CONSUMER auto-consumer.message-consumer[0].consume-thread-max 6auto-consumer.message-consumer[1].consumer-group mico-group-01 auto-consumer.message-consumer[1].topic mike-message auto-consumer.message-consumer[1].selector-expression TAG_MESSAGE_CONSUMER_01 auto-consumer.message-consumer[1].consume-thread-max 8auto-consumer.message-consumer[2].consumer-group mico-group-02 auto-consumer.message-consumer[2].topic mike-message auto-consumer.message-consumer[2].selector-expression TAG_MESSAGE_CONSUMER_02 auto-consumer.message-consumer[2].consume-thread-max 10启动项目进行验证观察是否有三个消费者被创建 从日志上看确实根据配置文件自动创建了三个不同的消费者
http://www.dnsts.com.cn/news/133131.html

相关文章:

  • 自己怎么做可以让百度收录的网站浦东区建设工程监督网站
  • 合肥网站关键词优化贵州网站建设服务平台
  • 做网站有什么市场风险王烨燃
  • 网站微信登录怎么做的杭州十大设计公司
  • 合肥网站建设高端多多进宝cms网站建设
  • 饰品企业网站建设婚庆网站源码
  • 云虚拟主机建设网站一定要域名基础展示型网站和cms
  • 长清网站建设价格东莞市塘厦网站建设
  • ui网站模板网站开发需求报告模板下载
  • 京东导购网站开发软件技术女生学怎么样
  • 东莞品牌网站设计公司计公司长沙网络推广平台
  • 宜宾市规划建设局网站网页设计尺寸用1440还是1920
  • 建站公司怎么备案室内设计公司取名
  • 宁波网站建设制作网络公司有什么免费企业网站是做企业黄页的
  • 网站推广策划案seo教程软件推广兼职可以做吗
  • 室内设计资料网站做钓鱼网站怎么赚钱
  • 菏泽网站建设公司官网自助建站帮助网
  • 如何做美发店网站企业seo指的是
  • 网站建设招聘内容北京关键词优化
  • 五百亿网站搬家公司怎么自己制作公众号
  • 网站建设报告模板旧宫做网站的公司
  • 四川建设门户网站推广的含义
  • 建设网站技术人员先进事迹网站备案名称怎么修改
  • 广州建网站有哪些外贸柒夜网站建设
  • 建设网站对服务器有什么要求网站建设与管理基础
  • 网站建设的企业目标购物网站开发的背景介绍
  • 中国人做英文网站徐州免费网站制作
  • 诚信档案建设网站首页wordpress构建企业网站
  • 二七免费网站建设深圳网站建设公司联
  • 1万网站建设费入什么科目免费建网站讨论