沈阳网 沈阳网站,上海临平路网站建设,国际进出口贸易网站,网站开发与网站设计区别kafka的处理的一些问题 消费者客户端不但没有背压而且内存充足#xff0c;但产生的消费延迟越来越大在Kafka的Leader副本宕机时 消费者客户端不但没有背压而且内存充足#xff0c;但产生的消费延迟越来越大 比如我们这个kakfa集群一共有3个Broker节点
TOp1有5个分区#xf… kafka的处理的一些问题 消费者客户端不但没有背压而且内存充足但产生的消费延迟越来越大在Kafka的Leader副本宕机时 消费者客户端不但没有背压而且内存充足但产生的消费延迟越来越大 比如我们这个kakfa集群一共有3个Broker节点
TOp1有5个分区P0、P1、P2、P3、P4这些分区分布在3个不同Broker节点上而我们创建了包含两个消费者的消费者组。
消费者1同时消费P0、P1和P4分区的数据。 消费者2消费P2和P3分区的数据 看到消费延迟大家想去就是增加消费者数量和分区数量让我消费者数量增加到和Partition的数量一样多这样每个消费者就可以仅仅消费一个分区的数据可以达到消费能力1最大化 。
了解消费者背后的执行原理。该如何优化消费者消费数据的吞吐量。 消费者在调用poll()方法到远端的Broker节点拉去数据时。优先从nextInLineFetch中获取数据这个nextInLineFetch就是数据接收缓冲区 如果数据接收缓冲区中没有待消费的数据这个时候才会调用SendFetches方法到Broker端拉去数据
kafka是向响应的Broker节点发送拉取数据的网络请求我们都知道网路请求对于内存请求是比较慢的因此这些拉取数据的网络请求是由Broker端异步执行的异步执行拉取数据请求就必须通过future监听数据是否已经准备好当数据准备好之后会异步将数放到数据接收缓存completedFetches中 这是因为IO请求比较耗时所以尽量一次批量拉取更多的数据放到缓存中这样就可以降低发起网络的IO次数进而提升消费能力现在缓冲区completedFetches中已经有数据了就会把completedFetches中队头的数据解析到nextInLineFetch中 解析成消费者可以消费的数据格式然后清除completedFetches中队头的元素。 随后如果有消费调用poll()方法拉取数就会优先从nextInLineFetch中获取数据注意消费者客户端每次获取的数据量是由参数 max.poll.records控制的默认值是500。 相当于每次从nextInLineFetch获取500条数据并返回给消费者。 当消费者消费完500条数据之后会再次调用poll()方法 再拉取500条数据 当消费者把nextlnLineFetch缓存的数据都消费完之后相当于再调用poll()方式时nextInLineFetch已经咩有待消费的数据了这个时候就会把completedFetch的新的队头元素解析解析成nextInLineFetch。可以适当的将该参数增加到16KB或者32KB
而参数fetch.max.bytes标识每次poll操作从Broker端最多拉取数据量默认值时50MB如果我们内存资源充足建议增大fetch.max.bytes增加到200MB以上.参数max.partition.fetch.bytes的默认值是1MB。表示每次poll返回的每个Broker节点上每个分区的最大字节数。因此我们再回头看这个例子。
那么每次从Broker-102上最多能拉取到的数据也就是1MB。数据量未免太小了有的时候刚消费完1MB,就得再次经过一次网络IO拉取下一批数据这可能是造成消费延迟的主要原因。大家可以根据自己的Topic的实际分区数来合理设置每个分区每次拉取数据的大小因此建议可以将每个分区每次拉取数据的大小设置成10MB以上。 max.partition.fetch.bytes增加到10MB以上
但有的时候只是提高每个分区每次最大拉取到的数量也是不够的因为每个Broker最多返回的最大字节数由参数fetch.max.bytes控制这个参数的默认值是50MB有时候也可以适当的提升这个参数的默认值比如增加到200MB。 这样就能再本地尽量缓存更多的数据以提升消费者消费数据的能力降低消费延迟主要适用于内存充足你消费能力不足的场景
消费客户端根本不能修改啦这个参数因为设置了静态的
在Kafka的Leader副本宕机时