当前位置: 首页 > news >正文

武汉手机网站建设价位成人本科报考官网

武汉手机网站建设价位,成人本科报考官网,网站建设项目wbs,dw做网站实例Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构#xff08;log-based#xff09;的消息引擎 消费消息时#xff0c;只是从磁盘文件上读取数据#xff0c;不会删除消息数据位移数据能由消费者控制#xff0c;能很容易修改位移的值#xff0c;实现重复消费历史数据… Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构log-based的消息引擎 消费消息时只是从磁盘文件上读取数据不会删除消息数据位移数据能由消费者控制能很容易修改位移的值实现重复消费历史数据 重设位移的两个维度 : 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值时间维度 : 将位移调整成 该时间的最小位移 7 种重设策略 : 维度策略含义位移维度Earliest位移调整当前最早位移处Latest位移调整到当前最新位移处Current位移调整到当前最新提交位移处Specified-Offset位移调整成指定位移Shift-By-N位移调整到当前位移 N 处时间维度DataTime位移调整到大于给定时间的最小位移处Duration位移调整到离当前时间指定间隔的位移处 Earliest 策略 : 将位移调整到主题当前最早位移处 不一定是 0很久的消息会被 Kafka 自动删除所以当前最早位移可能是 0 的值当要重新消费主题的所有消息就用 Earliest 策略 Latest 策略 : 把位移重设成最新末端位移 当向某个主题发送 15 条消息最新末端位移是 15当要跳过所有历史消息从最新的消息处开始消费就用 Latest 策略 Current 策略 : 将位移调整成消费者当前提交的最新位移 修改消费者代码并重启消费者结果发现代码有问题回滚前的代码变更还把位移重设为消费者重启时的位置 Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处 消费者处理某条错误消息时 , 能手动跳过此消息的处理生产中 , 出现 corrupted 消息无法被消费时消费者会抛出异常无法继续工作 Specified-Offset 策略 : 指定位移的绝对数值 Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离 把位移重设成当前位移的前 100 条位移处指定 N 为 -100 DateTime : 指定一个时间将位移重置到该时间之后的最早位移处 使用场景 : 重新消费昨天的数据并使用该策略重设位移到昨天 0 点 Duration 策略 : 相对的时间间隔将位移调整到距离当前给定时间间隔的位移处格式是 PnDTnHnMnS Java 8 引入 Duration : 以字母 P 开头后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)将位移调回到 15 分钟前就指定 PT0H15M0S 消费者 API Earliest 策略 : Properties consumerProperties new Properties(); // 禁止自动提交位移 consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 重设的消费者组的组 ID consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic test; // 要重设位移的 Kafka 主题 try (final KafkaConsumerString, String consumer new KafkaConsumer(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo -new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList())); } Latest 策略 : consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo - new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));Current 策略 : 获取当前提交的最新位移 : consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).forEach(tp - {long committedOffset consumer.committed(tp).offset();consumer.seek(tp, committedOffset); });Specified-Offset 策略 : long targetOffset 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset); }实现 Shift-By-N 策略 for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset consumer.committed(tp).offset() 123L; consumer.seek(tp, targetOffset); }DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点 long ts LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - ts));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset()); }Duration 策略 : 将位移调回 30 分钟前 MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - System.currentTimeMillis() - 30 * 1000 * 60));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset()); }命令行 Earliest 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-earliest –executeLatest 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-latest --executeCurrent 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-current --executeSpecified-Offset 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-offset offset --executeShift-By-N 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --shift-by offset_N --executeDateTime 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --to-datetime 2019-06-20T20:00:00.000 --executeDuration 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --by-duration PT0H30M0S --execute
http://www.dnsts.com.cn/news/1951.html

相关文章:

  • 登陆中国建设银行网站我的账户密码怎么就有了?怎么清除2021最火营销方案
  • 电话网站域名到期品牌营销策划十大要点
  • 网站建设开发感想网上怎么发布广告
  • 做网站选用什么域名比较好潍坊网站建设平台
  • 做外贸网站需要多少钱中央广播电视总台
  • 伪静态 多个网站广州seo外包公司
  • 朝阳周边网站建设申请自己的网站
  • dw怎么做网站注册登入页面台州seo优化公司
  • 什么做的网站推广网络推广优化招聘
  • 有没有做废品的网站seo自学教程seo免费教程
  • 网站开发外键做外贸网站的公司
  • 企业网设计方案整站关键词排名优化
  • 建设项目查询网站seo课
  • 网站建设设计公司关键词云图
  • 网店推广的常用方法有哪些杭州seo技术培训
  • wordpress设置页面403权限宁波seo智能优化
  • ps个人网站怎么做百度地图在线使用
  • dz网站收款即时到账怎么做的流氓网站
  • 沈阳网站百度竞价投放
  • 网站建设项目策划书游戏优化是什么意思
  • 外贸网站做开关行业的哪个好北京首页关键词优化
  • 自制网站除了购买域名还要怎么做提升网页优化排名
  • 北京手机站建站爱站网关键词查询系统
  • 践行新使命忠诚保大庆网站建设重庆seo务
  • 邮箱登陆嵌入网站百度广告太多
  • 做网站 设备郑州优化公司有哪些
  • 电子商务网站开发技术免费推广软件
  • 企业建设网站公司简介网上有免费的网站吗
  • 海洋公司做网站推广游戏推广论坛
  • 网站手册网站快速收录