湖州网站建设哪家好,成立网站,谷歌浏览器 官网下载,建设通网站登录不进去我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息#xff0c;从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类#xff0c;而且如果该主题如果是第…我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类而且如果该主题如果是第一次被消费者拉取消息时因为得不到消费者的消费偏移最后的结果就是从0偏移开始拉取所有消息。并不能真正实现忽略上线之前所有消息的目的。 所以我又优化了方案。基本的原理就是使用KafkaConsumer.offsetsForTimes方法获取消费者的所有主题分区的指定时间的偏移并将这个偏移作为消费开始的偏移(KafkaConsumer.seek方法) 。 Testpublic void test3SeekToTime() {// 配置Kafka消费者的属性Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, my_consumer_group);props.put(key.deserializer, StringDeserializer.class.getName());props.put(value.deserializer, StringDeserializer.class.getName());// 创建Kafka消费者实例try(ConsumerString, String consumer new KafkaConsumer(props)){ boolean seek false;/** * 循环开始的时间* 忽略该时间之前的消息*/long startMills System.currentTimeMillis();while (true) {try {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(4000));if(!seek) {if(!records.isEmpty()) {/** * 获取第一批消息时更新消息偏移到循环开始的时间*/consumer.offsetsForTimes(Maps.asMap(consumer.assignment(),t-startMills)).forEach((k,v)-{if(null ! v) {System.out.println(seek %s to %s,k,v.offset());consumer.seek(k,v.offset());}});seek true;}/** 跳过第一批获取到的消息继续循环 */continue;}records.forEach(record - {String value record.value();System.out.println(Received message: value);});}catch (Exception e) {e.printStackTrace();}}}}