当前位置: 首页 > news >正文

清远做网站seo免费虚拟空间wordpress

清远做网站seo,免费虚拟空间wordpress,办公室装修计入什么费用,做网站的程序员基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群#xff0c;这对于需要访问多套kafka集群的程序来说#xff0c;是有效的解决方案。这里需要注意的是#xff0c;此时的消费者配置信息需使用原生kafka的配置信息格式#xff08;如#xff1a;ConsumerC…基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群这对于需要访问多套kafka集群的程序来说是有效的解决方案。这里需要注意的是此时的消费者配置信息需使用原生kafka的配置信息格式如ConsumerConfig.MAX_POLL_RECORDS_CONFIG “max.poll.records”与自动装载KafkaConsumer时的配置信息格式不同。详情如下 依赖项其实spring-kafka包含了kafka-clients !-- spring-kafka -- dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.6.0/version /dependency !-- kafka-clients -- dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.6.0/version /dependency配置文件 配置参数的格式和含义参见《spring-kafka的配置使用》 生产代码 Component Slf4j public class KafKaProducer {Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言这里的泛型所代表的实际类型就是 SendResultK, V,而这里 K,V 的泛型实际上 被用于* ProducerRecordK, V producerRecord,即生产者发送消息的 key,value 类型*/ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallbackSendResultString, Object() {Overridepublic void onFailure(Throwable throwable) {log.error(发送消息失败: throwable.getMessage());}Overridepublic void onSuccess(SendResultString, Object sendResult){// log.info(发送消息成功: sendResult.toString());}});} }消费者配置类其中可配置多个kafka集群每个kafka集群生成一个KafkaListenerContainerFactory实例 Data Slf4j Configuration public class KafkaConfig {ResourceEnvironment environment;Beanpublic KafkaListenerContainerFactory? containerFactory() {Integer concurrency environment.getProperty(kafka.concurrency, Integer.class, 1);Integer pollTimeout environment.getProperty(kafka.poll.timeout, Integer.class, 3000);ConcurrentKafkaListenerContainerFactoryString, String containerFactory new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true); // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}Beanpublic MapString, Object consumerConfigs() {String servers environment.getProperty(kafka.servers, 127.0.0.1:9092);String groupId environment.getProperty(kafka.groupId, consumer-group);String sessionTimeout environment.getProperty(kafka.session.timeout.ms, 60000);String maxPollRecords environment.getProperty(kafka.max.poll.records, 100);String maxPollInterval environment.getProperty(kafka.max.poll.interval, 600000);String jaasConfig environment.getProperty(kafka.sasl.jaas.config);MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, SCRAM-SHA-256);props.put(sasl.jaas.config, jaasConfig);return props;} }消费代码 KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例也就指定了kafka集群 Slf4j Component public class KafkaConsumerListen implements BatchMessageListenerString, String {Autowiredprivate Environment environment;Autowiredprivate KafkaMsgHandleService msgHandleService;Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/************************* 接收消息************************/OverrideKafkaListener( containerFactory containerFactory, groupId ${kafka.groupId}, topics #{${kafka.topics}.split(,)}, concurrency ${kafka.concurrency})public void onMessage(ListConsumerRecordString, String records) {try {final ListString msgs records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info(收到消息体size{} content:{}, msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error(KafkaListener_kafka_consume_error., e);}}/************************* 处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() - {if (!environment.getProperty(kafka1.switch, Boolean.class,true)) {log.warn(KafkaListener_turn_off_drop_message.);return;}msgHandleService.handle(msg);});} }
http://www.dnsts.com.cn/news/103491.html

相关文章:

  • 昆明网站建设费用招聘网58同城官网
  • 苏醒主题做的网站织梦网站图片不显示
  • 上海建设行政主管部门政务网站做足球预测的网站
  • 响应式网站制作教程电子商务网站开发实务石道元
  • 免网站域名注册asp.net 网站授权
  • 关于写策划的一个网站哪里的赣州网站建设
  • 手机电商网站 模板网站建设跟加入会员哪个效果好
  • 株洲网站建设 李wordpress登录才能看内容
  • 网站如何建设与安全管理制度实业+东莞网站建设
  • 在网上找做设计是什么网站电子计算机哪个专业最吃香
  • 网站后台文章编辑器长春工作招聘网
  • php个人网站模板下载代码共享网站
  • html5 网站模版婚纱影楼网站建设
  • 宝应县天宇建设网站dj那个网站做的好
  • 百度网站做不做关于网站建设的英文歌
  • 网站开发需要的技术自己做衣服的网站
  • 一个网站数据库手机网站建设 jz.woonl
  • 网站建设销售该学的深圳网站设计灵点网络公司不错
  • 品牌型网站建设哪里好南通seo快速排名
  • 建设网站怎样通过流量赚钱网络工程毕设做网站
  • 公司 网站建设成都网站搜索优化
  • 上海标志设计公司上海品牌设计网站建设优化解析
  • 网站内容页显示不出来途牛网网站是哪家公司做的
  • 江苏广兴建设集团网站wordpress每页不显示文章
  • 建设网站是否等于网络营销wordpress 文章密码
  • 网站的pdf目录怎么做的网络优化网站
  • 网站后台上传图片 不可用成都住建局官网登录入口查询
  • 对接空间站做外贸网站建设
  • 网站编辑器失效包装设计公司排行
  • 沈阳城市建设学院官网网站免费行情软件在线网站