当前位置: 首页 > news >正文

网站建设公司株洲官方手表网站

网站建设公司株洲,官方手表网站,衡阳建设学校网站,电子商务网站建设重点难点springboot 连接 kafka集群 一、环境搭建1.1 springboot 环境1.2 kafka 依赖 二、 kafka 配置类2.1 发布者2.1.1 配置2.1.2 构建发布者类2.1.3 发布消息 2.2 消费者2.2.1 配置2.2.2 构建消费者类2.2.3 进行消息消费 一、环境搭建 1.1 springboot 环境 JDK 11 Maven 3.8.x spr… springboot 连接 kafka集群 一、环境搭建1.1 springboot 环境1.2 kafka 依赖 二、 kafka 配置类2.1 发布者2.1.1 配置2.1.2 构建发布者类2.1.3 发布消息 2.2 消费者2.2.1 配置2.2.2 构建消费者类2.2.3 进行消息消费 一、环境搭建 1.1 springboot 环境 JDK 11 Maven 3.8.x springboot 2.5.4 1.2 kafka 依赖 springboot的pom文件导入 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.4.0/version/dependency二、 kafka 配置类 2.1 发布者 2.1.1 配置 发布者我们使用 KafkaTemplate 来进行消息发布所以需要先对其进行一些必要的配置。 Configuration EnableKafka public class KafkaConfig {/***** 发布者 *****///生产者工厂Beanpublic ProducerFactoryInteger, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}//生产者配置Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板Beanpublic KafkaTemplateInteger, String kafkaTemplate() {return new KafkaTemplate(producerFactory());} }2.1.2 构建发布者类 配置完发布者下来就是发布消息我们需要继承 ProducerListenerK, V 接口该接口完整信息如下 public interface ProducerListenerK, V {void onSuccess(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata);void onError(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata,Exception exception);}实现该接口的方法我们可以获取包含发送结果成功或失败的异步回调也就是可以在这个接口的实现中获取发送结果。 我们简单的实现构建一个发布者类接收主题和发布消息参数并打印发布结果。 Component public class KafkaProducer implements ProducerListenerObject,Object {private static final Logger producerlog LoggerFactory.getLogger(KafkaProducer.class);private final KafkaTemplateInteger, String kafkaTemplate;public KafkaProducer(KafkaTemplateInteger, String kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}public void producer (String msg,String topic){ListenableFutureSendResultInteger, String future kafkaTemplate.send(topic,0, msg);future.addCallback(new KafkaSendCallbackInteger, String() {Overridepublic void onSuccess(SendResultInteger, String result) {producerlog.info(发送成功 {}, result);}Overridepublic void onFailure(KafkaProducerException ex) {ProducerRecordInteger, String failed ex.getFailedProducerRecord();producerlog.info(发送失败 {},failed);}});}}2.1.3 发布消息 写一个controller类来测试我们构建的发布者类这个类中打印接收到的消息来确保信息接收不出问题。 RestController public class KafkaTestController {private static final Logger kafkaTestLog LoggerFactory.getLogger(KafkaTestController.class);Resourceprivate KafkaProducer kafkaProducer;GetMapping(/kafkaTest)public void kafkaTest(String msg,String topic){kafkaProducer.producer(msg,topic);kafkaTestLog.info(接收到消息 {} {},msg,topic);} } 一切准备就绪我们启动程序利用postman来进行简单的测试。 进行消息发布 发布结果 可以看到消息发送成功。 我们再看看kafka消费者有没有接收到消息 看以看到kakfa的消费者也接收到了消息。 2.2 消费者 2.2.1 配置 消息的接受有多种方式我们这里选择的是使用 KafkaListener 注解来进行消息接收。它的使用像下面这样 public class Listener {KafkaListener(id foo, topics myTopic, clientIdPrefix myClientId)public void listen(String data) {...}}看起来不是太难吧但使用这个注解我们需要配置底层 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。 我们在原来的kafka配置类 KafkaConfig 中继续配置消费者大概就像下面这样 Configuration EnableKafka public class KafkaConfig {/***** 发布者 *****///生产者工厂Beanpublic ProducerFactoryInteger, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}//生产者配置Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板Beanpublic KafkaTemplateInteger, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}/***** 消费者 *****///容器监听工厂BeanKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, StringkafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}//消费者工厂Beanpublic ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}//消费者配置Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);return props;} }注意要设置容器属性必须使用getContainerProperties()工厂方法。它用作注入容器的实际属性的模板 2.2.2 构建消费者类 配置好后我们就可以使用这个注解了。这个注解的使用有多种方式 1、用它来覆盖容器工厂的concurrency和属性 KafkaListener(id myListener, topics myTopic,autoStartup ${listen.auto.start:true}, concurrency ${listen.concurrency:3}) public void listen(String data) {... } 2、可以使用显式主题和分区以及可选的初始偏移量 KafkaListener(id thing2, topicPartitions { TopicPartition(topic topic1, partitions { 0, 1 }),TopicPartition(topic topic2, partitions 0,partitionOffsets PartitionOffset(partition 1, initialOffset 100))}) public void listen(ConsumerRecord?, ? record) {... } 3、将初始偏移应用于所有已分配的分区 KafkaListener(id thing3, topicPartitions { TopicPartition(topic topic1, partitions { 0, 1 },partitionOffsets PartitionOffset(partition *, initialOffset 0))}) public void listen(ConsumerRecord?, ? record) {... } 4、指定以逗号分隔的分区列表或分区范围 KafkaListener(id pp, autoStartup false,topicPartitions TopicPartition(topic topic1,partitions 0-5, 7, 10-15)) public void process(String in) {... } 5、可以向侦听器提供Acknowledgment KafkaListener(id cat, topics myTopic,containerFactory kafkaManualAckListenerContainerFactory) public void listen(String data, Acknowledgment ack) {...ack.acknowledge(); } 6、添加标头 KafkaListener(id list, topics myTopic, containerFactory batchFactory) public void listen(ListString list,Header(KafkaHeaders.RECEIVED_KEY) ListInteger keys,Header(KafkaHeaders.RECEIVED_PARTITION) ListInteger partitions,Header(KafkaHeaders.RECEIVED_TOPIC) ListString topics,Header(KafkaHeaders.OFFSET) ListLong offsets) {... } 我们这里写一个简单的只用它来接受指定主题的数据 Component public class KafkaConsumer {private static final Logger consumerlog LoggerFactory.getLogger(KafkaConsumer.class);KafkaListener(topicPartitions TopicPartition(topic kafka-topic-test,partitions 0))public void consumer (String data){consumerlog.info(消费者接收数据 {},data);} }这里解释一下因为我们进行了手动分配主题/分区所以 注解中group.id 可以为空。若要指定group.id请在消费者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 TopicPartition 注解后加上 groupId “组id” 2.2.3 进行消息消费 继续使用postman调用我们写好的发布者发布消息观察控制台的消费者类是否有相关日志出现。
http://www.dnsts.com.cn/news/201270.html

相关文章:

  • 网站开发如何使用APIlnmp wordpress 邮件
  • 北京网站优化哪家公司好自学网站建设作业
  • 网站有备案是正规的吗服装网站目标
  • 免费建域名网站Dw做网站怎么加logo
  • 免费制作婚介网站做网站开发 用什么软件
  • 无锡做设计公司网站影视制作宣传片公司
  • 连云港做网站企业wordpress eocms
  • 网站后台编程语言wordpress 页面 按钮
  • 南浔做网站广东建设行业招聘 什么网站
  • 青海省住房和城乡建设厅 网站首页wordpress媒体库查询页
  • 和田地区建设局网站想做棋牌网站怎么做
  • 个人网站建设 开题报告php手机网站
  • 广州建设网站下载网站开发现在用什么
  • 做设计接单的网站网站首页图片切换
  • 公司建网站多少钱晋江文学城怎么用自己的网站做链轮
  • 包头市住房和城乡建设局网站泰安百度网站建设
  • 国外做测评的网站有哪些百度权重什么意思
  • 网站建设后台管理登陆代码如今流行的网站建设
  • 免费检测网站seo许昌市住房城乡建设局网站
  • 小企业网站建设5000块贵吗工程公司管理制度
  • 景区门户网站建设的必要性dw网页制作教程局中对齐
  • 网站运营与管理的对策淘宝联盟 做网站
  • 公司做网站一定要钱吗网络关键词
  • 黄岐建网站网站无障碍建设报告
  • 医药网站建设中图片wordpress视频模块
  • 建设信用卡银行积分商城网站公司域名备案网站名称
  • 计算机网站开发项目深圳4a广告公司有哪些
  • cnnic网站洛阳400电话洛阳网站seo
  • 雄安建站服务书画院网站建设
  • 百度网站入口贵州门户网站建设