当前位置: 首页 > news >正文

网站运营推广方案网络营销广告策划

网站运营推广方案,网络营销广告策划,制作网站的专业公司,贵州省水利建设管理总站网站Spring Boot – 动态启动/停止 Kafka 监听器 当 Spring Boot 应用程序启动时#xff0c;Kafka Listener 的默认行为是开始监听某个主题。但是#xff0c;有些情况下我们不想在应用程序启动后立即启动它。 要动态启动或停止 Kafka Listener#xff0c;我们需要三种主要方法…Spring Boot – 动态启动/停止 Kafka 监听器 当 Spring Boot 应用程序启动时Kafka Listener 的默认行为是开始监听某个主题。但是有些情况下我们不想在应用程序启动后立即启动它。 要动态启动或停止 Kafka Listener我们需要三种主要方法即在需要处理 Kafka 消息时启动/停止、使用KafkaListener注释、使用 kafkaListenerEndpointRegistry 在本文中我们将介绍如何动态启动或停止 Kafka 监听器。 启动/停止 Kafka 监听器的不同方法 方法一 当需要处理 Kafka 消息时启动一个应用程序。处理成功后停止应用程序。 方法 2在注册 Kafka Listener 时我们可以设置以下 id 属性。 KafkaListener(id id-1, groupId group-1, topics Message-topic, containerFactory messageListenerFactory, autoStartup false)public void consumeMessage(Message message) 方法 3自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。 AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; 开始 public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer ! null : false;listenerContainer.start(); 停止 public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer ! null : false;listenerContainer.stop();logger.info({} Kafka Listener Stopped., listenerId); 下面我们将以上述句法方法为例进行实现。 启动或停止特定 Kafka Listener的实现 创建一个类其对象将被 Kafka 侦听器使用。 文件Message.java import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;Data AllArgsConstructor NoArgsConstructor public class Message {private String message; } 配置 Kafka Listener 将使用的消费者。 文件KakfaConsumerConfig.java import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap; import java.util.Map;Configuration public class KafkaConsumerConfig {private String kafkaUrl localhost:9092;Beanpublic ConsumerFactoryString, Message messageConsumerFactory() {JsonDeserializerMessage deserializer new JsonDeserializer(Message.class, false);deserializer.setRemoveTypeHeaders(false);deserializer.addTrustedPackages(*);deserializer.setUseTypeMapperForKey(true);MapString, Object config new HashMap();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);config.put(ConsumerConfig.GROUP_ID_CONFIG, group-1);return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);}Beanpublic ConcurrentKafkaListenerContainerFactoryString, Message messageListenerFactory() {ConcurrentKafkaListenerContainerFactoryString, Message containerFactory new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(messageConsumerFactory());return containerFactory;} } 创建一个具有必要参数的 Kafka 监听器。 id此侦听器的容器唯一标识符。如果未指定则使用自动生成的 ID。groupId仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。主题此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理Kafka 将为组成员分配分区。containerFactoryKafkaListenerContainerFactory的 bean 名称将用于创建为该端点提供服务的消息侦听器容器。autoStartup设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下该值设置为 true因此它将在我们的应用程序启动时立即开始使用消息。 文件KafkaMessageListener.java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener;Configuration public class KafkaMessageListener {Logger logger LoggerFactory.getLogger(KafkaMessageListener.class);KafkaListener(id id-1, groupId group-1, topics Message-topic, containerFactory messageListenerFactory, autoStartup false)public void consumeMessage(Message message) {logger.info(Message received : - {}, message);} } KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。 文件KafkaListenerAutomation.java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component;Component public class KafkaListenerAutomation {private final Logger logger LoggerFactory.getLogger(KafkaListenerAutomation.class);AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer ! null : false;listenerContainer.start();logger.info({} Kafka Listener Started, listenerId);return true;}public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer ! null : false;listenerContainer.stop();logger.info({} Kafka Listener Stopped., listenerId);return true;} } 使用 API 端点我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。 文件StartOrStopListenerController.java import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;RestController public class StartOrStopListenerController {AutowiredKafkaListenerAutomation kafkaListenerAutomation;GetMapping(/start)public void start(RequestParam(id) String listenerId) {kafkaListenerAutomation.startListener(listenerId);}GetMapping(/stop)public void stop(RequestParam(id) String listenerId) {kafkaListenerAutomation.stopListener(listenerId);} } 输出 1.Kafka Listener启动 2.Kafka Listener 收到消息 3. Kafka Listener 停止 最后 理想情况下应用程序应在需要处理 Kafka 消息时启动并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。
http://www.dnsts.com.cn/news/139360.html

相关文章:

  • 优质做网站公司wordpress媒体页
  • 怎么自己做网站版面设计网络推广平台有哪些?
  • 四川省建设人才网站seo研究中心怎么样
  • 在电脑新建网站站点重庆建设摩托车价格
  • 景安网站上传完还要怎么做网站数据不变重新安装wordpress
  • 郑州经纬网络做网站吗行业论坛网站
  • 介绍自己的家乡遵义网站建设网站的内容
  • 申请网站建设网站设计中下拉列表怎么做
  • 最好的网站推广法人变更流程
  • 宁波网站建设接单昆明网站制作策划
  • 想在网站上放百度广告怎么做苏州大写的网站建设
  • 网站404页面做晚了网站建设学什么专业
  • 网站信息推广途径包括哪些成都住建局官网查询入口
  • 上海网络网站建wordpress 放大镜
  • 做兼职那个网站比较好网站查询功能怎么做
  • 新新手手网网站站建建设设wordpress 布局修改
  • 网站建设技术清单注册工商企业
  • 网站logo设计创意建筑模板厂投资多少钱
  • 网站建设的大功效wordpress观点
  • ip网站架设wordpress略缩图压缩
  • 网站做代理服务器如何找百度做网站
  • 如何建设淘宝网站js 网站怎么做中英文
  • 利用切片做网站背景图片网站设计谈判
  • 中山网站建设方案报价网站的开发包括什么东西
  • 网站优化竞争对手分析户外运动网站模板
  • 常州网站建设招聘做网站用电脑自带的
  • 聊城网站托管做百度联盟做什么类型网站
  • 网站一般用什么做的怎样做才能让自己的网站
  • 浙江银安建设有限公司网站网站开发商城app
  • 石家庄做手机网站推广模板网站制作多少钱