网店网站建设规划方案,网站数据迁移教程,手机网站开发 宽度,网站专题模板背景
某项目封装了 Kafka 消费者 API#xff0c;根据传递的消费者线程数#xff0c;创建 N 个消费者线程同时消费对应 topic 的数据#xff0c;并在线程启动后收集到全局列表中#xff0c;方便在程序调用 stop 流程时逐个停止。
主控类在创建 Kafka 消费线程时使用了 Cou…背景
某项目封装了 Kafka 消费者 API根据传递的消费者线程数创建 N 个消费者线程同时消费对应 topic 的数据并在线程启动后收集到全局列表中方便在程序调用 stop 流程时逐个停止。
主控类在创建 Kafka 消费线程时使用了 CountDownLatch 将启动的线程收集到全局列表并阻塞等待所有线程初始化完成消费者线程指定 Kafka 订阅方法后对计数器减一然后轮询消费 Kafka 的数据。
近日因某场景下不想消费某类 topic 数据而将 topic 设置为空预想其他几类的 topic 数据应该正常消费结果发现第一个 topic 设置为空后其他几类消费线程都没有正常启动。
封装逻辑
程序封装了一个 KafkaConsumerThread 类根据配置的线程数启动 N 个线程消费目标 topic 数据基本代码如下 用 CountDownLatch 控制消费者线程的初始化本意是在 run 方法执行的时候就对计数器减一标识本消费线程初始化完成的。
根据线程数创建 CountDownLatch 计数器。订阅 Kafka topic。计数器减一。记录启动的线程对象。主程序阻塞等待消费线程 run 方法执行到计数器减一。
问题排查
有一个 topic 设置为空后对应的消费者线程启动报异常了
java.lang.IllegalArgumentException:
Topic collection to subscribe to
cannot contain null or empty topic一个消费异常但其他消费者没有启动为什么呢理论上它们并不相干才对。
打印程序堆栈信息发现程序阻塞了
封装的 Kafka API 是顺次启动几类 topic 消费线程的因为启动第一个 topic 消费线程时因 topic 设置为空consumer.subscribe(config.getTopics()) 这句代码异常了其后面的 countDown 未执行而引发阻塞。
第一个 topic 消费启动异常后程序因调用了 countDownLatch.await() 而阻塞了因此后面代码就不执行了继而程序呈现异常状态。
基础巩固
CountDownLatch 是 JUC 包同步工具类用于协调多个线程。它允许一个或多个线程等待直到其他线程中执行的一组操作完成。CountDownLatch 通过一个计数器来实现该计数器由线程递减计数器值到达零后所有调用过 await 方法的线程将解除阻塞状态。
创建new CountDownLatch 对象时指定计数器的初始值。阻塞一个或多个线程调用 await 方法进入阻塞等待状态直到计数器的值变为零。倒计数其他线程在完成各自任务后调用 countDown 方法将计数器的值减一。当计数器的值减到零时所有在 await 上等待的线程会被唤醒继续执行。
启示录
同步锁使用不当容易引发死锁问题阿里开发者规范在 countDown() 方法处有一个提示 这个提示也不准确因为这个是一个 Kafka 消费线程它以线程中断状态为标识循环从 Kafka 中 poll 数据处理的所以不能在 finally 中调用。但是也不能在 subscribe 之后调用因为该语句会异常。
到底应该在哪里对计数器减一才能保证即使异常也能正常减一呢有两个方法
简化处理在线程的 run 方法第一行调用。稍微复杂一点添加一个开关在 countDown 后面设置为 true然后再 finally 里面判断如果这个开关的代码没有走到说明后面异常了就在对计数器再补充减一 其实这个问题产生的根源是没有对 topic 进行判空如果源头控制了就不会出现这种异常了。
PS真心再推荐一下 utools 工具整理本文时堆栈信息是从七天前的剪切板里面找出来的 对我这种一天不知道复制粘贴多少次的人来说这个工具真的很好用啊