做ps合成的网站,北京兄弟搬家公司,广州网站制作服务,个人网站制作成品图片目录 前言一、情景介绍二、问题分析三、代码实现 前言
之前接到一个需求#xff0c;我们项目的技术负责人希望通过配置的形式#xff0c;在项目启动的时候自动根据配置生成对应的消费者
觉得还有点意思#xff0c;随即记录一下~ 一、情景介绍
比如我这里有一个消费者 Mes… 目录 前言一、情景介绍二、问题分析三、代码实现 前言
之前接到一个需求我们项目的技术负责人希望通过配置的形式在项目启动的时候自动根据配置生成对应的消费者
觉得还有点意思随即记录一下~ 一、情景介绍
比如我这里有一个消费者 MessageConsumer
Slf4j
Service
RocketMQMessageListener(consumerGroup mike-group,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER,consumeThreadMax 6,consumeTimeout 60L)
public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);}
}在项目启动的时候会根据 RocketMQMessageListener 注解上的配置生成一个消费者
假如我还需要一个 MessageConsumer 消费者其 selectorExpression 的配置为 TAG_MESSAGE_CONSUMER_01consumeThreadMax 要设置为 8
通常情况下我们会再复制一个 MessageConsumer 命名为 MessageConsumer_01然后在新的消费者上改对应的配置例如
Slf4j
Service
RocketMQMessageListener(consumerGroup mike-group-01,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER_01,consumeThreadMax 8,consumeTimeout 60L)
public class MessageConsumer_01 implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);}
}这样做虽然没啥问题只是这两个类除了配置不一样其它的代码都是一摸一样的倘若之后还要有一个 selectorExpression TAG_MESSAGE_CONSUMER_02 的消费者那我又得再复制一个 MessageConsumer这样就造成了大量的代码冗余
所以就希望通过读取配置文件生成对应配置的消费者 二、问题分析
要如何实现这个功能可以去看下 RocketMQ 的源码看 Spring 是如何创建 RocketMQ 的消费者的
源码如下
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer 在该方法中可以看到 Spring 是如何初始化消费者参照这个方法只需要在项目启动完成后将初始化从注解上获取消费者配置的地方换成从配置文件上获取就可以了
通过实现 ApplicationListenerApplicationReadyEvent 可以监听项目是否启动完成 三、代码实现
因为消费者是需要通过配置文件的配置来自动生成那么可以将需要自动生成的消费者比如 MessageConsumer其 RocketMQMessageListener 的配置注释掉
Slf4j
Service
//RocketMQMessageListener(
// consumerGroup mike-group,
// topic mike-message,
// selectorExpression TAG_MESSAGE_CONSUMER,
// consumeThreadMax 6,
// consumeTimeout 60L)
public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);}
}配置文件上自动注入消费者的配置最好和 RocketMQMessageListener 的属性相同并且可以配置多个自动注入的消费者那么对应的映射文件可以这么写
AutoConsumerProperties.java
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;import java.util.List;Data
RefreshScope
Configuration
ConfigurationProperties(prefix auto-consumer)
public class AutoConsumerProperties {private ListAutoConsumer messageConsumer;
}AutoConsumer.java
import lombok.Data;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;Data
public class AutoConsumer {private String consumerGroup;private String topic;private SelectorType selectorType SelectorType.TAG;private String selectorExpression *;private ConsumeMode consumeMode ConsumeMode.CONCURRENTLY;private MessageModel messageModel MessageModel.CLUSTERING;private int consumeThreadMin 64;private int consumeThreadMax 64;private long consumeTimeout 15L;private String accessKey;private String secretKey;private boolean enableMsgTrace;private String customizedTraceTopic;private String nameServer;private String accessChannel;
}核心代码
ConsumerStarted.java
import cn.hutool.core.collection.CollUtil;
import com.mike.common.core.utils.JacksonUtil;
import com.mike.server.message.config.properties.AutoConsumer;
import com.mike.server.message.config.properties.AutoConsumerProperties;
import com.mike.server.message.consumer.MessageConsumer;
import com.mike.server.message.domain.entity.NotifyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;Slf4j
Component
public class ConsumerStarted implements ApplicationContextAware, ApplicationListenerApplicationReadyEvent /* , InitializingBean, SmartLifecycle */ {Value(${rocketmq.name-server:})private String nameServer;Value(${rocketmq.consumer.topic:})private String topic;Value(${rocketmq.consumer.access-key:})private String accessKey;Value(${rocketmq.consumer.secret-key:})private String secretKey;Resourceprivate AutoConsumerProperties autoConsumerProperties;Resourceprivate MessageConsumer messageConsumer;private ApplicationContext applicationContext;private final static boolean enableMsgTrace true;private final static String customizedTraceTopic null;OverrideSuppressWarnings(all)public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 需要等到程序启动完全之后再去启动initConsumer();}public void initConsumer() {ListAutoConsumer messageConsumers autoConsumerProperties.getMessageConsumer();if (CollUtil.isEmpty(messageConsumers)) return;final RocketMQListenerNotifyMessage messageConsumerListener messageConsumer;this.autoGenerateConsumer(messageConsumers, messageConsumerListener, NotifyMessage.class);}SuppressWarnings(all)private R void autoGenerateConsumer(ListAutoConsumer autoConsumers, RocketMQListenerR rocketMQListener, ClassR objClass) {// 根据 tag 自动生成对应的消费者for (AutoConsumer autoConsumer : autoConsumers) {String consumerGroup autoConsumer.getConsumerGroup();String nameServer getValueOrDefault(autoConsumer.getNameServer(), this.nameServer);String topic getValueOrDefault(autoConsumer.getTopic(), this.topic);String accessKey getValueOrDefault(autoConsumer.getAccessKey(), this.accessKey);String secretKey getValueOrDefault(autoConsumer.getSecretKey(), this.secretKey);try {Assert.notNull(consumerGroup, Property consumerGroup is required);Assert.notNull(nameServer, Property nameServer is required);Assert.notNull(topic, Property topic is required);DefaultMQPushConsumer consumer;RPCHook rpcHook RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), accessKey, secretKey);if (Objects.nonNull(rpcHook)) {consumer new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);consumer.setVipChannelEnabled(false);} else {consumer new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, customizedTraceTopic);}consumer.setInstanceName(RocketMQUtil.getInstanceName(this.nameServer));consumer.setNamesrvAddr(this.nameServer);consumer.setAccessChannel(AccessChannel.LOCAL);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeThreadMin(autoConsumer.getConsumeThreadMin());consumer.setConsumeThreadMax(autoConsumer.getConsumeThreadMax());if (consumer.getConsumeThreadMax() consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumer.getConsumeThreadMax());}switch (autoConsumer.getMessageModel()) {case BROADCASTING:consumer.setMessageModel(MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException(Property messageModel was wrong.);}switch (autoConsumer.getSelectorType()) {case TAG:consumer.subscribe(topic, autoConsumer.getSelectorExpression());break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(autoConsumer.getSelectorExpression()));break;default:throw new IllegalArgumentException(Property selectorType was wrong.);}switch (autoConsumer.getConsumeMode()) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly(autoConsumer, rocketMQListener, objClass));break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently(autoConsumer, rocketMQListener, objClass));break;default:throw new IllegalArgumentException(Property consumeMode was wrong.);}consumer.start();log.info(Consumer Start Success: {}:{}, topic, autoConsumer.getSelectorExpression());} catch (MQClientException e) {e.printStackTrace();log.info(Consumer Start Failed: {}:{}, topic, autoConsumer.getSelectorExpression());}}}private String getValueOrDefault(String value, String defaultValue) {return StringUtils.isNotBlank(value)? value: defaultValue;}OverrideSuppressWarnings(all)public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}public class DefaultMessageListenerOrderlyT implements MessageListenerOrderly {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerOrderly(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgList, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, this.objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final long suspendCurrentQueueTimeMillis 1000L;context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}public class DefaultMessageListenerConcurrently T implements MessageListenerConcurrently {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerConcurrently(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final int delayLevelWhenNextConsume 0;context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}SuppressWarnings(unchecked)private T T doConvertMessage(MessageExt messageExt, ClassT objClass) {if (Objects.equals(objClass, MessageExt.class)) {return (T)messageExt;} else {String str new String(messageExt.getBody(), StandardCharsets.UTF_8);if (Objects.equals(objClass, String.class)) {return (T)str;} else {if (objClass ! null) {return JacksonUtil.fromJson(str, objClass);} else {log.info(convert failed. str:{}, msgType:{}, str, null);throw new RuntimeException(cannot convert message to null);}}}}
}配置文件 yml 新增自动注入消费者的配置
auto-consumer:message-consumer:- consumer-group: mico-grouptopic: mike-messageselector-expression: TAG_MESSAGE_CONSUMERconsume-thread-max: 6- consumer-group: mike-group-01topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_01consume-thread-max: 8- consumer-group: mike-group-02topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_02consume-thread-max: 10如果是配置在 properties 文件中配置如下
auto-consumer.message-consumer[0].consumer-group mico-group
auto-consumer.message-consumer[0].topic mike-message
auto-consumer.message-consumer[0].selector-expression TAG_MESSAGE_CONSUMER
auto-consumer.message-consumer[0].consume-thread-max 6auto-consumer.message-consumer[1].consumer-group mico-group-01
auto-consumer.message-consumer[1].topic mike-message
auto-consumer.message-consumer[1].selector-expression TAG_MESSAGE_CONSUMER_01
auto-consumer.message-consumer[1].consume-thread-max 8auto-consumer.message-consumer[2].consumer-group mico-group-02
auto-consumer.message-consumer[2].topic mike-message
auto-consumer.message-consumer[2].selector-expression TAG_MESSAGE_CONSUMER_02
auto-consumer.message-consumer[2].consume-thread-max 10启动项目进行验证观察是否有三个消费者被创建 从日志上看确实根据配置文件自动创建了三个不同的消费者