深圳市做网站知名公司有哪些,wordpress文章图片不居中,网络公司 建站 官方网站,客户管理crm系统哪家好一、业务场景 我们在工作中经常会到往数据库里插入大量数据的工作#xff0c;但是既需要保证数据的一致性#xff0c;又要保证程序执行的效率。因此需要在多线程中使用事务#xff0c;这样既可以保证数据的一致性#xff0c;又能保证程序的执行效率。但是spring自带的Trans…一、业务场景 我们在工作中经常会到往数据库里插入大量数据的工作但是既需要保证数据的一致性又要保证程序执行的效率。因此需要在多线程中使用事务这样既可以保证数据的一致性又能保证程序的执行效率。但是spring自带的Transactional注解无法满足多线程间的事务一致性因为这几个事务执行的线程不同无法保持数据的一致性。
二、解决方案 我的解决方案参考分布式事务2PCTwo-phase commit protocol各个线程需要等待所有的线程执行完成后才能进行下一步操作在使用线程池执行任务时如果线程池的最大线程数小于任务列表的数量就会发生“死锁”即获取到线程的任务阻塞等待没有获取线程的任务执行完成而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”超时机制会发送回滚命令线程池收到后进行回滚但这种情况任务始终无法提交再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷可以先看下面具体代码实现再来结合这段文字思考。 1、工具类代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** author poxiao* create 2023-01-05 22:22* p* 多线程事务管理器* 基于分布式事务思想采用2PCTwo-phase commit protocol协议* 解决基于线程池的多线程事务一致性问题*/
Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩(第一阶段的准备阶段)当所有子线程准备完成时除“提交/回滚”操作以外的工作都完成countDownLatch的值为0*/private CountDownLatch oneStageLatch null;/*** 二阶段门闩(第二阶段的执行执行)主线程将不再等待子线程执行直接判定总的任务执行失败执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch new CountDownLatch(1);/*** 是否提交事务默认是true当任一线程发生异常时isSubmit会被设置为false即回滚事务*/private final AtomicBoolean isSubmit new AtomicBoolean(true);/*** 构造方法* param transactionManager 事务管理器* param timeout 超时时间* param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager transactionManager;this.timeout timeout;this.unit unit;}/*** 线程池方式执行任务可保证线程间的事务一致性* param runnableList 任务列表* param executor 线程池* return*/public boolean execute(ListRunnable runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread new Thread() {Overridepublic void run() {// 如果别的线程执行失败则该任务就不需要再执行了if (!isSubmit.get()) {log.info(当前子线程执行中止因为线程事务中有子线程执行失败);oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常设置回滚isSubmit.set(false);log.error(线程{}:业务发生异常,执行体:{}, Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成监控是否有异常有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交可能是子线程出现异常也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info(线程{}:事务提交成功,执行体:{}, Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info(线程{}:事务回滚成功,执行体:{}, Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者当第一阶段所有参与者准备完成oneStageLatch的计数为0* 主线程发起第二阶段执行阶段提交或回滚,根据*/try {// 主线程等待所有线程执行完成超时时间设置为五秒oneStageLatch.await(timeout, unit);long count oneStageLatch.getCount();System.out.println(countDownLatch值 count);// 主线程等待超时子线程可能发生长时间阻塞死锁if (count 0) {// 设置为回滚isSubmit.set(false);log.info(主线线程等待超时,任务即将全部回滚);}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果是否执行成功事务提交即为执行成功事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* param size 任务数量*/private void innit(int size) {oneStageLatch new CountDownLatch(size);}
}
2、业务代码
1线程池参数
我这里采用自定义线程池线程池参数如下
Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE CPU_COUNT * 4;private static final int MAX_COUR_SIZE CPU_COUNT * 8;// 接下来配置一个bean配置线程池。Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量这里设置成最大线程数的四倍threadPoolTaskExecutor.setThreadNamePrefix(thirdParty-thread);// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}2任务业务正常无异常抛出时正常提交事务情况
public Result? testTransaction() throws SQLException {ListUser users new LinkedList();User user new User();user.setName(1111);users.add(user);User user1 new User();user1.setName(2222);users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);ListRunnable runnableList new ArrayList();users.forEach((x) - {runnableList.add(new Runnable() {Overridepublic void run() {System.out.println(当前线程 Thread.currentThread().getName() 插入数据 x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}
执行时的日志 执行成功后数据库多次了两条数据 3展示出现异常任务时回滚事务情况 public Result? testTransaction() throws SQLException {ListUser users new LinkedList();User user new User();user.setName(1111);users.add(user);User user1 new User();user1.setName(2222);users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);ListRunnable runnableList new ArrayList();//模拟任务出现异常runnableList.add(() - {int a 10 / 0;});users.forEach((x) - {runnableList.add(new Runnable() {Overridepublic void run() {System.out.println(当前线程 Thread.currentThread().getName() 插入数据 x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}
执行时的日志 数据库没有新增的数据 参考文章
Spring多线程事务解决方案-CSDN博客 两阶段VS三阶段提交协议_两阶段提交-CSDN博客
详解Spring多线程下如何保证事务的一致性-51CTO.COM
多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客