当前位置: 首页 > news >正文

h5网站建设wordpress下载附件

h5网站建设,wordpress下载附件,施工企业如何获取竞争优势,成都seo技术动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器#xff08;Copy SpringBoot 源码#xff09;4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考#xff1a;Kafka搭建和测试 … 动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器Copy SpringBoot 源码4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考Kafka搭建和测试 二.动态初始化消费者 1.Topic定义 动态初始化即不通过注解和配置文件实现消费者的初始化定义一个Topic对象用于设置消费者参数 package com.demo.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;/*** author * date 2023-02-08 15:06* since 1.8*/ Data AllArgsConstructor NoArgsConstructor public class Topic {private String id;private String topic;private Integer partitions;private String group test;private String clientPrefix; }2.方法处理器工厂 此类直接使用 SpringBoot 源码原实现为私有类 package com.demo.manual;import org.springframework.context.ApplicationContext; import org.springframework.core.convert.TypeDescriptor; import org.springframework.core.convert.converter.ConditionalGenericConverter; import org.springframework.core.convert.converter.Converter; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.GenericMessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.util.Assert; import org.springframework.validation.Validator;import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.*;/*** author * date 2023-02-08 14:18* since 1.8*/ public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private ListHandlerMethodArgumentResolver customMethodArgumentResolvers new ArrayList();private final DefaultFormattingConversionService defaultFormattingConversionService new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator validator;this.applicationContext applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory null) {this.handlerMethodFactory createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory new DefaultMessageHandlerMethodFactory();if (this.validator ! null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);ListHandlerMethodArgumentResolver customArgumentsResolver new ArrayList(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converterbyte[], String {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset charset;}Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}OverrideNullablepublic SetConvertiblePair getConvertibleTypes() {HashSetConvertiblePair pairs new HashSet();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}OverrideNullablepublic Object convert(Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length 8, At least 8 bytes needed to convert a byte[] to a long); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length 4, At least 4 bytes needed to convert a byte[] to an integer); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length 2, At least 2 bytes needed to convert a byte[] to a short); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length 1, At least 1 byte needed to convert a byte[] to a byte); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class? target targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}} } 3.参数解析器Copy SpringBoot 源码 此类直接使用 SpringBoot 源码原实现为私有类 package com.demo.manual;import org.springframework.core.MethodParameter; import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver; import org.springframework.validation.Validator;import java.util.List;/*** author * date 2023-02-08 14:36* since 1.8*/ public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}Overridepublic Object resolveArgument(MethodParameter parameter, Message? message) throws Exception { // NOSONARObject resolved super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List? list ((List?) resolved);for (int i 0; i list.size(); i) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}Overrideprotected boolean isEmptyPayload(Object payload) {return payload null || payload instanceof KafkaNull;}} 4.消费接口和消费实现 当前接口和实现为了用于做统一的数据处理可以在实现类内再根据Topic去调用对应的数据解析方法 接口 package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 13:46* since 1.8*/ public interface Handler {void deal(ConsumerRecordString, String cRecord); } 实现 package com.demo.manual;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 11:49* since 1.8*/ Slf4j public class ManualHandler implements Handler{Overridepublic void deal(ConsumerRecordString, String cRecord) {log.info( Topic:{} Partition:{} Content:{},cRecord.topic(),cRecord.partition(),cRecord.value());} } 5.动态初始化 1.关键类简介 此处通过接口调用实现创建、暂停和恢复消费可根据实际应用场景进行设计 关键类说明KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器可以通过 Id 获取 Listener 实例从而暂停或恢复消费监听ConcurrentKafkaListenerContainerFactoryListener 工厂定义代码可参考上面链接的2.3 节ConsumerAwareListenerErrorHandler消费异常处理器定义代码可参考上面链接的2.3 节ApplicationContextSpring 的上下文容器MessageHandlerMethodFactory 初始化用MethodKafkaListenerEndpointKafka 配置节点详细逻辑可参考源码 SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法 package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor /*** 此处为相关源码仅供参考 寻找带有 KafkaListener 注解的类并初始化/Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class? targetClass AopUtils.getTargetClass(bean);CollectionKafkaListener classLevelListeners findListenerAnnotations(targetClass);final boolean hasClassLevelListeners !classLevelListeners.isEmpty();final ListMethod multiMethods new ArrayList();MapMethod, SetKafkaListener annotatedMethods MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookupSetKafkaListener) method - {SetKafkaListener listenerMethods findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {SetMethod methodsWithHandler MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method -AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() - No KafkaListener annotations found on bean type: bean.getClass());}else {// Non-empty set of methodsfor (Map.EntryMethod, SetKafkaListener entry : annotatedMethods.entrySet()) {Method method entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() - annotatedMethods.size() KafkaListener methods processed on bean beanName : annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}2.动态初始化实现 package com.demo.controller;import com.demo.entity.Topic; import com.demo.manual.MessageHandlerMethodFactory; import com.demo.manual.ManualHandler; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.util.ReflectionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/*** author * date 2023-02-07 13:40* since 1.8*/ Slf4j RestController RequestMapping(/listener) public class ListenerController {AutowiredKafkaListenerEndpointRegistry registry;AutowiredQualifier(batchTestContainerFactory)ConcurrentKafkaListenerContainerFactoryString,String batchTestContainerFactory;AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;PostConstructprivate void init(){factory new MessageHandlerMethodFactory(null,applicationContext);}static MapString, Topic map new ConcurrentHashMap();static {map.put(test_manual_1_id,new Topic(test_manual_1_id,test-topic-new.1,2,mygroup,test_manual_1_batch));map.put(test_manual_2_id,new Topic(test_manual_2_id,test-topic-new.2,1,mygroup,test_manual_2_batch));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例容器为 Map 实现* MapString, MessageListenerContainer* param id*/GetMapping(/close)public void close(String id){MessageListenerContainer container registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停暂停则恢复* 如果不存在则定义一个消费者注册到容器内并启动* param id* throws NoSuchMethodException*/GetMapping(/open)public void open(String id) throws NoSuchMethodException {MessageListenerContainer container registry.getListenerContainer(id);if (null!container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic map.get(id);if (nulltopic){return;}ManualHandler bean new ManualHandler();MethodKafkaListenerEndpointString, String endpoint new MethodKafkaListenerEndpoint();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* param methodArg* param bean* return*/private Method checkProxy(Method methodArg, Object bean) {Method method methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a KafkaListener method on the target class for this JDK proxy -// is it also present on the proxy itself?method bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class?[] proxiedInterfaces ((Advised) bean).getProxiedInterfaces();for (Class? iface : proxiedInterfaces) {try {method iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (SuppressWarnings(unused) NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format(KafkaListener method %s found on bean target class %s, but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to true, method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;} }
http://www.dnsts.com.cn/news/9847.html

相关文章:

  • 建设银行梅州分行网站淘客网站建设要求
  • 青岛 网站优化网站建设维护工作职责
  • 淘客网站推广免备案企业网站搜索优化外
  • 企业网站系统排名看网站有没有做404
  • 怎样自己免费建设一个网站wordpress图片超链接
  • 建设网站业务竞争大百度竞价排名案例分析
  • 响应式网站模板代码seo网站优化怎么做
  • 一个网站开发成本在百度做橱柜网站
  • 自己做简历网站应用商城软件下载 app
  • 新的网站建设一般多少钱如何做网页游戏代理
  • 浙江五联建设有限公司网站直接在原备案号下增加新网站
  • 网站直播的功能怎样做阳江网络问政
  • 建设网站费用评估做一个网站设计要多久
  • 网站建设sem怎么做wordpress修改个人头像
  • 重庆网站建设方案书上门做指甲哪个网站
  • 网站管理系统安装西安软件开发公司
  • 朋友让你做网站如何拒绝高水平高职院校 建设网站
  • 如何防止网站挂黑链在线制作图片书
  • h5网站建设服务织梦网站程序模板下载
  • 自己电脑做采集网站wordpress 插件选项
  • 网站怎么发布信息好用的html 模板网站
  • 顺企网南昌网站建设外汇跟单网站建设
  • 手机网站微信登陆Wordpress怎么做引导页
  • 山东高级网站建设小米商城官方网站入口
  • 想自己做网站推广北京营销策划公司有哪些
  • 成都免费建站遵义制作公司网站的公司
  • iis php服务器搭建网站顺义建站设计
  • 制作网站注意哪些问题杭州互联网企业有哪些
  • 企业电子商务网站优化方案中国商标网官方查询网站
  • 购物网站排名2017做网站设计软件