当前位置: 首页 > news >正文

欣赏别人做的网站电影网站规划

欣赏别人做的网站,电影网站规划,沧州网站建设方案咨询,网站挂黑链工具2023黑马头条.微服务项目.跟学笔记 五 延迟任务精准发布文章 1.文章定时发布2.延迟任务概述 2.1 什么是延迟任务2.2 技术对比 2.2.1 DelayQueue2.2.2 RabbitMQ实现延迟任务2.2.3 redis实现3.redis实现延迟任务4.延迟任务服务实现 4.1 搭建heima-leadnews-schedule模块4.2 数据库…2023黑马头条.微服务项目.跟学笔记 五 延迟任务精准发布文章 1.文章定时发布2.延迟任务概述 2.1 什么是延迟任务2.2 技术对比 2.2.1 DelayQueue2.2.2 RabbitMQ实现延迟任务2.2.3 redis实现3.redis实现延迟任务4.延迟任务服务实现 4.1 搭建heima-leadnews-schedule模块4.2 数据库准备 4.2.1 数据库准备-数据库自身解决并发两种策略4.2.2 数据库准备-mybatis-plus集成乐观锁的使用4.3 安装redis4.4 项目集成redis4.5 添加任务4.6 取消任务4.7 消费任务4.8 未来数据定时刷新 4.8.1 redis key值匹配4.8.2 redis管道4.8.3 未来数据定时刷新-功能完成4.9 分布式锁解决集群下的方法抢占执行 4.9.1 问题描述4.9.2 分布式锁4.9.3 redis分布式锁4.9.4 在工具类CacheService中添加方法4.10 数据库同步到redis5.延迟队列解决精准时间发布文章 5.1 延迟队列服务提供对外接口5.2 发布文章集成添加延迟队列接口5.3 消费任务进行审核文章 延迟任务精准发布文章 1.文章定时发布 2.延迟任务概述 2.1 什么是延迟任务 定时任务有固定周期的有明确的触发时间。延迟队列没有固定的开始时间它常常是由一个事件触发的而在这个事件触发之后的一段时间内触发另一个事件任务可以立即执行也可以延迟。 应用场景 场景一订单下单之后30分钟后如果用户没有付钱则系统自动取消订单如果期间下单成功任务取消。 场景二接口对接出现网络问题1分钟后重试如果失败2分钟重试直到出现阈值终止。 文章定时任务 2.2 技术对比 2.2.1 DelayQueue JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列 内部采用优先队列 PriorityQueue 存储元素同时元素必须实现 Delayed 接口在创建元素时可以指定多久才可以从队列中获取当前元素只有在延迟期满时才能从队列中提取元素 DelayQueue属于排序队列它的特殊之处在于队列的元素必须实现Delayed接口该接口需要实现compareTo和getDelay方法 getDelay方法获取元素在队列中的剩余时间只有当剩余时间为0时元素才可以出队列。 compareTo方法用于排序确定元素出队列的顺序。 实现 1在测试包jdk下创建延迟任务元素对象DelayedTask实现compareTo和getDelay方法 2在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务 3循环的从延迟队列中拉取任务 public class DelayedTask implements Delayed{// 任务的执行时间private int executeTime 0;public DelayedTask(int delay){Calendar calendar Calendar.getInstance();calendar.add(Calendar.SECOND,delay);this.executeTime (int)(calendar.getTimeInMillis() /1000 );}/*** 元素在队列中的剩余时间* param unit* return*/Overridepublic long getDelay(TimeUnit unit) {Calendar calendar Calendar.getInstance();return executeTime - (calendar.getTimeInMillis()/1000);}/*** 元素排序* param o* return*/Overridepublic int compareTo(Delayed o) {long val this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return val 0 ? 0 : ( val 0 ? -1: 1 );}public static void main(String[] args) {DelayQueueDelayedTask queue new DelayQueueDelayedTask();queue.add(new DelayedTask(5));queue.add(new DelayedTask(10));queue.add(new DelayedTask(15));System.out.println(System.currentTimeMillis()/1000 start consume );while(queue.size() ! 0){DelayedTask delayedTask queue.poll();if(delayedTask !null ){System.out.println(System.currentTimeMillis()/1000 cosume task);}//每隔一秒消费一次try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} } }DelayQueue实现完成之后思考一个问题 使用线程池或者原生DelayQueue程序挂掉之后任务都是放在内存需要考虑未处理消息的丢失带来的影响如何保证数据不丢失需要持久化磁盘 2.2.2 RabbitMQ实现延迟任务 TTLTime To Live (消息存活时间) 死信队列Dead Letter Exchange(死信交换机)当消息成为Dead message后可以重新发送另一个交换机死信交换机 2.2.3 redis实现 zset数据类型的去重有序分数排序特点进行延迟。例如时间戳作为score进行排序 例如: 生产者添加到4个任务到延迟队列中时间亳秒值分别为97、98、 99、 100。 当前时间的亳秒值为90。 消费者端进行监听如果当前时间的毫秒值匹配到了延迟队列中的秒值就立即消费。 总结: 3.redis实现延迟任务 实现思路 问题思路 1.为什么任务需要存储在数据库中 延迟任务是一个通用的服务任何需要延迟得任务都可以调用该服务需要考虑数据持久化的问题存储数据库中是一种数据安全的考虑。 2.为什么redis中使用两种数据类型list和zset 原因一: list存储立即执行的任务zset存储未来的数据。 原因二:任务量过大以后zset的性能会下降。 时间复杂渡:执行时间(次数)随着数据规模增长的变化趋势 操作redis中的list命令LPUSH: 时间复杂度: 0(1)操作redis中的zset命令zadd: 时间复杂度: O(M*log(n)) 3.在添加zset数据的时候为什么需要预加载 任务模块是一个通用的模块项目中任何需要延迟队列的地方都可以调用这个接口要考虑到数据量的问题如果数据量特别大为了防止阻塞只需要把未来几分钟要执行的数据存入缓存即可是一种优化形式。 4.延迟任务服务实现 4.1 搭建heima-leadnews-schedule模块 leadnews-schedule是一个通用的服务单独创建模块来管理任何类型的延迟任务 ①导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下如下图所示 在pom.xml中添加子模块 moduleheima-leadnews-schedule/module1 ②添加bootstrap.yml server:port: 51701 spring:application:name: leadnews-schedulecloud:nacos:discovery:server-addr: 192.168.200.130:8848config:server-addr: 192.168.200.130:8848file-extension: yml③在nacos中添加对应配置并添加数据库及mybatis-plus的配置 spring:datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCusername: rootpassword: root # 设置Mapper接口所对应的XML文件位置如果你在Mapper接口中有自定义方法需要进行该配置 mybatis-plus:mapper-locations: classpath*:mapper/*.xml# 设置别名包扫描路径通过该属性可以给包中的类注册别名type-aliases-package: com.heima.model.schedule.pojos4.2 数据库准备 导入资料中leadnews_schedule数据库 taskinfo 任务表 注意事项 MySQL中BLOB是一个二 进制大型对象是一个可以存储大量数据的容器。 LongBlob 最大存储4G 实体类 package com.heima.model.schedule.pojos;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;import java.io.Serializable; import java.util.Date;/*** p* * /p** author itheima*/ Data TableName(taskinfo) public class Taskinfo implements Serializable {private static final long serialVersionUID 1L;/*** 任务id*/TableId(type IdType.ID_WORKER)private Long taskId;/*** 执行时间*/TableField(execute_time)private Date executeTime;/*** 参数*/TableField(parameters)private byte[] parameters;/*** 优先级*/TableField(priority)private Integer priority;/*** 任务类型*/TableField(task_type)private Integer taskType;}taskinfo_logs 任务日志表 实体类 package com.heima.model.schedule.pojos;import com.baomidou.mybatisplus.annotation.*; import lombok.Data;import java.io.Serializable; import java.util.Date;/*** p* * /p** author itheima*/ Data TableName(taskinfo_logs) public class TaskinfoLogs implements Serializable {private static final long serialVersionUID 1L;/*** 任务id*/TableId(type IdType.ID_WORKER)private Long taskId;/*** 执行时间*/TableField(execute_time)private Date executeTime;/*** 参数*/TableField(parameters)private byte[] parameters;/*** 优先级*/TableField(priority)private Integer priority;/*** 任务类型*/TableField(task_type)private Integer taskType;/*** 版本号,用乐观锁*/Versionprivate Integer version;/*** 状态 0int 1EXECUTED 2CANCELLED*/TableField(status)private Integer status;}4.2.1 数据库准备-数据库自身解决并发两种策略 4.2.2 数据库准备-mybatis-plus集成乐观锁的使用 乐观锁支持 /*** mybatis-plus乐观锁支持* return*/ Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor; }4.3 安装redis ①拉取镜像 docker pull redis1 ② 创建容器 docker run -d --name redis --restartalways -p 6379:6379 redis --requirepass leadnews1 ③链接测试 打开资料中的Redis Desktop Manager输入host、port、password链接测试 能链接成功即可 4.4 项目集成redis ① 在heima-leadnews-common项目导入redis相关依赖已经完成 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId /dependency !-- redis依赖commons-pool 这个依赖一定要添加 -- dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId /dependency② 在heima-leadnews-schedule中集成redis,添加以下nacos配置链接上redis spring:redis:host: 192.168.200.130password: leadnewsport: 6379③ 拷贝资料文件夹下的类CacheService到heima-leadnews-common模块下并添加自动配置 com.heima.common.redis.CacheService1 ④测试 package com.heima.schedule.test;import com.heima.common.redis.CacheService; import com.heima.schedule.ScheduleApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.Set;SpringBootTest(classes ScheduleApplication.class) RunWith(SpringRunner.class) public class RedisTest {Autowiredprivate CacheService cacheService;Testpublic void testList(){//在list的左边添加元素 // cacheService.lLeftPush(list_001,hello,redis);//在list的右边获取元素并删除String list_001 cacheService.lRightPop(list_001);System.out.println(list_001);}Testpublic void testZset(){//添加数据到zset中 分值/*cacheService.zAdd(zset_key_001,hello zset 001,1000);cacheService.zAdd(zset_key_001,hello zset 002,8888);cacheService.zAdd(zset_key_001,hello zset 003,7777);cacheService.zAdd(zset_key_001,hello zset 004,999999);*///按照分值获取数据SetString zset_key_001 cacheService.zRangeByScore(zset_key_001, 0, 8888);System.out.println(zset_key_001);} }我们先测试一下list的存取情况: 我们打印到后台看下 rpop之后数据就没了 再测试一下zset存取成功 查询一下0~8888的数据 4.5 添加任务 ①拷贝mybatis-plus生成的文件mapper ②创建task类用于接收添加任务的参数 package com.heima.model.schedule.dtos;import lombok.Data;import java.io.Serializable;Data public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}③创建TaskService package com.heima.schedule.service;import com.heima.model.schedule.dtos.Task;/*** 对外访问接口*/ public interface TaskService {/*** 添加任务* param task 任务对象* return 任务id*/public long addTask(Task task) ;}实现 package com.heima.schedule.service.impl;import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;import java.util.Calendar; import java.util.Date;Service Transactional Slf4j public class TaskServiceImpl implements TaskService {/*** 添加延迟任务** param task* return*/Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success addTaskToDb(task);if (success) {//2.添加任务到redisaddTaskToCache(task);}return task.getTaskId();}Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** param task*/private void addTaskToCache(Task task) {String key task.getTaskType() _ task.getPriority();//获取5分钟之后的时间 毫秒值Calendar calendar Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间存入listif (task.getExecuteTime() System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC key, JSON.toJSONString(task));} else if (task.getExecuteTime() nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 小于等于预设时间未来5分钟 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE key, JSON.toJSONString(task), task.getExecuteTime());}}Autowiredprivate TaskinfoMapper taskinfoMapper;Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** param task* return*/private boolean addTaskToDb(Task task) {boolean flag false;try {//保存任务表Taskinfo taskinfo new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag true;} catch (Exception e) {e.printStackTrace();}return flag;} }ScheduleConstants常量类 package com.heima.common.constants;public class ScheduleConstants {//task状态public static final int SCHEDULED0; //初始化状态public static final int EXECUTED1; //已执行状态public static final int CANCELLED2; //已取消状态public static String FUTUREfuture_; //未来数据key前缀public static String TOPICtopic_; //当前数据key前缀 }④测试 对TaskService使用快捷键Alt Enter选择Create Test,勾选addTask TaskServiceImplTest.java内容如下: package com.heima.schedule.service.impl;import com.heima.model.schedule.dtos.Task; import com.heima.schedule.ScheduleApplication; import com.heima.schedule.service.TaskService; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.Date;import static org.junit.jupiter.api.Assertions.*;SpringBootTest(classes ScheduleApplication.class) RunWith(SpringRunner.class) class TaskServiceImplTest {Autowiredprivate TaskService taskService;Testvoid addTask() {Task task new Task();task.setTaskType(100);task.setPriority(50);task.setParameters(task test.getBytes());task.setExecuteTime(new Date().getTime());long taskId taskService.addTask(task);System.out.println(taskId);} }执行测试类之后有可能报com.heima.schedule.mapper找不到的错误这里我们可以通过Maven重新编译一下heima-leadnews-schedule 我们看到数据库 后台打印: 再看一下redis 那我们更改一下代码 后台如下: 数据库如下: 发现redis增加了未来的一条zset结构 我们把时间进一步增加超过5分钟发现数据库有存redis缓冲就没有添加了。 后台如下: 数据库:增加了数据 redis缓存:没有数据 4.6 取消任务 在TaskService中添加方法 /*** 取消任务* param taskId 任务id* return 取消结果*/ public boolean cancelTask(long taskId); 实现 /*** 取消任务* param taskId* return*/ Override public boolean cancelTask(long taskId) {boolean flag false;//删除任务更新日志Task task updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task ! null){removeTaskFromCache(task);flag true;}return false; }/*** 删除redis中的任务数据* param task*/ private void removeTaskFromCache(Task task) {String key task.getTaskType()_task.getPriority();if(task.getExecuteTime()System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPICkey,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTUREkey, JSON.toJSONString(task));} }/*** 删除任务更新任务日志状态* param taskId* param status* return*/ private Task updateDb(long taskId, int status) {Task task null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error(task cancel exception taskid{},taskId);}return task;}测试修改TaskServiceImplTest.java Testpublic void removeTask() {taskService.cancelTask(1677946555950288898L);}运行前先看下数据库的数据 redis数据如下: 执行后:leadnews_schedule.taskinfo数据如下 leadnews_schedule.taskinfo_logs 数据如下: 4.7 消费任务 消费任务: 在TaskService中添加方法 /*** 按照类型和优先级来拉取任务* param type* param priority* return*/ public Task poll(int type,int priority);实现 /*** 按照类型和优先级拉取任务* return*/ Override public Task poll(int type,int priority) {Task task null;try {String key type_priority;String task_json cacheService.lRightPop(ScheduleConstants.TOPIC key);if(StringUtils.isNotBlank(task_json)){task JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error(poll task exception);}return task; }测试TaskServiceImplTest.java添加方法如下: Testpublic void popTask() {Task task taskService.popTask(100, 50);System.out.println(消息如下: task);}测试前数据库如下: 测试前redis如下: 消息如下: 再看下数据库: 再看下redis: 4.8 未来数据定时刷新 思考: 方案1: 方案2: 4.8.1 redis key值匹配 方案1keys 模糊匹配 keys的模糊匹配功能很方便也很强大但是在生产环境需要慎用开发中使用keys的模糊匹配却发现redis的CPU使用率极高所以公司的redis生产环境将keys命令禁用了redis是单线程会被堵塞 方案2scan SCAN 命令是一个基于游标的迭代器SCAN命令每次被调用之后 都会向用户返回一个新的游标 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数 以此来延续之前的迭代过程。 代码案例RedisTest.java中添加方法testKeys具体内容如下: Testpublic void testKeys() {SetString keys cacheService.keys(ScheduleConstants.FUTURE *);System.out.println(方式一:);System.out.println(keys);SetString scan cacheService.scan(ScheduleConstants.FUTURE *);System.out.println(方式二:);System.out.println(scan);}运行结果如下: 4.8.2 redis管道 普通redis客户端和服务器交互模式 Pipeline请求模型 官方测试结果数据对比 测试案例对比 //耗时6151 Test public void testPiple1(){long start System.currentTimeMillis();for (int i 0; i 10000 ; i) {Task task new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush(1001_1, JSON.toJSONString(task));}System.out.println(耗时(System.currentTimeMillis()- start)); }Test public void testPiple2(){long start System.currentTimeMillis();//使用管道技术ListObject objectList cacheService.getstringRedisTemplate().executePipelined(new RedisCallbackObject() {NullableOverridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i 0; i 10000 ; i) {Task task new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush(1001_1.getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println(使用管道技术执行10000次自增操作共耗时:(System.currentTimeMillis()-start)毫秒); }测试后发现分别用时: 7521毫秒和1621毫秒 还是通过管道更加快捷。 4.8.3 未来数据定时刷新-功能完成 在TaskService中添加方法 Scheduled(cron 0 */1 * * * ?) public void refresh() {System.out.println(System.currentTimeMillis() / 1000 执行了定时任务);// 获取所有未来数据集合的key值SetString futureKeys cacheService.scan(ScheduleConstants.FUTURE *);// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey ScheduleConstants.TOPIC futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据SetString tasks cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println(成功的将 futureKey 下的当前需要执行的任务数据刷新到 topicKey 下);}} }然后新建一个测试方法TaskServiceImplTest.java /*** param* return long* description // 添加延迟任务循环* param: task* date 2023/7/9 15:03* author wty**/public void addTaskNew() {for (int i 0; i 5; i) {Task task new Task();task.setTaskType(100 i);task.setPriority(50);task.setParameters(task test.getBytes());task.setExecuteTime(new Date().getTime() 500 * i);long taskId taskService.addTask(task);}} 先测试一下测试类运行完后redis增加了5条数据 在引导类中添加开启任务调度注解EnableScheduling 添加后启动ScheduleApplication.java 启动后刷新成功 再看redis前缀由future变为了topic 4.9 分布式锁解决集群下的方法抢占执行 4.9.1 问题描述 启动两台heima-leadnews-schedule服务每台服务都会去执行refresh定时任务方法 我们再开一个服务演示一下多个不同实例运行同一个服务的情况。 更改为 -Dserver.port517021 这个时候把两个端口都启动 4.9.2 分布式锁 分布式锁控制分布式系统有序的去对共享资源进行操作通过互斥来保证数据的一致性。 解决方案 4.9.3 redis分布式锁 sexnx SET if Not eXists 命令在指定的 key 不存在时为 key 设置指定的值。 这种加锁的思路是如果 key 不存在则为 key 设置 value如果 key 已存在则 SETNX 命令不做任何操作 客户端A请求服务器设置key的值如果设置成功就表示加锁成功客户端B也去请求服务器设置key的值如果返回失败那么就代表加锁失败客户端A执行代码完成删除锁客户端B在等待一段时间后再去请求设置key的值设置成功客户端B执行代码完成删除锁 4.9.4 在工具类CacheService中添加方法 在heima-leadnews-common中添加方法 /*** 加锁** param name* param expire* return*/ public String tryLock(String name, long expire) {name name _lock;String token UUID.randomUUID().toString();RedisConnectionFactory factory stringRedisTemplate.getConnectionFactory();RedisConnection conn factory.getConnection();try {//参考redis命令//set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result ! null result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null; }修改未来数据定时刷新的方法如下 /*** 未来数据定时刷新*/ Scheduled(cron 0 */1 * * * ?) public void refresh(){String token cacheService.tryLock(FUTURE_TASK_SYNC, 1000 * 30);if(StringUtils.isNotBlank(token)){log.info(未来数据定时刷新---定时任务);//获取所有未来数据的集合keySetString futureKeys cacheService.scan(ScheduleConstants.FUTURE *);for (String futureKey : futureKeys) {//future_100_50//获取当前数据的key topicString topicKey ScheduleConstants.TOPICfutureKey.split(ScheduleConstants.FUTURE)[1];//按照key和分值查询符合条件的数据SetString tasks cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());//同步数据if(!tasks.isEmpty()){cacheService.refreshWithPipeline(futureKey,topicKey,tasks);log.info(成功的将futureKey刷新到了topicKey);}}} }修改完之后直接重启项目 发现51702时20点25分执行的 发现51701时20点26分执行的 因为有分布式锁的存在即便是有多个端口也只会执行其中的一个。 4.10 数据库同步到redis 步骤如下: Scheduled(cron 0 */5 * * * ?) PostConstruct public void reloadData() {clearCache();log.info(数据库数据同步到缓存);Calendar calendar Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);//查看小于未来5分钟的所有任务ListTaskinfo allTasks taskinfoMapper.selectList(Wrappers.TaskinfolambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks ! null allTasks.size() 0){for (Taskinfo taskinfo : allTasks) {Task task new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}} }private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySetString futurekeys cacheService.scan(ScheduleConstants.FUTURE *);// future_SetString topickeys cacheService.scan(ScheduleConstants.TOPIC *);// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys); }具体测试的话先执行TaskServiceImplTest.java中的addTaskNew方法运行几个任务数据 Java后台如下: redis中数据如下 数据库中数据如下: 我们删除一些redis的数据 然后我们重启schedule微服务 再看一下redis如下: 5.延迟队列解决精准时间发布文章 5.1 延迟队列服务提供对外接口 提供远程的feign接口在heima-leadnews-feign-api编写类如下 package com.heima.apis.schedule;import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody;FeignClient(leadnews-schedule) public interface IScheduleClient {/*** 添加任务* param task 任务对象* return 任务id*/PostMapping(/api/v1/task/add)public ResponseResult addTask(RequestBody Task task);/*** 取消任务* param taskId 任务id* return 取消结果*/GetMapping(/api/v1/task/cancel/{taskId})public ResponseResult cancelTask(PathVariable(taskId) long taskId);/*** 按照类型和优先级来拉取任务* param type* param priority* return*/GetMapping(/api/v1/task/poll/{type}/{priority})public ResponseResult poll(PathVariable(type) int type,PathVariable(priority) int priority); }注意这里的FeignClient要与服务一致 在heima-leadnews-schedule微服务下提供对应的实现 package com.heima.schedule.feign;import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;RestController public class ScheduleClient implements IScheduleClient {Autowiredprivate TaskService taskService;/*** 添加任务* param task 任务对象* return 任务id*/PostMapping(/api/v1/task/add)Overridepublic ResponseResult addTask(RequestBody Task task) {return ResponseResult.okResult(taskService.addTask(task));}/*** 取消任务* param taskId 任务id* return 取消结果*/GetMapping(/api/v1/task/cancel/{taskId})Overridepublic ResponseResult cancelTask(PathVariable(taskId) long taskId) {return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级来拉取任务* param type* param priority* return*/GetMapping(/api/v1/task/poll/{type}/{priority})Overridepublic ResponseResult poll(PathVariable(type) int type, PathVariable(priority) int priority) {return ResponseResult.okResult(taskService.poll(type,priority));} }5.2 发布文章集成添加延迟队列接口 在创建WmNewsTaskService package com.heima.wemedia.service;import com.heima.model.wemedia.pojos.WmNews;public interface WmNewsTaskService {/*** 添加任务到延迟队列中* param id 文章的id* param publishTime 发布的时间 可以做为任务的执行时间*/public void addNewsToTask(Integer id, Date publishTime);}实现 package com.heima.wemedia.service.impl;import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;Service Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService {Autowiredprivate IScheduleClient scheduleClient;/*** 添加任务到延迟队列中* param id 文章的id* param publishTime 发布的时间 可以做为任务的执行时间*/OverrideAsyncpublic void addNewsToTask(Integer id, Date publishTime) {log.info(添加任务到延迟服务中----begin);Task task new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);log.info(添加任务到延迟服务中----end);}}枚举类 package com.heima.model.common.enums;import lombok.AllArgsConstructor; import lombok.Getter;Getter AllArgsConstructor public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,文章定时审核),REMOTEERROR(1002, 2,第三方接口调用失败重试);private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息 }序列化工具对比 JdkSerializejava内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化 ObjectOutputStream的writeObject()方法可序列化对象生成字节数组Protostuffgoogle开源的protostuff采用更为紧凑的二进制数组表现更加优异然后使用protostuff的编译工具生成pojo类 拷贝资料中的两个类到heima-leadnews-utils下 Protostuff需要引导依赖 dependencygroupIdio.protostuff/groupIdartifactIdprotostuff-core/artifactIdversion1.6.0/version /dependencydependencygroupIdio.protostuff/groupIdartifactIdprotostuff-runtime/artifactIdversion1.6.0/version /dependency运行一下测试类 可以发现protostuff花费的时间是很少的 修改发布文章代码 把之前的异步调用修改为调用延迟任务 Autowired private WmNewsTaskService wmNewsTaskService;/*** 发布修改文章或保存为草稿* param dto* return*/ Override public ResponseResult submitNews(WmNewsDto dto) {//0.条件判断if(dto null || dto.getContent() null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//1.保存或修改文章WmNews wmNews new WmNews();//属性拷贝 属性名词和类型相同才能拷贝BeanUtils.copyProperties(dto,wmNews);//封面图片 list--- stringif(dto.getImages() ! null dto.getImages().size() 0){//[1dddfsd.jpg,sdlfjldk.jpg]-- 1dddfsd.jpg,sdlfjldk.jpgString imageStr StringUtils.join(dto.getImages(), ,);wmNews.setImages(imageStr);}//如果当前封面类型为自动 -1if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}saveOrUpdateWmNews(wmNews);//2.判断是否为草稿 如果为草稿结束当前方法if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}//3.不是草稿保存文章内容图片与素材的关系//获取到文章内容中的图片信息ListString materials ectractUrlInfo(dto.getContent());saveRelativeInfoForContent(materials,wmNews.getId());//4.不是草稿保存文章封面图片与素材的关系如果当前布局是自动需要匹配封面图片saveRelativeInfoForCover(dto,wmNews,materials);//审核文章// wmNewsAutoScanService.autoScanWmNews(wmNews.getId());wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}启动三个微服务 启动nginx 登录http://localhost:8802/#/login 发布文章 把数据库中的表leadnews_schedule.taskinfo和leadnews_schedule.taskinfo_logs全部截断 redis的数据也全部清空 全部操作完后再点击提交审核 我们发现任务是待审核的状态因为还没有消费 数据库如下: redis如下 再次发布一个未来5分钟内的文章 发布后: 发布后数据库如下 redis如下: 5.3 消费任务进行审核文章 WmNewsTaskService中添加方法 /*** 消费延迟队列数据*/ public void scanNewsByTask();实现 Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService;/*** 消费延迟队列数据*/ Scheduled(fixedRate 1000) Override SneakyThrows public void scanNewsByTask() {log.info(文章审核---消费任务执行---begin---);ResponseResult responseResult scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) responseResult.getData() ! null){String json_str JSON.toJSONString(responseResult.getData());Task task JSON.parseObject(json_str, Task.class);byte[] parameters task.getParameters();WmNews wmNews ProtostuffUtil.deserialize(parameters, WmNews.class);System.out.println(wmNews.getId()-----------);wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}log.info(文章审核---消费任务执行---end---); }在WemediaApplication自媒体的引导类中添加开启任务调度注解EnableScheduling 启动4个微服务 登录 http://localhost:8802/ 发布文章 发布成功后审批成功 我们再发布一个5分钟内的延时任务 发表后并没有马上审核 1分钟后再次刷新发现审核了
http://www.dnsts.com.cn/news/47772.html

相关文章:

  • 手机wap网站开发建设干部学校网站首页
  • 毕业设计网站设计说明书网站文件大小
  • 做实体店优惠券的网站wordpress 软件下载
  • 怎么建立一个网站推广济宁外贸网站建设
  • 推荐个在广州做网站的网站建设属于资产
  • 站外seo是什么普通网站能不能用vue做几个小功能
  • 云存储做网站微信小程序云开发收费标准
  • 建设 云服务器 网站数据库跟网站内容
  • 网站优化seo辽宁建设银行企业银行官方网站
  • 做网站找个人还是找公司好免费seo排名软件
  • 做设计找素材那个网站最好用网站移动端优化工具
  • 网站区域名是什么意思电商拿货平台
  • 商家在网站做淘宝客会给佣金吗漯河seo推广
  • 广州公司制作网站wordpress 关闭评论
  • 有哪里可以做兼职翻译的网站做视频网站注意什么
  • 网站挂马解决建房城乡建设部网站
  • 博星卓越营销网站设计南通学校网站建设
  • 深圳罗湖医疗集团网站建设编程代码网站
  • wordpress多站点文章调用成都网页制作推广
  • 专业做网站广州营销型公司网站
  • 重庆外贸网站建设公司上海市企业登记网络服务平台
  • 微信投票网站怎么做怎么联系做网站公司
  • 网站管理系统制作商标设计思路
  • 新浪微博可以做网站吗win7优化大师好不好
  • 富阳设计网站黑帽seo怎么做网站排名
  • 河南国基建设集团--官方网站网站建设服务包括什么
  • 甘肃建设厅网站执业注册中心企业网络部署方案
  • 网站建设的一般步骤包含哪些邢台网站招聘员工123
  • 爱写作网站做网站学什么语言
  • 中国式现代化推进中华民族伟大复兴爱站网seo综合查询工具