h5网站建设,wordpress下载附件,施工企业如何获取竞争优势,成都seo技术动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器#xff08;Copy SpringBoot 源码#xff09;4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建
参考#xff1a;Kafka搭建和测试
…
动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器Copy SpringBoot 源码4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建
参考Kafka搭建和测试
二.动态初始化消费者
1.Topic定义
动态初始化即不通过注解和配置文件实现消费者的初始化定义一个Topic对象用于设置消费者参数
package com.demo.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author * date 2023-02-08 15:06* since 1.8*/
Data
AllArgsConstructor
NoArgsConstructor
public class Topic {private String id;private String topic;private Integer partitions;private String group test;private String clientPrefix;
}2.方法处理器工厂
此类直接使用 SpringBoot 源码原实现为私有类
package com.demo.manual;import org.springframework.context.ApplicationContext;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** author * date 2023-02-08 14:18* since 1.8*/
public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private ListHandlerMethodArgumentResolver customMethodArgumentResolvers new ArrayList();private final DefaultFormattingConversionService defaultFormattingConversionService new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator validator;this.applicationContext applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory null) {this.handlerMethodFactory createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory new DefaultMessageHandlerMethodFactory();if (this.validator ! null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);ListHandlerMethodArgumentResolver customArgumentsResolver new ArrayList(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converterbyte[], String {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset charset;}Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}OverrideNullablepublic SetConvertiblePair getConvertibleTypes() {HashSetConvertiblePair pairs new HashSet();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}OverrideNullablepublic Object convert(Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length 8, At least 8 bytes needed to convert a byte[] to a long); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length 4, At least 4 bytes needed to convert a byte[] to an integer); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length 2, At least 2 bytes needed to convert a byte[] to a short); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length 1, At least 1 byte needed to convert a byte[] to a byte); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class? target targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}}
}
3.参数解析器Copy SpringBoot 源码
此类直接使用 SpringBoot 源码原实现为私有类
package com.demo.manual;import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.Validator;import java.util.List;/*** author * date 2023-02-08 14:36* since 1.8*/
public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}Overridepublic Object resolveArgument(MethodParameter parameter, Message? message) throws Exception { // NOSONARObject resolved super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List? list ((List?) resolved);for (int i 0; i list.size(); i) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}Overrideprotected boolean isEmptyPayload(Object payload) {return payload null || payload instanceof KafkaNull;}}
4.消费接口和消费实现
当前接口和实现为了用于做统一的数据处理可以在实现类内再根据Topic去调用对应的数据解析方法
接口
package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 13:46* since 1.8*/
public interface Handler {void deal(ConsumerRecordString, String cRecord);
}
实现
package com.demo.manual;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 11:49* since 1.8*/
Slf4j
public class ManualHandler implements Handler{Overridepublic void deal(ConsumerRecordString, String cRecord) {log.info( Topic:{} Partition:{} Content:{},cRecord.topic(),cRecord.partition(),cRecord.value());}
}
5.动态初始化
1.关键类简介
此处通过接口调用实现创建、暂停和恢复消费可根据实际应用场景进行设计
关键类说明KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器可以通过 Id 获取 Listener 实例从而暂停或恢复消费监听ConcurrentKafkaListenerContainerFactoryListener 工厂定义代码可参考上面链接的2.3 节ConsumerAwareListenerErrorHandler消费异常处理器定义代码可参考上面链接的2.3 节ApplicationContextSpring 的上下文容器MessageHandlerMethodFactory 初始化用MethodKafkaListenerEndpointKafka 配置节点详细逻辑可参考源码
SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法
package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor /*** 此处为相关源码仅供参考 寻找带有 KafkaListener 注解的类并初始化/Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class? targetClass AopUtils.getTargetClass(bean);CollectionKafkaListener classLevelListeners findListenerAnnotations(targetClass);final boolean hasClassLevelListeners !classLevelListeners.isEmpty();final ListMethod multiMethods new ArrayList();MapMethod, SetKafkaListener annotatedMethods MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookupSetKafkaListener) method - {SetKafkaListener listenerMethods findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {SetMethod methodsWithHandler MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method -AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() - No KafkaListener annotations found on bean type: bean.getClass());}else {// Non-empty set of methodsfor (Map.EntryMethod, SetKafkaListener entry : annotatedMethods.entrySet()) {Method method entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() - annotatedMethods.size() KafkaListener methods processed on bean beanName : annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}2.动态初始化实现
package com.demo.controller;import com.demo.entity.Topic;
import com.demo.manual.MessageHandlerMethodFactory;
import com.demo.manual.ManualHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** author * date 2023-02-07 13:40* since 1.8*/
Slf4j
RestController
RequestMapping(/listener)
public class ListenerController {AutowiredKafkaListenerEndpointRegistry registry;AutowiredQualifier(batchTestContainerFactory)ConcurrentKafkaListenerContainerFactoryString,String batchTestContainerFactory;AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;PostConstructprivate void init(){factory new MessageHandlerMethodFactory(null,applicationContext);}static MapString, Topic map new ConcurrentHashMap();static {map.put(test_manual_1_id,new Topic(test_manual_1_id,test-topic-new.1,2,mygroup,test_manual_1_batch));map.put(test_manual_2_id,new Topic(test_manual_2_id,test-topic-new.2,1,mygroup,test_manual_2_batch));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例容器为 Map 实现* MapString, MessageListenerContainer* param id*/GetMapping(/close)public void close(String id){MessageListenerContainer container registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停暂停则恢复* 如果不存在则定义一个消费者注册到容器内并启动* param id* throws NoSuchMethodException*/GetMapping(/open)public void open(String id) throws NoSuchMethodException {MessageListenerContainer container registry.getListenerContainer(id);if (null!container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic map.get(id);if (nulltopic){return;}ManualHandler bean new ManualHandler();MethodKafkaListenerEndpointString, String endpoint new MethodKafkaListenerEndpoint();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* param methodArg* param bean* return*/private Method checkProxy(Method methodArg, Object bean) {Method method methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a KafkaListener method on the target class for this JDK proxy -// is it also present on the proxy itself?method bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class?[] proxiedInterfaces ((Advised) bean).getProxiedInterfaces();for (Class? iface : proxiedInterfaces) {try {method iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (SuppressWarnings(unused) NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format(KafkaListener method %s found on bean target class %s, but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to true, method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}
}