苏州建站,seo诊断书,泗水网站建设,信阳seo优化先说下业务场景#xff0c;业务1#xff1a;基于实时轨迹数据打卡#xff0c;业务2#xff1a;基于非实时轨迹的时间差#xff0c;计算累计时长。 简单点说就是从websocket获取到的实时数据#xff0c;既要兼容不耗时操作#xff0c;又要兼容耗时操作。
单线程做的话业务1基于实时轨迹数据打卡业务2基于非实时轨迹的时间差计算累计时长。 简单点说就是从websocket获取到的实时数据既要兼容不耗时操作又要兼容耗时操作。
单线程做的话一两个用户的数据没问题用户多了就处理不过来。
实现思路是用TaskExecutor来做一个task接收从redis lPop的数据并放入BlockingQueue另外的task从BlockingQueue获取数据。
Autowiredprivate TaskExecutor taskExecutor1;Autowiredprivate TaskExecutor taskExecutor2;Autowiredprivate TaskExecutor taskExecutor3;static BlockingQueueTrackHistory dataQueue new ArrayBlockingQueue(1 12);static BlockingQueueTrackHistory keepWatchQueue new ArrayBlockingQueue(1 12);public static final String M :;Bean(redisReadThread)public String service() {taskExecutor1.execute(() - {while (true) {try {lPop();} catch (Exception e) {e.printStackTrace();}}});return null;}Bean(calculationsBusinessData)public void calculationsService() {taskExecutor2.execute(() - {while (true) {try {if (dataQueue.size() ! 0) {TrackHistory trackRealTime dataQueue.poll();if (trackRealTime null) {Thread.sleep(100L);} else {//耗时方法doSomething();//存储当前日期人员的最新位置坐标saveTrackToRedis(trackRealTime);}} else {Thread.sleep(100L);}} catch (Exception e) {log.error(业务1数据计算异常-{}, e.getMessage());}}});}Bean(calculationsKeepWatch)public void keepWatchService() {taskExecutor3.execute(() - {while (true) {try {if (keepWatchQueue.size() ! 0) {TrackHistory trackRealTime keepWatchQueue.poll();if (trackRealTime null) {Thread.sleep(100L);} else {doSomething2(trackRealTime);}} else {Thread.sleep(100L);}} catch (Exception e) {log.error(业务2数据计算异常-{}, e.getMessage());}}});}/*** 从队列中读取数据** return*/private synchronized void lPop() {Object o redisTemplate.opsForList().leftPop(RedisKeyCons.COORDINATE);if (!org.springframework.util.StringUtils.isEmpty(o)) {TrackHistory trackRealTime (TrackHistory) o;log.info(leftPop trackHistory {}, trackRealTime);if (null ! trackRealTime) {if (checkMemberExist(trackRealTime)) {return;}//存储当前日期人员的最新位置坐标saveTrackToRedisForKeepWatch(trackRealTime);dataQueue.add(trackRealTime);keepWatchQueue.add(trackRealTime);}}}
配置线程池
/*** 线程池配置、启用异步***/
EnableAsync
Configuration
public class AsycTaskExecutorConfig {Bean(nametaskExecutor1)public TaskExecutor taskExecutor1() {ThreadPoolTaskExecutor taskExecutor new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(1);taskExecutor.setMaxPoolSize(1);return taskExecutor;}Bean(nametaskExecutor2)public TaskExecutor taskExecutor2() {ThreadPoolTaskExecutor taskExecutor new ThreadPoolTaskExecutor();//最大线程数taskExecutor.setMaxPoolSize(5);//核心线程数taskExecutor.setCorePoolSize(5);//任务队列的大小taskExecutor.setQueueCapacity(5);//线程前缀名
// executor.setThreadNamePrefix();//线程存活时间taskExecutor.setKeepAliveSeconds(60);taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化return taskExecutor;}Bean(nametaskExecutor3)public TaskExecutor taskExecutor3() {ThreadPoolTaskExecutor taskExecutor new ThreadPoolTaskExecutor();//最大线程数taskExecutor.setMaxPoolSize(5);//核心线程数taskExecutor.setCorePoolSize(5);//任务队列的大小taskExecutor.setQueueCapacity(5);//线程前缀名
// executor.setThreadNamePrefix();//线程存活时间taskExecutor.setKeepAliveSeconds(60);taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化return taskExecutor;}Bean(name asyncPoolTaskExecutor)public ThreadPoolTaskExecutor executor() {ThreadPoolTaskExecutor taskExecutor new ThreadPoolTaskExecutor();//核心线程数taskExecutor.setCorePoolSize(10);//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程taskExecutor.setMaxPoolSize(10);//缓存队列taskExecutor.setQueueCapacity(15);//设置线程的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁taskExecutor.setKeepAliveSeconds(60);//异步方法内部线程名称taskExecutor.setThreadNamePrefix(async-);/*** 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize如果还有任务到来就会采取任务拒绝策略* 通常有以下四种策略* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。* ThreadPoolExecutor.DiscardPolicy也是丢弃任务但是不抛出异常。* ThreadPoolExecutor.DiscardOldestPolicy丢弃队列最前面的任务然后重新尝试执行任务重复此过程* ThreadPoolExecutor.CallerRunsPolicy重试添加当前的任务自动重复调用 execute() 方法直到成功*/taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());taskExecutor.initialize();return taskExecutor;}}