当前位置: 首页 > news >正文

广州seo网站排名优化吉林省白山市建设局官方网站

广州seo网站排名优化,吉林省白山市建设局官方网站,网站建设价格如何,wordpress4.7.8文章目录 1. 自动提交消费位移2. 自动提交消费位移存在的问题#xff1f;3. 手动提交消费位移1. 同步提交消费位移2. 异步提交消费位移3. 同步和异步组合提交消费位移4. 提交特定的消费位移5. 按分区提交消费位移 4. 消费者查找不到消费位移时怎么办#xff1f;5. 如何从特定… 文章目录 1. 自动提交消费位移2. 自动提交消费位移存在的问题3. 手动提交消费位移1. 同步提交消费位移2. 异步提交消费位移3. 同步和异步组合提交消费位移4. 提交特定的消费位移5. 按分区提交消费位移 4. 消费者查找不到消费位移时怎么办5. 如何从特定分区位移处读取消息6. 如何优雅地退出轮询循环消费 1. 自动提交消费位移 最简单的提交方式是让消费者自动提交偏移量自动提交 offset 的相关参数 enable.auto.commit是否开启自动提交 offset 功能默认为 true;auto.commit.interval.ms自动提交 offset 的时间间隔默认为5秒 如果 enable.auto.commit 被设置为true那么每过5秒消费者就会自动提交 poll() 返回的最大偏移量即将拉取到的每个分区中最大的消息位移进行提交。提交时间间隔通过 auto.commit.interval.ms 来设定默认是5秒。与消费者中的其他处理过程一样自动提交也是在轮询循环中进行的。消费者会在每次轮询时检查是否该提交偏移量了如果是就会提交最后一次轮询返回的偏移量。 ① 启动消费者消费程序并设置为自动提交消费者位移的方式 public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-ni);// 显式配置消费者自动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 显式配置消费者自动提交位移的事件间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(ni));// 消费数据while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.printf(主题 %s, 分区 %d, 位移 %d, 消息键 %s, 消息值 %s\n,record.topic(), record.partition(), record.offset(), record.key(), record.value());}}} }② 启动生产者程序发送3条消息消息的内容都为 hello,kafka ③ 查看消费者消费的消息记录 主题 ni, 分区 0, 位移 0, 消息键 null, 消息值 hello,kafka 主题 ni, 分区 0, 位移 1, 消息键 null, 消息值 hello,kafka 主题 ni, 分区 0, 位移 2, 消息键 null, 消息值 hello,kafka可以看到消费者消费分区的最新消息的位移为 offset 2即消费者的消息位移为 offset 2; ④ 查看消费者提交的位移 [rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter --from-beginning[group-ni,ni,0]::OffsetAndMetadata(offset3, leaderEpochOptional[0], metadata, commitTimestamp1692168114999, expireTimestampNone)可以看到消费者的消息位移为 offset 2但是消费者的提交位移为 offset 3; 2. 自动提交消费位移存在的问题 假设刚刚提交完一次消费位移然后拉取一批消息进行消费在下一次自动提交消费位移之前消费者崩溃了那么又得从上一次位移提交的地方重新开始消费这样便发生了重复消费的现象对于再均衡的情况同样适用再均衡完成之后接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息。可以通过修改提交时间间隔来更频繁地提交偏移量缩小可能导致重复消息的时间窗口但无法完全避免。 在使用自动提交时到了该提交偏移量的时候轮询方法将提交上一次轮询返回的偏移量但它并不知道具体哪些消息已经被处理过了。所以在再次调用poll()之前要确保上一次poll()返回的所有消息都已经处理完毕调用close()方法也会自动提交偏移量。通常情况下这不会有什么问题但在处理异常或提前退出轮询循环时需要特别小心。 虽然自动提交很方便但是没有为避免开发者重复处理消息留有余地。 3. 手动提交消费位移 在Kafka中还提供了手动位移提交的方式这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成而是需要将消息写入数据库、写入本地缓存或者是更加复杂的业务处理。在这些场景下所有的业务处理完成才能认为消息被成功消费手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。 开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false让应用程序自己决定何时提交偏移量。手动提交可以细分为同步提交和异步提交对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。 ① 同步提交位移是指消费者在提交位移时会阻塞直到提交完成并收到确认。它会提交 poll() 返回的最新偏移量提交成功后马上返回如果由于某些原因提交失败就抛出异常。 commitAsync() 方法有四个不同的重载方法具体定义如下 public void commitSync() public void commitSync(Duration timeout) public void commitSync(MapTopicPartition, OffsetAndMetadata offsets) public void commitSync(MapTopicPartition, OffsetAndMetadata offsets, Duration timeout) ② 异步提交位移在执行的时候消费者线程不会被阻塞可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法具体定义如下 public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(MapTopicPartition, OffsetAndMetadata offsets, OffsetCommitCallback callback) 1. 同步提交消费位移 在消费消息的循环中处理完当前批次的消息后在轮询更多的消息之前调用 commitSync() 方法提交当前批次最新的偏移量这会阻塞当前线程直到位移提交完成并收到确认。 只要没有发生不可恢复的错误commitSync() 方法就会一直尝试直至提交成功。如果提交失败就把异常记录到错误日志里。 public void commitSync()Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic-01));// 消费数据while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {// 业务处理拉取的消息}try{// 消费者手动提交消费位移同步提交方式consumer.commitSync();}catch (CommitFailedException exception){log.error(commit failed....);}}} }还可以将消费者程序修改为批量处理批量提交的方式 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic-01));// 消费数据while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));int minSize 200;ListConsumerRecordString, String buffer new ArrayList();for (ConsumerRecordString, String record : consumerRecords) {buffer.add(record);}try{// 消费者手动提交消费位移同步提交方式if(buffer.size()minSize){// 批量处理消息// ...}// 手动提交位移同步方式consumer.commitSync();}catch (CommitFailedException exception){log.error(commit failed....);}}} }上面的示例中将拉取到的消息存入缓存 buffer等到积累到足够多的时候也就是大于等于200个的时候再做相应的批量处理之后再做批量提交。 commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交只要没有发生不可恢复的错误它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等我们可以将其捕获并做针对性的处理。 需要注意的是同步提交位移时需要确保在处理完消息后再进行提交因为 commitSync() 将会提交 poll() 返回的最新偏移量如果你在处理完所有记录之前就调用了 commitSync()那么一旦应用程序发生崩溃就会有丢失消息的风险消息已被提交但未被处理。如果应用程序在处理记录时发生崩溃但 commitSync() 还没有被调用那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理——这或许比丢失消息更好或许更坏。 2. 异步提交消费位移 同步提交有一个缺点在broker对请求做出回应之前应用程序会一直阻塞这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐量但如果发生了再均衡则会增加潜在的消息重复。这个时候可以使用异步提交API。只管发送请求无须等待broker做出响应。 public void commitAsync() Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic-01));// 消费数据while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {// 业务逻辑处理}// 异步提交消费位移consumer.commitAsync();}} }在提交成功或碰到无法恢复的错误之前commitSync() 会一直重试但commitAsync()不会这是commitAsync() 的一个缺点。之所以不进行重试是因为 commitAsync() 在收到服务器端的响应时可能已经有一个更大的位移提交成功。假设我们发出一个提交位移2000的请求这个时候出现了短暂的通信问题服务器收不到请求自然也不会做出响应。与此同时我们处理了另外一批消息并成功提交了位移3000。如果此时 commitAsync() 重新尝试提交位移2000则有可能在位移3000之后提交成功。这个时候如果发生再均衡就会导致消息重复。 之所以提到这个问题并强调提交顺序的重要性是因为 commitAsync() 也支持回调回调会在broker返回响应时执行。回调经常被用于记录位移提交错误或生成指标如果要用它来重试提交位移那么一定要注意提交顺序。 public void commitAsync(OffsetCommitCallback callback)Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic-01));// 消费数据while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {// 业务逻辑处理}// 异步提交消费位移consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsetAndMetadataMap, Exception exception) {if(exception!null){log.info(fail to commit offsets:{},offsetAndMetadataMap,exception);}}});}} }异步提交中如何实现重试我们可以设置一个递增的序号来维护异步提交的顺序每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候可以检查所提交的位移和序号的值的大小如果前者小于后者则说明有更大的位移已经提交了不需要再进行本次重试如果两者相同则说明可以进行重试提交。 3. 同步和异步组合提交消费位移 一般情况下偶尔提交失败但不进行重试不会有太大问题因为如果提交失败是由于临时问题导致的后续的提交总会成功。如果消费者异常退出那么这个重复消费的问题就很难避免因为这种情况下无法及时提交消费位移但如果这是发生在消费者被关闭或再均衡前的最后一次提交则要确保提交是成功的可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));try {while (true) {ConsumerRecordsString, String records consumer.poll( Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 业务逻辑处理}// 异步提交位移consumer.commitAsync();}} catch (Exception e) {log.error(Unexpected error, e);} finally {try {// 同步提交位移consumer.commitSync();}finally{consumer.close();}}} }4. 提交特定的消费位移 对于采用 commitSync() 的无参方法而言它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。但如果想要更频繁地提交位移该怎么办如果 poll() 返回了一大批数据那么为了避免可能因再均衡引起的消息重复想要在批次处理过程中提交位移该怎么办这个时候不能只是调用 commitSync() 或commitAsync()因为它们只会提交消息批次里的最后一个位移。 幸运的是消费者API允许在调用 commitSync() 和 commitAsync() 时传给它们想要提交的分区和位移 public void commitSync(MapTopicPartition, OffsetAndMetadata offsets) public void commitAsync(MapTopicPartition, OffsetAndMetadata offsets, OffsetCommitCallback callback)如图消费者的提交位移当前一次poll拉取的分区消息的最大位移offset 1这个提交位移就是下次 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));ConcurrentHashMapTopicPartition,OffsetAndMetadata offsets new ConcurrentHashMap();int count 0;while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 消息所属的主题和分区TopicPartition topicPartition new TopicPartition(record.topic(), record.partition());// 消费者提交的消费位移当前消费消息的位移1OffsetAndMetadata offsetAndMetadata new OffsetAndMetadata(record.offset() 1);offsets.put(topicPartition, offsetAndMetadata);if(count % 1000 0){consumer.commitAsync(offsets,null);}count;}}} }5. 按分区提交消费位移 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));while (true){ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 获取拉取的消息包含的所有分区列表SetTopicPartition partitions consumerRecords.partitions();for (TopicPartition partition : partitions) {// 获取当前分区要消费的消息ListConsumerRecordString, String partitionRecords consumerRecords.records(partition);// 获取当前分区消息的最大位移long lastConsumerOffset partitionRecords.get(partitionRecords.size() - 1).offset();// 当前分区的消费位移提交 当前分区消息的最大位移 1MapTopicPartition, OffsetAndMetadata topicPartitionOffsetAndMetadataMap Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset 1));consumer.commitSync(topicPartitionOffsetAndMetadataMap);}}} }4. 消费者查找不到消费位移时怎么办 当一个新的消费组建立的时候它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题它也没有可以查找的消费位移。当__consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后它也没有可以查找的消费位移。当 Kafka 中没有初始位移或服务器上不再存在当前位移时该怎么办 此时会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费auto.offset.reset 参数的取值如下 latest默认值表示从分区末尾开始消费消息。earliest 表示消费者会从起始处也就是0开始消费。none查到不到消费位移的时候既不从最新的消息位置处开始消费也不从最早的消息位置处开始消费此时会报出NoOffsetForPartitionException异常。如果能够找到消费位移那么配置为“none”不会出现任何异常。 如果配置的不是“latest”、“earliest”和“none”则会报出ConfigException异常。 auto.offset.reset 参数用于指定消费者在启动时如果找不到消费位移应该从哪里开始消费消息。 如果能够找到消费位移那么消费者会从该位移处开始消费消息那么 auto.offset.reset 参数并不会奏效只有在找不到消费位移时才会生效。如果发生位移越界即消费位移超出了消息队列中消息的数量或位置范围那么 auto.offset.reset 参数也会生效。 5. 如何从特定分区位移处读取消息 如果消费者能够找到消费位移使用 poll() 可以从各个分区的最新位移处读取消息 而且提供的 auto.offset.reset 参数也可以在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。但是有些时候我们需要一种更细粒度的掌控可以让我们从特定的位移处开始拉取消息而 KafkaConsumer 中的 seek() 方法正好提供了这个功能让我们得以追前消费或回溯消费。 public void seek(TopicPartition partition, long offset) public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)① seek() 方法中的参数 partition 表示分区而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置而分区的分配是在 poll() 方法的调用过程中实现的。也就是说在执行 seek() 方法之前需要先执行一次poll()方法等到分配到分区之后才可以重置消费位置 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));// 执行一次poll() 方法完成分区分配的逻辑// ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(0));ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(10000));SetTopicPartition topicPartitions consumer.assignment();for (TopicPartition topicPartition : topicPartitions) {consumer.seek(topicPartition,10);}while (true) {ConsumerRecordsString, String poll consumer.poll(Duration.ofMillis(1000));// ...}} }② 如果 poll() 方法中的参数为0此方法立刻返回那么 poll() 方法内部进行分区分配的逻辑就会来不及实施也就是说消费者此时并未分配到任何分区那么 topicPartitions 便是一个空列表。那么这里的 timeout 参数设置为多少合适呢太短会使分配分区的动作失败太长又有可能造成一些不必要的等待。我们可以通过 KafkaConsumer的 assignment方法来判定是否分配到了相应的分区 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));SetTopicPartition topicPartitions consumer.assignment();// 此时说明还未完成分区分配while (topicPartitions.size()0){consumer.poll(Duration.ofMillis(100));topicPartitions consumer.assignment();}for (TopicPartition topicPartition : topicPartitions) {// 重置每个分区的消费位置为10consumer.seek(topicPartition,10);}while (true) {ConsumerRecordsString, String poll consumer.poll(Duration.ofMillis(1000));// 消费消息}} }③ 如果对未分配到的分区执行seek() 方法那么会报出 IllegalStateException 的异常。类似在调用subscribe() 方法之后直接调用seek() 方法 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));// 未完成分区分配直接调用seek方法,重置分区1的消费位置为10consumer.seek(new TopicPartition(topic-01,1),10);while (true) {ConsumerRecordsString, String poll consumer.poll(Duration.ofMillis(1000));// 消费消息}} }报错 Exception in thread main java.lang.IllegalStateException: No current assignment for partition topic-01-1④ 如果消费组内的消费者在启动的时候能够找到消费位移那么消费者就会从该位移处开始消费消息。除非发生位移越界即消费位移超出了消息队列中消息的数量或位置范围否则 auto.offset.reset 参数并不会奏效此时如果想指定从开头或末尾开始消费就需要seek() 方法的帮助了指定从分区末尾开始消费 endOffsets() 方法用来获取指定分区的末尾的消息位置 endOffsets 的具体方法定义如下 public MapTopicPartition, Long endOffsets(CollectionTopicPartition partitions) public MapTopicPartition, Long endOffsets(CollectionTopicPartition partitions, Duration timeout)其中 partitions 参数表示分区集合而 timeout 参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值那么 endOffsets() 方法的等待时间由客户端参数 request.timeout.ms 来设置默认值为 30000。与 endOffsets 对应的是 beginningOffset() 方法一个分区的起始位置起初是0但并不代表每时每刻都为0因为日志清理的动作会清理旧的数据所以分区的起始位置会自然而然地增加beginningOffsets() 方法的具体定义如下 public MapTopicPartition, Long beginningOffsets(CollectionTopicPartition partitions) public MapTopicPartition, Long beginningOffsets(CollectionTopicPartition partitions, Duration timeout)beginningOffsets() 方法中的参数内容和含义都与 endOffsets() 方法中的一样配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法来实现这两个功能这两个方法的具体定义如下 public void seekToBeginning(CollectionTopicPartition partitions) public void seekToEnd(CollectionTopicPartition partitions)⑤ 有时候我们并不知道特定的消费位置却知道一个相关的时间点比如我们想要消费昨天8点之后的消息这个需求更符合正常的思维逻辑。此时我们无法直接使用seek() 方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况它提供了一个offsetsForTimes() 方法通过timestamp来查询与此对应的分区位置 public MapTopicPartition, OffsetAndTimestamp offsetsForTimes(MapTopicPartition, Long timestampsToSearch) public MapTopicPartition, OffsetAndTimestamp offsetsForTimes(MapTopicPartition, Long timestampsToSearch, Duration timeout)offsetsForTimes() 方法的参数 timestampsToSearch 是一个Map类型key为待查询的分区而 value 为待查询的时间戳该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳对应于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之间的使用方法首先通过 offsetsForTimes() 方法获取一天之前的消息位置然后使用 seek() 方法追溯到相应位置开始消费 Slf4j public class CustomConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ConsumerConfig.GROUP_ID_CONFIG,group-topic-01);// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Arrays.asList(topic-01));MapTopicPartition,Long timestampToSearch new HashMap();SetTopicPartition topicPartitionSet consumer.assignment();// 查询的分区以及查询的时间戳for (TopicPartition topicPartition : topicPartitionSet) {timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);}// 获取时间戳大于等于待查询时间的第一条消息对应的位置和时间戳MapTopicPartition, OffsetAndTimestamp topicPartitionOffsetAndTimestampMap consumer.offsetsForTimes(timestampToSearch);for (TopicPartition topicPartition : topicPartitionSet) {OffsetAndTimestamp offsetAndTimestamp topicPartitionOffsetAndTimestampMap.get(topicPartition);// seek 方法重置消费的位移if(offsetAndTimestamp ! null){consumer.seek(topicPartition,offsetAndTimestamp.offset());}}} }⑥ 位移越界也会触发 auto.offset.reset 参数的执行位移越界是指知道消费位置却无法在实际的分区中查找到比如原本拉取位置为101fetch offset 101但已经越界了out of range所以此时会根据 auto.offset.reset 参数的默认值来将拉取位置重置resetting offset为100我们也能知道此时分区中最大的消息 offset 为99。 6. 如何优雅地退出轮询循环消费 如何优雅地退出轮询循环如果你确定马上要关闭消费者即使消费者还在等待一个poll()返回那么可以在另一个线程中调用consumer.wakeup()。如果轮询循环运行在主线程中那么可以在ShutdownHook里调用这个方法。需要注意的是consumer.wakeup() 是消费者唯一一个可以在其他线程中安全调用的方法。调用 consumer.wakeup() 会导致poll()抛出WakeupException如果调用 consumer.wakeup() 时线程没有在轮询那么异常将在下一次调用 poll() 时抛出。不一定要处理WakeupException但在退出线程之前必须调用consumer.close() 。消费者在被关闭时会提交还没有提交的偏移量并向消费者协调器发送消息告知自己正在离开群组。协调器会立即触发再均衡被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者不需要等待会话超时。
http://www.dnsts.com.cn/news/7670.html

相关文章:

  • 郑州网站推广公司排名东海网站建设
  • 南昌网站建设公司信息wordpress导航怎么设置
  • 河南网络洛阳网站建设河南网站建设建设工程施工合同2017
  • 网站空间和服务器的区别怎样建设凡科网站
  • 网上做网站广告投放大学生dw网页设计作业
  • 返利网app网站开发在wordpress教程视频
  • 易经网站开发公司北京seo网站优化培训
  • 个人网站尺寸商标设计logo图案设计软件
  • logo网站设计中建材建设有限公司网站
  • 如何建造免费的网站怎么样才能把网站关键词做有排名
  • python网站开发视频庆阳做网站的公司
  • 雁塔区网站建设网站上的图是怎么做的
  • 什么网站可做浏览器首页珠海公司网站制作公
  • 英文站 wordpress seo优化电销系统外呼软件
  • 佛山找人做网站p2p网站策划
  • 做盗市相关网站福建路桥建设有限公司网站
  • 叫外包公司做网站不肯给源代码的上海政务服务网官网
  • 电子商务网站开发报告目前网站建设采用什么技术
  • 网站建设公司创业计划书谷歌浏览器搜索入口
  • 北京免费网站建站模板安徽注册公司网站
  • 网站建设培训需要多少钱安卓优化大师hd
  • 做网站和app多少费用沈阳高端网站设计
  • 做电子请帖的网站网站建设步骤及推广方法
  • 青岛网站seo价格国外建设网站情况报告
  • 字体在线设计网站大数据网站建设费用
  • 旅游网站建设目标分析wordpress 跟随插件
  • 什么软件可以做网站晋江网站建设价格
  • 深圳市做网站知名公司汕头网站推广多少钱
  • 建设高校网站的现实意义挂号网站制作
  • 优惠券个人网站怎么做wordpress 成功案例