淮安哪里做网站,平面设计培训学校一年学费,宠物用品wordpress模板,飞翔时代网站建设前言
在实际开发过程中#xff0c;如果使用Kafka处理超大数据量(千万级、亿级)的场景#xff0c;Kafka消费者的消费速度可能决定系统性能瓶颈。
实现方案
为了提高消费者的消费速度#xff0c;我们可以采取以下措施#xff1a;
将主题的分区数量增大#xff0c;如 20如果使用Kafka处理超大数据量(千万级、亿级)的场景Kafka消费者的消费速度可能决定系统性能瓶颈。
实现方案
为了提高消费者的消费速度我们可以采取以下措施
将主题的分区数量增大如 20通过concurrency将消费者的消费线程数增大到 102个pod)提高消息处理的并发能力。将每次批量拉取消息的数量max.poll.records增大到 500提高单次处理消息的数量。将消息切分成批次将单个批次的数据处理业务逻辑放进线程池中异步进行提高并发处理消息的速度。将异步线程池的拒绝模式调整为 CallerRunsPolicy这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时新的任务将由提交任务的线程即调用者线程来执行。否则在消息量特别大的情况下很可能会因为线程池任务队列满了而丢失数据。将异步线程池的队列容量设置为 0这样意味着所有任务必须立即由线程池中的线程来处理减少在队列中的等待时间。在数据上报的时候进行幂等性验证防止重复上报数据。
Component
public class OrderConsumer {Resource(name execThreadPool)private ThreadPoolTaskExecutor execThreadPool;KafkaListener(id record_consumer,topics record,groupId g_record_consumer,concurrency 10,properties {max.poll.interval.ms:300000, max.poll.records:500})public void consume(ConsumerRecordsString, String records, Acknowledgment ack) {execThreadPool.submit(()- {// 业务逻辑});ack.acknowledge();}}ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现用于管理和执行多线程任务。它是 TaskExecutor 接口的实现提供了在 Spring 应用程序中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor主要特点 线程池配置 ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。 线程创建和销毁 它会根据任务的需求自动创建和销毁线程避免不必要的线程创建和销毁开销。 线程复用 线程池中的线程可以被复用从而减少线程创建的开销。 队列管理 当线程池达到最大线程数时新任务会被放入队列中等待执行。 拒绝策略 当线程池已满并且队列也已满时可以配置拒绝策略来处理新任务的方式。 RejectedExecutionHandler 是 Java 线程池的一个重要接口用于定义当线程池已满并且无法接受新任务时如何处理被拒绝的任务。当线程池的队列和线程都已满新任务就会被拒绝执行这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。 在 Java 中有几种内置的 RejectedExecutionHandler 实现可供选择每种实现都有不同的拒绝策略 AbortPolicy默认策略 这是默认的拒绝策略它会抛出一个 RejectedExecutionException 异常表示任务被拒绝执行。 CallerRunsPolicy 当线程池已满时将任务返回给提交任务的调用者Caller。这意味着提交任务的线程会尝试执行被拒绝的任务。 DiscardPolicy 这个策略会默默地丢弃被拒绝的任务不会产生任何异常。 DiscardOldestPolicy 这个策略会丢弃队列中最老的任务然后尝试将新任务添加到队列中。 除了这些内置的策略你还可以实现自定义的 RejectedExecutionHandler 接口以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略比如记录日志、通知管理员、重试等。
Configuration
public class ThreadPoolConfig {Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool new ThreadPoolTaskExecutor();pool.setCorePoolSize(50); // 核心线程数pool.setMaxPoolSize(10000); // 最大线程数pool.setQueueCapacity(0); // 等待队列sizepool.setKeepAliveSeconds(60); // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60); // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy()); // 拒绝策略return pool;}
}通过以上方案我们可以提高消费侧的TPS同时杜绝重复上报的现象极大提高数据准确性和用户体验。