沈阳做网站的互联网公司,seo资讯,家教网站建设模板,wordpress随机背景图片1 基于Rabbitmq延迟消息实现
支付时间设置为30#xff0c;未支付的消息会积压在mq中#xff0c;给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
1.1定义延迟消息实体
由于我们要多次发送延迟消息#xff0c;因此需要先定义一个记录消息…1 基于Rabbitmq延迟消息实现
支付时间设置为30未支付的消息会积压在mq中给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
1.1定义延迟消息实体
由于我们要多次发送延迟消息因此需要先定义一个记录消息延迟时间的消息体
Data
public class MultiDelayMessageT {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private ListLong delayMillis;public MultiDelayMessage(T data, ListLong delayMillis) {this.data data;this.delayMillis delayMillis;}public static T MultiDelayMessageT of(T data, Long ... delayMillis){return new MultiDelayMessage(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}
1.2 定义常量用于记录交换机、队列、RoutingKey等常量
package com.hmall.trade.constants;public interface MqConstants {String DELAY_EXCHANGE trade.delay.topic;String DELAY_ORDER_QUEUE trade.order.delay.queue;String DELAY_ORDER_ROUTING_KEY order.query;
}1.3 抽取mq配置到nacos中
spring:rabbitmq:host: ${hm.mq.host:192.168.150.101} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:hmall} # 用户名password: ${hm.mq.pw:123} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息1.4 定义消息处理器
使用延迟消息处理器发送消息
1.5 消息监听与延迟消息再次发送 2 延迟消息实现
DelayQueue:基于JVM保存在内存中会出现消息丢失
Rabbitmq的延迟任务基于TTL和死信交换机
2.1 redis的延迟任务基于zset的去重和排序功能 1.为什么任务需要存储在数据库中? 延迟任务是一个通用的服务任何有延迟需求的任务都可以调用该服务内存数据库的存储是有限的需要考虑数据持久化的问题存储数据库中是一种数据安全的考虑
2.为什么使用redis中的两种数据类型list和zset?
原因一: list存储立即执行的任务zset存储未来的数据原因二:任务量过大以后zset的性能会下降
时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势
操作redis中的list命令LPUSH: 时间复杂度: O(1)操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))
2.2 设计mybatis映射实体类
/*** 版本号,用乐观锁*/Versionprivate Integer version;乐观锁支持
/*** mybatis-plus乐观锁支持* return*/
Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}2.3 创建task类用于接收添加任务的参数
Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}2.4 添加任务
2.4.1 添加任务到数据库中
addTaskToDb(task);修改任务表和日志表
Autowiredprivate TaskinfoMapper taskinfoMapper;Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** param task* return*/private boolean addTaskToDb(Task task) {boolean flag false;try {//保存任务表Taskinfo taskinfo new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag true;} catch (Exception e) {e.printStackTrace();}return flag;}2.4.2 添加任务到redis
addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内
Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** param task*/private void addTaskToCache(Task task) {String key task.getTaskType() _ task.getPriority();//获取5分钟之后的时间 毫秒值Calendar calendar Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间存入listif (task.getExecuteTime() System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC key, JSON.toJSONString(task));} else if (task.getExecuteTime() nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 小于等于预设时间未来5分钟 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE key, JSON.toJSONString(task), task.getExecuteTime());}}2.5 删除任务
1、删除数据库任务表更改日志表任务状态 2、删除list或者zset中的任务
在TaskService中添加方法
/*** 取消任务* param taskId 任务id* return 取消结果*/
public boolean cancelTask(long taskId);
/*** 取消任务* param taskId* return*/
Override
public boolean cancelTask(long taskId) {boolean flag false;//删除任务更新日志Task task updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task ! null){removeTaskFromCache(task);flag true;}return false;
}/*** 删除redis中的任务数据* param task*/
private void removeTaskFromCache(Task task) {String key task.getTaskType()_task.getPriority();if(task.getExecuteTime()System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPICkey,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTUREkey, JSON.toJSONString(task));}
}/*** 删除任务更新任务日志状态* param taskId* param status* return*/
private Task updateDb(long taskId, int status) {Task task null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error(task cancel exception taskid{},taskId);}return task;}2.6 消费任务
1、删除list中的数据 2、使用updateDB删除任务表、跟新日志表
在TaskService中添加方法
/*** 按照类型和优先级来拉取任务* param type* param priority* return*/
public Task poll(int type,int priority);实现
/*** 按照类型和优先级拉取任务* return*/
Override
public Task poll(int type,int priority) {Task task null;try {String key type_priority;String task_json cacheService.lRightPop(ScheduleConstants.TOPIC key);if(StringUtils.isNotBlank(task_json)){task JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error(poll task exception);}return task;
}2.7 未来定时任务更新-reids管道
减少与redis的交互次数 1、在引导类中添加开启任务调度注解EnableScheduling 2、在service中添加定时任务 Scheduled(cron “0 */1 * * * ?”)每分钟一次
Scheduled(cron 0 */1 * * * ?)
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 执行了定时任务);// 获取所有未来数据集合的key值SetString futureKeys cacheService.scan(ScheduleConstants.FUTURE *);// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey ScheduleConstants.TOPIC futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据SetString tasks cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println(成功的将 futureKey 下的当前需要执行的任务数据刷新到 topicKey 下);}}
}public ListObject refreshWithPipeline(String future_key,String topic_key,CollectionString values){ListObject objects stringRedisTemplate.executePipelined(new RedisCallbackObject() {NullableOverridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {StringRedisConnection stringRedisConnection (StringRedisConnection)redisConnection;String[] strings values.toArray(new String[values.size()]);stringRedisConnection.rPush(topic_key,strings);stringRedisConnection.zRem(future_key,strings);return null;}});return objects;}总结
1、使用rebbitmq使用的场景是在支付和订单微服务中用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能并可以快速处理前几分钟防止消息堆积 2、使用redis是基于zset的去重和排序功能相当于将一定数据的保存在数据库使用定时任务同步数据库符合五分钟的任务到zset中然后在在zest中定时更新可以运行的任务到list集合中相当于实现了延迟功能和缓存功能。 3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中然后定时的去同步redis中的数据到数据库中防止消息堆积。