当前位置: 首页 > news >正文

江西南昌电子商务网站建设公司wordpress本地上传图片

江西南昌电子商务网站建设公司,wordpress本地上传图片,房间装修风格,高邑网站建设标题 #x1f600;#x1f600;#x1f600;创作不易#xff0c;各位看官点赞收藏. 文章目录 标题Redis 基础笔记1、安装及环境搭建2、Redis 数据类型2.1、String2.2、List2.3、Hash2.4、Set2.5、Zset2.6、BitMap2.7、HyperLogLog2.8、Geospatial2.9、Stream 3、Redis 持久…标题 创作不易各位看官点赞收藏. 文章目录 标题Redis 基础笔记1、安装及环境搭建2、Redis 数据类型2.1、String2.2、List2.3、Hash2.4、Set2.5、Zset2.6、BitMap2.7、HyperLogLog2.8、Geospatial2.9、Stream 3、Redis 持久化3.1、RDB3.2、AOF 4、Redis 事务5、Redis 管道6、Redis 发布与订阅7、Redis 主从复制7.1、Redis 主从复制7.2、Redis 哨兵模式 8、Redis 集群8.1、Redis 集群理论8.2、Redis 集群搭建8.2.1、Redis 三主三从集群搭建8.2.2、Redis 集群扩容缩容 9、 Spring Boot 整合 Redis9.1、整合单机版9.2、整合集群版 10、应用问题11、分布式锁 Redis 基础笔记 redis是一个 NoSql (Not Only Sql)非关系型数据库不依赖业务逻辑方式储存以简单的key-value方式进行存储。可以配合关系型数据库进行缓存数据数据操作主要是在内存中而且一些特殊场景比关系型数据库优越也需要相互依赖。 1、安装及环境搭建 redis 官网https://redis.io/ redis 命令文档http://doc.redisfans.com/ c语言环境redis编译需要c语言环境。 yum install gcc # 安装gcc环境 gcc --version # 检查gcc的版本 yum install -y gcc-c 解压编译 redis tar -zxvf redis-7.0.4.tar.gz # 解压 # 解压好进入redis目录 make make install # 编译安装成功后默认在/usr/local/bin安装 redis 的相关内容 redis-benchmark性能测试工具。redis-check-aof修复有问题的 AOF 文件。redis-check-dump修复有问题的 dump.rdb 文件。redis-sentinelredis 集群搭建。redis-serverredis 服务启动命令。redis-cli客户端操作入口。 进入目录/usr/local/bin执行 redis-server 启动 redis 修改redis配置文件(在redis解压目录下) 注释掉 bind 127.0.0.1 这一行解决只能特定网段连接的限制将 protected-mode 属性改为 no 关闭保护模式不然会阻止远程访问将 daemonize 属性改为 yes 这样启动时就在后台启动添加 requirepass 123456即可 当然这步完全看心情将 port XXX 修改成自己的端口(默认是6379) 启动redis redis-server /opt/redis/redis-7.0.4/redis.conf[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N5wZ6Hqy-1690513503273)(https://s2.loli.net/2022/08/11/KFrWXSNiMjQmEza.png)] 开启访问端口 firewall-cmd --zonepublic --add-port6379/tcp --permanent # 开放端口 firewall-cmd --reload # 重启防火墙 firewall-cmd --list-ports # 查看开放的端口号远程连接 redis配置好上面的信息然后可以下载一个Another Redis Desktop Manager可视化的 redis 管理工具。 redis-cli 客户端连接 redis-cli -h 连接的ip地址 -p 端口号 -a 连接密码 # 获取修改 redis 配置文件信息命令 config get 配置名称 # 例如 config get port config set 配置名称 value # 但是这个 redis 重启后这个配置就会失效2、Redis 数据类型 注意在 redis 中命令是不区分大小写的但是 key 名称是区分大小写的。 key 操作命令 keys 正则表达 # 查看当前库中满足正则表达式的所有key exists key # 判断某个key是否存在 type key # 查看 key 的类型 del key # 删除 key unlink key # 非阻塞删除现将 key 从 keyspace 中删除后续异步再执行真正的删除 expire key time(单位秒) # 设置key的过期时间 ttl key # 查看可以会有多少秒过期-1表示永久不会过期-2表示已过期 get key # 获取某个key的值 move key dbIndex[0-15] # 将 key 移动到指定数据库中# redis默认16个数据库下标从0开始(默认初始使用0号数据库)。 select dbid # 切换数据库例如select 10这些库的密码都是同一个密码 dbsize # 查询当前数据库的key数量 flushdb # 清空当前库 flushall # 清空所有库 help 类型名称 # 获取对应类型的命令的帮助2.1、String String是 redis 最基本的数据类型一个 key 对应一个 value。String 是二进制安全的简单字符串、复杂的 xml/json 的字符串、二进制图像或者音频的字符串、以及可以是数字的字符串但是一个 key 的 value 值最大是 512MB。 set key value # 常用参数 set key value ex 秒 px 毫秒 [nx|xx] [get] exat 以unix时间戳单位秒 pxat 以unix时间戳单位毫秒 keepttl(保留过期时间) # nx如果这个 key 不存在值设置值如果存在就不设置。 # xxkey 存在还是会重新设置值 # get如果key值存在设置值后返回旧的值 # keepttl重新设置key值需要保存之前的过期时间get key # 获取 key 对应的值 append key value # 如果key存在就向 key 的 value 追加如果 key 不存在就新添加一个 key strlen key # 获取value的长度 setnx key value # 如果key不存在就新增一个key如果存在不会覆盖也不会新增key# 对于纯数字类型的自增和自减在redis中自增和自减是一个原子操作不会被线程调度打断。 incr key # 自增1 decr key # 自减1 increby key step # 自增step decrby key step # 自减step# 其它命令 mset k1 v1 k2 v2 k3 v3。。。。 # 设置多个 key mget k1 k2 k3。。。 # 获取多个 key 的值 # 设置多个key和 setnx 类似所有的 key 都不存在是才会设置成功有一个 key 存在所有的 key 都会设置失败。(有点原子性效果) msetnx k1 v1 k2 v2 k3 v3 getrange key start end # 截取 value 的值start、end是下标类似substring。前后都包含 setrange key index value # 设置 key 从 index 下标使用 value 开始覆盖对应下标的值 setex key time(秒) value # 在设置key的时候设置过期时间 getset key value # 设置key新值的同时返回旧值String 类型应用场景 缓存数据例如缓存用户登录信息数据。计数器记录网站访问量、点赞数量、api 调用次数等等。验证码存储用户登录验证码设置一个过期时间。限流设置记录每个 API 对应人访问量以此来做限定。分布式锁可以使用 setnx expire 来实现分布式锁。分布式 session对应分布式系统可以使用 redis 对 session 进行一个集中管理。 String 底层数据结构 ​ 底层是一个动态的字符串可以修改的字符串。类似 Java 的 ArrayList采用分配冗余空间减少内存的频繁分配。有一个 len 的阈值当操作这个阈值就会进行扩容容量小于 1MB 时会双倍扩容大于 1MB 之后每次扩容只会增加 1MB最大容量是 512MB。 2.2、List Listredis 列表是简单的字符串列表按照插入顺序进行排序可以在头和尾部添加数据。它底层就是一个循环链表在两端操作数据性能较好但是通过下标检索元素性能较差。 # 常见命令 lpush/rpush key v1 v2 ... # 向 key 的左边/右边添加多个值如果key不存在就新增key lpop/rpop key [count] # 从左边/右边去除元素没有指定count就是一个指定 count 是几个就移除几个 lrange key start end # 获取列表中的元素从start开始到end结束0 -1表示获取全部元素 rpoplpush key1 key2 # 从 key1 的右边去除一个元素然后加入到 key2 的左边 blpop key [timeout] # 阻塞行为在移除元素时其它客户端会进行阻塞如果 list 没有元素了就会一直阻塞timeout 就是设置阻塞时间# 其它命令 lindex key index # 获取 key 对应 index 下标的元素 llen key # 获取 list 的长度 lrem key n value # 从左边删除 n 个值为 value 的元素 ltrim key start end # 截取下标中 start 到 end 的元素然后重新给列表赋值 lset key index value # 将下标 index 的元素设置为新的元素 linsert key before/after v1 newvalue # 在元素 v1 的前面/后面新增一个 newvalue 元素如果存在多个 v1 会在第一个左右进行操作List 实际应用 lpush lpop可以作为一个栈先进后出。lpush rpop可以作为一个队列先进先出。lpush brpop可以用于消息队列但是存在消息数据丢失问题。(服务把消息取出来但是在处理时服务出现服务失败这样消息就会丢失)订阅号功能例如微信订阅号功能可以使用 list 去存储用户关注订阅号发布的文章 id。 底层数据结构 ​ 底层使用的是快速链表。在元素比较少的情况下会使用一段连续空间存储ziplist(压缩链表)。在元素较多的情况下才会改成快速链表普通链表需要一部分内存空间去存放指针redis 将压缩链表和快速链表结合起来了将多个压缩链表使用快速链表的方式链连接起来这样就不会出来空间冗余。 2.3、Hash hash是一个键值对集合一个 String 类型的 filed 和 value 的映射表适合存储对象类似 java 中的 map。用户的 id 作为 key存储 value 就是用户的信息。 hset key filed value [filed value] # 添加 hash 集合filed - value 一一对应 hget key filed # 获取 filed 对应的 value hexists key filed # 判断 key 的 filed 是否存在1存在0不存在 hkeys key # 查看 key 中所有的 filed hvals key # 查看 key 中 filed 对应的所有value hincrby key filed value # 给filed值加上value(都要为数字) hsetnx key filed value # 添加一个key中不存在的filed如果存在就添加不成功hgetall key # 获取 hash 的所有 filed 和 value hdel key filed # 删除指定 key 的 filed 属性 hlen key # 获取 key 中 filed 的数量hash 应用场景 购物车场景可以将用户购物车信息以 hash 存储。 抢购优惠卷。 2.4、Set Set是一个元素不可重复的列表是一个 String 类型的无序集合底层是一个 value 为 null 的 hash 表查询、添加、删除的复杂度都是 O(1)。 # 常用命令 sadd key v1 v2 v3 。。。 # 添加一个或多个元素元素不能重复 smembers key # 查看集合的所有元素 sismember key v # 判断v元素是否存在1表示存在0表示不存在 srem key v1 v2 ... # 删除集合中的元素 scard key # 获取集合元素的个数# 其它命令 srandmember key n # 从集合中随机取出n个值但是不会从集合中删除 spop key n # 随机从集合中移除n个元素并返回移除的元素 smove key1 key2 v1 # 将集合 key1 中的v1元素移到 key2 集合中原集合中元素会移除# 集合运算 sinter key1 [key2 key3..] # 返回key1集合与后面集合的 交集 sunion key1 [key2 key3..] # key1与其他集合的 并集 sdiff key2 [key2 key3..] # key1与其他集合的 差集 (在key1中有在其他集合中没有) sintercard num k1 k2 k3... [limit] # 统计 num 个集合的交集集合中不重复个数limit 显示多少个Set 应用场景 抽奖小程序spop 随机抽取中将号码。朋友圈点赞功能key 消息记录id然后每个人点赞都把对应用户 id 加到 set 中去。共同联系人可以采用 set 集合的集合运算找出两个人的共同好友也可以找出两个人之间可能认识的人。 2.5、Zset Zset对 set 进行一个增强每一个元素都关联了一个 score (评分)。集合根据评分来排序集合集合中成员是唯一但是评分可以不唯一。 # 常用命令 zadd key score1 value1 score2 value2... # 新增集合中元素并携带score zrange key start end [withscores] # 获取集合中start到end下标的元素如果后面添加withscores会将元素的score也会返回到集合中默认返回升序排列 zrevrange key start end [withscores] # 相当于是倒叙输出元素 # 如果有 () 表示不包含边界值 zrangebyscore key [(]min max[)] [limit] i1 i2 # 获取到评分在min和max之间的元素limit 相当于是从下标 i1 开始向后移动i2个元素 zrevrangebyscore key [(]min max[)] [limit] i1 i2 # 将评分在这个区间的元素按照从大到小进行排序 zsocre key 元素 # 获取元素对应分数 zcount key min max # 统计在这个区间元素的个数 zrem key v1 v2 ... # 从集合中删除元素 zincrby key incr value # 把 value 值对应的评分增加 incr zrank key value # 返回value在集合中排名从0开始 zrevrank key value # 分数倒序排列value值排第几# 7.0以后版本有的命令 zmpop num1 [key...] min count num2 # 把num1个set集合中每个集合移除num2个最小分数的元素ZSet 应用场景 延时队列将 socre 设置成需要执行的时间戳按照时间戳进行排序后在循环去消费第一个消息可以达到延时执行的效果。没有ACK机制可能出现消息丢失排行榜将排列时间作为 key排行作品 id 作为 member数量作为分数进行插入然后通过 zrevrange 和 zrange 取对应的值。限流可以使用一种滑动窗口策略将用户 id 作为 key访问时间戳作为 member 和 score我们只需要统计用户 id 在指定时间戳中的个数就可以得到对应访问频率然后与最大数进行比较。 2.6、BitMap Bitmap可以实现对位的操作底层是字符串本质是个数组数组由多个二进制位组成。可以看做成一个以 bit 为单位的数组只能存储 0 和 1数组的下标称为偏移量。 setbit key offset 0|1 # 设置一个key的bitmapoffset是偏移量值只能是0或1(如果偏移量过大整个初始化过程会比较慢所以一般在初 一个数字)下标从0开始 getbit key offset # 获取位对应偏移量的值没有设置对应偏移量的值就返回0 strlen key # 返回位图占用的字节数每8位一个字节例如占用了8为就返回1占用了9位就返回2 bitcount key [s,e] # 统计key中值为1的个数可以指定一个偏移量范围0 -1表示全部 bitop and|or|not|xor dest key1 key2 .... # 将这些集合进行求 和|或|非|异或 操作然后将结果放在dest集合中返回记录条数应用场景 签到场景偏移量今天是一年的第几天 % 今年天数key年份:用户id。统计用户活跃数将用户 id 作为偏移量来记录今天是否登录系统。实现布隆过滤器。 2.7、HyperLogLog HyperLogLog用来做基数统计(集合中不重复元素个数)的算法再输入元素数量很大时计算基数时所需要的的空间总是固定的并且很小只需要 12KB 左右它只计算出集合中元素不重复个数并不会记录这些元素存在 0.81% 的计算误差。 pfadd key e1 e2 .... # 添加元素如果基数发生变化返回1否则返回0 pfcount key # 计算集合中不重复元素个数即集合基数 pfmerge newkey k1 k2 ... # 将k1 k2 ... 的 HyperLogLog 合并成新的一个 HyperLogLog应用场景 统计网站实际访问数同一个访问 IP 记录成一个。统计网站在线人数实时 UV 数。统计用户每天搜索关键词个数只能记录个数不能记录具体关键词。 2.8、Geospatial Geospatial提供经纬度设置、查询、范围查询、距离查询经纬度 hash 等操作它是 zset 类型。 geoadd key 经度1 纬度1 name1 经度2 纬度2 name2 。。。 # 添加地理位置name是地理名称 # 经度范围-180到180纬度范围-85到85如果超出给定范围会返回一个错误重复名称不能添加 geopos key name # 返回name对应的经纬度信息 geodist key name1 name2 [m|km|ft|mi] # 返回这两个地理位置的直线距离默认单位是米也可以指定单位千米、英尺、英里 # withcoord把经纬度也返回withhash经纬度已hash编码形式返回withdist返回距离count num返回num条记录 georadius key 经度 纬度 radius [m|km|ft|mi] [withdist] [withcoord] [withhash] [count num] # 查询在给定经纬度为中心radius为半径中的元素 geohash key name # 将地点的经纬度值使用hash编码返回一个映射 georadiusbymembers key name 。。。。# 与georadius类似只不过是以指定name为中心2.9、Stream 实现消息队列支持消息持久化、生成全局唯一消息 ID、支持 ack 确认消息模式、支持消费组模模式是一种用于消息队列的数据类型。 名称解释Message Content消息内容它底层是一个消息链表将所有需要消费的消息串联起来每个消息都有一个唯一的 ID 与之对应。consumer group消费组通过 xgroup create 创建同一个消费组可以有多个消费者last_delivered_id游标每个消费组有一个游标任意一个消费者读取了消息都会使游标往前移动。consumer消费者pending_ids[ ]是一个数组用于存放消费者读取了消息但是没有返回 ack 确认码的消息 ID保证消息不被丢失 # 在消息队列尾部添加一条消息* 表示由redis生成i消息id也可以自己指定id但是必须比前一个消息id大后面的消息内容是hash结构 xadd key *|id filed1 value1 filed2 value2 .... # 获取消息- 表示最小表示最大count num表示返回多少条消息 xrange key start(-) end() [count num] # 反向获取消息注意下标位置反转 xrevrange key end() start(-) [count num] # 删除一条消息根据消息id删除 xdel key id # 统计消息条数 xlen key # 限制队列中消息的个数将id小的删除最多只保留num个消息 xtrim key maxlen num # 限制消息最小id比指定id小的消息全部删除 xtrim key minid id # 读取消息(默认非阻塞读取)从指定消息队列key1、key2.。中读取num条消息$表示当前最新消息00表示最旧的数据也可以根据指定消息id获取数据 xread count num streams key1 key2.. [$] [00] [id1 id2 ...] # 阻塞读取block表示阻塞读取millisecond表示阻塞时间如果是0表示一直阻塞 xread count num [block millisecode] streams key1 key2.. [$] [00] [id1 id2 ...] # 创建消费组指定消费哪一个队列以及指定消费组名称$表示从头开始消费0表示从尾部开始消费 xgroup create key groupName $|0 # 创建消费者并读取消息使用groupName组中的consumerName读取num条消息从key1、key2.。。消息队列中读取表示从尚未被消费的消息开始读取并且移动到下一位 xreadgroup group groupName consumerName [count num] streams key1 key2... 注意在 Stream 中的消息队列如果被任意消费组中的任意一个消费者消费后这个组的其它消费者都不能再对这条 消息进行消费游标位置会移动到下一位但是其它消费组中的消费者依然可以读取消息。 消费者 ACK 机制每个消费组读取消息后都会把消息 id 备份到 pending_ids[ ] 数组中以防止客户端消息处理失败导致消息丢失只有当客户端执行 xack 命令时才会将消息进行擦除。 # 查看消费组中消息读取但是未 ack 确认的消息可以指定查看的条数以及指定消费组未 ack 的消息 id xpending key groupName [start end count consumerName] # 向消息队列发送消息被消费的 ack指定消费组中信息id对应消息被消费 xack key groupName id1 id2.。。。 # 打印stream的信息 xinfo stream key3、Redis 持久化 Redis 的数据是保存在内存中一旦服务器宕机数据就会全部丢失这就需要将数据进行持久化到磁盘中。在 Redis 中有两种持久化方式RDB (Redis DataBase)、AOF (Append Of File)。 3.1、RDB RDB在指定的时间间隔内将内存数据的快照写到磁盘中恢复时将磁盘中的文件读取到内存可以在配置文件中修改 RDB 的持久化策略在 Redis 中默认使用 RDB 持久化方式 # 以下是 Redis 默认的 RDB 持久化策略也可以自定义# Redis6.2之前 save 900 1 # 900秒之内发生了一次key变化进行持久化 save 300 10 # 300秒之内发生了10次key变化进行持久化 save 60 10000 # 60秒之内发生了10000次key变化进行持久化# Redis6.2之后将持久化频率做了改变 save 3600 1 300 100 60 10000 # 1小时、5分钟、1分钟# 禁用 rdb 持久化只需要配置成空串即可 save 自动触发 RBD主要是通过配置文件去修改持久化策略以及 rdb 文件名称和保存路径修改配置文件后重启服务。 # 设置频率 save 5 1 # 指定 rdb 文件保存路径这个路径需要提前创建 dir /opt/data # 指定文件名称 dbfilename 名称# 其它配置 stop-writes-on-basave-error yes # 当持久化出现错误时redis就停止写操作 rdbcompression yes # 对于持久化的文件是否进行使用LZF算法进行压缩 rdbchecksun yes # 使用CRC64算法进行数据检查手动触发 RDB使用 save 和 bgsave 命令来手动进行数据持久化。 # 这个命令会阻塞 redis 线程只有当持久化成功后才会继续缓存数据(生产中不能使用) save # 会异步进行持久化操作 这个就会 fork 子进程去持久化数据 bgsave # 获取上一次持久化时间返回时间戳 lastsave持久化流程Redis 会单独创建一个子线程 (fork) 进行持久化先将数据写到一个临时文件等持久化完成以后用这个临时文件去替换上次持久化完成的文件。 优点 适合数据备份以及一些大规模数据恢复工作。RDB 文件在内存中加载速度比 AOF 快很多。 缺点 在一定时间时间做一次备份这时 Redis 意外宕机就可能出现丢失当前时间与最新一次持久化期间的数据。如果 redis 中数据量大在 fork 子进程时会导致占用的 CPU 资源过多内存数据会拷贝一份需要2倍数据膨胀可能导致服务性能降低。 修复 RDB 文件在 REB 文件迁移的过程中可能存在文件损坏可以通过 redis 提供的修复命令去修复对应文件。 # 在 /usr/local/bin 目录下使用 redis-check-rdb 修复文件路径3.2、AOF AOF以日志的形式记录每一个写操作将 Redis 执行过程中的所有写指令记录下来只许追加到文件上不能修改文件Redis重启会读取日志文件重新构建数据就是将日志中的执行从前到后执行一次来恢复数据工作。 # aof默认是不开启的需要在配置文件中手动开启而 RDB 默认开启的 appendonly yes # 开启aof持久化 appendfilename appendonly.aof # 持久化保存文件保存路劲在启动目录下注意如果 RDB 和 AOP 同时开启redis 默认使用 AO F持久化启动 redis 就会默认使用 appendonly.aof 进行数据初始化。 AOF 工作流程 当执行写命令时并不是直接写入 AOF 文件而是先写到 AOF 命令缓冲区当命令达到一定数量然后在 IO 到磁盘文件上这样避免了频繁 IO 操作。AOF 缓冲区会根据同步文件三种写回策略将命令同步到磁盘 AOF 文件。避免写入的 AOF 文件过大AOF 又会根据规则将命令进行合并(AOF 重写)从而达到 AOF 文件压缩目的。 三种写回策略 Always同步写回每个写命令执行完后立刻将日志写到磁盘文件上。(命令不丢失IO 频繁)everysec每秒写回写命令执行完会把日志写到 AOF 缓存区然后每隔 1s 将缓存区的命令写到磁盘。默认no操作系统写回写命令执行完后会将日志写到 AOF 缓存区由操作系统决定何时将缓冲区内容写到磁盘。 优点更好保护数据不丢失、性能高、可做紧急恢复。 缺点相比较 RDB 数据集文件要大恢复速度比 RDB 慢一些。 Redis 7 Muti Part AOF 设计在 Redis7 之前的 AOF 文件有且只有一个但是在 Redis7 之后底层做了改变采用多个文件来进行 AOF 持久化Redis 重写后文件名称就可能发生改变。 Base表示基础 AOF由子进程重写产生这个文件只有一个。Incr表示增量 AOF在进行重写时被创建可能存在多个这个文件用来记录数据。manifest清单文件管理、跟踪 AOF 文件。 AOF 文件异常修复在进行 AOF 持久化时可能存在AOF文件损坏这样备份文件在初始化数据时就会出错。通过 Redis 提供的redis-check-aof --fix进行文件修复在 /usr/local/bin 目录下使用 # 重启redis就可以修复文件了 redis-check-aof --fix aof文件保存路径 AOF 压缩重写AOF 采用文件追加方式文件可能会越来越大这时就增加了压缩重写当文件大小达到一个阈值时就会将文件进行重写只保留恢复数据的最小指令集。 # 重写就是将之前多个操作重写成一个操作但是最后的结果是一样的 set k1 v1 set k2 v2# 就会将上面的重写成一个操作 set k1 v1 k2 v2 # redis 配置文件 no-appendfsync-on-rewrite yes # (不阻塞同步)不写入 aof 文件而是写入缓存相当于不重写请求不会阻塞但是在这期间服务器宕机缓存数据会丢失 no-appendfsync-on-rewrite no # 进行重写重写时请求可能出现阻塞(阻塞同步)auto-aof-rewrite-min-size 64mb # 阈值为 64mb auto-aof-rewrite-percentage 100 # 当操作阈值的100%就会进行重写相当于是文件大小操作128mb就会重写# AOF 没有到达重写阈值可以手动进行重写操作 bgrewriteaof 4、Redis 事务 Redis事务是一个单独的隔离操作事务中的所有命令都会序列化、按照顺序执行事务执行过程中不会被其他客户端发来的命令打断。串联多个命令防止别的命令插队。在 Redis 中事务有三个基本命令multi、exec、discard。 multi开始事务进行命令组队接下来的所有命令都会按照顺序进行排队。exec执行命令将排队的命令按照排队顺序执行。discard回滚操作将刚才执行的命令取消并恢复原来的数据在exec执行前执行。 事务处理几种情况 所有命令按照排列顺序正常执行。放弃事务执行在 exec 命令之前使用 discard 命令会取消所有队列中的命令。命令在排队时 (exec 之前) 执行失败 (语法错误)在执行 exec 时会提示错误所有命令都不会执行成功。在使用 exec 命令执行时可能出现命令执行错误对于错误命令会执行不成功但是其它可以执行成功的命令可以成功。部分原子性 watch 命令在执行multi命令之前先执行watch key1 key2 ... 如果在事务执行前这些key的值被其它操作修改那么这个事务就会被打断。在事务中如果几个操作去操作同一条数据可能会发生数据不同步的问题Redis 使用乐观锁机制来防止事务冲突问题。 注意unwatch命令可以取消对所有key的监视客户端断开连接客户端对 Redis 的所有 key 的监控也会取消。 5、Redis 管道 Redis 是一种 C/S 架构每次请求都会建立 TCP 连接为了减少客户端与服务器端的连接Redis 管道可以将命令进行打包进行批量操作只请求一次并且只会获取一次返回结果这样减少连接资源消耗提高性能。pipeline 管道实现原理就是一个队列先进入命令先执行。 注意 pipeline 管道是非原子性的支持不同数据类型的命令。pipeline 组装命令不能太多太多命令可能使客户端阻塞太久而导致服务端被迫发送一个队列等待的响应应该将多个命令拆分成小段进行执行。pipeline 与 事务都是一些列命令但是事务是将命令一条一条发送执行 exec 时才会真正执行命令而管道是一次将所有命令发送。事务会阻塞其它命令执行而管道不会阻塞。 6、Redis 发布与订阅 Redis 的发布与订阅是一种消息的通讯模式发送者发送消息订阅者接收消息。Redis 客户端可以订阅任意数量的频道。如果发布者发布的频道被其他客户端订阅了频道那么在发布的时候订阅的客户端就会接收到消息。 subscribe 频道名1 频道名2 。。。。 # 订阅者订阅多个频道 publish 频道名 message # 发布者通过频道发布消息7、Redis 主从复制 主从复制主机更新数据后会根据配置和策略自动同步到备机上(master/slaver)。master 以写为主slaver 以读为主。(只能一主多从不能多主但是可以配置集群) 读写分离这样应用可以将读和写操作分开减少redis服务压力提高性能。容灾快速恢复如果主机或者某一个从机发生故障可以根据策略从其它从机上读取数据 7.1、Redis 主从复制 info replication # 查看 redis 服务信息# 手动配置主从关系重启 redis 失效一般配置到 redis.conf 中 replicaof 主库ip 主库端口 # 在从机上使用命令指定从机对应的主机 slaveof 主库ip 主库端口 # 修改主机对应的从机 slaveof no one # 去除从机性质改成主机配置文件(配从机不配主机)只需要配置从机配置文件它就会去连接主机然后同步主机数据。 slaveof 主机ip port # 从机配置文件上连接主机 masterauth xxxx # 主机的密码注意主机上可以进行读写操作但是从机上只能读取不能写入写入就会报错。当从机宕机后主机会感知到会移除这个从机。如果主机宕机后从机会感知但是不会移除主机只是主机状态变成下线状态当主机重新上线从机依然会连接到主机。 薪火相传一个主机下可以有多个从机一个从机下也可以有多个从机。主机每次同步数据时只会同步给主机下的从机然后再由从机同步给它下面的从机。(中间服务器也不能写操作) 反客为主当主机服务宕机以后在这个主机下的所有从机都会升级为主机而从机下的从机不会发生变化。( ) # 当主机宕机需要手动使用命令将从机变成主机 slaveof no one主从复制执行流程 从机启动后会向主机发送一个同步数据命令同步主机全部数据(全量复制)从机自身的数据全部会被覆盖掉。主机收到从机同步命令时会触发 RDB 持久化操作在持久化完成后将 RDB 快照文件发送给从机从机就会根据文件进行初始化。主机会默认 10s 给从机发送心跳检测保持通信。在主机上进行写命令是会自动同步给从机。从机下线后重连重连都会全量进行复制主机数据。 注意主从架构有致命缺点主机同步数据到从机上可能出现网络延迟情况。如果主机宕机那么这个缓存系统就不能进行写操作只能从机读取数据。 7.2、Redis 哨兵模式 哨兵模式反客为主的自动版能够自动监控主机是否故障如果故障了就会根据投票数高的从机转换成主机。哨兵机器不存放数据只是一个监控者。 主从监控监控主从 Redis 是否正常运行。消息通知哨兵可以将故障转移的结果发送给客户端。故障转移如果主机宕机可以根据投票策略将主机下票数高的从机升级成主机。配置中心客户端通过连接哨兵获取当前 Redis 服务的主节点地址。 配置哨兵在监控主机上也需要配置密码不然故障转移旧主机不能连接到新主机。 # 配置哨兵在安装目录下配置 sentinel.conf 文件中配置哨兵 daemonize yes # 后台启动 port 26379 # 端口 # mymaster是给监控对象起的名称x是确认客观下线至少有几个哨兵同意迁移才能迁移(一个主机可以有多个哨兵一个哨兵可以监听多个主机) sentinel monitor mymaster 监视的主机ip 端口 x sentinel auth-pass 监视名称 xxxx(监视对象的密码) # 主节点在 milliseconds 没有回答哨兵哨兵认为主机下线默认 30s sentinel down-after-milliseconds master-name milliseconds # 故障转移超时时间操作这个时间转移失败默认3分钟 sentinel failover-timeout mymaster 180000# 启动哨兵 redis-sentinel 哨兵配置文件路径 # 或者 redis-server 哨兵配置文件路径 --sentinel主观下线(SD)是指某个哨兵认为 Redis 服务不再正常工作但是没有与其它哨兵达成共识。 客观下线(OD)在 sentinel 集群中可能出现网络抖动导致某个哨兵不能及时收到心跳包可能误以为主机下线。X 参数就是指定在指定集群中认为主机下线的哨兵数至少达到 X 才会认为主机下线才会进行故障转移。 注意启动了 Sentinel 哨兵后在哨兵的配置文件中会自动生成对应的配置。 主机下线从机上位模拟主机下线从机是否能够升级成主机。 主机下线通过投票从从机选取新的主机并且新主机和从机数据不会丢失。启用 sentinel 模式后主机记录其它从机信息旧主机重新上线后会作为从机跟在新主机的后面(旧主机需要配置密码)新主机的配置文件也会发生一定变化。发生主机切换哨兵的配置文件被重写对应配置。 哨兵运行流程 先某个 sentinel 哨兵对主机进行了主观下线任务主机异常。然后与 Sentinel 集群中的其它哨兵进行达成共识判断是否达到客观下线的值。如果达到客观下线的值在 Sentinel 集群中根据 RSTF 选举算法进行故障转移的哨兵的选举。选举出来的哨兵领导者从 slaver 中选举出行的主机并进行故障转移。在故障转移的过程中可能出现数据丢失的情况。 新 master 选举原理 配置从机优先级在从机的 redis.conf 文件中。 # 设置从机优先级数字越小优先级越高默认是100 replica-priority 100 选择偏移量大的从机数据上如果和主机同步率越高越先被选择。在每次启动 Redis 服务都会产生一个 UUID (40位)UUID 越小越先被选择。 8、Redis 集群 ​ 在实际开发中Redis可能出现内存容量不够以及在并发操作下性能提高的需求另外在薪火相传、反客为主、主机宕机哨兵选择新主机情况下导致 ip 地址的变化。在 Redis3.0 之前是通过代理主机的方式解决3.0之后通过无中心化的集群配置来解决提到的问题。 代理主机所有的应用请求都请求到代理主机上然后代理主机来分发请求到各个 Redis 主机上然后得到的数据返回给应用。 无中心化集群每一个主机都可以作为请求的入口当请求来了后会到请求主机上查询数据如果有这条数据就返回没有就会去其他主机上查询主机与主机之间是相通的。 ​ Redis 集群实现了 Redis 的水平扩容启动 N 个 Redis 节点那么整个数据都会分布存储存在这 N 个节点中每一个节点储存总数据的 1/N 。即使某个 Redis 节点失效并无法进行通讯集群中的其它节点依然可以处理请求多个 Redis 节点之间共享数据的程序集。 8.1、Redis 集群理论 ​ 在Redis集群中应该至少有三个节点并且按照每一个主机在不同的服务器(不同ip)上每个从机不和自己主机在同一服务器上这样就保证了当主机宕机以后从机能快速接替主机继续进行工作。 slot (插槽)在集群搭建成功后集群中会有16384个插槽集群中每一个节点会负责一部分插槽。在每次向集群中插入 key 时会根据 CRC16(key) mod 16384 去计算 key 对应的插槽然后找到对应处理主机进行处理理论上 Redis 集群节点可以达到16384个每一个节点管理一个槽位但是官方建议集群节点不超过1000个。 分片使用 Redis 集群时我们会把数据分散储存到多台 Redis 机器上集群中每一个节点都是一个片区。 写操作根据 key 进行CRC16(key) mod 16384算出存储在哪个节点上(哪个分片)。读操作会根据 key 的 确定性哈希函数找到 key 在写操作时写入的片区然后去这个节点上去找。 分片算法 哈希取余算法hash(key) mod 节点数量直接通过 key 去取余节点数量这种方式简单粗暴但是存在一定问题。 如果某一个节点宕机那么取余的节点数量需要手动去修改不然后映射到宕机主机上。对于节点扩容、缩容不方便节点增加和删除可能出现 key 与原来的映射不一致问题取余结果会重新洗牌。 一致性哈希算法将集群中各个节点 IP 映射到 hash 环上每个节点就能确定在 hash 环上的位置。在 key 映射时根据同一个 hash 算法也确定在 hash 环上的位置然后再顺时针查找在 hash 环上的节点位置然后就近落在这个节点上。 一致性问题解决了节点宕机导致需要重新计算 hash 值即使某个节点出现问题它也可以继续顺时针向下找下一个节点。当节点比较少时可能出现数据倾斜问题当两个节点相距很近数据可能大量会在某一个节点上处理。 哈希槽分区算法在数据与节点之间添加一层槽用于管理数据与节点之间的关系相当于多个 key 存放在槽里面多个槽对应一个具体节点。先使用 CRC16(key) mod 16384查找到对应的槽然后再根据槽位找到对应节点。 为什么插槽数数量是16384个 因为在集群节点心跳包传输中插槽如果是16位(65535)传输插槽需要的数据为8kb(16384)而14位需要的只有2kb。Redis 集群节点数量基本不可能超过1000个16384个槽位够用。 8.2、Redis 集群搭建 8.2.1、Redis 三主三从集群搭建 redis 配置文件 # 开启集群 cluster-enabled yes # 节点配置名 cluster-config-file nodes-6379.conf # 节点失联时间超过这个时间自动主从切换 cluster-node-timeout 15000注意在 redis 配置文件中需要去掉主从以及哨兵的配置信息。 启动所有 redis 服务进行 redis 安装路径下的 src 目录下使用命令 # 确保 redis 的 nodes-6379.conf 文件都生成了 # 将所有的 redis 合成一个集群只需要使用下面命令 redis-cli --cluster create --cluster-replicas 1 各个主机的真实ip地址(使用空格隔开127.0.0.1:6379) -a 密码 # redis-cli --cluster create --cluster-replicas 1 192.168.32.135:6379 192.168.32.136:6379 192.168.32.137:6379 192.168.32.138:6379 192.168.32.139:6379 192.168.32.140:6379 -a liuhongjun # 1表示一台主机有一台从机它会自动分配注意事项 集群配置了密码需要在启动命令后添加 -a 参数指定连接密码。集群启动需要清除 Redis 中所有数据包括 RDB、AOF、node配置文件。 测试集群 # 因为是集群所以可以通过任何一个主机连接到服务上-c是集群连接 redis-cli -c -h ip -p port -a password # 连接后使用命令 cluster nodes # 查看集群中所有节点信息有myself字样的服务是当前连接 集群常用操作 # 在集群中不能使用一次添加多个key的命令例如mset k1 v1 k2 v2....需要使用分组方式来进行批量添加 mset k1{name} v1 k2{name} v2 # name是分组名会根据name计算slot值然后进行处理cluster keyslot key # 计算这个key的插槽值不存在的key也可以计算 cluster countkeysinslot 插槽值 # 查看当前主机范围slot下key的数量 cluster getkeysinslot 插槽值 n # 返回插槽值下n个key也只能返回自己主机上的keys# 配置文件中集群某一节点不可用系统是否还继续使用yes:不可用no:可用 cluster-require-full-coverage yes|no 故障恢复集群中某个主机节点宕机那么它的从机马上替换主机成为新的主机继续处理请求就主机修复后启动会作为新主机的从机。(15秒超时自动断开连接)如果需要继续保持之前的从属关系需要在旧主机上执行 cluster failover命令。 ​ 如果集群中某一段插槽节点的主机从机都宕机了 可以通过在 redis.conf 配置cluster-require-full-coverage来设置如果值是yes 表示某一节点宕机那么整个集群将不可用如果值为 no表示宕机的节点的插槽不可用不可写入但是其它插槽依然可以继续使用。 8.2.2、Redis 集群扩容缩容 扩容在某些高并发峰值时需要增加节点保证系统可用但是新加入节点没有对应插槽这就需要将之前的槽位重新计算。 新建一个 Redis 节点并且启动服务。节点加入到集群中。 # 需要一个集群中存在节点作为介绍人 redis-cli -a 密码 --cluster add-node 新节点ip:端口 介绍人ip:端口 # redis-cli -a liuhongjun --cluster add-node 192.168.32.141:6379 192.168.32.135:6379当前新加入的节点还没有分配插槽需要其它主节点余一点插槽给新节点并重新计算。 # 由一个已分配槽位节点重新分配槽位 redis-cli -a 密码 --cluster reshard 已分配插槽节点ip:端口 # redis-cli -a liuhongjun --cluster reshard 192.168.32.136:6379新增从机节点到集群中。 # 新增从机节点到集群中并同步某个主机数据 redis-cli -a 密码 --cluster add-node 新节点ip:端口 介绍人ip:端口 --cluster-slave --cluster-master-id 同步主机节点id # redis-cli -a liuhongjun --cluster add-node 192.168.32.142:6379 192.168.32.135:6379 --cluster-slave --cluster-master-id 12131b05f5014a88e1199c1c6ef60b30ea3375c0缩容当流量顶峰过去可以减少机器节约成本。 先删除主机下的从节点。 redis-cli -a 密码 --cluster del-node 节点ip:端口 节点ID # redis-cli -a liuhongjun --cluster del-node 192.168.32.142:6379 25a9accc9cf8d24265382db3e3ef102ae3268332 # 检查 redis 集群状态 redis-cli -a liuhongjun --cluster check 某一节点ip:端口 # redis-cli -a liuhongjun --cluster check 192.168.32.135:6379将主机插槽重新分配插槽重新分配后数据也会迁移移除插槽后这个主机会变成目标主机的从机然后需要再执行删除从机操作。 redis-cli -a 密码 --cluster reshard 已分配插槽节点ip:端口 # redis-cli -a liuhongjun --cluster reshard 192.168.32.136:6379# 删除从机 redis-cli -a 密码 --cluster del-node 节点ip:端口 节点ID # redis-cli -a liuhgonjun --cluster del-node 192.168.32.141:6379 12131b05f5014a88e1199c1c6ef60b30ea3375c09、 Spring Boot 整合 Redis dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactIdversion2.6.8/version /dependency dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactIdversion2.11.1/version /dependency9.1、整合单机版 修改 yaml 配置文件 spring:redis:host: 101.43.29.221port: 6001password: XXXXdatabase: 0 # 库下标0-15timeout: 1800 # 连接超时时间lettuce:pool:max-active: 20 # 最大连接数max-wait: -1 # 最大阻塞时间-1没有限制max-idle: 5 # 最大空闲连接min-idle: 0 # 最小空闲连接创建 Redis 序列化配置类配置这个类后会对 key 和 value 进行序列化如果不序列化可能出现中文乱码问题。 Configuration EnableCaching // 开启缓存 public class RedisConfig extends CachingConfigurerSupport {Beanpublic RedisTemplateString,Object redisTemplate(RedisConnectionFactory factory){RedisTemplateString, Object redisTemplate new RedisTemplate();RedisSerializerString stringRedisSerializer new StringRedisSerializer();Jackson2JsonRedisSerializer jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);redisTemplate.setConnectionFactory(factory);// key序列化redisTemplate.setKeySerializer(stringRedisSerializer);// value序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// hashmap序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);return redisTemplate;}Beanpublic CacheManager cacheManager(RedisConnectionFactory factory){RedisSerializerString stringRedisSerializer new StringRedisSerializer();Jackson2JsonRedisSerializer jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class);// 解决缓存转换异常ObjectMapper objectMapper new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);// 配置序列化解决乱码问题过期时间600秒RedisCacheConfiguration cacheConfiguration RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofSeconds(600)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer)).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)).disableCachingNullValues();return RedisCacheManager.builder(factory).cacheDefaults(cacheConfiguration).build();} }测试通过 RedisTemplate 就可以对 Redis 进行操作 RestController public class RedisController {Autowiredprivate RedisTemplateString,Object redisTemplate;GetMapping(/)public String test1(){redisTemplate.opsForValue().set(user:10001,10001);String login (String) redisTemplate.opsForValue().get(user:10001);System.out.println(login);return login;} }9.2、整合集群版 编写 yaml 配置 spring:cache:redis:time-to-live: 10000redis:timeout: 5000database: 0cluster:nodes: 各个节点ip:port (127.0.0.1::6379)多个主机使用英文逗号隔开max-redirects: 3lettuce:pool:max-active: 20 # 最大连接数max-wait: -1 # 最大阻塞时间-1没有限制max-idle: 5 # 最大空闲连接min-idle: 0 # 最小空闲连接password: XXXX主机宕机当 key 对应插槽节点出现宕机然后从机上位充当主机但是 java 客户端不能动态感知集群中节点变化就出现节点连接失败问题需要在配置中打开 lettuce 的动态感知配置。 lettuce:cluster:refresh:# 支持集群动态感应刷新adaptive: true# 定时刷新period: 2000封装 RedisTemplateService 工具类 Service Slf4j public class RedisTemplateService {Autowiredprivate RedisTemplateString, Object redisTemplate;/*** 指定缓存失效时间** param key 键* param time 时间(秒)*/public boolean expire(NonNull String key, long time) {try {if (time 0) {redisTemplate.expire(key, time, TimeUnit.SECONDS);}return true;} catch (Exception e) {log.error(exception when expire key {}. , key, e);return false;}}/*** 根据key获取过期时间** param key 键 不能为 null* return 时间(秒) -1 代表为永久有效 -2 代表已失效*/public long getExpire(NonNull String key) {Long expire redisTemplate.getExpire(key, TimeUnit.SECONDS);OptionalLong optional Optional.ofNullable(expire);return optional.orElse(-2L);}/*** 判断key是否存在** param key 键* return true 存在 false不存在*/public boolean hasKey(String key) {return Boolean.TRUE.equals(redisTemplate.hasKey(key));}/*** 删除缓存** param key 可以传一个值 或多个*/SuppressWarnings(unchecked)public void del(String... key) {if (key ! null key.length 0) {if (key.length 1) {redisTemplate.delete(key[0]);} else {redisTemplate.delete((CollectionString) CollectionUtils.arrayToList(key));}}}/*** 普通缓存获取** param key 键* return 值*/public Object get(String key) {return key null ? null : redisTemplate.opsForValue().get(key);}/*** 普通缓存放入** param key 键* param value 值* return true成功 false失败*/public boolean set(String key, Object value) {try {redisTemplate.opsForValue().set(key, value);return true;} catch (Exception e) {log.error(exception when set key {}. , key, e);return false;}}/*** 普通缓存放入并设置时间** param key 键* param value 值* param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期* return true成功 false 失败*/public boolean set(String key, Object value, long time) {try {if (time 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {set(key, value);}return true;} catch (Exception e) {log.error(exception when set key {}. , key, e);return false;}}/*** 递增** param key 键* param delta 要增加几(大于0)*/public long incr(String key, long delta) {if (delta 0) {throw new RuntimeException(递增因子必须大于0);}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** param key 键* param delta 要减少几(小于0)*/public long decr(String key, long delta) {if (delta 0) {throw new RuntimeException(递减因子必须大于0);}return redisTemplate.opsForValue().increment(key, -delta);}/*** HashGet** param key 键 不能为null* param item 项 不能为null* return 值*/public Object hGet(String key, String item) {return redisTemplate.opsForHash().get(key, item);}/*** 获取hashKey对应的所有键值** param key 键* return 对应的多个键值*/public MapObject, Object hMGet(String key) {return redisTemplate.opsForHash().entries(key);}/*** HashSet** param key 键* param map 对应多个键值* return true 成功 false 失败*/public boolean hMSet(String key, MapString, Object map) {try {redisTemplate.opsForHash().putAll(key, map);return true;} catch (Exception e) {log.error(exception when hash set key {}. , key, e);return false;}}/*** HashSet 并设置时间** param key 键* param map 对应多个键值* param time 时间(秒)* return true成功 false失败*/public boolean hMSet(String key, MapString, Object map, long time) {try {redisTemplate.opsForHash().putAll(key, map);if (time 0) {expire(key, time);}return true;} catch (Exception e) {log.error(exception when hash set key {}. , key, e);return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** param key 键* param item 项* param value 值* return true 成功 false失败*/public boolean hSet(String key, String item, Object value) {try {redisTemplate.opsForHash().put(key, item, value);return true;} catch (Exception e) {log.error(exception when hash set key {}, item {} , key, item, e);return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** param key 键* param item 项* param value 值* param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间* return true 成功 false失败*/public boolean hSet(String key, String item, Object value, long time) {try {redisTemplate.opsForHash().put(key, item, value);if (time 0) {expire(key, time);}return true;} catch (Exception e) {log.error(exception when hash set key {}, item {} , key, item, e);return false;}}/*** 删除hash表中的值** param key 键 不能为null* param item 项 可以使多个 不能为null*/public void hDel(String key, Object... item) {redisTemplate.opsForHash().delete(key, item);}/*** 判断hash表中是否有该项的值** param key 键 不能为null* param item 项 不能为null* return true 存在 false不存在*/public boolean hHasKey(String key, String item) {return redisTemplate.opsForHash().hasKey(key, item);}/*** hash递增 如果不存在,就会创建一个 并把新增后的值返回** param key 键* param item 项* param by 要增加几(大于0)*/public double hincr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, by);}/*** hash递减** param key 键* param item 项* param by 要减少记(小于0)*/public double hdecr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, -by);}/*** 根据key获取Set中的所有值** param key 键*/public SetObject sGet(String key) {try {return redisTemplate.opsForSet().members(key);} catch (Exception e) {return null;}}/*** 根据value从一个set中查询,是否存在** param key 键* param value 值* return true 存在 false不存在*/public boolean sHasKey(String key, Object value) {try {return redisTemplate.opsForSet().isMember(key, value);} catch (Exception e) {return false;}}/*** 将数据放入set缓存** param key 键* param values 值 可以是多个* return 成功个数*/public long sSet(String key, Object... values) {try {return redisTemplate.opsForSet().add(key, values);} catch (Exception e) {return 0;}}/*** 将set数据放入缓存** param key 键* param time 时间(秒)* param values 值 可以是多个* return 成功个数*/public long sSetAndTime(String key, long time, Object... values) {try {Long count redisTemplate.opsForSet().add(key, values);if (time 0)expire(key, time);return count;} catch (Exception e) {return 0;}}/*** 获取set缓存的长度** param key 键*/public long sGetSetSize(String key) {try {return redisTemplate.opsForSet().size(key);} catch (Exception e) {return 0;}}/*** 移除值为value的** param key 键* param values 值 可以是多个* return 移除的个数*/public long setRemove(String key, Object... values) {try {return redisTemplate.opsForSet().remove(key, values);} catch (Exception e) {return 0;}}/*** 获取list缓存的内容** param key 键* param start 开始* param end 结束 0 到 -1代表所有值*/public ListObject lGet(String key, long start, long end) {try {return redisTemplate.opsForList().range(key, start, end);} catch (Exception e) {return null;}}/*** 获取list缓存的长度** param key 键*/public long lGetListSize(String key) {try {return redisTemplate.opsForList().size(key);} catch (Exception e) {return 0;}}/*** 通过索引 获取list中的值** param key 键* param index 索引 index0时 0 表头1 第二个元素依次类推index0时-1表尾-2倒数第二个元素依次类推*/public Object lGetIndex(String key, long index) {try {return redisTemplate.opsForList().index(key, index);} catch (Exception e) {return null;}}/*** 将list放入缓存** param key 键* param value 值*/public boolean lSet(String key, Object value) {try {redisTemplate.opsForList().rightPush(key, value);return true;} catch (Exception e) {return false;}}/*** 将list放入缓存** param key 键* param value 值* param time 时间(秒)*/public boolean lSet(String key, Object value, long time) {try {redisTemplate.opsForList().rightPush(key, value);if (time 0)expire(key, time);return true;} catch (Exception e) {return false;}}/*** 将list放入缓存** param key 键* param value 值*/public boolean lSet(String key, ListObject value) {try {redisTemplate.opsForList().rightPushAll(key, value);return true;} catch (Exception e) {return false;}}/*** 将list放入缓存** param key 键* param value 值* param time 时间(秒)*/public boolean lSet(String key, ListObject value, long time) {try {redisTemplate.opsForList().rightPushAll(key, value);if (time 0)expire(key, time);return true;} catch (Exception e) {return false;}}/*** 根据索引修改list中的某条数据** param key 键* param index 索引* param value 值*/public boolean lUpdateIndex(String key, long index, Object value) {try {redisTemplate.opsForList().set(key, index, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 移除N个值为value** param key 键* param count 移除多少个* param value 值* return 移除的个数*/public long lRemove(String key, long count, Object value) {try {return redisTemplate.opsForList().remove(key, count, value);} catch (Exception e) {return 0;} }10、应用问题 缓存穿透是用户查询一条在redis中不存在的数据这是就会向数据库中去查询然而也没有查询到就会一直请求一直请求导致数据库服务压力变大。缓存穿透一般是遭受黑客攻击黑客会一直去查询一条不存在的数据导致我们的服务不可用。 解决方案 对空值进行缓存对于返回空值的数据也进行缓存将空值的过期时间设置很短。设置课访问的白名单使用bitmap类型定义一个白名单名单id作为bitmap的偏移量先查询是否在bitmap中如果在就将请求放行如果不在就将请求进行拦截。布隆过滤器它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多缺点是有一定的误识别率和删除困难。 缓存击穿一个经常被请求的key在缓存时间过期时后端数据库没有及时将缓存更新这时突然大量的请求去请求这个key导致在redis中无法获取到数据会直接去请求数据库数据库压力增大性能下降。 解决方案 使用锁在缓存失效时(取出来的值为空)不要立即去数据库中查询。 缓存雪崩redis中的大量key集中过期或者redis宕机导致大量请求直接从数据库中请求数据数据库服务压力增大。 解决方案 给key设置过期时间设置成间隔过期。构建多级缓存结构。搭建redis集群保证redis的高可用。在请求层面对缓存业务添加限流和服务降级。使用锁和队列保证没有大量的线程对数据库进行读写同时避免大量请求落在底层的存储系统上。 11、分布式锁 ​ 随着业务的发展从单一的服务架构演变成分布式的服务架构。由于分布式的多线程、多进程分布在不同的服务器上这使单机下的锁策略失效。为了解决这个问题就需要一个跨JVM的互斥锁来控制资源的访问。 基于数据库实现分布式锁。基于缓存实现分布式锁。基于Zookeeper实现分布式锁。 基于Redis缓存实现分布式锁 # 分布式锁基于下面的命令 setnx key_lock value # setnx增加key时如果key存在就不会添加成功不存在才能添加成功key_lock是锁的名称# 在这个过程中可能存在业务逻辑中一直没有把锁删除所以需要给锁加一个过期时间 set key_lock value nx ex time # nx 表示不能重复设置ex设置过期时间time单位秒Java实现分布式锁 GetMapping(/lock) public String lock(){// 获取锁boolean lock redisTemplateService.setNx(lock, 1, 10);// 如果是true就获取了锁if (lock){// 键num1,redisTemplateService.incr(num,1);// 删除缓存中的锁redisTemplateService.del(lock);System.out.println(增加成功);return 增加成功;}else {// 没有获取到锁等待并重新请求锁this.lock();}return 增加成功; }UUID防止lock误删在获取到锁以后但是在业务逻辑代码中如果处理的时间超过了lock设置的过期时间那么lock就会自动过期其他请求就会重新设置锁并进入请求。当之前的业务逻辑代码处理完后就会去删除lock就会把其他请求的lock删除导致另外的请求也会进入这样就可以使用UUID作为key值再删除之前判断是否是自己的key如果是自己的key就删除不是就不删除。 GetMapping(/lock) public String lock(){String uuid UUID.randomUUID().toString();// 获取锁boolean lock redisTemplateService.setNx(lock, uuid, 10);// 如果是true就获取了锁if (lock){// 键num1,redisTemplateService.incr(num,1);String value (String) redisTemplateService.get(lock);if (uuid.equals(value)){// 删除缓存中的锁redisTemplateService.del(lock);}return 增加成功;}else {// 没有获取到锁等待并重新请求锁this.lock();}return 增加成功; }控制资源的访问。 基于数据库实现分布式锁。基于缓存实现分布式锁。基于Zookeeper实现分布式锁。
http://www.dnsts.com.cn/news/47080.html

相关文章:

  • wordpress商务套餐网站排名优化教程
  • 耒阳在那做网站购物网站设计的意义
  • 哪家网站推广做的好代做网站
  • 做网站简历怎么写百度竞价关键词价格查询工具
  • 那些网站是伪静态wordpress 404页面
  • 学校品牌建设太原seo代理商
  • 网站后期维修问题资金盘app开发要多少钱
  • 之梦一个系统做多个网站微网站需要备案吗
  • 导购网站的seo怎么做网站建设属于什么类目
  • 岳阳网站开发公司wordpress批量修改
  • 锐仕方达猎头公司seo含义
  • 网站为什么打不开百度网站排名
  • 做视频采集网站犯法怎么将自己的视频推广出去
  • 临汾做网站电话如何用微信小程序开店
  • 网站备案容易吗上海网站seo公司
  • 南山网站建设公网站联系方式连接怎么做
  • 网站安全证书出错怎么做软件开发流程报告
  • 怎么做 niche网站垫江网站建设djrckj
  • 能免费创建网站吗衡水外贸网站建设
  • 阜新建设网站辽阳做网站
  • 做一个简单的网站网站建设是怎么挣钱的
  • 万网网站备案系统织梦网站定时
  • 中国联通 网站备案珠海网站建设制作
  • 嘉兴门户网站建设delphi7 网站开发
  • 网站推广一般多少钱百度搜索引擎优化的推广计划
  • 网站seo外包公司有哪些微信小程序低代码开发平台
  • 查看网站是否wordpresswordpress专题模板
  • 企业专业网站建设哪家好wordpress 课程激活
  • 做阿里巴巴1688网站程序flashfxp怎么上传对应网站空间
  • 旅游网站制作代码如何进wordpress后台