可视化域名网站模块被删了,网站 制作公司,自由人网站开发,关键词优化软件排行#作者#xff1a;孙德新 文章目录 分区分配操作(kafka-reassign-partitions.sh)1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)1.2、修改topic分区partition的副本数#xff08;扩缩容副本#xff09;1.3、Partition Reassign场景限流1.4、节点内副本移动到不…#作者孙德新 文章目录 分区分配操作(kafka-reassign-partitions.sh)1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)1.2、修改topic分区partition的副本数扩缩容副本1.3、Partition Reassign场景限流1.4、节点内副本移动到不同目录的场景限流1.5、集群Broker恢复时副本数据同步场景限流 分区分配操作(kafka-reassign-partitions.sh)
1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)
Kafka系统提供了一个分区重新分配工具kafka-reassign-partitions.sh该工具可用于在Broker之间迁移分区。理想情况下将确保所有Broker的数据和分区均匀分配。分区重新分配工具无法自动分析Kafka群集中的数据分布并迁移分区以实现均匀的负载均衡。因此管理员在操作的时候必须弄清楚应该迁移哪些Topic或分区。 Kafka物理节点扩容方法只要每个kafka配置文件的brokerid 需要全局唯一不冲突启动新kafka节点即可自动加入原有kafka集群。但是原集群topic数据不会自动移动到新kafka节点上需要手动迁移。新topic则可以自动分配到新节点上。
–topics-to-move-json-file 指定json文件,文件内容为topic配置 Json文件格式如下:
{topics: [{topic: test_create_topic1}],version: 1
}–generate 尝试给出副本重分配的策略,该命令并不实际执行 –broker-list 指定想要分配的Broker节点用于尝试给出分配策略,与–generate搭配使用 。–broker-list “0,1,2,3” –broker-list举例 ./kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list “0,1,2,3” --generate
执行完毕之后会打印如下需求注意的是此时分区移动尚未开始它只是告诉你当前的分配和建议。保存当前分配以防你想要回滚它 Current partition replica assignment//当前副本分配方式
Current partition replica assignment//当前副本分配方式
{version:1,partitions:[{topic:test_create_topic1,partition:2,replicas:[1],log_dirs:[any]},{topic:test_create_topic1,partition:1,replicas:[3],log_dirs:[any]},{topic:test_create_topic1,partition:0,replicas:[2],log_dirs:[any]}]}Proposed partition reassignment configuration//期望的重新分配方式
{version:1,partitions:[{topic:test_create_topic1,partition:2,replicas:[2],log_dirs:[any]},{topic:test_create_topic1,partition:1,replicas:[1],log_dirs:[any]},{topic:test_create_topic1,partition:0,replicas:[0],log_dirs:[any]}]}–reassignment-json-file 指定要重分配的json文件,与–execute搭配使用
–execute 执行。开始执行重分配任务,与–reassignment-json-file搭配使用
–verify 验证任务是否执行成功检查分区重新分配的状态。当有使用–throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响。 该选项用于检查分区重新分配的状态同时–throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。
–throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec – throttle 500000 。迁移过程注意流量陡增对集群的影响。Kafka提供一个broker之间复制传输的流量限制限制了副本从机器到另一台机器的带宽上限当重新平衡集群引导新broker添加或移除broker时候这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响 sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000加上一个–throttle 50000000 参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s加上参数后可以看到如下
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上–replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;
如果你想在重新平衡期间修改限制增加吞吐量以便完成的更快。你可以重新运行execute命令用相同的reassignment-json-file
–replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能限制数据拷贝从一个目录到另外一个目录带宽上。单位bytes/sec --replica-alter-log-dirs-throttle 100000
–disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息
–bootstrap-server 如果是副本跨路径迁移必须有此参数
下面是topic数据均衡操作迁移操作也是这个思路只是最终kafka节点目标少 背景原有3台kafka集群192.168.40.11–13扩容一台kafka 192.168.40.14把原有3台数据均衡到4台kafka上 首先在现有集群的基础上再添加⼀个Kafka节点进行物理组件节点扩容然后使⽤Kafka⾃带的kafka-reassign-partitions.sh ⼯具来重新分布分区进行数据均衡。该⼯具有三种使⽤模式 generate模式给定需要重新分配的Topic⾃动⽣成reassign plan并不执⾏。在此模式下给定Topic列表和Broker列表该工具会生成候选重新分配以将指定Topic的所有分区迁移到新Broker中。此选项仅提供了一种方便的方法可在给定Topic和目标Broker列表的情况下生成分区重新分配计划。 execute模式根据指定的reassign plan重新分配Partition。在此模式下该工具将根据用户提供的重新分配计划启动分区的重新分配。 使用–reassignment-json-file选项。由管理员手动制定自定义重新分配计划也可以使用–generate选项提供。 verify模式验证重新分配Partition是否成功。在此模式下该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。
基本步骤如下
查看集群中当前所有可用的topic目标是名为three的topic进行数据均衡到12344个kafka节点上
[rootlocalhost bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.40.11:9092
__consumer_offsets
first
second
three
topic_first查看计划数据均衡的topic的详细信息 列出 对应的topic名称的详细信息包括topic的分区数量副本及leader等。
./kafka-topics.sh --describe --zookeeper zk-ip:port --topic topic名称 命令模板
[rootlocalhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092 --topic three
[2024-03-06 01:25:53,446] WARN [AdminClient clientIdadminclient-1] Connection to node 4 (/192.168.40.14:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes1073741824Topic: three Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 2,3,1Topic: three Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,3,1Topic: three Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 2,3,1Topic: three Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 2,3,1Topic: three Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
编制负载均衡topic主题清单的json文件 借助kafka-reassign-partitions.sh⼯具⽣成reassign 计划不过我们先得按照要求定义⼀个json⽂件文件名自定义。⾥⾯说明哪些topic需要重新分区版本号固定为1⽂件内容如下
[rootnode1 ~]# cat topics-to-move.json
{ topics: [ { topic:three } ], version:1}或如下
[rootlocalhost ~]# cat topic-to-move.json
{topics: [{topic: three}],
version:1
}生成副本均衡迁移计划 然后使⽤ kafka-reassign-partitions.sh⼯具利用–generate⽣成reassign计划
./kafka-reassign-partitions.sh --zookeeper 192.168.12.100:2181 --topics-to-move-json-file ./plans/topic-to-move.json --broker-list 1,2,3,4,5,6 --generate 命令模板参数–broker-list “1,2,3,4” 表示期望均衡、迁移的本kafka集群全部节点也可是部分节点但节点个数不能小于副本数量。执行后输出两段内容其中黄色标识部分就是副本存储计划文件内容⽣成的就是将分区重新分布到broker 上的结果
[rootlocalhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --topics-to-move-json-file /root/topic-to-move.json --broker-list 1,2,3,4 --generate
Current partition replica assignment //目前的副本分配状态这个需要保留可以用它回滚
{version:1,partitions:[{topic:three,partition:0,replicas:[1,3,2],log_dirs:[any,any,any]},{topic:three,partition:1,replicas:[2,1,3],log_dirs:[any,any,any]},{topic:three,partition:2,replicas:[3,2,1],log_dirs:[any,any,any]},{topic:three,partition:3,replicas:[1,2,3],log_dirs:[any,any,any]},{topic:three,partition:4,replicas:[2,3,1],log_dirs:[any,any,any]}]}Proposed partition reassignment configuration //期望计划的副本分配状态
{version:1,partitions:[{topic:three,partition:0,replicas:[2,4,1],log_dirs:[any,any,any]},{topic:three,partition:1,replicas:[3,1,2],log_dirs:[any,any,any]},{topic:three,partition:2,replicas:[4,2,3],log_dirs:[any,any,any]},{topic:three,partition:3,replicas:[1,3,4],log_dirs:[any,any,any]},{topic:three,partition:4,replicas:[2,1,3],log_dirs:[any,any,any]}]}注
上述黄色输出内容说明解释partitions:[{topic:three,partition:0,replicas:[2,4,1]是计划将topic:three,partition:0分区分配到broker2、4、1上后面内容也同样编制存储计划json文件 上面最后一段黄色标识就是副本存储计划文件内容⽣成的就是将分区重新分布到broker 上的结果再创建一个存储计划json文件文件名自定义
[rootlocalhost bin]# vim increase-replication-factor.json
{version:1,partitions:[{topic:three,partition:0,replicas:[2,4,1],log_dirs:[any,any,any]},{topic:three,partition:1,replicas:[3,1,2],log_dirs:[any,any,any]},{topic:three,partition:2,replicas:[4,2,3],log_dirs:[any,any,any]},{topic:three,partition:3,replicas:[1,3,4],log_dirs:[any,any,any]},{topic:three,partition:4,replicas:[2,1,3],log_dirs:[any,any,any]}]}执行存储计划 该命令会将指定分区的副本重新分配到新的 broker上。集群控制器通过为每个分区添加新副本实现重新分配(增加复制系数)。新的副本将从分区的首领那里复制所有数据。根据分区大小的不同复制过程可能需要花一些时间因为数据是通过网络复制到新副本上的。在复制完成之后控制器将旧副本从副本清单里移除(恢复到原先的复制系数)。
[rootlocalhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/increase-replication-factor.json --execute
Current partition replica assignment{version:1,partitions:[{topic:three,partition:0,replicas:[1,3,2],log_dirs:[any,any,any]},{topic:three,partition:1,replicas:[2,1,3],log_dirs:[any,any,any]},{topic:three,partition:2,replicas:[3,2,1],log_dirs:[any,any,any]},{topic:three,partition:3,replicas:[1,2,3],log_dirs:[any,any,any]},{topic:three,partition:4,replicas:[2,3,1],log_dirs:[any,any,any]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4验证副本存储计划 在重分配进行过程中或者完成之后可以使用 kafka-reassign-partitions.sh 工具验证重分配 的状态。它可以显示重分配的进度、已经完成重分配的分区以及错误信息。
[rootlocalhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3,4 //说明已经分配在kafka集群的全部4个节点上了
Clearing topic-level throttles on topic three再次验证副本存储计划 说明没问题成功。14主机上分配了分区0、2、3共3个分区
[rootlocalhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092 --topic three
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes1073741824Topic: three Partition: 0 Leader: 1 Replicas: 2,4,1 Isr: 2,1,4Topic: three Partition: 1 Leader: 2 Replicas: 3,1,2 Isr: 2,3,1Topic: three Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4Topic: three Partition: 3 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4Topic: three Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,3,1到kafka-14主机查看数据已经有数据了分区0、2、3在14主机上
[rootlocalhost log]# ll
total 16
-rw-r--r--. 1 root root 0 Mar 4 21:48 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Mar 6 06:55 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Mar 6 01:47 meta.properties
-rw-r--r-- 1 root root 34 Mar 6 06:55 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 34 Mar 6 06:55 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Mar 6 03:01 three-0
drwxr-xr-x 2 root root 141 Mar 6 03:06 three-2
drwxr-xr-x 2 root root 141 Mar 6 03:01 three-3
[rootlocalhost log]# cd three-0
[rootlocalhost three-0]# ll
total 0
-rw-r--r-- 1 root root 10485760 Mar 6 03:01 00000000000000000000.index
-rw-r--r-- 1 root root 0 Mar 6 03:01 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Mar 6 03:01 00000000000000000000.timeindex
-rw-r--r-- 1 root root 0 Mar 6 03:01 leader-epoch-checkpoint1.2、修改topic分区partition的副本数扩缩容副本
分区重分配工具提供了一些特性用于改变分区的复制系数副本数。如果在创建分区时指定了错误的复制系数、副本数(比如在创建主题时没有足够多可用的 broker)那么就有必要修改它们。这可以通过创建一个 JSON 对象来完成该对象使用分区重新分配的执行步骤中使用的格式显式指定分区所需的副本数量。集群将完成重分配过程并使用新的复制系数、副本数。 Kafka不能通过命令行方式修改副本需要通过json等其他方式修改。无论是增加或减少副本都可以针对每个分区进行个性增加或减少不是必须所有分区都增加或同时减少。
增加副本操作
创建json文件 1方法一使用命令自动创建有待进一步验证 指定要修改副本数的Topic和分区以及新的副本分配。这个JSON文件可以通过kafka-reassign-partitions.sh脚本的–generate选项自动生成。命令模板如下11 14 18为kafka的broker id。 首先创建一个文件addReplicas.json现有业务topic名称为aaa
[rootkafka18 ~]# cat addReplicas.json
{topics: [{topic: aaa}],version: 1
}[rootkafka18 ~]# /usr/local/kafka_2.13-2.7.1/bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.40.18:9092 --topics-to-move-json-file addReplicas.json --broker-list 11,14,18 --generate
Current partition replica assignment
{version:1,partitions:[{topic:aaa,partition:0,replicas:[14,18,11],log_dirs:[any,any,any]},{topic:aaa,partition:1,replicas:[11,14,18],log_dirs:[any,any,any]},{topic:aaa,partition:2,replicas:[18,11,14],log_dirs:[any,any,any]}]}Proposed partition reassignment configuration
{version:1,partitions:[{topic:aaa,partition:0,replicas:[11,14,18],log_dirs:[any,any,any]},{topic:aaa,partition:1,replicas:[14,18,11],log_dirs:[any,any,any]},{topic:aaa,partition:2,replicas:[18,11,14],log_dirs:[any,any,any]}]}
2方法二手动编辑创建 假设topic已有3个分区2个副本需要增加到3个副本。创建一个预定义的topic分区副本json文件即副本存储计划
]# cat addReplicas.json //该名称自定义该文件与下面reassign.json文件内容一致仅格式不同使用reassign.json文件{version: 1, //固定写法partitions: [{topic: test, //业务的topic名称partition: 0, //partition的序号必须从0开始是第一个分区replicas: [ //期望增加到的副本数数字为broker的id说明期望增加到0123个broker上为3个副本0,1,2]},{topic: test,partition: 1,replicas: [1,0,2]},{topic: test,partition: 2,replicas: [2,0,1]}]
}或如下格式编写 整体操作步骤如下
首先查看topic的副本情况发现有5个分区每个分区有2个副本 bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092 --describe Topic: three PartitionCount: 5 ReplicationFactor: 2 Configs: segment.bytes1073741824 Topic: three Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: three Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: three Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: three Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: three Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3
首先编写配置文件把上面查询输出复制过来即可
vim reassign.json
{version:1,partitions:[{topic:three,partition:0,replicas:[1,3,2]},{topic:three,partition:1,replicas:[2,1,3]},{topic:three,partition:2,replicas:[3,2,1]},{topic:three,partition:3,replicas:[1,2,3]},{topic:three,partition:4,replicas:[2,3,1]}
]}执行副本存储计划
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{version:1,partitions:[{topic:three,partition:0,replicas:[1,3],log_dirs:[any,any]},{topic:three,partition:1,replicas:[2,1],log_dirs:[any,any]},{topic:three,partition:2,replicas:[3,2],log_dirs:[any,any]},{topic:three,partition:3,replicas:[1,2],log_dirs:[any,any]},{topic:three,partition:4,replicas:[2,3],log_dirs:[any,any]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4查看执行的状态
[rootlocalhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic three再次查看验证结果黄色部分broker的id说明副本都增加成功了分布在原有kafka集群了
]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes1073741824Topic: three Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2Topic: three Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3Topic: three Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1Topic: three Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: three Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1减少topic副本操作即从某个kafka节点下线副本
把下面4分区3副本名为topic_first的topic减少计划如下黄色为计划减掉的副本
[rootlocalhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: topic_first PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes1073741824Topic: topic_first Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2Topic: topic_first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: topic_first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: topic_first Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1我们必须保留Leader副本再把计划保留的其他Follower副本编辑上首先编写配置文件把上面查询输出复制过来即可
vim reassign.json
{version:1,partitions:[{topic:topic_first,partition:0,replicas:[3,1]},{topic:topic_first,partition:1,replicas:[1]},{topic:topic_first,partition:2,replicas:[2,1]},{topic:topic_first,partition:3,replicas:[3,2]}
]}执行计划
]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute //有zk写法
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file reassign.json --execute //无zkJson文件要用绝对路径或者确保可以找到json文件
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{version:1,partitions:[{topic:topic_first,partition:0,replicas:[3,1,2],log_dirs:[any,any,any]},{topic:topic_first,partition:1,replicas:[1,2,3],log_dirs:[any,any,any]},{topic:topic_first,partition:2,replicas:[2,3,1],log_dirs:[any,any,any]},{topic:topic_first,partition:3,replicas:[3,2,1],log_dirs:[any,any,any]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic_first-0,topic_first-1,topic_first-2,topic_first-3查询执行结果
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition topic_first-0 is complete.
Reassignment of partition topic_first-1 is complete.
Reassignment of partition topic_first-2 is complete.
Reassignment of partition topic_first-3 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic topic_first再次查看 Topic 的详情否按照预想减少副本结果成功
[rootlocalhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: topic_first PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes1073741824Topic: topic_first Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1Topic: topic_first Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: topic_first Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: topic_first Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2故障恢复方法 宕机如何恢复 少部分副本宕机 当leader宕机了会从follower选择⼀个作为leader。当宕机的重新恢复时会把之前commit的数据清空重新从leader⾥pull数据。 全部副本宕机 当全部副本宕机了有两种恢复⽅式: 等待ISR中的⼀个恢复后并选它作为leader。等待时间较⻓降低可⽤性 选择第⼀个恢复的副本作为新的leader⽆论是否在ISR中。并未包含之前leader commit的数据因此造成数据丢失
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。 Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISRin-sync replica已同步的副本的集合该集合中是⼀些分区的副本。 只有当这些副本都跟Leader中的副本同步了之后kafka才会认为消息已提交并反馈给消息的⽣产者。 如果这个集合有增减kafka会更新zookeeper上的记录。
如果所有的ISR副本都失败了怎么办此时有两种⽅法可选 等待ISR集合中的副本复活 选择任何⼀个⽴即可⽤的副本⽽这个副本不⼀定是在ISR集合中。需要设置 unclean.leader.election.enabletrue如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader这样有可能造成数据的丢失 这两种⽅法各有利弊实际⽣产中按需选择。 如果要等待ISR副本复活虽然可以保证⼀致性但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本则很可能该副本并不⼀致。
总结 Kafka中Leader分区选举通过维护⼀个动态变化的ISR集合来实现⼀旦Leader分区丢掉则从ISR中随机挑选⼀个副本做新的Leader分区。 如果ISR中的副本都丢失了则 可以等待ISR中的副本任何⼀个恢复接着对外提供服务需要时间等待。 从OSR中选出⼀个副本做Leader副本此时会造成数据丢失
还有一个ISR该参数全称in-sync replica它维护了一个集合例如截图里的2,0,1代表2,0,1副本保存的消息日志与leader 副本是保持一致的只有保持一致的副本包括所有副本才会被维护在ISR集合里当出现一定程度的不同步时就会将该对应已经不一致的副本移出ISR集合但是这种移出并非永久的一旦被移出的副本慢慢又恢复与leader一样时那么又会被加回isr集合当中。注意一点只有在这个ISR里的副本服务器才能在leader出现问题时有机会被选举为新的leader。
补充 用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json
{version: 1,partitions: [{topic: test_create_topic1,partition: 2,replicas: [2],log_dirs: [any]}, {topic: test_create_topic1,partition: 1,replicas: [1],log_dirs: [any]}, {topic: test_create_topic1,partition: 0,replicas: [0],log_dirs: [any]
}]}假如想把所有分区的副本都变成2, 那只需修改replicas: []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以根据自己想要的分配规则修改一下json文件就变成如下
{version: 1,partitions: [{topic: test_create_topic1,partition: 2,replicas: [2,0],log_dirs: [any,any]}, {topic: test_create_topic1,partition: 1,replicas: [1,2],log_dirs: [any,any]}, {topic: test_create_topic1,partition: 0,replicas: [0,1],log_dirs: [any,any]}]}注意log_dirs里面的数量要和replicas数量匹配;或者直接把log_dirs选项删除掉; 这个log_dirs是副本跨路径迁移时候的绝对路径 执行–execute
如果想在重新平衡期间修改限制增加吞吐量以便完成的更快。可以重新运行execute命令用相同的reassignment-json-file: 验证–verify完事之后,副本数量就增加了
副本缩容 副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除; 比如刚新增了一个副本,想重新恢复到一个副本 执行下面的json文件
{version: 1,partitions: [{topic: test_create_topic1,partition: 2,replicas: [2],log_dirs: [any]}, {topic: test_create_topic1,partition: 1,replicas: [1],log_dirs: [any]}, {topic: test_create_topic1,partition: 0,replicas: [0],log_dirs: [any]}]}1.3、Partition Reassign场景限流
Reassign中文重新分配 Partition Reassign限流注意事项
限流速度不能过小如果限流速度过小将不能触发实际的reassign复制过程。 限流参数不会对正常的副本fetch流量进行限速。 任务完成后您需要通过verify参数移除Topic和Broker上的限速参数配置。 如果刚开始已经设置了throttle参数则可以通过execute命令再次修改throttle参数。 如果刚开始没有设置throttle参数则需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicas和follower.replication.throttled.replicas参数、修改Broker上的leader.replication.throttled.rate和follower.replication.throttled.rate参数。
使用kafka-reassign-partitions.sh来进行Partition Reassign操作通过使用throttle参数来设置限流的大小。示例如下所示
创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe执行以下命令模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks-1 linger.ms0 bootstrap.serverscore-1-1:9092设置throttle参数并执行reassign操作。 创建reassignment-json-file文件reassign.json写入如下内容。
{version:1,partitions:[{topic:test-throttled,partition:0,replicas:[2,0,3],log_dirs:[any,any,any]}]}执行reassign操作。 由于模拟的写入速度为10 Mbit/s所以将reassign限流速度设置为30 Mbit/s。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute查看限流参数。 查看指定节点的Broker参数。2是broker的id ./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
查看指定Topic的参数。 ./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
查看reassign任务执行情况。 ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
说明任务完成后您需要重复执行上述命令以移除限流参数。
1.4、节点内副本移动到不同目录的场景限流
通过kafka-reassign-partitions.sh可以进行Broker节点内的副本迁移参数replica-alter-log-dirs-throttle可以对节点内的迁移IO进行限制。示例如下所示
创建测试Topic。 执行以下命令创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create可以通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe执行以下命令模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks-1 linger.ms0 bootstrap.serverscore-1-1:9092设置参数replica-alter-log-dirs-throttle并执行reassign操作。
创建文件reassign.json将目标目录写入reassignment文件中内容如下。
{version:1,partitions:[{topic:test-throttled,partition:0,replicas:[2,0,3],log_dirs:[any,/mnt/disk1/kafka/log,any]}]}执行replicas movement操作。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute查看限流参数。 Broker节点内目录间移动副本会在Broker上配置限流参数参数名为Brokerreplica.alter.log.dirs.io.max.bytes.per.second。
执行以下命令查看指定节点的Broker参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0查看reassign任务执行情况。 ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
说明任务完成后您需要重复执行上述命令以移除限流参数。
1.5、集群Broker恢复时副本数据同步场景限流
重要提示 限流速度不能过小如果限流速度过小将不能触发实际的reassign复制过程。 限流参数不会对正常的副本fetch流量进行限速。 数据恢复完成后需要使用kafka-configs.sh命令删除相应的参数。
步骤 当Broker重启时需要从leader副本进行副本数据的同步。在Broker节点迁移、坏盘修复重新上线等场景时由于之前的副本数据完全丢失、副本数据恢复会产生大量的同步流量有必要对恢复过程进行限流避免恢复流量过大影响正常流量。
创建测试Topic。 执行以下命令创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create可以通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe执行以下命令写入测试数据。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks-1 linger.ms0 bootstrap.serverscore-1-1:9092通过kafka-configs.sh命令设置限流参数。
//设置Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config leader.replication.throttled.replicas*,follower.replication.throttled.replicas*//设置Broker上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config leader.replication.throttled.rate1024,follower.replication.throttled.rate1024 --entity-name 0
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config leader.replication.throttled.rate1024,follower.replication.throttled.rate1024 --entity-name 1
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config leader.replication.throttled.rate1024,follower.replication.throttled.rate1024 --entity-name 2停止Broker 1节点进程。 删除Broker 1上的副本数据模拟数据丢失的场景。 rm -rf /mnt/disk2/kafka/log/test-throttled-0/ 启动Broker 1节点观察限流参数是否起作用。 待Broker 1相应的副本恢复到ISR列表后使用kafka-configs.sh命令删除限流参数的配置。
//删除Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config leader.replication.throttled.replicas,follower.replication.throttled.replicas --entity-name test-throttled//删除Broker上的限流参数
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate --entity-name 0