西客站网站建设,搭建游戏,wordpress建站怎么样,网站建设实训文章目录引入相关资料环境准备分页查询处理#xff0c;减少单次批量处理的数据量级补充亿点点日志#xff0c;更易观察多线程优化查询_切数据版多线程_每个线程都分页处理引入
都说后端开发能顶半个运维#xff0c;我们经常需要对大量输出进行需求调整#xff0c;很多时候…
文章目录引入相关资料环境准备分页查询处理减少单次批量处理的数据量级补充亿点点日志更易观察多线程优化查询_切数据版多线程_每个线程都分页处理引入
都说后端开发能顶半个运维我们经常需要对大量输出进行需求调整很多时候sql语句已经无法吗满足我们的需求此时就需要使用我们熟悉的 java语言结合单元测试写一些脚本进行批量处理。
相关资料
案例代码获取 视频讲解
利用分页处理数据量较大的情况补充亿点点日志
环境准备
可直接使用我分享的工程 案例代码获取 我这里准备了一个10000条数据的的user表和对应的一个springboot工程:
Slf4j
SpringBootTest(classes MyWebDemoApplication.class,// 配置端口启动否则获取失败webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT)
public class BatchDemo {Autowiredprivate UserMapper userMapper;}分页查询处理减少单次批量处理的数据量级
当我们的数据量很大并且单个对象也很大时如果一次查出所有待处理的数据往往会把我们的对象给撑爆这时我们可以利用分页的思想将数据拆分分页去处理
已知数据量总数的分页批处理模板
/*** 分页查询处理减少单次批量处理的数据量级* 当前已知数据量总数*/
Test
public void test1() {// 预定义参数int page 0;int pageSize 5000;// 获取总数Integer total userMapper.selectCount(null);// 计算页数int pages total / pageSize;if (total % pageSize 0) {pages;}// 开始遍历处理数据for (; page pages; page) {ListUser users userMapper.selectList(Wrappers.UserlambdaQuery().last(String.format(LIMIT %s,%s, page * pageSize, pageSize)));users.forEach(user - {/// 进行一些数据组装操作});/// 批量 修改/插入 操作User lastUser users.get(users.size() - 1);log.info(最后一个要处理的用户的ID为{}名字{}, lastUser.getId(), lastUser.getNickName());}}上面展示的是已知数据量总数的情况有时候我们是未知总量的此时可以采用如下写法
未知数据量总数的分页批处理模板
/*** 未知总数的写法*/
Test
public void test2() {// 预定义参数int page 0;int pageSize 500;// 开始遍历处理数据for (; ; ) {ListUser users userMapper.selectList(Wrappers.UserlambdaQuery().last(String.format(LIMIT %s,%s, (page) * pageSize, pageSize)));users.forEach(user - {/// 进行一些数据组装操作});/// 批量 修改/插入 操作if (CollUtil.isNotEmpty(users)) {User lastUser users.get(users.size() - 1);log.info(最后一个要处理的用户的ID为{}名字{}, lastUser.getId(), lastUser.getNickName());}if (users.size() pageSize) {break;}}
}这里每次输出循环的最后一条数据帮助我们验证结果
补充亿点点日志更易观察
良好的日志输出能够帮助我们实时了解脚本的运行情况很多时候每次循环内部都会处理一个耗时操作这里用已知总数的情况添加日志如下
起始展示待处理数据总量总页数每页条数每页开始展示当前进度每页结束暂时耗时已处理条数失败数最后一条数据信息等循环内部每分钟输出一次日志处理完毕输出总耗时总条数失败数失败数据id集合等
/*** 补充亿点点日志*/
Test
public void test3() {// 预定义参数int page 1;int pageSize 500;// 获取总数Integer total userMapper.selectCount(null);// 计算页数int pages total / pageSize;if (total % pageSize 0) {pages;}// 总处理条数int count 0;// 成功处理数int countOk 0;// 处理失败记录ListInteger wrongIds new ArrayList();// 已过分钟数int countMinute 1;long start System.currentTimeMillis();// 开始遍历处理数据log.info( 开始批量处理数据 );log.info(待处理条数{}, total);log.info(总页数{}, pages);log.info(每页条数{}, pageSize);for (; page pages; page) {log.info( 当前进度{}/{} , page, pages);ListUser users userMapper.selectList(Wrappers.UserlambdaQuery().last(String.format(LIMIT %s,%s, (page - 1) * pageSize, pageSize)));for (User user : users) {/// 进行一些数据组装操作if (user.getId() % 99 0) {wrongIds.add(user.getId());} else {countOk;}count;/// 模拟耗时操作try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}// 每分钟输出一次日志if ((System.currentTimeMillis() - start) / 1000 / 60 countMinute) {log.info(已耗时{} s, (System.currentTimeMillis() - start) / 1000);log.info(当前总条数{}, count);log.info(处理成功数{}, countOk);log.info(处理失败数{}, wrongIds.size());log.info(当前处理用户信息{} : {}, user.getId(), user.getNickName());countMinute;}}/// 批量 修改/插入 操作log.info(已耗时{} s, (System.currentTimeMillis() - start) / 1000);log.info(当前总条数{}, count);log.info(处理成功数{}, countOk);log.info(处理失败数{}, wrongIds.size());if (CollUtil.isNotEmpty(users)) {User user users.get(users.size() - 1);log.info({} : {}, user.getNickName(), user.getId());}}log.info( 运行完毕 );log.info(总耗时{} s, (System.currentTimeMillis() - start) / 1000);log.info(总处理条数{}, count);log.info(处理成功数{}, countOk);log.info(处理失败数{}, wrongIds.size());log.info(处理失败数据id集合{}, wrongIds);
}效果如下
多线程优化查询_切数据版
多核CPU才能真正意义上的并行不然就是宏观并行微观串行 o(╥﹏╥)o大家得看下自己的cpu当然如果有很多阻塞IO单核进行切换线程也是能够提高性能的
这里开5个线程将数据按线程数进行拆分代码如下
/*** 多线程优化查询【切数据版 按线程数量切割数据直接处理】* 需要程序进行大量计算* 数据库能承受较大并发* 多核CPU才能真正意义上的并行不然就是宏观并行微观串行 o(╥﹏╥)o*/
Test
public void test4() {// 预定义参数int threadNum 5;long start System.currentTimeMillis();// 获取总数Integer total userMapper.selectCount(null);// 创建线程池这里为了简便操作直接用Executors创建推荐自行集成配置线程池ExecutorService executorService Executors.newFixedThreadPool(threadNum);// 设置信号标用于等待所有线程执行完CountDownLatch countDownLatch new CountDownLatch(threadNum);// 计算线程需要处理的数据量的递增步长int threadTotalStep total / threadNum;// 判断是否有余数如果有多出的数据补给最后一个线程int more total % threadNum;// 开启 threadNum 个线程处理数据for (int i 0; i threadNum; i) {int finalI i;executorService.execute(() - {int current threadTotalStep * finalI;/// 如果有余数最后一次计算得补充余数if (more 0 finalI threadNum - 1) {current more;}ListUser users userMapper.selectList(Wrappers.UserlambdaQuery().last(String.format(LIMIT %s,%s, current, threadTotalStep)));users.forEach(user - {/// 进行一些数据组装操作/// 进行一些耗时操作try {Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/// 批量 修改/插入 操作User user users.get(users.size() - 1);log.info(线程-{} 处理的最后一个数据的id为{}, finalI 1, user.getId());countDownLatch.countDown();});}try {countDownLatch.await();executorService.shutdown();log.info(总耗时{} s, (System.currentTimeMillis() - start) / 1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
}执行结果如下
多线程_每个线程都分页处理
如果单个线程处理数据量也很大此时每个线程都可补充分页进行处理如下
/*** 多线程优化查询【分页版先按数量切数据再在每个线程中分页处理数据】* 需要程序进行大量计算* 数据库能承受较大并发* 多核CPU才能真正意义上的并行不然就是宏观并行微观串行 o(╥﹏╥)o*/
Test
public void test5() {// 预定义参数int threadNum 5; // 线程数int pageSize 500; // 每页处理条数long start System.currentTimeMillis();// 获取总数Integer total userMapper.selectCount(null);// 创建线程池这里为了简便操作直接用Executors创建推荐自行集成配置线程池ExecutorService executorService Executors.newFixedThreadPool(threadNum);// 设置信号标用于等待所有线程执行完CountDownLatch countDownLatch new CountDownLatch(threadNum);// 计算线程需要处理的数据量的递增步长int threadTotalStep total / threadNum;// 判断是否有余数如果有多出的数据补给最后一个线程int more total % threadNum;// 开启 threadNum 个线程处理数据for (int i 0; i threadNum; i) {int finalI i;executorService.execute(() - {/// 数据总数就是 数据总数步长int threadTotal threadTotalStep;// 获取上一个线程最终行数int oldThreadCurrent threadTotalStep * finalI;/// 如果有余数最后一次计算得补充余数if (more 0 finalI threadNum - 1) {threadTotal more;}log.info(线程-{} 要处理的数据总数为{}, finalI 1, threadTotal);// 计算页数int pages threadTotal / pageSize;if (threadTotal % pageSize 0) {pages;}// 统计数量当等于线程总总数时退出循环避免重复计数int handleCount 0;// 获取最后一个userUser lastUser new User();// 开始遍历处理数据for (int page 0; page pages; page) {ListUser users userMapper.selectList(Wrappers.UserlambdaQuery().last(String.format(LIMIT %s,%s, page * pageSize oldThreadCurrent, pageSize)));for (User user : users) {handleCount;if (handleCount threadTotal) {break;}/// 模拟真正的逻辑处理耗时操作try {Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}}/// 批量 修改/插入 操作if (CollUtil.isNotEmpty(users)) {lastUser users.get(users.size() - 1);}}log.info(线程-{} 处理的最后一个数据的id为{}, finalI 1, lastUser.getId());countDownLatch.countDown();});}try {countDownLatch.await();executorService.shutdown();log.info(总耗时{} s, (System.currentTimeMillis() - start) / 1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}