wordpress 微信 插件开发,防疫优化措施,室内设计网站都有哪些公司,物联网平台开发1.Kafka中主题和分区的概念
1.主题Topic
主题-topic在kafka中是一个逻辑的概念#xff0c;kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费者消费
但是有一个问题#xff0c;如果说这个topic中的消息非常非常多#xff0c;多到需要几T来存#xff0c;因…1.Kafka中主题和分区的概念
1.主题Topic
主题-topic在kafka中是一个逻辑的概念kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费者消费
但是有一个问题如果说这个topic中的消息非常非常多多到需要几T来存因为消息是会被保存到10g日志文件中的。为了解决这个文件过大的问题kafka提出了Partition分区的概念
2.分区Partition
1分区的概念
通过partition将一个topic中的消息分区来存储。这样的好处有多个:
分区存储可以解决统一存储文件过大的问题提供了读写的吞吐量:读和写可以同时在多个分区中进行
2创建多分区的主题
[rootk8s-master bin]# kafka-topics.sh --create --bootstrap-server 10.0.8.2:9092 --replication-factor 1 --partitions 2 --topic test1分区的作用:
可以分布式存储可以并行写
实际上是存在data/kafka-logs/test-0 和 test-1中的0000000.log文件中,且消费者定期将自己消费分区的ofset提交给kafka内部 topic
小细节:
00000.og:这个文件中保存的就是消息__consumer_offsets-49: kafka内部自己创建了_consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性默认设置了50个分区。 提交到哪个分区:通过hash函数:hash(consumerGroupld)%consumer offsets主题的分区数。提交到该主题中的内容是:key是consumerGroupldtopic分区号value就是当前offset的值文件中保存的消息默认保存7天。七天到后消息会被删除。
2.kafka集群操作
kafka集群搭建之前博客有介绍且部署文档百度搜索很多不过多赘述
1.搭建kafka集群(三个broker)
创建三个server.properties文件.
#0 1 2
broker.id2
// 9092 9093 9094
listenerSPLAINTEXT://192.168.65.60:9094//kafka-logs kafka-logs-l kafka-logs-2log.dir/usr/local/data/kafka-logs-2
通过命令来启动三台broker.
kafka-server-start.sh-daemon ../config/server.properties
kafka-server-start.sh-daemon ../config/serverl.properties
kafka-server-start.sh-daemon ../config/server2.properties校验是否启动成功
进入到zk中查看/brokers/ids中过是否有三个znode(012)2. 副本的概念
在创建主题时除了指明了主题的分区数以外还指明了副本数那么副本是一个什么概念呢? 副本是为了为主题中的分区创建多个备份多个副本在kafka集群的多个broker中会有一个副本作为leader其他是follower。
leader: kafka的写和读的操作都发生在leader上。leader负责把数据同步给folower。当leader挂了经过主从选举从多个follower中选举产生一个新的leaderfollower 接收leader的同步的数据isr: 可以同步和已同步的节点会被存入到isr集合中。这里有一个细节:如果isr中的节点性能较差会被提出isr集合
此时broker、主题、分区、副本 这些概念就全部展现了; 集群中有多个broker创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储)可以为分区创建多个副本不同的副本存放在不同的broker里。
3.关于集群消费
向集群发送消息:
kafka-console-consumer.sh--bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.idtestGroupl --topic my-replicated-topic从集群中消费消息
kafka-console-producer.sh--broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topicmy-replicated-topic指定消费组来消费消息
kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.idtestGroup1 --topicmy-replicated-topic分区分消费组的集群消费中的细节
一个partition只能被一个消费组中的一个消费者消费目的是为了保证消费的顺序性但是多个partion的多个消费者消费的总的顺序性是得不到保证的那怎么做到消费的总顺序性呢?partition的数量决定了消费组中消费者的数量建议同一个消费组中消费者的数量不要超过partition的数量否则多的消费者消费不到消息如果消费者挂了那么会触发rebalance机制(后面介绍)会让其他消费者来消费该分区
3.kafka的java客户端-生产者的实现
1.生产者的基本实现
引入依赖
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdAversion2.4.1/version
/dependency具体实现
package com.qf.kafka;
import org.apache.kafka.clients.producer**
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionExceptioon;public class MySimpleProducer {private final static String TOPIC_NAME my-reeplicated-topic;public static void main(String[] args) throws ExecutiionException, InterruptedException//1.设置参数Properties propsnew Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG172.16.253.38:9092,172.16.253.38:9093,1722.16.253.38:9094);//把发送的key从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())//把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建生产消息的客户端,传入参数ProducerString, String producer new KafkaProducerString, String(props)//3.创建消息//key:作用是决定了往哪个分区上发,value:具体要发送的消息内容ProducerRecordString, String producerRecord new ProducerRecord(TOPIC_NAME,mykeyvalue,hellokafka);//4.发送消息,得到消息发送的元数据并输出 --同步发送RecordMetadata metadata producer.send(producerRecord).get()System.out.println(同步方式发送消息结果: topiic- metadata.topic() | partition- metadata.partition() | offset- metadata.offset())}2.生产者同步发消息 如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。
RecordMetadata metadata producer.send(producerRecord).get()
System.out.println(同步方式发送消息结果: topiic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset())3.生产者的异步发送消息
异步发送,生产者发送完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法。
//5.异步发送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata,Exception exception)if (exception ! null) {System.err.println(发送消息失败:exception.getsStackTrace())}if (metadata ! null) {System.out.println(异步方式发送消息结果: topic- metadata.topic() | partition-metadata.partition() | offset- metadata.offset());}}
});4.生产者中的ack的配置
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有3个配置:
ack0kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息的,效率是最高的ack1(默认):多副本之间的leader已经收到消息,并把消息写/入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的ack-1/all。里面有默认的配置min.insync.replicas2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后, 才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差。
下面是关于ack和重试(如果没有收到ack,就开启重试)的配置 props.put(ProducerConfig.ACKS_CONFIG, 1);/* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
接收者那边做好消息接收的幂等性处理 */props.put(ProducerConfig.RETRIES_CONFIG, 3);//重试间隔设置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);5.关于消息发送的缓冲区 kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 161384)如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到到broker
props.put(ProducerConfig.LINGER_MS_CONFIG,10);3.Java客户端消费者的实现细节
1.消费者的基本实现
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MySimpleConsumer {
private final static String TOPIC_NAME my-replicated-topic;
private final static String CONSUMER_GROUP_NAME testGroup;public static void main(String[] args){Properties propsnew Properties();props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 172.16.253.38:9092,172.16.253.38:9093,172.16.253.388:9094);//消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONPIG, SStringDeserializer.class.getName())//创建一个消费者的客户端KafkaConsumerString, String consumer new KafkaconsumerString,String(props)//消费者订阅主题列表consumer.subscribe (Arrays.asList(TOPIC_NAME)) ;while (true) {/**poll()API是拉取消息的长轮询*/ConsumerRecordsString, String records coonsumer.poll(Duration.ofMillis(1000))for (ConsumerRecordString,Stringrecord:records){System.out.printf(收到消息:partition %d, offset %d, key is, value %s%n, record.partition(),record.offset(),record.key(), record.value())}}
}2.关于消费者自动提交和手动提交offset
1) 提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组消费的某个主题消费的某个分区及消费的偏移量,这样的信息是交到集群的_consumer_offsets主题里面。
2) 自动提交
消费者poll消息下来以后就会自动提交offset
// 是否自动提交offset,默认就是true
props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交offset的间隔时间
props.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);3) 手动提交
需要把自动提交的配置改成false
props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)手动提交又分成了两种:
手动同步提交 在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
while(true) {/**poll()API是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000))for (ConsumerRecordString, String record : records) {System.out.printf(收到消息:partition1d,offsettd,key ts,value tsn,record.partition(),record.offset(),record.key(),record.value())}//所有的消息已消费完if(records.count()0){//有消息//手动同步提交offset,当前线程会阻塞直到offset提交房成功//一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();//提交成功}}
}异步提交
while (true) {/**poll()API是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000))for (ConsumerRecordString, String record : records) {System.out.printf(收到消息:partition %d,offset %d,key %s,value %s%n,record.partition(),record.offset(),record.key(),record.value())}//所有的消息已消费完if(records.count()0){//手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback(){Overridepublic void onComplete(MapTopicPartition,OffsetAndMetadata offsets, Exception exception){if(exception!null){System.err.println(Commit failed for offsets);System.err.println(Commit failed exception: exception.getStackTrace())}}
}3.长轮询poll消息
默认情况下,消费者一次会poll500条消息。 //一次pol1最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put (ConsumerConfig.MAX_POLL_RECORDS_COMNFIG, 500);代码中设置了长轮询的时间是1000毫秒
while(true){* poll()API是拉取消息的长轮询ConsumerRecordsString, String records conisumer.poll(Duration.ofMillis(1000))for (ConsumerRecordString, String record : records)System.out.printf(收到消息:partition %d,offset %d, key %s, value %s%n, record.partition(),record.offset(),record.key(), record.vaalue())}意味着: 如果一次poll到500条,就直接执行for循环如果这一次没有poll到500条。且时间在1秒内,那么长轮询继续poll,要么到500条,要么到1s如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环 如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制 会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
//一次pol1最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CODNFIG,500)
//如果两次pol1的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MIS_CONFIG, 30 * 1000);