网站建设入门教程视频,wordpress标题添加连载中,东莞手工活外发加工网,深圳网站建设 易通鼎文章目录高并发缓存架构设计架构设计思路完整代码开发规范与优化建议键值设计命令使用客户端的使用扩展布隆过滤器redis的过期键的清除策略高并发缓存架构设计
架构设计思路
首先是一个基础的缓存架构#xff0c;对于新增、修改操作set会对缓存更新#xff0c;对于查询操作…
文章目录高并发缓存架构设计架构设计思路完整代码开发规范与优化建议键值设计命令使用客户端的使用扩展布隆过滤器redis的过期键的清除策略高并发缓存架构设计
架构设计思路
首先是一个基础的缓存架构对于新增、修改操作set会对缓存更新对于查询操作先去查询redis缓存没有再去查询DB然后把数据在Redis中也缓存一份 但如果数据量很大redis中不能存储所有的数据而且很大一部分数据都是冷门数据可能就插入时用了一次。对于这种情况我们就可以在新增或修改时对缓存set时加过期时间在查询时将查询到的数据在缓存中更新过期时间保证热点数据不过期这样就简单的实现了冷热分离 考虑解决缓存击穿的情况大量的数据同时间过期导致大量请求直接落到了DB上。在业务上肯定会有一些批量操作那么对于批量新增的场景多个数据设置了同一时间过期那么就可能会出现缓存击穿的情况解决方法是再加个随机方法在原有过期时间上再增加一段随机时间 考虑解决缓存穿透的情况管理员误删热点数据或恶意攻击一直访问缓存和数据库中都不存在的数据进而拖垮数据库。
我们可以选择往缓存中存一个空值过期时间或者是布隆过滤器。
这里最好是要区分一下写一个方法区查询缓存如果查询缓存没有查询到数据返回null如果查询到了数据但是这个数据是我们上一步存的空值那么这里返回什么最好是和前一步返回的null区分开 考虑解决热点数据缓存重建问题热点数据过期或者是冷门数据忽然变成热点数据这时缓存中没数据DB中有数据一瞬间大量的请求要查询这条数据就都落到了DB上。
解决方案是加锁对查询DB操作加锁这里需要加分布式锁锁的粒度要小比如电商中使用锁前缀商品id。之所以要使用分布式锁而不是synchronized也是因为锁的粒度使用synchronized(this)方式那么查询其他商品的操作也会被阻塞住使用synchronized(锁前缀 product.getId()) 这种方式生成的锁对象又不是同一个对象。为了性能同时还需要参考单例模式的双重检测机制将DCL(Double-Checked-Locking)机制也加上。
伪代码如下
// 查询缓存
Object o queryCache(keyStr);
if(o ! null){return o;
}// 热点数据缓存重建 缓存没有查询到就加锁
lock.lock();
try{// 再查一遍Object o queryCache(keyStr);if(o ! null){return o;}// 还没有查询到就查询DB并往Redis中写queryDatabase(...);}finally {lock.unlock();
}考虑解决缓存与DB双写不一致问题
双写不一致问题 读写不一致问题假如更新操作时我不更新缓存而是删除缓存嘞如下所示也是有问题的。 要解决上面这些问题的方法还是加锁在查DB更新缓存、更新DB更新缓存这两个地方加同一把分布式锁。此时就会有人提问了 这么写一个简单的增删改查逻辑代码会变得很复杂 如果并发不高或者能容忍这些问题的话那么你可以不加这些校验代码如果你想要解决这些小概率问题那么就必须要加相应的代码。 这么写都是串行执行效率不会很低吗? 在上一步中我们对查询方法就加了DCL机制绝大部分请求在两次校验时就从缓存中拿到数据返回了根本不会进入到加锁的位置
Object o queryCache(keyStr);
if(o ! null){return o;
}// 解决热点数据缓存重建问题 缓存没有查询到就加锁
lock.lock();
try{Object o queryCache(keyStr);if(o ! null){return o;}// 这个方法中才是查询DB更新缓存这里面才会加锁但是大部分请求在上面两次查缓存中就已经拿到数据并返回了queryDatabase(...);}finally {lock.unlock();
}接下来尝试对上面两种分布式锁做优化
首先是为了解决热点数据缓存重建问题而加的分布式锁其实现在这种方式也能用了因为只会有一个请求去查询DB然后写入缓存其他阻塞的线程就会通过第二次验证从缓存中拿到数据返回。但是可能会有几万个请求都阻塞在了加锁的那一行中我们可以使用trylock()方法来优化指定一个最大等待时间这样所有等待的线程一到时间就直接去执行第二个从缓存中取数据的代码了。
接下来是为了解决缓存与DB数据不一致问题而加的分布式锁大部分的情况下我们业务都是读多写少我们可以在这里使用读写锁来提高性能更新DB更新缓存的地方使用写锁缓存无数据 查询DB更新缓存的地方用读锁。 接下来考虑解决缓存雪崩的问题我们都是请求–web服务–redis如果并发量很大多个web服务的请求到发到了一个redis节点上redis处理不过来一秒十几万或更多的请求那么redis就可能宕机或者是用户线程一直等待redis响应
如果redis宕机那么业务代码try{}catch后一般就去查数据库了大量的请求就会让数据库挂掉接着再整个服务都不能访问了。
如果redis没宕机用户线程一直等待redis数据返回此时并发很高web服务中线程得不到释放又不断有新的请求进来又导致微服务可能宕机进而影响到整个系统宕机。
解决方法 限流但万一中间件部分限流没有拦住我们在代码层面也要做相应的处理使用下面这种解决方案 多级缓存我们可以在JVM层面也加一层缓存在访问Redis前先访问JVM层面的缓存。因为JVM是本机内存并发访问可以达到每秒百万 我们还需要一个单独的系统去维护JVM层面的缓存。确定什么样的数据能放缓存中某个微服务节点更新了数据其他节点的JVM层面的缓存也要更新等等情况。 完整代码
package com.hs.distributlock.service;import com.alibaba.fastjson.JSON;
import com.hs.distributlock.entity.ProductEntity;
import com.hs.distributlock.mapper.ProductMapper;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Description: 高并发缓存架构* Author 胡尚* Date: 2023/3/25 20:10*/
Service
public class ProductService {AutowiredStringRedisTemplate redisTemplate;AutowiredRedisson redisson;AutowiredProductMapper productMapper;/*** 缓存穿透默认值*/public static final String PRODUCT_PENETRATE_DEFAULT {};/*** 突发性热点缓存重建时加的锁*/public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX lock:product:hot_cache_create:;/*** 突发性热点缓存重建时加的锁*/public static final String LOCK_CACHE_DB_UNLIKE_PREFIX lock:cache_db_unlike:;public void updateProduct(ProductEntity entity){// 解决缓存与数据库双写 数据不一致问题 而加写锁RReadWriteLock readWriteLock redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX entity.getId());RLock wLock readWriteLock.writeLock();wLock.lock();try {// 更新DB 更新缓存productMapper.update(entity);String json JSON.toJSONString(entity);redisTemplate.opsForValue().set(product:id: entity.getId(), json, 12*60*60getRandomTime(), TimeUnit.SECONDS);}finally {wLock.unlock();}}public void insertProduct(ProductEntity entity){// 解决缓存与数据库双写 数据不一致问题 而加写锁RReadWriteLock readWriteLock redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX entity.getId());RLock wLock readWriteLock.writeLock();wLock.lock();try {// 更新DB 更新缓存productMapper.insert(entity);String json JSON.toJSONString(entity);redisTemplate.opsForValue().set(product:id: entity.getId(), json, 12*60*60getRandomTime(), TimeUnit.SECONDS);}finally {wLock.unlock();}}/*** 重点方法这其中使用了双重检测去查询缓存还加了读锁去解决缓存和数据库双写导致的数据不一致问题*/public ProductEntity queryProduct(Long id) throws InterruptedException {String productKey product:id: id;// 从缓存取数据ProductEntity entity queryCache(productKey);if (entity ! null){return entity;}// 突发性热点缓存重建问题避免大量请求直接去请求DB进而加锁拦截RLock lock redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX id);lock.tryLock(3, 30, TimeUnit.SECONDS);try {// 第二次验证 从缓存取数据entity queryCache(productKey);if (entity ! null){return entity;}// 缓存与DB数据双写不一致问题 加锁RReadWriteLock readWriteLock redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX id);RLock rLock readWriteLock.readLock();rLock.lock();try {// 查询数据库 更新缓存entity queryDatabase(productKey, id);}finally {rLock.unlock();}}finally {lock.unlock();}return entity;}/*** 从缓存中查询*/private ProductEntity queryCache(String key){ProductEntity entity null;String json redisTemplate.opsForValue().get(key);if (StringUtils.hasLength(json)){// 判断是否为解决缓存穿透而手动存储的值如果是则直接返回一个新对象并和前端约定好错误提示if (Objects.equals(PRODUCT_PENETRATE_DEFAULT, json)){return new ProductEntity();}entity JSON.parseObject(json, ProductEntity.class);// 延期redisTemplate.expire(key, 12*60*60getRandomTime(), TimeUnit.SECONDS);}return entity;}/*** 从数据库中查询如果查询到了就将数据在缓存中保存一份如果没有查询到则往缓存中存一个默认值来解决缓存击穿问题*/private ProductEntity queryDatabase(String productKey, long id){ProductEntity entity productMapper.get(id);// 如果数据库中也没有查询到那么就往缓存中存一个默认值去解决缓存击穿问题if (entity null){redisTemplate.opsForValue().set(productKey, PRODUCT_PENETRATE_DEFAULT, 60*1000, TimeUnit.SECONDS);} else {redisTemplate.opsForValue().set(productKey, JSON.toJSONString(entity), 12*60*60getRandomTime(), TimeUnit.SECONDS);}return entity;}private Integer getRandomTime(){return new Random().nextInt(5) * 60 * 60;}
} 开发规范与优化建议
键值设计
Key的设计 可读性和可管理性。以业务名为前缀用冒号分割。避免直接使用id作为key导致数据覆盖的情况。 简洁性。保证语义的前提下控制key的长度不要太长 不要包含空格、换行、单双引号以及其他特殊字符
value的设计 避免bigkey 字符串类型最大512MB但是一般超过10KB就认为是bigkey 非字符串类型hash、list、set、zset一般它们的元素超过5000就任务是bigkey bigkey的危害导致redis阻塞、网络拥堵 一般产生bigkey的原因按天统计某些功能或用户集合、将数据库查询到的数据序列化后存入缓存我们要判断是否是所有字段都需要缓存 优化bigkey将一个bigkey拆分为多个数据、如果不可拆分则不要每次取所有的数据比如hmget key field [field ...] 只读取其中某些元素 选择合适的数据类型 反例 set user:1:name tom
set user:1:age 19
set user:1:favor football正例: hmset user:1 name tom age 19 favor football控制key的生命周期添加过期时间 命令使用 O(N)命令关注N的数量 例如hgeall、lrange、smembers、zrange等并非不能用但需要关注N的数量。 有遍历的需求可以使用hscan、sscan、zscan代替。 禁用命令keys * 、flushall 、flushdb等 使用批量操作提高效率比如pipeline管道等 redis的事务不要过多使用可以使用lua脚本来代替 客户端的使用 避免所有业务使用同一个redis实例 可以搭建多个redis集群不同的微服务调用不同的redis集群不相干的业务拆分公共数据做服务化。 使用带有连接池有效控制连接提高效率 JedisPoolConfig jedisPoolConfig new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(5);
jedisPoolConfig.setMaxIdle(2);
jedisPoolConfig.setTestOnBorrow(true);JedisPool jedisPool new JedisPool(jedisPoolConfig, 127.0.0.1, 6379, 3000, null);Jedis jedis null;
try {jedis jedisPool.getResource();//具体的命令jedis.executeCommand()
} catch (Exception e) {logger.error(op key {} error: e.getMessage(), key, e);
} finally {//注意这里不是关闭连接在JedisPool模式下Jedis会被归还给资源池。if (jedis ! null) jedis.close();
}连接池参数含义 序号参数名含义默认值使用建议1maxTotal资源池中最大连接数8设置建议见下面2maxIdle资源池允许最大空闲的连接数8设置建议见下面3minIdle资源池确保最少空闲的连接数0设置建议见下面4blockWhenExhausted当资源池用尽后调用者是否要等待。只有当为true时下面的maxWaitMillis才会生效true建议使用默认值5maxWaitMillis当资源池连接用尽后调用者的最大等待时间(单位为毫秒)-1表示永不超时不建议使用默认值6testOnBorrow向资源池借用连接时是否做连接有效性检测(ping)无效连接会被移除false业务量很大时候建议设置为false(多一次ping的开销)。7testOnReturn向资源池归还连接时是否做连接有效性检测(ping)无效连接会被移除false业务量很大时候建议设置为false(多一次ping的开销)。8jmxEnabled是否开启jmx监控可用于监控true建议开启但应用本身也要开启一般我们控制最大连接数和资源池允许的最大空闲连接数这两个配置就行了 Redis连接池的连接对象是懒加载的连接池对象刚创建时不会初始化创建连接对象是真正使用时才会去创建我们可以做连接池预热 // 先将创建的连接对象保存起来然后再统一调用close()方法放回连接池中
ListJedis minIdleJedisList new ArrayListJedis(jedisPoolConfig.getMinIdle());for (int i 0; i jedisPoolConfig.getMinIdle(); i) {Jedis jedis null;try {jedis pool.getResource();minIdleJedisList.add(jedis);jedis.ping();} catch (Exception e) {logger.error(e.getMessage(), e);} finally {//注意这里不能马上close将连接还回连接池否则最后连接池里只会建立1个连接//jedis.close();}
}
//统一将预热的连接还回连接池
for (int i 0; i jedisPoolConfig.getMinIdle(); i) {Jedis jedis null;try {jedis minIdleJedisList.get(i);//将连接归还回连接池jedis.close();} catch (Exception e) {logger.error(e.getMessage(), e);} finally {}
}扩展
布隆过滤器
当布隆过滤器说某个值存在时这个值可能不存在当它说不存在时那就肯定不存在。
布隆过滤器不能删除数据如果要删除得重新初始化数据。
底层原理如下图所示 使用Redisson实现一个简单的案例
package com.redisson;import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class RedissonBloomFilter {public static void main(String[] args) {// 构建一个redisson对象Config config new Config();config.useSingleServer().setAddress(redis://localhost:6379);RedissonClient redisson Redisson.create(config);RBloomFilterString bloomFilter redisson.getBloomFilter(nameList);//初始化布隆过滤器预计元素为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小bloomFilter.tryInit(100000000L,0.03);//将zhuge插入到布隆过滤器中bloomFilter.add(hushang);//判断下面号码是否在布隆过滤器中System.out.println(bloomFilter.contains(aabbcc));//falseSystem.out.println(bloomFilter.contains(eeeee));//falseSystem.out.println(bloomFilter.contains(hushang));//true}
}实际开发中的伪代码如下
//初始化布隆过滤器
RBloomFilterString bloomFilter redisson.getBloomFilter(nameList);
//初始化布隆过滤器预计元素为100000000L,误差率为3%
bloomFilter.tryInit(100000000L,0.03);//把所有数据存入布隆过滤器
void init(){for (String key: keys) {bloomFilter.add(key);}
}String get(String key) {// 从布隆过滤器这一级缓存判断下key是否存在Boolean exist bloomFilter.contains(key);if(!exist){return ;}// 从缓存中获取数据String cacheValue cache.get(key);// 缓存为空if (StringUtils.isBlank(cacheValue)) {// 从存储中获取String storageValue storage.get(key);cache.set(key, storageValue);// 如果存储数据为空往缓存中存一个空对象 需要设置一个过期时间(300秒)if (storageValue null) {cache.expire(key, 60 * 5);}return storageValue;} else {// 缓存非空return cacheValue;}
}redis的过期键的清除策略 被动删除 当调用get命令时去查询key是否过期如果过期再删除 主动删除 Redis会定期(默认每100ms)主动淘汰一批已过期的key 主动清理策略 我们在redis.conf配置文件中配置maxmemory指定redis的最大使用内存如果当前使用内存超过了这里配置的值则触发主动清理策略
主动清理策略共有8种一般默认是不处理。
对于设置了过期时间的key volatile-ttl根据过期时间的先后进行删除越早过期的越先被删除。volatile-random在设置了过期时间的键值对中进行随机删除。volatile-lru会使用 LRU 算法筛选设置了过期时间的键值对删除。volatile-lfu会使用 LFU 算法筛选设置了过期时间的键值对删除。 对于所有的key allkeys-random从所有键值对中随机选择并删除数据。allkeys-lru使用 LRU 算法在所有数据中进行筛选删除。allkeys-lfu使用 LFU 算法在所有数据中进行筛选删除。 不处理 noeviction不会剔除任何数据拒绝所有写入操作并返回客户端错误信息(error) OOM command not allowed when used memory此时Redis只响应读操作。 LRU 算法Least Recently Used最近最少使用)
淘汰很久没被访问过的数据以最近一次访问时间作为参考。
LFU 算法Least Frequently Used最不经常使用)
淘汰最近一段时间被访问次数最少的数据以次数作为参考。 一般对于存在热点数据的访问场景下选择LFU算法这样避免了短时间新增了一批冷数据进而删除热点数据但是LRU的效率要高
根据自身业务类型配置好maxmemory-policy(默认是noeviction)推荐使用volatile-lru。
如果不设置maxmemory最大使用内存的话redis将服务器的所有内存用完后那么就会开始频繁的和磁盘进行交互了会很慢。
对于主从模式主节点删除过期的key后会把del key命令同步到从节点执行