当前位置: 首页 > news >正文

代做安装预算的网站渭南做网站电话

代做安装预算的网站,渭南做网站电话,网站logo如何替换,网站建设及运营 经营范围文章目录 1、重试和死信主题2、死信队列3、代码演示3.1、appication.yml3.2、引入spring-kafka依赖3.3、创建SpringBoot启动类3.4、创建生产者发送消息3.5、创建消费者消费消息 1、重试和死信主题 kafka默认支持重试和死信主题 重试主题#xff1a;当消费者消费消息异常时当消费者消费消息异常时手动ack如果有异常继续向后消费为了保证消息尽可能成功消费可以将消费异常的消息扔到重试主题再通过重试主题的消费者尝试消费死信主题当一个消息消费多次仍然失败可以将该消息存到死信主题。避免漏消息以后可以人工介入手动解决消息消费失败的问题 2、死信队列 在Kafka中DLT通常指的是 Dead Letter Topic死信队列。 Dead Letter TopicDLT的定义与功能 背景原生Kafka是不支持Retry Topic和DLT的但Spring Kafka在客户端实现了这两个功能。功能当消息在Kafka中被消费时如果消费逻辑抛出异常并且重试策略如默认重试10次后仍无法成功处理该消息Spring Kafka会将该消息发送到DLT中。自定义处理可以通过自定义SeekToCurrentErrorHandler来控制消费失败后的处理逻辑例如添加重试间隔、设置重试次数等。如果在重试后消息仍然消费失败Spring Kafka会将该消息发送到DLT。 DLT的使用与意义 错误处理DLT为Kafka提供了一种机制来处理那些无法被成功消费的消息从而避免了消息的丢失或阻塞。监控与告警通过对DLT的监控可以及时发现并处理那些无法被成功消费的消息从而保障Kafka系统的稳定性和可靠性。二次处理对于发送到DLT的消息可以进行二次处理如手动干预、修复数据等以确保这些消息能够最终得到处理。 总之在Kafka中DLT是一个用于处理无法被成功消费的消息的特殊Topic它提供了一种灵活且可靠的机制来保障Kafka系统的稳定性和可靠性。 3、代码演示 3.1、appication.yml server:port: 8120 # v1 spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别read-uncommitted会导致脏读可以读取生产者事务还未提交的消息# 消费者是否自动ack true自动ack 消费者获取到消息后kafka提交消费者偏移量# 调用ack方法时才会提交ack给kafka # enable-auto-commit: false# 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始# earliest:从最早的消息开始消费# latest:第一次从LEO位置开始消费# none:如果主题分区没有偏移量则抛出异常auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer# json反序列化器 # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring.json.trusted.packages: *listener:# 手动ackmanual手动ack时 如果有异常会尝试一直消费 # ack-mode: manual# 手动ack消费有异常时停止ack-mode: manual_immediate 3.2、引入spring-kafka依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.0.5/versionrelativePath/ !-- lookup parent from repository --/parent!-- Generated by https://start.springboot.io --!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 https://springdoc.cn --groupIdcom.atguigu/groupIdartifactIdspring-kafka-consumer/artifactIdversion0.0.1-SNAPSHOT/versionnamespring-kafka-consumer/namedescriptionspring-kafka-consumer/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project 3.3、创建SpringBoot启动类 package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 https://springdoc.cn SpringBootApplication public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}} 3.4、创建生产者发送消息 package com.atguigu.spring.kafka.consumer; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import java.time.LocalDateTime; SpringBootTest public class KafkaProducerApplicationTests {ResourceKafkaTemplate kafkaTemplate;Testvoid sendRertyMsg(){kafkaTemplate.send(retry_topic,0,,重试机制和死信队列_ LocalDateTime.now());} } 没有指定分区默认会创建一个分区即使此时有3个kafka实例也只会用一个因为只有一个分区 [[{partition: 0,offset: 0,msg: 重试机制和死信队列_2024-06-07T15:49:37.398841600,timespan: 1717746578357,date: 2024-06-07 07:49:38}] ]3.5、创建消费者消费消息 给一个消费者配重试主题的时候死信主题消费者一般不写 package com.atguigu.spring.kafka.consumer.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.retry.annotation.Backoff; Component public class MyKafkaListenerAck {/*** 自动ack可能会导致漏消息* spring-kafka* 自动ack 如果有异常会死循环获取消息重新消费* 不能继续向后消费消息会导致消息积压** 手动ack 配置了手动ack且ack-mode为manual_immediate时* 如果消息消费失败会继续向后消费* param record*///指定消费者消费异常使用重试死信主题 必须结合手动ack使用RetryableTopic(attempts 3,numPartitions 3, //重试主题的分区数量backoff Backoff(value 2_000L), //重试的时间间隔autoCreateTopics true,retryTopicSuffix -myRetry, //指定重试主题创建时的后缀名使用原主题的名称拼接后缀生成名称 -retrydltTopicSuffix -myDlt //指定DLT主题创建时的后缀名使用原主题的名称拼接后缀生成名称 -dlt)KafkaListener(groupId my_group1,topicPartitions {TopicPartition(topic retry_topic,partitions {0,1,2})})public void consumeByRetry(ConsumerRecordString, String record, Acknowledgment ack) {System.out.println(consumeByRetry消费者获取到消息topic record.topic()partition:record.partition()offset record.offset()key record.key()value record.value());int i 1/0;//手动ackack.acknowledge();}/* //死信队列默认名称在原队列后拼接-dltKafkaListener(groupId my_group2,topicPartitions {TopicPartition(topic topic_dlt,partitions {0,1,2})})public void consumeByDLT(ConsumerRecordString, String record, Acknowledgment ack) {System.out.println(consumeByDLT消费者获取到消息topic record.topic()partition:record.partition()offset record.offset()key record.key()value record.value());//手动ackack.acknowledge();}*/} 此时运行SpringKafkaConsumerApplication控制台会报错因为我们手动写了异常 int i1/0 [[{partition: 0,offset: 0,msg: 重试机制和死信队列_2024-06-07T15:49:37.398841600,timespan: 1717746979414,date: 2024-06-07 07:56:19}] ]我们发现原本在重试主题中的消息死信主题也有一份
http://www.dnsts.com.cn/news/140731.html

相关文章:

  • 营销网站文章去那找如何用ps做照片模板下载网站
  • 个人网站设计作业dede网站源码 如何修改
  • 创建视频网站免费注册wordpress 论坛app
  • 一般用网站服务器网站建设+荆州
  • 网站开发环境是什么意思呼市网站制作招聘
  • 不一样的婚恋网站怎么做网页关键词优化难度
  • 三门峡高端网站建设下载企业微信app免费
  • 沧州黄骅港赶海的地方动态ip做网站影响seo吗
  • 网站中的打赏怎么做的手游推广赚佣金的平台
  • 网站租用服务器价格企业网站运营推广
  • 全屏的翻页网站这么做为什么wordpress有广告
  • 地方网站宁波网络公司排行榜
  • 大连网站建设服务网站关键词提高
  • 水果网站系统的建设与实现东莞专业网站推广需要多少钱
  • 健身会所网站模板网站建设的实施方案
  • 网站建设力度公司网站的功能
  • 网站建设流程有几个阶段cms建站系统安装
  • 做的比较好的分享网站网站设计宽屏
  • asp网站开发的实训wordpress自动增加阅读数代码
  • 湘潭响应式网站建设 速来磐石网络电子商务 网站开发
  • 青岛手机建站哪家好如何制作app软件游戏
  • 知名企业网站分析 比较评估企业网站建设要求
  • 咸宁市网站建设莆田手表网站
  • 用网站做淘客怎么做简单php企业网站源码
  • 深圳外文网站制作seo公司软件
  • 100m的光纤可以做网站吗南宁电商网络推广
  • 对接空间站中国风古典网站模板
  • 企业网站建设都能做哪些工作安徽新站优化
  • 电子商务网站建设人才调研电商网站建设实训要求
  • wap购物网站模板下载肃宁县做网站