php网站生成静态页面,做阀门网站电话号码,珠海网页模板建站,做网站湘潭文章目录 redis单线程VS多线程面试题**redis是多线程还是单线程,为什么是单线程****聊聊redis的多线程特性和IO多路复用****io多路复用模型****redis如此快的原因** BigKey大批量插入数据测试数据key面试题海量数据里查询某一固定前缀的key如果生产上限值keys * #xff0c;fl… 文章目录 redis单线程VS多线程面试题**redis是多线程还是单线程,为什么是单线程****聊聊redis的多线程特性和IO多路复用****io多路复用模型****redis如此快的原因** BigKey大批量插入数据测试数据key面试题海量数据里查询某一固定前缀的key如果生产上限值keys * flushdbflushall等危险命令以防止误操作bigkey如何处理memory usage 命令使用过吗如何发现bigkey**删除bigkey**bigkey调优惰性释放lazyfree了解过吗 缓存与数据库双写一致理论先删除缓存再更新数据库会出现什么问题双检加锁策略 实操canal是什么作用canal工作原理 官网地址mysql查看mysql版本当前主机二进制日志查看show variables like log_bin开启mysql的binlog写入功能重启mysql再次查看log_bin授权canal连接mysql账号 canal服务端下载解压配置启动查看 canal客户端java编写业务程序sql脚本建module改pom配置文件启动类业务类 案例实战bitmap/hyperloglog/GEO统计的类型有哪些聚合统计排序统计二值统计基数统计 hyperloglog案例实战去重统计的方案淘宝网站首页亿级UV的redis统计方案 GEO案例实战美团地图位置附近的酒店推送 bitmap案例实战布隆过滤器BloomFilter手写一个布隆过滤器京东签到送京豆 缓存预热缓存雪崩缓存击穿缓存穿透缓存预热缓存雪崩预防和解决 缓存击穿是什么解决方案 缓存穿透是什么解决方案 手写redis分布式锁面试题分布式锁Lua脚本可重入设计模式自动续期 RedLock算法及原理设计理念解决方案redisson实战案例单机redisson实战案例多机多重锁 reids的缓存过期淘汰策略redis的缓存内容大小以及配置reids的默认内容是多少在哪里配置以及如何修改 往redis里写的数据是怎么没得他是如何删除的lru算法和lfu算法的区别八种淘汰策略 微信抢红包功能实现需求分析代码 redis单线程VS多线程
面试题
redis是多线程还是单线程,为什么是单线程
这种问法其实并不严谨为啥这么说呢?
Redis的版本很多3.x、4.x、6.x版本不同架构也是不同的不限定版本问是否单线程也不太严谨。 版本3.x 最早版本也就是大家口口相传的redis是单线程阳哥2016年讲解的redis就是3.X的版本。 版本4.x严格意义来说也不是单线程而是负责处理客户端请求的线程是单线程但是开始加了点多线程的东西(异步删除)。—貌似 2020年5月版本的6.0.x后及2022年出的7.0版本后告别了大家印象中的单线程用一种全新的多线程来解决问题。—实锤 redis是单线程 主要是指redis的网络io和键值对的读写是由一个线程来完成的redis在处理客户端的请求的时候包括获取socket读解析执行内容返回socket写等都是有一个顺序串行的主线程处理这就是所谓的“单线程”这也是redis对外提供键值存储服务的主要流程 但redis的其他功能比如持久化RDBAOF异步删除集群数据同步等其实是由额外的线程执行的 redis命令工作线程是单线程的但是整个redis来说是多线程的
其实纯单线程也会有问题
正常情况下使用 del 指令可以很快的删除数据而当被删除的 key 是一个非常大的对象时例如时包含了成千上万个元素的 hash 集合时那么 del 指令就会造成 Redis 主线程卡顿。
这就是redis3.x单线程时代最经典的故障大key删除的头疼问题
由于redis是单线程的del bigKey …
等待很久这个线程才会释放类似加了一个synchronized锁你可以想象高并发下程序堵成什么样子
解决方案 使用惰性删除可以有效避免redis卡顿问题 比如当我Redis需要删除一个很大的数据时因为是单线程原子命令操作这就会导致 Redis 服务卡顿 于是在 Redis 4.0 中就新增了多线程的模块当然此版本中的多线程主要是为了解决删除数据效率比较低的问题的。
unlink keyflushdb asyncflushall async把删除工作交给了后台的小弟子线程异步来删除数据了。
聊聊redis的多线程特性和IO多路复用
对于redis主要的性能瓶颈是内存或者网络带宽并非CPU
在redis6/7中非常受关注的第一个新特性就是多线程
这是因为redis一直被大家熟知的就是他的单线程架构虽然有些命令可以用后台线程或子进程执行比如数据删除快照生成aof重写但是从网络io处理到实际的读写命令处理都是由单个线程完成的为了应对这个问题采用多个io线程来处理网络请求提高网络请求处理的并行度redis6/7就是采用这种方法但是reids的多io线程只是用来处理网络请求的对于读写命令操作仍然是使用单线程来处理这是因为redis处理请求时网络处理经常是瓶颈通过多个io线程并行处理网络操作可以提升实例的整体处理性能而继续使用单线程执行命令操作就不用为了保证Lua脚本事务的原子性额外开发多线程互斥加锁机制了不管加锁操作处理这样一来redis线程模型实现就简单了
IO多路复用是什么
一种同步的IO模型实现一个线程监视多个文件句柄一旦某个文件句柄就绪就能够通知到对应的应用程序进行相应的对鞋操作没有文件句柄时会阻塞应用程序从而释放CPU资源I/O网络I/O尤其在操作系统层面指数据在内核态和用户态之间的读写操作多路多个客户端连接即socket复用复用一个或几个线程IO多路复用也就是说一个或一组线程处理多个TCP连接使用单进程就能实现同时处理多个客户端的连接无需创建或者维护过多的进程/线程
io多路复用模型
将用户socket对应的文件描述符(FileDescriptor)注册进epoll然后epoll帮你监听哪些socket上有消息到达这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。这样整个过程只在调用select、poll、epoll这些调用的时候才会阻塞收发客户消息是不会阻塞的整个进程或者线程就被充分利用起来这就是事件驱动所谓的reactor反应模式。 在单个线程通过记录跟踪每一个Sockek(I/O流)的状态来同时管理多个I/O流. 一个服务端进程可以同时处理多个套接字描述符。
目的是尽量多的提高服务器的吞吐能力。
大家都用过nginxnginx使用epoll接收请求ngnix会有很多链接进来 epoll会把他们都监视起来然后像拨开关一样谁有数据就拨向谁然后调用相应的代码处理。redis类似同理这就是IO多路复用原理有请求就响应没请求不打扰。
redis如此快的原因
IO多路复用epoll函数使用才是redis为什么这么快的直接原因而不是仅仅单线程命令redis安装在内存中。
BigKey
大批量插入数据测试数据key
linux bash执行
生成100W条redis批量设置kv的语句(keykn,valuevn)
写入到/tmp目录下的redisTest.txt文件中for((i1;i100*10000;i)); do echo set k$i v$i /tmp/redisTest.txt ;done;
[rootiZuf60dna3h67gfyijgutrZ ~]# for((i1;i100*10000;i)); do echo set k$i v$i /tmp/redisTest.txt ;done;通过redis提供的管道–pipe命令插入100w的数据 请结合自己的机器地址
cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a 密码 --pipe
[rootiZuf60dna3h67gfyijgutrZ tmp]# cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a sunxin --pipe
Warning: Using a password with -a or -u option on the command line interface may not be safe.
All data transferred. Waiting for the last reply...
Last reply received from server.
errors: 0, replies: 2000000
[rootiZuf60dna3h67gfyijgutrZ tmp]# redis-cli -a sunxin
Warning: Using a password with -a or -u option on the command line interface may not be safe.
127.0.0.1:6379 ping
PONG
127.0.0.1:6379 DBSIZE
(integer) 1000002插入成功
面试题
海量数据里查询某一固定前缀的key
使用scan命令 SCAN 命令是一个基于游标的迭代器每次被调用之后 都会向用户返回一个新的游标 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数 以此来延续之前的迭代过程。
SCAN 返回一个包含两个元素的数组
第一个元素是用于进行下一次迭代的新游标
第二个元素则是一个数组 这个数组中包含了所有被迭代的元素。如果新游标返回零表示迭代已结束。
SCAN的遍历顺序
非常特别它不是从第一维数组的第零位一直遍历到末尾而是采用了高位进位加法来遍历。之所以使用这样特殊的方式进行遍历是考虑到字典的扩容和缩容时避免槽位的遍历重复和遗漏。
如果生产上限值keys * flushdbflushall等危险命令以防止误操作
通过配置设置禁用这些命令redis.conf在security这一项中
bigkey如何处理
多大算big 参考《阿里云redis开发手册》 bigkey的危害
内存不均集群迁移困难超时删除大key删除阻塞主线程网络流量阻塞
memory usage 命令使用过吗如何发现bigkey
第一种
redis-cli --bigkeys好处见最下面总结
给出每种数据结构Top 1 bigkey同时给出每种数据类型的键值个数平均大小
不足
想查询大于10kb的所有key–bigkeys参数就无能为力了需要用到memory usage来计算每个键值的字节数
redis-cli --bigkeys -a 111111
redis-cli -h 127.0.0.1 -p 6379 -a 111111 --bigkeys
每隔 100 条 scan 指令就会休眠 0.1sops 就不会剧烈抬升但是扫描的时间会变长
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys -i 0.1第二种 官网说明
删除bigkey
官网 string一般用del如果过于庞大用unlink hash 使用hscan每次获取少量field-value再使用hdel删除每个field list 使用trim渐进式逐步删除直到全部删除完成 set 使用sscan每次获取部分元素再使用srem命令删除每个元素 zset 使用zscan每次获取部分元素再使用ZREMRANGEBYRANK命令删除每个元素
bigkey调优惰性释放lazyfree了解过吗
redis.conf配置文件LAZY FREEING相关说明 缓存与数据库双写一致 理论
先更新数据库再删除缓存 无法实时一致只能最终一致
可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中例如使用Kafka/RabbitMQ等。当程序没有能够成功地删除缓存值或者是更新数据库值时可以从消息队列中重新读取这些值然后再次进行删除或更新。如果能够成功地删除或更新我们就要把这些值从消息队列中去除以免重复操作此时我们也可以保证数据库和缓存的数据一致了否则还需要再次进行重试如果重试超过的一定次数后还是没有成功我们就需要向业务层发送报错信息了通知运维人员。
先删除缓存再更新数据库会出现什么问题
**问题**当A线程来更新数据此时缓存已经被干掉了准备更新数据库的时候这个时候还没更新完此时B线程来进行查询他发现缓存读不到就去查数据库这个时候是旧值就将这个旧值又写回了redis了。A这个时候懵了我删的怎么又变回去了这个时候A更新完数据库就发现数据库和redis数据不一致了
解决方案 延迟双删
双检加锁策略
多个线程同时去查询数据库的这条数据那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着等第一个线程查询到了数据然后做缓存。
后面的线程进来发现已经有缓存了就直接走缓存。
实操
canal
下载地址
是什么
canal [kə’næl]中文翻译为 水道/管道/沟渠/运河主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析是阿里巴巴开发并开源的采用Java语言开发
历史背景是早期阿里巴巴因为杭州和美国双机房部署存在跨机房数据同步的业务需求实现方式主要是基于业务 trigger触发器 获取增量变更。从2010年开始阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步由此衍生出了canal项目
作用
数据库镜像数据库实时备份索引构建和实时维护拆分异构索引、倒排索引业务cache刷新带业务逻辑的增量数据处理
canal工作原理
传统mysql主从复制工作原理
MySQL的主从复制将经过如下步骤
1、当 master 主服务器上的数据发生改变时则将其改变写入二进制事件日志文件中
2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测探测其是否发生过改变
如果探测到 master 主服务器的二进制事件日志发生了改变则开始一个 I/O Thread 请求 master 二进制事件日志
3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread用于向其发送二进制事件日志
4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中
5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志在本地重放使得其数据和主服务器保持一致
6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态等待下一次被唤醒
canal工作原理
官网地址
官网
mysql
查看mysql版本
select version();
mysql5.7.28当前主机二进制日志
show master status;查看show variables like ‘log_bin’ 开启mysql的binlog写入功能
编辑my.ini请事先备份文件
log-binmysql-bin #开启 binlog
binlog-formatROW #选择 ROW 模式
server_id1 #配置MySQL replaction需要定义不要和canal的 slaveId重复ROW模式 除了记录sql语句之外还会记录每个字段的变化情况能够清楚的记录每行数据的变化历史但会占用较多的空间。 STATEMENT模式只记录了sql语句但是没有记录上下文信息在进行数据恢复的时候可能会导致数据的丢失情况 MIX模式比较灵活的记录理论上说当遇到了表结构变更的时候就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式 window系统下是my.ini linux系统下是my.conf
重启mysql
再次查看log_bin 授权canal连接mysql账号
默认是没有canal账户此处新建并授权 DROP USER IF EXISTS canal%;
CREATE USER canal% IDENTIFIED BY canal;
GRANT ALL PRIVILEGES ON *.* TO canal% IDENTIFIED BY canal;
FLUSH PRIVILEGES;SELECT * FROM mysql.user;canal服务端
注意默认linux服务器已经有java8的环境因为canal是java写的需要java运行环境
下载
下载地址https://github.com/alibaba/canal/releases/tag/canal-1.1.6
注意发布时间版本2022.8.11后发布的才用
解压
解压放在/mycanal路径下
配置
修改/mycanal/conf/example路径下instance.properties文件 换成自己的mysql主机master的IP地址 换成自己的在mysql新建的canal账户
启动
在/opt/mycanal/bin路径下执行
./startup.sh查看
判断canal是否启动成功 查看server日志 查看样例example的日志
canal客户端java编写业务程序
sql脚本
1 随便选个数据库以你自己为主本例bigdata按照下面建表CREATE TABLE t_user (id bigint(20) NOT NULL AUTO_INCREMENT,userName varchar(100) NOT NULL,PRIMARY KEY (id)) ENGINEInnoDB AUTO_INCREMENT10 DEFAULT CHARSETutf8mb4建module
canal_demo02
改pom
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.canal/groupIdartifactIdcanal_demo02/artifactIdversion1.0-SNAPSHOT/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.5.14/versionrelativePath//parentpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjunit.version4.12/junit.versionlog4j.version1.2.17/log4j.versionlombok.version1.16.18/lombok.versionmysql.version5.1.47/mysql.versiondruid.version1.1.16/druid.versionmapper.version4.1.5/mapper.versionmybatis.spring.boot.version1.3.0/mybatis.spring.boot.version/propertiesdependencies!--canal--dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version/dependency!--SpringBoot通用依赖模块--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--swagger2--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency!--SpringBoot与Redis整合依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependency!--SpringBoot与AOP--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-aop/artifactId/dependencydependencygroupIdorg.aspectj/groupIdartifactIdaspectjweaver/artifactId/dependency!--Mysql数据库驱动--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version/dependency!--SpringBoot集成druid连接池--dependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactIdversion1.1.10/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid/artifactIdversion${druid.version}/version/dependency!--mybatis和springboot整合--dependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion${mybatis.spring.boot.version}/version/dependency!--通用基础配置junit/devtools/test/log4j/lombok/hutool--!--hutool--dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.2.3/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion${junit.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion${lombok.version}/versionoptionaltrue/optional/dependency!--persistence--dependencygroupIdjavax.persistence/groupIdartifactIdpersistence-api/artifactIdversion1.0.2/version/dependency!--通用Mapper--dependencygroupIdtk.mybatis/groupIdartifactIdmapper/artifactIdversion${mapper.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-autoconfigure/artifactId/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.8.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project配置文件
注意使用自己的mysql密码
server.port5555# alibaba.druid
spring.datasource.typecom.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-namecom.mysql.jdbc.Driver
spring.datasource.urljdbc:mysql://localhost:3306/bigdata?useUnicodetruecharacterEncodingutf-8useSSLfalse
spring.datasource.usernameroot
spring.datasource.password123456
spring.datasource.druid.test-while-idlefalse启动类
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class CanalDemo02App
{//本例不要启动CanalDemo02App实例
}
业务类
redisUtils
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils
{public static final String REDIS_IP_ADDR 192.168.111.185;public static final String REDIS_pwd 111111;public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfignew JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPoolnew JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!jedisPool){return jedisPool.getResource();}throw new Exception(Jedispool is not ok);}}RedisCanalClientExample import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class RedisCanalClientExample
{public static final Integer _60SECONDS 60;public static final String REDIS_IP_ADDR 192.168.111.185;private static void redisInsert(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println(---------update after: jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(ListEntry entrys) {for (Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {continue;}RowChange rowChage null;try {//获取变更的row数据rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error,data: entry.toString(),e);}//获取变动类型EventType eventType rowChage.getEventType();System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println(---------O(∩_∩)O哈哈~ initCanal() main方法-----------);//// 创建链接canal服务端CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), example, , );int batchSize 1000;//空闲空转计数器int emptyCount 0;System.out.println(---------------------canal init OK开始监听mysql变化------);try {connector.connect();//connector.subscribe(.*\\..*);指的是全库全表一般不推荐这样用影响canal性能可能导致数据不一致connector.subscribe(bigdata.t_user);connector.rollback();int totalEmptyCount 10 * _60SECONDS;while (emptyCount totalEmptyCount) {System.out.println(我是canal每秒一次正在监听: UUID.randomUUID().toString());Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(已经监听了totalEmptyCount秒无任何消息请重启重试......);} finally {connector.disconnect();}}
} 为什么jedis在代码中不进行释放资源的操作呢 自动释放了因为jedis继承了BinaryJedis接口这个接口实现了closeable所以就自动释放了相当于一个语法糖
案例实战bitmap/hyperloglog/GEO
统计的类型有哪些
亿级系统中常见的四种统计
聚合统计 排序统计 解决方案 v1,v2…代表文本内容
二值统计 基数统计 hyperloglog案例实战
行业术语 UV
PV
DAU
MAU 需求 很多计数类场景比如 每日注册 IP 数、每日访问 IP 数、页面实时访问数 PV、访问用户数 UV等。
因为主要的目标高效、巨量地进行计数所以对存储的数据的内容并不太关心。
也就是说它只能用于统计巨量数量不太涉及具体的统计对象的内容和精准性。
统计单日一个页面的访问量(PV)单次访问就算一次。
统计单日一个页面的用户访问量(UV)即按照用户为维度计算单个用户一天内多次访问也只算一次。
多个key的合并统计某个门户网站的所有模块的PV聚合统计就是整个网站的总PV。
去重统计的方案
hashsetbitmap 如果数据显较大亿级统计,使用bitmaps同样会有这个问题。
bitmap是通过用位bit数组来表示各元素是否出现每个元素对应一位所需的总内存为N个bit。
基数计数则将每一个元素对应到bit数组中的其中一位比如bit数组010010101(按照从零开始下标有的就是1、4、6、8)。
新进入的元素只需要将已经有的bit数组和新加入的元素进行按位或计算就行。这个方式能大大减少内存占用且位操作迅速。
But假设一个样本案例就是一亿个基数位值数据一个样本就是一亿
如果要统计1亿个数据的基数位值,大约需要内存100000000/8/1024/1024约等于12M,内存减少占用的效果显著。
这样得到统计一个对象样本的基数值需要12M。
如果统计10000个对象样本(1w个亿级),就需要117.1875G将近120G可见使用bitmaps还是不适用大数据量下(亿级)的基数计数场景
hyperloglog
淘宝网站首页亿级UV的redis统计方案
需求 方案
mysql直接死mysql最大300w用redis的hash结构存储
redis——hash keyDay,ip,1
按照ipv4的结构来说明每个ipv4的地址最多是15个字节(ip “192.168.111.1”最多xxx.xxx.xxx.xxx)
某一天的1.5亿 * 15个字节 2G一个月60Gredis死定了。
hyperloglog 为什么是只需要花费12Kb service
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.TimeUnit;Service
Slf4j
public class HyperLogLogService
{Resourceprivate RedisTemplate redisTemplate;/*** 模拟后台有用户点击首页每个用户来自不同ip地址*/PostConstructpublic void init(){log.info(------模拟后台有用户点击首页每个用户来自不同ip地址);new Thread(() - {String ip null;for (int i 1; i 200; i) {Random r new Random();ip r.nextInt(256) . r.nextInt(256) . r.nextInt(256) . r.nextInt(256);Long hll redisTemplate.opsForHyperLogLog().add(hll, ip);log.info(ip{},该ip地址访问首页的次数{},ip,hll);//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }}},t1).start();}}
controller
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;Api(description 淘宝亿级UV的Redis统计方案)
RestController
Slf4j
public class HyperLogLogController
{Resourceprivate RedisTemplate redisTemplate;ApiOperation(获得IP去重后的首页访问量)RequestMapping(value /uv,method RequestMethod.GET)public long uv(){//pfcountreturn redisTemplate.opsForHyperLogLog().size(hll);}}
GEO案例实战
命令复习
GEOADD city 116.403963 39.915119 天安门 116.403414 39.924091 故宫 116.024067 40.362639 长城GEOPOS city 天安门 故宫GEOHASH city 天安门 故宫 长城GEODIST city 天安门 长城 km以半径为中心查找附近的xxx
georadius 以给定的经纬度为中心 返回键包含的位置元素当中 与中心的距离不超过给定最大距离的所有位置元素。
GEORADIUS city 116.418017 39.914402 10 km withdist withcoord count 10 withhash desc
GEORADIUS city 116.418017 39.914402 10 km withdist withcoord count 10 desc
WITHDIST: 在返回位置元素的同时 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。
WITHCOORD: 将位置元素的经度和维度也一并返回。
WITHHASH: 以 52 位有符号整数的形式 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试 实际中的作用并不大
COUNT 限定返回的记录数。 美团地图位置附近的酒店推送 controller
import com.atguigu.redis7.service.GeoService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.geo.*;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;Api(tags 美团地图位置附近的酒店推送GEO)
RestController
Slf4j
public class GeoController
{Resourceprivate GeoService geoService;ApiOperation(添加坐标geoadd)RequestMapping(value /geoadd,method RequestMethod.GET)public String geoAdd(){return geoService.geoAdd();}ApiOperation(获取经纬度坐标geopos)RequestMapping(value /geopos,method RequestMethod.GET)public Point position(String member){return geoService.position(member);}ApiOperation(获取经纬度生成的base32编码值geohash)RequestMapping(value /geohash,method RequestMethod.GET)public String hash(String member){return geoService.hash(member);}ApiOperation(获取两个给定位置之间的距离)RequestMapping(value /geodist,method RequestMethod.GET)public Distance distance(String member1, String member2){return geoService.distance(member1,member2);}ApiOperation(通过经度纬度查找北京王府井附近的)RequestMapping(value /georadius,method RequestMethod.GET)public GeoResults radiusByxy(){return geoService.radiusByxy();}ApiOperation(通过地方查找附近,本例写死天安门作为地址)RequestMapping(value /georadiusByMember,method RequestMethod.GET)public GeoResults radiusByMember(){return geoService.radiusByMember();}}service
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.geo.Circle;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;import java.util.HashMap;
import java.util.List;
import java.util.Map;Service
Slf4j
public class GeoService
{public static final String CITY city;Autowiredprivate RedisTemplate redisTemplate;public String geoAdd(){MapString, Point map new HashMap();map.put(天安门,new Point(116.403963,39.915119));map.put(故宫,new Point(116.403414 ,39.924091));map.put(长城 ,new Point(116.024067,40.362639));redisTemplate.opsForGeo().add(CITY,map);return map.toString();}public Point position(String member) {//获取经纬度坐标ListPoint list this.redisTemplate.opsForGeo().position(CITY,member);return list.get(0);}public String hash(String member) {//geohash算法生成的base32编码值ListString list this.redisTemplate.opsForGeo().hash(CITY,member);return list.get(0);}public Distance distance(String member1, String member2) {//获取两个给定位置之间的距离Distance distance this.redisTemplate.opsForGeo().distance(CITY,member1,member2, RedisGeoCommands.DistanceUnit.KILOMETERS);return distance;}public GeoResults radiusByxy() {//通过经度纬度查找附近的,北京王府井位置116.418017,39.914402Circle circle new Circle(116.418017, 39.914402, Metrics.KILOMETERS.getMultiplier());//返回50条RedisGeoCommands.GeoRadiusCommandArgs args RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);GeoResultsRedisGeoCommands.GeoLocationString geoResults this.redisTemplate.opsForGeo().radius(CITY,circle, args);return geoResults;}public GeoResults radiusByMember() {//通过地方查找附近String member天安门;//返回50条RedisGeoCommands.GeoRadiusCommandArgs args RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);//半径10公里内Distance distancenew Distance(10, Metrics.KILOMETERS);GeoResultsRedisGeoCommands.GeoLocationString geoResults this.redisTemplate.opsForGeo().radius(CITY,member, distance,args);return geoResults;}
}bitmap案例实战
日活统计 连续签到打卡 最近一周的活跃用户 统计指定用户一年之中的登陆天数
布隆过滤器BloomFilter
是什么 由一个初值都为0的bit数组和多个哈希函数构成用来快速判断集合中是否存在某个元素 布隆过滤器英语Bloom Filter是 1970 年由布隆提出的。
它实际上是一个很长的二进制数组(00000000)一系列随机hash算法映射函数主要用于判断一个元素是否在集合中。
通常我们会遇到很多要判断一个元素是否在某个集合中的业务场景一般想到的是将集合中所有元素保存起来然后通过比较确定。
链表、树、哈希表等等数据结构都是这种思路。但是随着集合中元素的增加我们需要的存储空间也会呈现线性增长最终达到瓶颈。同时检索速度也越来越慢上述三种结构的检索时间复杂度分别为O(n),O(logn),O(1)。这个时候布隆过滤器Bloom Filter就应运而生 特点考点 一个元素如果判断结果存在时元素不一定存在但判断结果为不存在时则一定不存在 布隆过滤器可以添加元素但不能删除元素由于涉及hashcode判断依据删掉元素会导致误判率增加 原理 hash冲突案例 使用场景 解决缓存穿透问题哥redis结合bitmap使用 黑名单校验识别垃圾邮件 安全链接网址全球上10亿的网址判断
手写一个布隆过滤器
结合bitmap类型手写一个简单的布隆过滤器 整体架构
白名单初始化
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** 布隆过滤器白名单初始化工具类一开始就设置一部分数据为白名单所有* 白名单业务默认规定布隆过滤器有redis也有。*/
Component
Slf4j
public class BloomFilterInit
{Resourceprivate RedisTemplate redisTemplate;PostConstruct//初始化白名单数据故意差异化数据演示效果......public void init(){//白名单客户预加载到布隆过滤器String uid customer:12;//1 计算hashcode由于可能有负数直接取绝对值int hashValue Math.abs(uid.hashCode());//2 通过hashValue和2的32次方取余后获得对应的下标坑位long index (long) (hashValue % Math.pow(2, 32));log.info(uid 对应------坑位index:{},index);//3 设置redis里面bitmap对应坑位该有值设置为1redisTemplate.opsForValue().setBit(whitelistCustomer,index,true);}
}checkUtils
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;Component
Slf4j
public class CheckUtils
{Resourceprivate RedisTemplate redisTemplate;public boolean checkWithBloomFilter(String checkItem,String key){int hashValue Math.abs(key.hashCode());long index (long) (hashValue % Math.pow(2, 32));boolean existOK redisTemplate.opsForValue().getBit(checkItem, index);log.info(-----key:key\t对应坑位index:index\t是否存在:existOK);return existOK;}
}
service
Resourceprivate CheckUtils checkUtils;public Customer findCustomerByIdWithBloomFilter (Integer customerId){Customer customer null;//缓存key的名称String key CACHE_KEY_CUSTOMER customerId;//布隆过滤器check无是绝对无有是可能有//if(!checkUtils.checkWithBloomFilter(whitelistCustomer,key)){log.info(白名单无此顾客信息:{},key);return null;}////1 查询rediscustomer (Customer) redisTemplate.opsForValue().get(key);//redis无进一步查询mysqlif (customer null) {//2 从mysql查出来customercustomer customerMapper.selectByPrimaryKey(customerId);// mysql有redis无if (customer ! null) {//3 把mysql捞到的数据写入redis方便下次查询能redis命中。redisTemplate.opsForValue().set(key, customer);}}return customer;}controller ApiOperation(BloomFilter案例讲解)RequestMapping(value /customerbloomfilter/{id}, method RequestMethod.GET)public Customer findCustomerByIdWithBloomFilter(PathVariable int id) throws ExecutionException, InterruptedException{return customerSerivce.findCustomerByIdWithBloomFilter(id);}总结主要看service中的代码让过滤器去拦住那些没有的key数据来解决缓存击穿的问题相当于再加了一层拦截保护。
京东签到送京豆 签到日历仅展示当月签到数据 签到日历需展示最近连续签到天数 假设当前日期是20210618且20210616未签到 若20210617已签到且0618未签到则连续签到天数为1 若20210617已签到且0618已签到则连续签到天数为2 连续签到天数越多奖励越大 所有用户均可签到 截至2020年3月31日的12个月京东年度活跃用户数3.87亿同比增长24.8%环比增长超2500万此外2020年3月移动端日均活跃用户数同比增长46%假设10%左右的用户参与签到签到用户也高达3千万
小厂方法
CREATE TABLE user_sign
(keyid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT,user_key VARCHAR(200),#京东用户IDsign_date DATETIME,#签到日期(20210618)sign_count INT #连续签到天数
)INSERT INTO user_sign(user_key,sign_date,sign_count)
VALUES (20210618-xxxx-xxxx-xxxx-xxxxxxxxxxxx,2020-06-18 15:11:12,1);SELECTsign_count
FROMuser_sign
WHEREuser_key 20210618-xxxx-xxxx-xxxx-xxxxxxxxxxxxAND sign_date BETWEEN 2020-06-17 00:00:00 AND 2020-06-18 23:59:59
ORDER BYsign_date DESCLIMIT 1;
方法正确但是难以落地实现o(╥﹏╥)o。
签到用户量较小时这么设计能行但京东这个体量的用户估算3000W签到用户一天一条数据一个月就是9亿数据
对于京东这样的体量如果一条签到记录对应着当日用记录那会很恐怖…
如何解决这个痛点 一条签到记录对应一条记录会占据越来越大的空间。 一个月最多31天刚好我们的int类型是32位那这样一个int类型就可以搞定一个月32位大于31天当天来了位是1没来就是0。 一条数据直接存储一个月的签到记录不再是存储一天的签到记录。
大厂方法 基于redis的bitmaps实现签到日历建表按位-redis bitmap 在签到统计时每个用户一天的签到用1个bit位就能表示
一个月假设是31天的签到情况用31个bit位就可以一年的签到也只需要用365个bit位根本不用太复杂的集合类型
缓存预热缓存雪崩缓存击穿缓存穿透 缓存预热 预热初始化程序可以参考布隆过滤器 PostConstruct
缓存雪崩 预防和解决
redis中key设置为永不过期或者过期时间错开redis缓存集群来实现高可用多缓存结合预防雪崩ehcache本地缓存redis缓存服务降级 人民币玩家上阿里云的redis缓存服务器
缓存击穿
是什么
击穿和穿透不是一样的
解决方案
方案一 差异失效时间对于访问频繁的热点key干脆就不设置过期时间 方案二 互斥更新采用双检加锁策略 案例 天猫聚划算功能实现防止缓存击穿 实体类
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;Data
AllArgsConstructor
NoArgsConstructor
ApiModel(value 聚划算活动producet信息)
public class Product
{//产品IDprivate Long id;//产品名称private String name;//产品价格private Integer price;//产品详情private String detail;
}采用定时器将参与聚划算活动的特价商品新增进入redis中
import cn.hutool.core.date.DateUtil;
import com.atguigu.redis7.entities.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;Service
Slf4j
public class JHSTaskService
{public static final String JHS_KEYjhs;public static final String JHS_KEY_Ajhs:a;public static final String JHS_KEY_Bjhs:b;Autowiredprivate RedisTemplate redisTemplate;/*** 偷个懒不加mybatis了模拟从数据库读取100件特价商品用于加载到聚划算的页面中* return*/private ListProduct getProductsFromMysql() {ListProduct listnew ArrayList();for (int i 1; i 20; i) {Random rand new Random();int id rand.nextInt(10000);Product objnew Product((long) id,producti,i,detail);list.add(obj);}return list;}PostConstructpublic void initJHS(){log.info(启动定时器淘宝聚划算功能模拟.......... DateUtil.now());new Thread(() - {//模拟定时器一个后台任务定时把数据库的特价商品刷新到redis中while (true){//模拟从数据库读取100件特价商品用于加载到聚划算的页面中ListProduct listthis.getProductsFromMysql();//采用redis list数据结构的lpush来实现存储this.redisTemplate.delete(JHS_KEY);//lpush命令this.redisTemplate.opsForList().leftPushAll(JHS_KEY,list);//间隔一分钟 执行一遍模拟聚划算每3天刷新一批次参加活动try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }log.info(runJhs定时刷新..............);}},t1).start();}
}
控制类
import com.atguigu.redis7.entities.Product;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.util.List;RestController
Slf4j
Api(tags 聚划算商品列表接口)
public class JHSProductController
{public static final String JHS_KEYjhs;public static final String JHS_KEY_Ajhs:a;public static final String JHS_KEY_Bjhs:b;Autowiredprivate RedisTemplate redisTemplate;/*** 分页查询在高并发的情况下只能走redis查询走db的话必定会把db打垮* param page* param size* return*/RequestMapping(value /pruduct/find,method RequestMethod.GET)ApiOperation(按照分页和每页显示容量点击查看)public ListProduct find(int page, int size) {ListProduct listnull;long start (page - 1) * size;long end start size - 1;try {//采用redis list数据结构的lrange命令实现分页查询list this.redisTemplate.opsForList().range(JHS_KEY, start, end);if (CollectionUtils.isEmpty(list)) {//TODO 走DB查询}log.info(查询结果{}, list);} catch (Exception ex) {//这里的异常一般是redis瘫痪 或 redis网络timeoutlog.error(exception:, ex);//TODO 走DB查询}return list;}
}bug和隐患说明 在删除热点key 的一秒内有高并发请求了聚划算的商品列表此时商品还未上新导致mysql被暴击 解决隐患
采用双检加锁策略 多个线程同时去查询数据库的这条数据那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着等第一个线程查询到了数据然后做缓存。后面的线程进来发现已经有缓存了就直接走缓存。
差异失效时间 service
PostConstructpublic void initJHSAB(){log.info(启动AB定时器计划任务淘宝聚划算功能模拟..........DateUtil.now());new Thread(() - {//模拟定时器定时把数据库的特价商品刷新到redis中while (true){//模拟从数据库读取100件特价商品用于加载到聚划算的页面中ListProduct listthis.getProductsFromMysql();//先更新B缓存this.redisTemplate.delete(JHS_KEY_B);this.redisTemplate.opsForList().leftPushAll(JHS_KEY_B,list);this.redisTemplate.expire(JHS_KEY_B,20L,TimeUnit.DAYS);//再更新A缓存this.redisTemplate.delete(JHS_KEY_A);this.redisTemplate.opsForList().leftPushAll(JHS_KEY_A,list);this.redisTemplate.expire(JHS_KEY_A,15L,TimeUnit.DAYS);//间隔一分钟 执行一遍try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }log.info(runJhs定时刷新双缓存AB两层..............);}},t1).start();}控制类
RequestMapping(value /pruduct/findab,method RequestMethod.GET)ApiOperation(防止热点key突然失效AB双缓存架构)public ListProduct findAB(int page, int size) {ListProduct listnull;long start (page - 1) * size;long end start size - 1;try {//采用redis list数据结构的lrange命令实现分页查询list this.redisTemplate.opsForList().range(JHS_KEY_A, start, end);if (CollectionUtils.isEmpty(list)) {log.info(A缓存已经失效了记得人工修补B缓存自动延续5天);//用户先查询缓存A(上面的代码)如果缓存A查询不到例如更新缓存的时候删除了再查询缓存Bthis.redisTemplate.opsForList().range(JHS_KEY_B, start, end);//TODO 走DB查询}log.info(查询结果{}, list);} catch (Exception ex) {//这里的异常一般是redis瘫痪 或 redis网络timeoutlog.error(exception:, ex);//TODO 走DB查询}return list;}缓存穿透
是什么 解决方案
方案一 空对象缓存或者缺省值 如果发生了缓存穿透我们可以针对要查询的数据在Redis里存一个和业务部门商量后确定的缺省值(比如零、负数、defaultNull等)。
比如键uid:abcdxxx值defaultNull作为案例的key和value
先去redis查键uid:abcdxxx没有再去mysql查没有获得 这就发生了一次穿透现象。
but可以增强回写机制
mysql也查不到的话也让redis存入刚刚查不到的key并保护mysql。
第一次来查询uid:abcdxxxredis和mysql都没有返回null给调用者但是增强回写后第二次来查uid:abcdxxx此时redis就有值了。
可以直接从Redis中读取default缺省值返回给业务应用程序避免了把大量请求发送给mysql处理打爆mysql。
但是此方法架不住黑客的恶意攻击有缺陷…只能解决key相同的情况
方案二
google布隆过滤器Guava解决缓存穿透 Guava中布隆过滤器的实现算是比较权威的所以实际项目中我们可以直接使用guava布隆过滤器
依赖添加 !--guava Google 开源的 Guava 中自带的布隆过滤器--dependencygroupIdcom.google.guava/groupIdartifactIdguava/artifactIdversion23.0/version/dependency
入门 误判率
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;Service
Slf4j
public class GuavaBloomFilterService{public static final int _1W 10000;//布隆过滤器里预计要插入多少数据public static int size 100 * _1W;//误判率,它越小误判的个数也就越少(思考是不是可以设置的无限小没有误判岂不更好)//fpp the desired false positive probabilitypublic static double fpp 0.03;// 构建布隆过滤器private static BloomFilterInteger bloomFilter BloomFilter.create(Funnels.integerFunnel(), size,fpp);public void guavaBloomFilter(){//1 先往布隆过滤器里面插入100万的样本数据for (int i 1; i size; i) {bloomFilter.put(i);}//故意取10万个不在过滤器里的值看看有多少个会被认为在过滤器里ListInteger list new ArrayList(10 * _1W);for (int i size1; i size (10 *_1W); i) {if (bloomFilter.mightContain(i)) {log.info(被误判了:{},i);list.add(i);}}log.info(误判的总数量:{},list.size());}
}手写redis分布式锁
面试题 分布式锁
一个靠谱的分布式锁需要具备的条件和刚需 解决超卖防止缓存击穿
Lua脚本 可重入设计模式
工厂类
Component
public class DistributedLockFactory
{Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;public DistributedLockFactory(){this.uuidValue IdUtil.simpleUUID();//UUID}public Lock getDistributedLock(String lockType){if(lockType null) return null;if(lockType.equalsIgnoreCase(REDIS)){lockName zzyyRedisLock;return new RedisDistributedLock(stringRedisTemplate,lockName,uuidValue);} else if(lockType.equalsIgnoreCase(ZOOKEEPER)){//TODO zookeeper版本的分布式锁实现return new ZookeeperDistributedLock();} else if(lockType.equalsIgnoreCase(MYSQL)){//TODO mysql版本的分布式锁实现return null;}return null;}
}redis锁
public class RedisDistributedLock implements Lock
{private StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;private long expireTime;public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuidValue){this.stringRedisTemplate stringRedisTemplate;this.lockName lockName;this.uuidValue uuidValue:Thread.currentThread().getId();this.expireTime 30L;}Overridepublic void lock(){this.tryLock();}Overridepublic boolean tryLock(){try{return this.tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException{if(time ! -1L){expireTime unit.toSeconds(time);}String script if redis.call(exists,KEYS[1]) 0 or redis.call(hexists,KEYS[1],ARGV[1]) 1 then redis.call(hincrby,KEYS[1],ARGV[1],1) redis.call(expire,KEYS[1],ARGV[2]) return 1 else return 0 end;System.out.println(lockName: lockName\tuuidValue: uuidValue);while (!stringRedisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))){try { TimeUnit.MILLISECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }}return true;}Overridepublic void unlock(){String script if redis.call(HEXISTS,KEYS[1],ARGV[1]) 0 then return nil elseif redis.call(HINCRBY,KEYS[1],ARGV[1],-1) 0 then return redis.call(del,KEYS[1]) else return 0 end;System.out.println(lockName: lockName\tuuidValue: uuidValue);Long flag stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));if(flag null){throw new RuntimeException(没有这个锁HEXISTS查询无);}}//Overridepublic void lockInterruptibly() throws InterruptedException{}Overridepublic Condition newCondition(){return null;}
}业务类 Service
Slf4j
public class InventoryService
{Autowiredprivate StringRedisTemplate stringRedisTemplate;Value(${server.port})private String port;Autowiredprivate DistributedLockFactory distributedLockFactory;public String sale(){String retMessage ;Lock redisLock distributedLockFactory.getDistributedLock(redis);redisLock.lock();try{//1 查询库存信息String result stringRedisTemplate.opsForValue().get(inventory001);//2 判断库存是否足够Integer inventoryNumber result null ? 0 : Integer.parseInt(result);//3 扣减库存if(inventoryNumber 0) {stringRedisTemplate.opsForValue().set(inventory001,String.valueOf(--inventoryNumber));retMessage 成功卖出一个商品库存剩余: inventoryNumber;System.out.println(retMessage);this.testReEnter();}else{retMessage 商品卖完了o(╥﹏╥)o;}}catch (Exception e){e.printStackTrace();}finally {redisLock.unlock();}return retMessage\t服务端口号port;}private void testReEnter(){Lock redisLock distributedLockFactory.getDistributedLock(redis);redisLock.lock();try{System.out.println(################测试可重入锁####################################);}finally {redisLock.unlock();}}
}自动续期
public class RedisDistributedLock implements Lock
{private StringRedisTemplate stringRedisTemplate;private String lockName;//KEYS[1]private String uuidValue;//ARGV[1]private long expireTime;//ARGV[2]public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,String lockName,String uuidValue){this.stringRedisTemplate stringRedisTemplate;this.lockName lockName;this.uuidValue uuidValue:Thread.currentThread().getId();this.expireTime 30L;}Overridepublic void lock(){tryLock();}Overridepublic boolean tryLock(){try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 干活的实现加锁功能实现这一个干活的就OK全盘通用* param time* param unit* return* throws InterruptedException*/Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException{if(time ! -1L){this.expireTime unit.toSeconds(time);}String script if redis.call(exists,KEYS[1]) 0 or redis.call(hexists,KEYS[1],ARGV[1]) 1 then redis.call(hincrby,KEYS[1],ARGV[1],1) redis.call(expire,KEYS[1],ARGV[2]) return 1 else return 0 end;System.out.println(script: script);System.out.println(lockName: lockName);System.out.println(uuidValue: uuidValue);System.out.println(expireTime: expireTime);while (!stringRedisTemplate.execute(new DefaultRedisScript(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {TimeUnit.MILLISECONDS.sleep(50);}this.renewExpire();return true;}/***干活的实现解锁功能*/Overridepublic void unlock(){String script if redis.call(HEXISTS,KEYS[1],ARGV[1]) 0 then return nil elseif redis.call(HINCRBY,KEYS[1],ARGV[1],-1) 0 then return redis.call(del,KEYS[1]) else return 0 end;// nil false 1 true 0 falseSystem.out.println(lockName: lockName);System.out.println(uuidValue: uuidValue);System.out.println(expireTime: expireTime);Long flag stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));if(flag null){throw new RuntimeException(This lock doesnt EXIST);}}private void renewExpire(){String script if redis.call(HEXISTS,KEYS[1],ARGV[1]) 1 then return redis.call(expire,KEYS[1],ARGV[2]) else return 0 end;new Timer().schedule(new TimerTask(){Overridepublic void run(){if (stringRedisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {renewExpire();}}},(this.expireTime * 1000)/3);}//下面的redis分布式锁暂时用不到//下面的redis分布式锁暂时用不到//下面的redis分布式锁暂时用不到Overridepublic void lockInterruptibly() throws InterruptedException{}Overridepublic Condition newCondition(){return null;}
}业务层
Service
Slf4j
public class InventoryService
{Autowiredprivate StringRedisTemplate stringRedisTemplate;Value(${server.port})private String port;Autowiredprivate DistributedLockFactory distributedLockFactory;public String sale(){String retMessage ;Lock redisLock distributedLockFactory.getDistributedLock(redis);redisLock.lock();try{//1 查询库存信息String result stringRedisTemplate.opsForValue().get(inventory001);//2 判断库存是否足够Integer inventoryNumber result null ? 0 : Integer.parseInt(result);//3 扣减库存if(inventoryNumber 0) {stringRedisTemplate.opsForValue().set(inventory001,String.valueOf(--inventoryNumber));retMessage 成功卖出一个商品库存剩余: inventoryNumber;System.out.println(retMessage);//暂停几秒钟线程,为了测试自动续期try { TimeUnit.SECONDS.sleep(120); } catch (InterruptedException e) { e.printStackTrace(); }}else{retMessage 商品卖完了o(╥﹏╥)o;}}catch (Exception e){e.printStackTrace();}finally {redisLock.unlock();}return retMessage\t服务端口号port;}private void testReEnter(){Lock redisLock distributedLockFactory.getDistributedLock(redis);redisLock.lock();try{System.out.println(################测试可重入锁####################################);}finally {redisLock.unlock();}}
}
RedLock算法及原理
官网https://redis.io/docs/manual/patterns/distributed-locks/
设计理念 解决方案 redissonhttps://redisson.org/
redisson实战案例单机
pom
!--redisson--
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.13.4/version
/dependency
redisconfig
Configuration
public class RedisConfig
{Beanpublic RedisTemplateString, Object redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){RedisTemplateString,Object redisTemplate new RedisTemplate();redisTemplate.setConnectionFactory(lettuceConnectionFactory);//设置key序列化方式stringredisTemplate.setKeySerializer(new StringRedisSerializer());//设置value的序列化方式jsonredisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}//单Redis节点模式Beanpublic Redisson redisson(){Config config new Config();config.useSingleServer().setAddress(redis://192.168.111.175:6379).setDatabase(0).setPassword(111111);return (Redisson) Redisson.create(config);}
}contoller ApiOperation(扣减库存saleByRedisson一次卖一个)GetMapping(value /inventory/saleByRedisson)public String saleByRedisson(){return inventoryService.saleByRedisson();}service
Service
Slf4j
public class InventoryService
{Autowiredprivate StringRedisTemplate stringRedisTemplate;Value(${server.port})private String port;Autowiredprivate DistributedLockFactory distributedLockFactory;Autowiredprivate Redisson redisson;public String saleByRedisson(){String retMessage ;String key RedisLock;RLock redissonLock redisson.getLock(key);redissonLock.lock();try{//1 查询库存信息String result stringRedisTemplate.opsForValue().get(inventory001);//2 判断库存是否足够Integer inventoryNumber result null ? 0 : Integer.parseInt(result);//3 扣减库存if(inventoryNumber 0) {stringRedisTemplate.opsForValue().set(inventory001,String.valueOf(--inventoryNumber));retMessage 成功卖出一个商品库存剩余: inventoryNumber;System.out.println(retMessage);}else{retMessage 商品卖完了o(╥﹏╥)o;}}finally {if(redissonLock.isLocked() redissonLock.isHeldByCurrentThread()){redissonLock.unlock();}}return retMessage\t服务端口号port;}
}redisson实战案例多机多重锁
启动3台docker容器的redis的master主机pom
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.10.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.atguigu.redis.redlock/groupIdartifactIdredis_redlock/artifactIdversion0.0.1-SNAPSHOT/versionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.19.1/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.8/version/dependency!--swagger--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependency!--swagger-ui--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion3.4/versionscopecompile/scope/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.11/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-configuration-processor/artifactIdoptionaltrue/optional/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.springframework.boot/groupIdartifactIdspring-boot-configuration-processor/artifactId/exclude/excludes/configuration/plugin/plugins/build/project
配置文件
server.port9090
spring.application.nameredlockspring.swagger2.enabledtruespring.redis.database0
spring.redis.password
spring.redis.timeout3000
spring.redis.modesinglespring.redis.pool.conn-timeout3000
spring.redis.pool.so-timeout3000
spring.redis.pool.size10spring.redis.single.address1192.168.111.185:6381
spring.redis.single.address2192.168.111.185:6382
spring.redis.single.address3192.168.111.185:6383启动类
SpringBootApplication
public class RedisRedlockApplication
{public static void main(String[] args){SpringApplication.run(RedisRedlockApplication.class, args);}}业务类 cacheConfiguration
Configuration
EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {AutowiredRedisProperties redisProperties;BeanRedissonClient redissonClient1() {Config config new Config();String node redisProperties.getSingle().getAddress1();node node.startsWith(redis://) ? node : redis:// node;SingleServerConfig serverConfig config.useSingleServer().setAddress(node).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtils.isNotBlank(redisProperties.getPassword())) {serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}BeanRedissonClient redissonClient2() {Config config new Config();String node redisProperties.getSingle().getAddress2();node node.startsWith(redis://) ? node : redis:// node;SingleServerConfig serverConfig config.useSingleServer().setAddress(node).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtils.isNotBlank(redisProperties.getPassword())) {serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}BeanRedissonClient redissonClient3() {Config config new Config();String node redisProperties.getSingle().getAddress3();node node.startsWith(redis://) ? node : redis:// node;SingleServerConfig serverConfig config.useSingleServer().setAddress(node).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtils.isNotBlank(redisProperties.getPassword())) {serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}/*** 单机* return*//*Beanpublic Redisson redisson(){Config config new Config();config.useSingleServer().setAddress(redis://192.168.111.147:6379).setDatabase(0);return (Redisson) Redisson.create(config);}*/}redispoolproperties
import lombok.Data;Data
public class RedisPoolProperties {private int maxIdle;private int minIdle;private int maxActive;private int maxWait;private int connTimeout;private int soTimeout;/*** 池大小*/private int size;}redisproperties
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;ConfigurationProperties(prefix spring.redis, ignoreUnknownFields false)
Data
public class RedisProperties {private int database;/*** 等待节点回复命令的时间。该时间从命令发送成功时开始计时*/private int timeout;private String password;private String mode;/*** 池配置*/private RedisPoolProperties pool;/*** 单机信息配置*/private RedisSingleProperties single;}redisRingleProperties
import lombok.Data;Data
public class RedisSingleProperties {private String address1;private String address2;private String address3;
}controller
RestController
Slf4j
public class RedLockController {public static final String CACHE_KEY_REDLOCK ATGUIGU_REDLOCK;AutowiredRedissonClient redissonClient1;AutowiredRedissonClient redissonClient2;AutowiredRedissonClient redissonClient3;boolean isLockBoolean;GetMapping(value /multiLock)//多重锁 public String getMultiLock() throws InterruptedException{String uuid IdUtil.simpleUUID();String uuidValue uuid:Thread.currentThread().getId();RLock lock1 redissonClient1.getLock(CACHE_KEY_REDLOCK);RLock lock2 redissonClient2.getLock(CACHE_KEY_REDLOCK);RLock lock3 redissonClient3.getLock(CACHE_KEY_REDLOCK);RedissonMultiLock redLock new RedissonMultiLock(lock1, lock2, lock3);redLock.lock();try{System.out.println(uuidValue\t---come in biz multiLock);try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(uuidValue\t---task is over multiLock);} catch (Exception e) {e.printStackTrace();log.error(multiLock exception ,e);} finally {redLock.unlock();log.info(释放分布式锁成功key:{}, CACHE_KEY_REDLOCK);}return multiLock task is over uuidValue;}}reids的缓存过期淘汰策略
redis的缓存内容大小以及配置
reids的默认内容是多少在哪里配置以及如何修改
查看redis的最大占用内存
info memoryconfig get maxmemory打开redis配置文件设置maxmemory参数maxmemory是bytes字节类型注意转换。
在64位系统下设置为0表示的是不限制redis 的内存使用
在生产环境下一般推荐redis设置内存为最大物理内存的四分之三
超出内存使用上线会报OOM错误所以需要内存淘汰
修改redis内存设置 配置文件 or 命令方式
往redis里写的数据是怎么没得他是如何删除的
使用redis的八种淘汰策略 【MEMORY MANAGEMENT】 lru算法和lfu算法的区别
区别 LRU最近最少使用页面置换算法淘汰最长时间未被使用的页面看页面最后一次被使用到发生调度的时间长短首先淘汰最长时间未被使用的页面
LFU最近最不常用页面置换算法淘汰一定时期内被访问次数最少的页看一定时间段内页面被使用的频率淘汰一定时期内被访问次数最少的页
例子 某次时期Time为10分钟,如果每分钟进行一次调页,主存块为3,若所需页面走向为2 1 2 1 2 3 4
假设到页面4时会发生缺页中断
若按LRU算法,应换页面1(1页面最久未被使用因为第二个块中1在最前面)但按LFU算法应换页面3(十分钟内,页面3只使用了一次) 可见LRU关键是看页面最后一次被使用到发生调度的时间长短,而LFU关键是看一定时间段内页面被使用的频率!
八种淘汰策略
两个维度四个方面 维度所有key和有过期时间的key 四个方面lrulfu随机randomttl
配置建议避免存储bigkey 开启惰性淘汰lazyfree-lazy-evictionyes
微信抢红包功能实现
需求分析 各种节假日发红包抢红包100%高并发业务要求不能用mysql来做, 一个总的大红包会有可能拆分成多个小红包总金额 分金额1分金额2分金额3…分金额N 每个人只能抢一次你需要有记录比如100块钱被拆分成10个红包发出去 总计有10个红包抢一个少一个总数显示(10/6)直到完需要记录那些人抢到了红包重复抢作弊不可以。 有可能还需要你计时完整抢完从发出到全部over耗时多少 红包过期或者群主人品差没人抢红包原封不动退回。 红包过期剩余金额可能需要回退到发红包主账户下。
关键点
发红包 list抢红包 抢不加锁且原子性还需要能支持高并发 lpop 出list即可记红包 记录每个人抢了多少 hash 同一个用户不可以抢夺2次红包拆红包 算法所有人抢到金额之和等于红包金额不能超过也不能少于每个人至少抢到一分钱保证所有人抢到金额的几率相等
抢红包业务通用算法-二倍均值法
剩余红包金额为M剩余人数为N那么有如下公式
每次抢到的金额 随机区间 0 (剩余红包金额M ÷ 剩余人数N ) X 2
这个公式保证了每次随机金额的平均值是相等的不会因为抢红包的先后顺序而造成不公平。
举个栗子
假设有10个人红包总额100元。
第1次
100÷10 X2 20, 所以第一个人的随机范围是020 )平均可以抢到10元。假设第一个人随机到10元那么剩余金额是100-10 90 元。
第2次
90÷9 X2 20, 所以第二个人的随机范围同样是020 )平均可以抢到10元。假设第二个人随机到10元那么剩余金额是90-10 80 元。
第3次
80÷8 X2 20, 所以第三个人的随机范围同样是020 )平均可以抢到10元。 以此类推每一次随机范围的均值是相等的。
代码
不考虑小数
RestController
public class RedPackageController
{public static final String RED_PACKAGE_KEY redpackage:;public static final String RED_PACKAGE_CONSUME_KEY redpackage:consume:;Resourceprivate RedisTemplate redisTemplate;/*** 拆分发送红包* http://localhost:5555/send?totalMoney100redPackageNumber5* param totalMoney* param redPackageNumber* return*/RequestMapping(/send)public String sendRedPackage(int totalMoney,int redPackageNumber){//1 拆红包总金额拆分成多少个红包每个小红包里面包多少钱Integer[] splitRedPackages splitRedPackage(totalMoney, redPackageNumber);//2 红包的全局IDString key RED_PACKAGE_KEYIdUtil.simpleUUID();//3 采用list存储红包并设置过期时间redisTemplate.opsForList().leftPushAll(key,splitRedPackages);redisTemplate.expire(key,1,TimeUnit.DAYS);return key\t\t Ints.asList(Arrays.stream(splitRedPackages).mapToInt(Integer::valueOf).toArray());}/*** http://localhost:5555/rob?redPackageKey上一步的红包UUIDuserId1* param redPackageKey* param userId* return*/RequestMapping(/rob)public String rodRedPackage(String redPackageKey,String userId){//1 验证某个用户是否抢过红包Object redPackage redisTemplate.opsForHash().get(RED_PACKAGE_CONSUME_KEY redPackageKey, userId);//2 没有抢过就开抢否则返回-2表示抢过if (redPackage null) {// 2.1 从list里面出队一个红包抢到了一个Object partRedPackage redisTemplate.opsForList().leftPop(RED_PACKAGE_KEY redPackageKey);if (partRedPackage ! null) {//2.2 抢到手后记录进去hash表示谁抢到了多少钱的某一个红包redisTemplate.opsForHash().put(RED_PACKAGE_CONSUME_KEY redPackageKey,userId,partRedPackage);System.out.println(用户: userId\t 抢到多少钱红包: partRedPackage);//TODO 后续异步进mysql或者RabbitMQ进一步处理return String.valueOf(partRedPackage);}//抢完return errorCode:-1,红包抢完了;}//3 某个用户抢过了不可以作弊重新抢return errorCode:-2, message: \tuserId 用户你已经抢过红包了;}/*** 1 拆完红包总金额每个小红包金额别太离谱* param totalMoney* param redPackageNumber* return*/private Integer[] splitRedPackage(int totalMoney, int redPackageNumber){int useMoney 0;Integer[] redPackageNumbers new Integer[redPackageNumber];Random random new Random();for (int i 0; i redPackageNumber; i){if(i redPackageNumber - 1){redPackageNumbers[i] totalMoney - useMoney;}else{int avgMoney (totalMoney - useMoney) * 2 / (redPackageNumber - i);redPackageNumbers[i] 1 random.nextInt(avgMoney - 1);}useMoney useMoney redPackageNumbers[i];}return redPackageNumbers;}
}批量删除key