设计方案翻译,做搜狗网站优化排,网络推广是什么职业,114网站做推广怎么样Kafka生产调优 文章目录 Kafka生产调优一、Kafka 硬件配置选择场景说明服务器台数选择磁盘选择内存选择CPU选择 二、Kafka Broker调优Broker 核心参数配置服役新节点/退役旧节点增加副本因子调整分区副本存储 三、Kafka 生产者调优生产者如何提高吞吐量数据可靠性数据去重数据乱…Kafka生产调优 文章目录 Kafka生产调优一、Kafka 硬件配置选择场景说明服务器台数选择磁盘选择内存选择CPU选择 二、Kafka Broker调优Broker 核心参数配置服役新节点/退役旧节点增加副本因子调整分区副本存储 三、Kafka 生产者调优生产者如何提高吞吐量数据可靠性数据去重数据乱序 四、Kafka 消费者调优消费者重要参数消费者再平衡指定offset进行消费指定时间进行消费消费者如何提高吞吐量 五、Kafka总体调优如何提升吞吐量数据精准一次合理设置分区数单条日志大于 1m的问题集群压力测试Kafka Producer 压力测试Kafka Consumer 压力测试 一、Kafka 硬件配置选择
场景说明
100 万日活每人每天 100 条日志每天总共的日志条数是 100 万 * 100 条 1 亿条。
1 亿 / 24 小时 / 60 分 / 60 秒 1150 条/每秒钟。
每条日志大小0.5k ~ 2k约1k。
1150 条/每秒钟 * 1k ≈ 1m/s 。
高峰期每秒钟1150 条 * 20 倍 23000 条。
所以高峰每秒数据量20MB/s。
服务器台数选择
服务器台数 2 * 生产者峰值生产速率 * 副本 / 100 1
2 * 20m/s * 2 / 100 1
3 台
磁盘选择
kafka 底层主要是顺序写固态硬盘和机械硬盘的顺序写速度差不多。建议选择普通的机械硬盘。
每天总数据量1 亿条 * 1k ≈ 100g
100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T
建议三台服务器硬盘总大小大于等于 1T。
内存选择
Kafka 内存组成堆内存 页缓存
1Kafka 堆内存建议每个节点10g ~ 15g在 kafka-server-start.sh 中修改
if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx10G -Xms10G
fi2页缓存页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment1g中25%的数据在内存中就好。
每个节点页缓存大小 分区数 * 1g * 25%/ 节点数。例如 10 个分区页缓存大小10 * 1g * 25%/ 3 ≈ 1g
建议服务器内存大于等于 11G。
CPU选择
num.io.threads 8 负责写磁盘的线程数整个参数值要占总核数的 50%。num.replica.fetchers 1 副本拉取线程数这个参数占总核数的 50%的 1/3。num.network.threads 3 数据传输线程数这个参数占总核数的 50%的 2/3。
建议 32 个 cpu core。
二、Kafka Broker调优
Broker 核心参数配置 参数名称描述replica.lag.time.max.msISR 中如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值默认30s。auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中数据保存的时间默认 7 天。log.retention.minutesKafka 中数据保存的时间分钟级别默认关闭。log.retention.msKafka 中数据保存的时间毫秒级别默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔默认是 5 分钟。log.retention.bytes默认等于-1表示无穷大也表示关闭。超过设置的所有日志总大小删除最早的 segment。log.cleanup.policy默认是 delete表示所有数据启用删除策略如果设置值为 compact表示所有数据启用压缩策num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.replica.fetchers默认是 1。副本拉取线程数这个参数占总核数的 50%的 1/3num.network.threads默认是 3。数据传输线程数这个参数占总核数的50%的 2/3 。log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改 交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。
服役新节点/退役旧节点
1创建一个要均衡的主题。
$ vim topics-to-move.json
{topics: [{topic: first}],version: 1
}2生成一个负载均衡的计划。
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 --generate3创建副本存储计划所有副本存储在 broker0、broker1、broker2、broker3 中由步骤2生成的
$ vim increase-replication-factor.json4执行副本存储计划。
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute5验证副本存储计划。
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --verify增加副本因子
1创建测试 topic3分区、1副本
$ bin/kafka-topics.sh --bootstrap-server node102:9092 --create --partitions 3 --replication-factor 1 --
topic four2手动增加副本存储3分区、3副本创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中
vim increase-replication-factor.json{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]}
]}3执行副本存储计划。
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute调整分区副本存储
1创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。 vim increase-replication-factor.json{version:1,partitions:[{topic:three,partition:0,replicas:[0,1]},{topic:three,partition:1,replicas:[0,1]},{topic:three,partition:2,replicas:[1,0]},{topic:three,partition:3,replicas:[1,0]}]
}2执行副本存储计划。
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --execute3验证副本存储计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server node102:9092 --reassignment-json-file increase-replication-factor.json --verify三、Kafka 生产者调优 生产者如何提高吞吐量
参数名称描述buffer.memoryRecordAccumulator 缓冲区总大小默认 32m。batch.size缓冲区一批数据最大值默认 16k。适当增加该值可 缓冲区一批数据最大值默认 16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传 以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。 输延迟增加。linger.ms如果数据迟迟未达到 batch.sizesender 等待 linger.time之后就会发送数据。单位 ms默认值是 0ms表示没有延迟。生产环境建议该值大小为 5-100ms 之间。compression.type生产者发送的所有数据的压缩方式。默认是 none也就是不压缩。 支持压缩类型none、gzip、snappy、lz4 和 zstd。
数据可靠性
参数名称描述acks0生产者发送过来的数据不需要等数据落盘应答。 1生产者发送过来的数据Leader 收到数据后应答。 -1all生产者发送过来的数据Leader和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1
至少一次 ACK 级别设置为-1 分区副本大于等于 2 ISR 里应答的最小副本数量大于等于 2
数据去重
参数名称描述enable.idempotence是否开启幂等性默认 true表示开启幂等性。
Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();// 2 开启事务
void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量主要用于消费者
void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets,String consumerGroupId) throws
ProducerFencedException;// 4 提交事务
void commitTransaction() throws ProducerFencedException;// 5 放弃事务类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;数据乱序
参数名称描述enable.idempotence是否开启幂等性默认 true表示开启幂等性。max.in.flight.requests.per.connection允许最多没有返回 ack 的次数默认为 5开启幂等性 要保证该值是 1-5 的数字。
四、Kafka 消费者调优
消费者重要参数 参数描述bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer 和 value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit自动提交offset开关默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms消费者偏移量向Kafka提交的频率默认5s。如果设置自动提交offset时才生效auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在如数据被删除了该如何处理 earliest自动重置偏移量到最早的偏移量。latest默认自动重置偏移量为最新的偏移量。none如果消费组原来的previous偏移量offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。 该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3。☘️session.timeout.msKafka consumer 和 coordinator 之间连接超时时间默认 45s。 超过该值该消费者被移除消费者组执行再平衡。☘️max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes消费者获取服务器端一批消息最小的字节数。 默认 1 个字节fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。
消费者再平衡
参数名称描述heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。 该条目的值必须小于 session.timeout.ms也不应该高于 session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。 超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。partition.assignment.strategy消费者分区分配策略 默 认 策 略 是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可以选择的策略包括Range、RoundRobin、Sticky、CooperativeSticky
指定offset进行消费
public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 获取消费者分区信息并指定offset进行消费SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}// 遍历所有分区并指定 offset 从 1700 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 1700);}// 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
} 指定时间进行消费
在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。例如要求按照时间消费前一天的数据怎么处理
public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 获取消费者分区信息并指定offset进行消费SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}HashMapTopicPartition, Long timestampToSearch new HashMap(); // 封装集合存储每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMapTopicPartition, OffsetAndTimestamp offsets kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp ! null){kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());}}// 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
} 消费者如何提高吞吐量
增加分区数
$ bin/kafka-topics.sh --bootstrap-server node102:9092 --alter --topic first --partitions 3参数名称描述fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker config or max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。
五、Kafka总体调优
如何提升吞吐量
1提升生产吞吐量
buffer.memory发送消息的缓冲区大小默认值是 32m可以增加到 64m。batch.size默认是 16k。如果 batch 设置太小会导致频繁网络请求吞吐量下降如果 batch 太大会导致一条消息需要等待很久才能被发送出去增加网络延时linger.ms这个值默认是 0意思就是消息必须立即被发送。一般设置一个 5~100毫秒。如果 linger.ms 设置的太小会导致频繁网络请求吞吐量下降如果 linger.ms 太长会导致一条消息需要等待很久才能被发送出去增加网络延时。compression.type默认是 none不压缩但是也可以使用 lz4 压缩效率还是不错的压缩之后可以减小数据量提升吞吐量但是会加大 producer 端的 CPU 开销。
2增加分区
3消费者提高吞吐量
调整 fetch.max.bytes 大小默认是 50m。调整 max.poll.records 大小默认是 500 条。
4增加下游消费者处理能力
数据精准一次
1生产者角度
acks 设置为-1 acks-1幂等性enable.idempotence true 事务
2broker 服务端角度
分区副本大于等于 2 –replication-factor 2ISR 里应答的最小副本数量大于等于 2 min.insync.replicas 2
3消费者
事务 手动提交 offset enable.auto.commit false消费者输出的目的地必须支持事务MySQL、Kafka
合理设置分区数
1创建一个只有 1 个分区的 topic。
2测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
3假设他们的值分别是 Tp 和 Tc单位可以是 MB/s。
4然后假设总的目标吞吐量是 Tt那么分区数 Tt / minTpTc。
例如producer 吞吐量 20m/sconsumer 吞吐量 50m/s期望吞吐量 100m/s
分区数 100 / 20 5 分区
分区数一般设置为3-10 个
分区数不是越多越好也不是越少越好需要搭建完集群进行压测再灵活调整分区个数。
单条日志大于 1m的问题
参数名称描述message.max.bytes默认 1mbroker 端接收每个批次消息最大值max.request.size默认 1m生产者发往 broker 每个请求消息最大值。针对 topic级别设置消息体的大小replica.fetch.max.bytes默认 1m副本同步数据每个批次消息最大值fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对 最大值。一批次的大小受 message.max.bytes broker config or max.message.bytes topic config影响
集群压力测试
用 Kafka 官方自带的脚本对 Kafka 进行压测
生产者压测kafka-producer-perf-test.sh消费者压测kafka-consumer-perf-test.sh
Kafka Producer 压力测试
1创建一个 test topic设置为 3 个分区 3 个副本
$ bin/kafka-topics.sh --bootstrap-server node102:9092 --create --replication-factor 3 --partitions 3 --topic test2在/opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下
$ bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.serversnode102:9092,node103:9092,node104:9092 batch.size16384 linger.ms0测试参数说明
record-size 是一条信息有多大单位是字节本次测试设置为 1k。num-records 是总共发送多少条信息本次测试设置为 100 万条。throughput 是每秒多少条信息设成-1表示不限流尽可能快的生产数据可测出生产者最大吞吐量。本次设置为每秒钟 1 万条。producer-props 后面可以配置生产者相关参数batch.size 配置为 16k
调优参数
更改不同的值进行压测
参数名称描述buffer.memoryRecordAccumulator 缓冲区总大小默认 32m。batch.size缓冲区一批数据最大值默认 16k。适当增加该值可 缓冲区一批数据最大值默认 16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传 以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。 输延迟增加。linger.ms如果数据迟迟未达到 batch.sizesender 等待 linger.time之后就会发送数据。单位 ms默认值是 0ms表示没有延迟。生产环境建议该值大小为 5-100ms 之间。compression.type生产者发送的所有数据的压缩方式。默认是 none也就是不压缩。 支持压缩类型none、gzip、snappy、lz4 和 zstd。
Kafka Consumer 压力测试
1修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500
max.poll.records5002消费 100 万条日志进行压测
$ bin/kafka-consumer-perf-test.sh --bootstrap-server node102:9092,node103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties参数说明
–bootstrap-server 指定 Kafka 集群地址–topic 指定 topic 的名称–messages 总共要消费的消息个数。本次实验 100 万条
调优参数优化在consumer.properties中进行修改
修改一次拉取的数量max.poll.records2000调整文件中的拉取一批数据大小 100mfetch.max.bytes104857600