怎样做软件开发,seo排名诊断,网站备案更换主体,wordpress用户的区别目录 一、Kafka是什么#xff1f;消息系统#xff1a;Publish/subscribe#xff08;发布/订阅者#xff09;模式相关术语 二、初步使用1.yml文件配置2.生产者类3.消费者类4.发送消息 三、减少分区数量1.停止业务服务进程2.停止kafka服务进程3.重新启动kafka服务4.重新启动业… 目录 一、Kafka是什么消息系统Publish/subscribe发布/订阅者模式相关术语 二、初步使用1.yml文件配置2.生产者类3.消费者类4.发送消息 三、减少分区数量1.停止业务服务进程2.停止kafka服务进程3.重新启动kafka服务4.重新启动业务服务 参考文章 一、Kafka是什么
Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费有一套完善的消息存储机制确保数据高效安全且持久化Kafka作为一个集群运行在一个或多个服务器上可以跨多个机房当某台故障时生产者和消费者转而使用其他的Kafka。
消息系统Publish/subscribe发布/订阅者模式
1.消息发布者发布消息到主题中有多个订阅者消费该消息。 2.当发布者发布消息时不管是否有订阅者都不会报错。 3.一定要先有消息发布者后有消息订阅者。
相关术语
1.BrokerKafka服务器负责创建topic、消息存储和转发。 2.Topic消息类别主题用于区分消息。 3.Partition分区真正的存储数据单元。每个Topic包含一个或多个分区用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量) 4.offset分区消息此时被消费的位置。分区中消息的唯一id。 5.Producer消息生产者。 6.Consumer消息消费者。 7.Consumer Group消费者组。由消费不同的分区的多个消费者实例组成共用同一个Group-id。 8.Message消息由offset分区上的消息id、MessageSize消息内容data大小、data消息具体内容组成。
二、初步使用
1.yml文件配置
spring:kafka:bootstrap-servers: http://127.0.0.1:9002properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required usernamekafka password123456;producer:# 发生错误后消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。# acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理# latest默认值在偏移量无效的情况下消费者将从最新的记录开始读取数据在消费者启动之后生成的记录# earliest 在偏移量无效的情况下消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量默认值是true,为了避免出现重复数据和数据丢失可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费者超时时间 6秒properties:max:poll:interval:ms: 6000listener:# 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】concurrency: 5#listner负责ack每调用一次就立即commitack-mode: manual_immediatemissing-topics-fatal: false2.生产者类
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;Component
Slf4j
public class KafkaProducer {// 消费者组public static final String TOPIC_GROUP2 topic.group2;Autowiredprivate KafkaTemplateString, Object kafkaTemplate;public void send(String topic,Object obj) {String obj2String JSONObject.toJSONString(obj);log.info(准备发送消息为{}, obj2String);//发送消息ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, obj);future.addCallback(new ListenableFutureCallbackSendResultString, Object() {Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(topic - 生产者 发送消息失败 throwable.getMessage());}Overridepublic void onSuccess(SendResultString, Object stringObjectSendResult) {//成功的处理log.info(topic - 生产者 发送消息成功 stringObjectSendResult.toString());}});}
}
3.消费者类
使用注解的方式来创建主题和分区。
package com.lezhi.szxy.oa.core.kafka;import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;Component
Slf4j
public class KafkaConsumer {Resourceprivate addService addService;Resourceprivate RedisLockUtil redisLockUtil;ResourceRedissonClient redissonClient;ResourceRedisTemplateString,String redisTemplate;private static final String ADD_LOCK_PREFIX ADD_LOCK_PREFIX;ObjectMapper objectMapper new ObjectMapper();/*** 初始化主题分区* return*/Beanpublic NewTopic batchTopic() {log.info(初始化主题分区batchTopic : add_topic分区5副本数1 );return new NewTopic(add_topic, 5, (short) 1);}/*** 添加消息* param ack*/KafkaListener(topics add_topicC,groupId KafkaProducer.TOPIC_GROUP2)public void handleAddMessage(ConsumerRecord?, ? record, Acknowledgment ack, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info(add_topic-队列消费端 topic:{}, 收到消息, topic);Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();try {ParamImport param objectMapper.readValue(String.valueOf(msg) , ParamImport .class);String fullKey redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));if(redisLockUtil.getLock(fullKey , 10000)){// 业务代码...log.info(add_topic 消费了 Topic: topic ,Message: String.valueOf(msg));}else {log.info(add_topic 已经被消费 Topic: topic ,Message: String.valueOf(msg));}ack.acknowledge();} catch (Exception e) {e.printStackTrace();log.error(解析 OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC 数据异常);}}}
}配置消费端主题分区启动后查看kafkaadd_topic主题生成五个分区实例 注意一个消费线程可以对应若干分区。但是为了保证数据的一致性同一个分区同时只能备一个消费者实例消费所以超过分区数量的消费者实例个数是多余的会被闲置。
将消费者实例消费线程比为一个人分区消息相当于一个办公位。办公位数人数时哪个办公位有消息待消费人就到哪一个工位处理消息。当办公位数人数时后面的人数需要排队等待前面的人离开才可以进入办公位消费。 当人再多时只有一个办公位人也得排队办公属于同步消费当办公位有多个时才能实现多人同时操作。
单机kafka分区最好不超过5。默认使用轮询策略。
4.发送消息
public void addTopicMsg(ParamImport param) throws ServiceException {String json;try {json objectMapper.writeValueAsString(param);} catch (JsonProcessingException e) {log.error(addTopicMsg-发送消息kafka消息转换失败{}, e);throw new ServiceException(发送失败);}log.info(addTopicMsg-发送消息发送kafka请求);kafkaTemplate.send(add_topic, json);}三、减少分区数量
上文中我们使用了new NewTopic()的方式创建分区分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区达成减少分区的目的。
1.停止业务服务进程
停止业务服务进程使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。
2.停止kafka服务进程
停止kafka服务进程清空分区、主题等数据。
3.重新启动kafka服务
4.重新启动业务服务
此时就会根据修改后的分区设置重新生成分区。
参考文章
【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic SpringBootKafka之如何优雅的创建topic 想弄明白Kafka到底是什么吗看完这篇你就知道了(概念、数据存储、生产者、消费者) 图解Kafka看本篇就足够啦