做一个网站花多少钱,怎样编辑网站标题,乐清市网论坛,企业网站备案 过户Kafka 服务器#xff08;Broker#xff09; 的配置
server.properties
# broker.id: 每个 Kafka Broker 的唯一标识符。broker.id 必须在整个 Kafka 集群中唯一。
broker.id0# 配置 Kafka Broker 监听客户端请求的地址和端口。这个配置决定了 Kafka 服务将接受来自生产者、…Kafka 服务器Broker 的配置
server.properties
# broker.id: 每个 Kafka Broker 的唯一标识符。broker.id 必须在整个 Kafka 集群中唯一。
broker.id0# 配置 Kafka Broker 监听客户端请求的地址和端口。这个配置决定了 Kafka 服务将接受来自生产者、消费者以及其他客户端的连接。
listenersPLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存储目录
log.dir/usr/local/data/kafka‐logs# Kafka 连接到 Zookeeper 的地址
zookeeper.connect192.168.65.60:2181每个 Kafka 集群中的节点Broker都需要有一个 server.properties 配置文件并且每个节点的配置可以有所不同。
生产者
生产者配置
Properties props new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094);// 把发送的key和value从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 发出消息持久化机制参数* acks0 表示producer不需要等待任何broker确认收到消息的回复就可以继续发送下一条消息。性能最高但是最容易丢消息* acks1 至少要等待leader已经成功将数据写入本地log但是不需要等待所有follower是否成功写入就可以继续发送下一条消息* 如果follower没有成功备份数据而此时leader又挂掉则消息会丢失* acks‐1或all 需要等待 min.insync.replicas(默认为1推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志。这是最强的数据保证。一般金融级别才会使用这种配置*/
props.put(ProducerConfig.ACKS_CONFIG, 1);// 2. 重试相关
//2.1 发送失败重试次数重试能保证消息发送的可靠性但是也可能造成消息重复发送需要接收者做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重试间隔设置默认重试间隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地缓冲区和延迟发送相关* 在设置本地缓冲区/延迟发送后消息会先发送到本地缓冲区当达到批量发送消息的大 * 小时本地线程会从缓冲区取数据一个batch批量发送到broker。同时需要设置 * batch最大的延迟发送时间如果一条消息在本地缓冲区中等待的时间达到设置的时间后 * batch没满那么也必须把消息发送出去* /// 3.1 设置本地缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 设置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延迟发送时间* 默认值是0意思就是消息必须立即被发送但这样会影响性能* 一般设置10毫秒左右就是说这个消息发送完后会进入本地的一个batch如果10毫秒内这个batch满了16kb就会随batch一起被发送出去* 如果10毫秒内batch没满那么也必须把消息发送出去不能让消息的发送延迟时间太长* * 消息 - 本地缓冲区32M- batch16k- 发送10ms batch不满也发送*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);生产者发送消息
// 创建 Kafka 生产者
KafkaProducerString, String producer new KafkaProducer(properties);// 发送消息
String topic test-topic; // 主题名称
String key order1; // 消息的 key
String value Order details: 123; // 消息的内容// 创建消息记录
ProducerRecordString, String record new ProducerRecord(topic, key, value);try {// 发送消息这里的lambda函数就是onCompletion()方法producer.send(record, (metadata, exception) - {if (exception ! null) {System.out.println(Error sending message: exception.getMessage());} else {System.out.println(Message sent successfully to topic metadata.topic() partition metadata.partition() with offset metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 关闭生产者producer.close();
}// 指定发送分区
var producerRecord new ProducerRecordString, String(TOPIC_NAME, 0, key_json, value_json);// 也可以指定发送分区
var producerRecord new ProducerRecordString, String(TOPIC_NAME, key_json, value_json);// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata producer.send(producerRecord).get();// 异步回调方式发送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 处理异常}
});
// 关闭
producer.close();
此外为了保证生产者的消息发送成功可以通过添加回调函数的方式在send成功后打印日志。
详细内容参考Kafka如何保证消息不丢失
ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o);
future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息, result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));消费者
消费者配置
Properties properties new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094);// 把发送的key和value从字符串序列化为字节数组
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自动提交offset默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/* * 当消费主题的是一个新的消费组或者指定offset的消费方式offset不存在那么应该如何消费* latest(默认) 只消费自己启动之后发送到主题的消息* earliest第一次从头开始消费以后按照消费offset记录继续消费这个需要区别于 consumer.seekToBeginning(每次都从头开始消费)*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// consumer给broker发送心跳的间隔时间broker接收到心跳如果此时有rebalance发生会通过心跳响应将rebalance方案下发给consumer这个时间可以稍微短一点
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服务端broker多久感知不到一个consumer心跳就认为他故障了会将其踢出消费组对应的Partition也会被重新分配给其他consumer默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的条数如果消费者处理速度很快可以设置大点如果处理速度一般可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果两次poll操作间隔超过了这个时间broker就会认为这个consumer处理能力太弱会将其踢出消费组将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);消费者消费消息
// 创建 Kafka 消费者
KafkaConsumerString, String consumer new KafkaConsumer(properties);// 订阅主题
consumer.subscribe(Collections.singletonList(test-topic));// 消费指定分区这段代码指定了消费者从TOPIC_NAME的第一个分区分区0开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消费从头消费 - seekToBeginning* seekToBeginning()方法使消费者回溯到该分区的最初位置意味着从头开始消费该分 区的所有消息。* 这对于重新消费主题中的消息或重新同步时非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消费即消费者将跳过之前的消息从该offset开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 从指定时间点开始消费 - 1小时前
* partitionsFor()方法获取指定主题TOPIC_NAME的所有分区信息。
* fetchDataTime 是一个时间戳表示1小时前的时间new Date().getTime() - 1000 * 60 * 60 用来计算这个时间戳。
* map 用于存储每个分区与其对应的时间戳fetchDataTime。这个时间戳将用于从Kafka中拉取时间戳较早的消息。
*/
ListPartitionInfo topicPartitions consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime new Date().getTime() ‐ 1000 * 60 * 60;
MapTopicPartition, Long map new HashMap();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}// 消费消息
try {while (true) {consumer.poll(1000).forEach(record - {// 可以修改为具体业务逻辑System.out.println(Consumed record with key: record.key() , value: record.value() , from partition: record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 关闭消费者consumer.close();
}消费者提交offset
手动提交offset的意义
控制消费进度
手动提交offset能够让消费者在每个消息或消息批次消费后明确地告诉Kafka“我已经消费到这个offset了”。这对于控制消息消费的精确性非常重要尤其在需要精确控制消费位置的场景中。
避免消息丢失或重复消费
如果自动提交offset可能会发生消费者在处理中出现异常如程序崩溃导致已消费的消息的offset提交失败导致消息丢失或重复消费。手动提交则可以在处理完消息并确保成功时再提交offset避免这种问题。比如在金融交易、日志收集系统等场景中需要确保消息的处理不会丢失并且不会重复处理。
灵活的错误处理与恢复
通过手动提交offset消费者可以在消费过程中灵活地处理错误。如果在消费某条消息时发生异常消费者可以选择不提交offset这样在消费者重启或恢复时会重新消费该消息。它使得消费者在出错时能更好地控制重试策略。
代码实现
同步提交
consumer.commitSync();当调用该方法时消费者会将当前消费的偏移量提交到Kafka集群并且当前线程会阻塞直到该提交操作完成。
优势
阻塞会等待offset提交成功不会继续执行后续代码直到提交完成。可靠性如果提交失败commitSync()会抛出异常可以捕获并进行处理确保提交正确。
缺点会导致性能问题因为它会阻塞当前线程直到提交完成。
异步提交
consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception ex) {// 处理异常}
});回调函数异步提交会接受一个OffsetCommitCallback回调接口作为参数该接口的onComplete()方法会在提交操作完成时被调用。这个方法会接收到两个参数 offsets包含提交的偏移量信息TopicPartition和OffsetAndMetadata。 ex如果提交发生错误该参数会包含异常信息。
优势
非阻塞不会等待提交完成允许程序继续执行其他操作。提高吞吐量减少等待时间尤其是在批量消费和提交的情况下可以提高整体的吞吐量和性能。
缺点可能会出现提交失败的情况回调函数中的异常处理需要做好以确保异常得到及时处理。
Spring boot集成
1. 添加依赖
在pom.xml 中添加 Kafka 的相关依赖
dependencies!-- Spring Boot Starter for Apache Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Spring Boot Starter Web (optional if you need a web app) --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Boot Starter for Actuator (optional for monitoring) --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!-- Spring Boot Starter Test (optional for testing) --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency
/dependencies2. 配置文件
application.yml
spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate注意 ack‐mode RECORD当每一条记录被消费者监听器ListenerConsumer处理之后提交 BATCH当每一批poll()的数据被消费者监听器处理之后提交 TIME当每一批poll()的数据被消费者监听器处理之后距离上次提交时间大于TIME时提交 COUNT当每一批poll()的数据被消费者监听器处理之后被处理record数量大于等于COUNT时提交 TIME | COUNT有一个条件满足时提交 MANUAL当每一批poll()的数据被消费者监听器处理之后, 手动调用Acknowledgment.acknowledge()后提交 MANUAL_IMMEDIATE手动调用Acknowledgment.acknowledge()后立即提交一般使用这种一次提交一条消息
3. 启动类
package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;SpringBootApplication
public class KafkaApplication implements CommandLineRunner {Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}Overridepublic void run(String... args) throws Exception {// 发送消息kafkaProducer.sendMessage(test-topic, Hello, Kafka!);}
}
4. 生产者类
package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;Service
public class KafkaProducer {private final KafkaTemplateString, String kafkaTemplate;public KafkaProducer(KafkaTemplateString, String kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println(Message sent: message);}
}5. 消费者类
package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
public class KafkaConsumer {KafkaListener(topics test-topic, groupId test-group)public void consume(String message) {System.out.println(Consumed message: message);}KafkaListener(topics test-topic,groupId test-group)public void consume1(ConsumerRecordString, String record, Acknowledgment ack) {String value record.value();ack.acknowledge(); //手动提交offset}// 配置多个topicconcurrency就是同组下的消费者个数就是并发消费数必须小于等于分区总数KafkaListener(groupId testGroup, topicPartitions {TopicPartition(topic topic1, partitions {0, 1}), // 从topic1的分区0和1读取消息TopicPartition(topic topic2, partitions 0,partitionOffsets PartitionOffset(partition 1, initialOffset 100)) // 从topic2的分区0读取消息并设置分区1的初始偏移量为100}, concurrency 6)public void listenToMultipleTopics(String message) {// 消费消息的逻辑System.out.println(Group: testGroup, Message: message);}
}Kafka事务
Properties props new Properties();
props.put(bootstrap.servers, localhost:9092);
props.put(transactional.id, my‐transactional‐id);
ProducerString, String producer new KafkaProducer(props, new StringSerializer(), new StringSerializer());// 初始化事务
producer.initTransactions();
try {// 开启事务producer.beginTransaction();// 发到不同的主题的不同分区producer.send(/*...*/);// 提交事务producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滚事务producer.abortTransaction();
}
// 关闭
producer.close();spring框架下Kafka事务
可以通过**Transactional**实现
配置
可以通过在application.yml文件或KafkaConfig配置类中添加配置的方式提供事务支持。
1. application.yml
spring:kafka:bootstrap-servers: localhost:9092 # Kafka 集群地址producer:acks: all # 确保消息被所有副本确认transactional-id-prefix: tx- # 事务前缀Kafka 事务需要一个事务 ID 前缀consumer:group-id: test-group # 消费者组 IDenable-auto-commit: false # 手动提交 offsetlistener:ack-mode: manual # 设置为手动提交确认
2. 配置类
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;Configuration
EnableKafka
public class KafkaConfig {Beanpublic KafkaTemplateString, String kafkaTemplate() {// 设置 Kafka 生产者的事务管理器KafkaTransactionManagerString, String transactionManager new KafkaTransactionManager(producerFactory());KafkaTemplateString, String kafkaTemplate new KafkaTemplate(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}Beanpublic DefaultKafkaProducerFactoryString, String producerFactory() {MapString, Object configProps new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, all);configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx-); // 事务 IDreturn new DefaultKafkaProducerFactory(configProps);}
}生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;Service
public class KafkaTransactionProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;Transactionalpublic void sendTransactionalMessages() {try {// 发送事务消息kafkaTemplate.send(topic1, key1, message1);kafkaTemplate.send(topic2, key2, message2);// 你可以在此处加入其他业务逻辑如果出现异常会回滚事务if (someConditionFails()) {throw new RuntimeException(Simulating failure to trigger rollback);}// 如果没有异常事务提交消息将被正常发送} catch (Exception e) {// 事务回滚System.out.println(Transaction failed, rolling back...);throw e;}}private boolean someConditionFails() {// 模拟某些条件下事务失败return true;}
}消费者
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
EnableKafka
public class KafkaTransactionConsumer {KafkaListener(topics topic1, groupId test-group)public void listenTopic1(String message) {System.out.println(Received message from topic1: message);}KafkaListener(topics topic2, groupId test-group)public void listenTopic2(String message) {System.out.println(Received message from topic2: message);}
}在生产者的配置中启用事务配置 transactional.id并设置事务管理器 KafkaTransactionManager它会自动管理 Kafka 事务的开始、提交和回滚。
事务管理Transactional 注解用于标识在发送消息的过程是一个事务操作。如果其中任何消息发送失败Spring Kafka 会自动回滚事务。
回滚机制在 sendTransactionalMessages() 中模拟了一个失败的条件确保事务在遇到异常时会被回滚。