清远做网站seo,免费虚拟空间wordpress,办公室装修计入什么费用,做网站的程序员基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群#xff0c;这对于需要访问多套kafka集群的程序来说#xff0c;是有效的解决方案。这里需要注意的是#xff0c;此时的消费者配置信息需使用原生kafka的配置信息格式#xff08;如#xff1a;ConsumerC…基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群这对于需要访问多套kafka集群的程序来说是有效的解决方案。这里需要注意的是此时的消费者配置信息需使用原生kafka的配置信息格式如ConsumerConfig.MAX_POLL_RECORDS_CONFIG “max.poll.records”与自动装载KafkaConsumer时的配置信息格式不同。详情如下
依赖项其实spring-kafka包含了kafka-clients
!-- spring-kafka --
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.6.0/version
/dependency
!-- kafka-clients --
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.6.0/version
/dependency配置文件 配置参数的格式和含义参见《spring-kafka的配置使用》
生产代码
Component
Slf4j
public class KafKaProducer {Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言这里的泛型所代表的实际类型就是 SendResultK, V,而这里 K,V 的泛型实际上 被用于* ProducerRecordK, V producerRecord,即生产者发送消息的 key,value 类型*/ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallbackSendResultString, Object() {Overridepublic void onFailure(Throwable throwable) {log.error(发送消息失败: throwable.getMessage());}Overridepublic void onSuccess(SendResultString, Object sendResult){// log.info(发送消息成功: sendResult.toString());}});}
}消费者配置类其中可配置多个kafka集群每个kafka集群生成一个KafkaListenerContainerFactory实例
Data
Slf4j
Configuration
public class KafkaConfig {ResourceEnvironment environment;Beanpublic KafkaListenerContainerFactory? containerFactory() {Integer concurrency environment.getProperty(kafka.concurrency, Integer.class, 1);Integer pollTimeout environment.getProperty(kafka.poll.timeout, Integer.class, 3000);ConcurrentKafkaListenerContainerFactoryString, String containerFactory new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true); // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}Beanpublic MapString, Object consumerConfigs() {String servers environment.getProperty(kafka.servers, 127.0.0.1:9092);String groupId environment.getProperty(kafka.groupId, consumer-group);String sessionTimeout environment.getProperty(kafka.session.timeout.ms, 60000);String maxPollRecords environment.getProperty(kafka.max.poll.records, 100);String maxPollInterval environment.getProperty(kafka.max.poll.interval, 600000);String jaasConfig environment.getProperty(kafka.sasl.jaas.config);MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, SCRAM-SHA-256);props.put(sasl.jaas.config, jaasConfig);return props;}
}消费代码 KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例也就指定了kafka集群
Slf4j
Component
public class KafkaConsumerListen implements BatchMessageListenerString, String {Autowiredprivate Environment environment;Autowiredprivate KafkaMsgHandleService msgHandleService;Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/************************* 接收消息************************/OverrideKafkaListener( containerFactory containerFactory, groupId ${kafka.groupId}, topics #{${kafka.topics}.split(,)}, concurrency ${kafka.concurrency})public void onMessage(ListConsumerRecordString, String records) {try {final ListString msgs records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info(收到消息体size{} content:{}, msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error(KafkaListener_kafka_consume_error., e);}}/************************* 处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() - {if (!environment.getProperty(kafka1.switch, Boolean.class,true)) {log.warn(KafkaListener_turn_off_drop_message.);return;}msgHandleService.handle(msg);});}
}