国家住房城乡建设部网站,我是做废品回收,最近有个变宝网主动联系我说是再生资源网站的,可信吗?,网页设计素材背景图片,绍兴建设局网站首页文章目录前提Lettuce简介连接Redis定制的连接URI语法基本使用API同步API异步API反应式API发布和订阅事务和批量命令执行Lua脚本执行高可用和分片普通主从模式哨兵模式集群模式动态命令和自定义命令高阶特性配置客户端资源使用连接池几个常见的渐进式删除例子在SpringBoot中使用…
文章目录前提Lettuce简介连接Redis定制的连接URI语法基本使用API同步API异步API反应式API发布和订阅事务和批量命令执行Lua脚本执行高可用和分片普通主从模式哨兵模式集群模式动态命令和自定义命令高阶特性配置客户端资源使用连接池几个常见的渐进式删除例子在SpringBoot中使用Lettuce小结链接前提
Lettuce,读音[ˈletɪs]是一个Redis的Java驱动包初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码发现spring-data-redis的驱动包在某个版本之后替换为Lettuce。Lettuce翻译为生菜没错就是吃的那种生菜所以它的Logo长这样 既然能被Spring生态所认可Lettuce想必有过人之处于是笔者花时间阅读她的官方文档整理测试示例写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASESpringBoot 2.1.8.RELEASEJDK [8,11]。超长警告这篇文章断断续续花了两周完成超过4万字…
Lettuce简介
Lettuce是一个高性能基于Java编写的Redis驱动框架底层集成了Project Reactor提供天然的反应式编程通信框架集成了Netty使用了非阻塞IO5.x版本之后融合了JDK1.8的异步编程特性在保证高性能的同时提供了十分丰富易用的API5.1版本的新特性如下
支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。支持通过Brave模块跟踪Redis命令执行。支持Redis Streams。支持异步的主从连接。支持异步连接池。新增命令最多执行一次模式禁止自动重连。全局命令超时设置对异步和反应式命令也有效。…等等
注意一点Redis的版本至少需要2.6当然越高越好API的兼容性比较强大。
只需要引入单个依赖就可以开始愉快地使用Lettuce
Maven
dependencygroupIdio.lettuce/groupIdartifactIdlettuce-core/artifactIdversion5.1.8.RELEASE/version
/dependencyGradle
dependencies {compile io.lettuce:lettuce-core:5.1.8.RELEASE
}连接Redis
单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例
定制的字符串URI语法
RedisURI uri RedisURI.create(redis://localhost/);使用建造器RedisURI.Builder
RedisURI uri RedisURI.builder().withHost(localhost).withPort(6379).build();直接通过构造函数实例化
RedisURI uri new RedisURI(localhost, 6379, 60, TimeUnit.SECONDS);定制的连接URI语法
单机前缀为redis://
格式redis://[password]host[:port][/databaseNumber][?[timeouttimeout[d|h|m|s|ms|us|ns]]
完整redis://mypassword127.0.0.1:6379/0?timeout10s
简单redis://localhost单机并且使用SSL前缀为rediss:// 注意后面多了个s
格式rediss://[password]host[:port][/databaseNumber][?[timeouttimeout[d|h|m|s|ms|us|ns]]
完整rediss://mypassword127.0.0.1:6379/0?timeout10s
简单rediss://localhost单机Unix Domain Sockets模式前缀为redis-socket://
格式redis-socket://path[?[timeouttimeout[d|h|m|s|ms|us|ns]][_databasedatabase_]]
完整redis-socket:///tmp/redis?timeout10s_database0哨兵前缀为redis-sentinel://
格式redis-sentinel://[password]host[:port][,host2[:port2]][/databaseNumber][?[timeouttimeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整redis-sentinel://mypassword127.0.0.1:6379,127.0.0.1:6380/0?timeout10s#mymaster超时时间单位
d 天h 小时m 分钟s 秒钟ms 毫秒us 微秒ns 纳秒
个人建议使用RedisURI提供的建造器毕竟定制的URI虽然简洁但是比较容易出现人为错误。鉴于笔者没有SSL和Unix Domain Socket的使用场景下面不对这两种连接方式进行列举。
基本使用
Lettuce使用的时候依赖于四个主要组件
RedisURI连接信息。RedisClientRedis客户端特殊地集群连接有一个定制的RedisClusterClient。ConnectionRedis连接主要是StatefulConnection或者StatefulRedisConnection的子类连接的类型主要由连接的具体方式单机、哨兵、集群、订阅发布等等选定比较重要。RedisCommandsRedis命令API接口基本上覆盖了Redis发行版本的所有命令提供了同步sync、异步async、反应式reative的调用方式对于使用者而言会经常跟RedisCommands系列接口打交道。
一个基本使用例子如下
Test
public void testSetGet() throws Exception {RedisURI redisUri RedisURI.builder() // 1 创建单机连接的连接信息.withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri); // 2 创建客户端StatefulRedisConnectionString, String connection redisClient.connect(); // 3 创建线程安全的连接RedisCommandsString, String redisCommands connection.sync(); // 4 创建同步命令SetArgs setArgs SetArgs.Builder.nx().ex(5);String result redisCommands.set(name, throwable, setArgs);Assertions.assertThat(result).isEqualToIgnoringCase(OK);result redisCommands.get(name);Assertions.assertThat(result).isEqualTo(throwable);// ... 其他操作connection.close(); // 5 关闭连接redisClient.shutdown(); // 6 关闭客户端
}注意
5关闭连接一般在应用程序停止之前操作一个应用程序中的一个Redis驱动实例不需要太多的连接一般情况下只需要一个连接实例就可以如果有多个连接的需要可以考虑使用连接池其实Redis目前处理命令的模块是单线程在客户端多个连接多线程调用理论上没有效果。6关闭客户端一般应用程序停止之前操作如果条件允许的话基于后开先闭原则客户端关闭应该在连接关闭之后操作。
API
Lettuce主要提供三种API
同步syncRedisCommands。异步asyncRedisAsyncCommands。反应式reactiveRedisReactiveCommands。
先准备好一个单机Redis连接备用
private static StatefulRedisConnectionString, String CONNECTION;
private static RedisClient CLIENT;BeforeClass
public static void beforeClass() {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();CLIENT RedisClient.create(redisUri);CONNECTION CLIENT.connect();
}AfterClass
public static void afterClass() throws Exception {CONNECTION.close();CLIENT.shutdown();
}Redis命令API的具体实现可以直接从StatefulRedisConnection实例获取见其接口定义
public interface StatefulRedisConnectionK, V extends StatefulConnectionK, V {boolean isMulti();RedisCommandsK, V sync();RedisAsyncCommandsK, V async();RedisReactiveCommandsK, V reactive();
} 值得注意的是在不指定编码解码器RedisCodec的前提下RedisClient创建的StatefulRedisConnection实例一般是泛型实例StatefulRedisConnectionString,String也就是所有命令API的KEY和VALUE都是String类型这种使用方式能满足大部分的使用场景。当然必要的时候可以定制编码解码器RedisCodecK,V。
同步API
先构建RedisCommands实例
private static RedisCommandsString, String COMMAND;BeforeClass
public static void beforeClass() {COMMAND CONNECTION.sync();
}基本使用
Test
public void testSyncPing() throws Exception {String pong COMMAND.ping();Assertions.assertThat(pong).isEqualToIgnoringCase(PONG);
}Test
public void testSyncSetAndGet() throws Exception {SetArgs setArgs SetArgs.Builder.nx().ex(5);COMMAND.set(name, throwable, setArgs);String value COMMAND.get(name);log.info(Get value: {}, value);
}// Get value: throwable同步API在所有命令调用之后会立即返回结果。如果熟悉Jedis的话RedisCommands的用法其实和它相差不大。
异步API
先构建RedisAsyncCommands实例
private static RedisAsyncCommandsString, String ASYNC_COMMAND;BeforeClass
public static void beforeClass() {ASYNC_COMMAND CONNECTION.async();
}基本使用
Test
public void testAsyncPing() throws Exception {RedisFutureString redisFuture ASYNC_COMMAND.ping();log.info(Ping result:{}, redisFuture.get());
}
// Ping result:PONGRedisAsyncCommands所有方法执行返回结果都是RedisFuture实例而RedisFuture接口的定义如下
public interface RedisFutureV extends CompletionStageV, FutureV {String getError();boolean await(long timeout, TimeUnit unit) throws InterruptedException;
} 也就是RedisFuture可以无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子
Test
public void testAsyncSetAndGet1() throws Exception {SetArgs setArgs SetArgs.Builder.nx().ex(5);RedisFutureString future ASYNC_COMMAND.set(name, throwable, setArgs);// CompletableFuture#thenAccept()future.thenAccept(value - log.info(Set命令返回:{}, value));// Future#get()future.get();
}
// Set命令返回:OKTest
public void testAsyncSetAndGet2() throws Exception {SetArgs setArgs SetArgs.Builder.nx().ex(5);CompletableFutureVoid result (CompletableFutureVoid) ASYNC_COMMAND.set(name, throwable, setArgs).thenAcceptBoth(ASYNC_COMMAND.get(name),(s, g) - {log.info(Set命令返回:{}, s);log.info(Get命令返回:{}, g);});result.get();
}
// Set命令返回:OK
// Get命令返回:throwable如果能熟练使用CompletableFuture和函数式编程技巧可以组合多个RedisFuture完成一些列复杂的操作。
反应式API
Lettuce引入的反应式编程框架是Project Reactor如果没有反应式编程经验可以先自行了解一下Project Reactor。
构建RedisReactiveCommands实例
private static RedisReactiveCommandsString, String REACTIVE_COMMAND;BeforeClass
public static void beforeClass() {REACTIVE_COMMAND CONNECTION.reactive();
}根据Project ReactorRedisReactiveCommands的方法如果返回的结果只包含0或1个元素那么返回值类型是Mono如果返回的结果包含0到NN大于0个元素那么返回值是Flux。举个例子
Test
public void testReactivePing() throws Exception {MonoString ping REACTIVE_COMMAND.ping();ping.subscribe(v - log.info(Ping result:{}, v));Thread.sleep(1000);
}
// Ping result:PONGTest
public void testReactiveSetAndGet() throws Exception {SetArgs setArgs SetArgs.Builder.nx().ex(5);REACTIVE_COMMAND.set(name, throwable, setArgs).block();REACTIVE_COMMAND.get(name).subscribe(value - log.info(Get命令返回:{}, value));Thread.sleep(1000);
}
// Get命令返回:throwableTest
public void testReactiveSet() throws Exception {REACTIVE_COMMAND.sadd(food, bread, meat, fish).block();FluxString flux REACTIVE_COMMAND.smembers(food);flux.subscribe(log::info);REACTIVE_COMMAND.srem(food, bread, meat, fish).block();Thread.sleep(1000);
}
// meat
// bread
// fish举个更加复杂的例子包含了事务、函数转换等
Test
public void testReactiveFunctional() throws Exception {REACTIVE_COMMAND.multi().doOnSuccess(r - {REACTIVE_COMMAND.set(counter, 1).doOnNext(log::info).subscribe();REACTIVE_COMMAND.incr(counter).doOnNext(c - log.info(String.valueOf(c))).subscribe();}).flatMap(s - REACTIVE_COMMAND.exec()).doOnNext(transactionResult - log.info(Discarded:{}, transactionResult.wasDiscarded())).subscribe();Thread.sleep(1000);
}
// OK
// 2
// Discarded:false这个方法开启一个事务先把counter设置为1再将counter自增1。
发布和订阅
非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()
非集群模式
// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client ...
StatefulRedisPubSubConnectionString, String connection client.connectPubSub();
connection.addListener(new RedisPubSubListenerString, String() { ... });// 同步命令
RedisPubSubCommandsString, String sync connection.sync();
sync.subscribe(channel);// 异步命令
RedisPubSubAsyncCommandsString, String async connection.async();
RedisFutureVoid future async.subscribe(channel);// 反应式命令
RedisPubSubReactiveCommandsString, String reactive connection.reactive();
reactive.subscribe(channel).subscribe();reactive.observeChannels().doOnNext(patternMessage - {...}).subscribe()集群模式 // 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient ...
StatefulRedisClusterPubSubConnectionString, String connection clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListenerString, String() { ... });
RedisPubSubCommandsString, String sync connection.sync();
sync.subscribe(channel);
// ...这里用单机同步命令的模式举一个Redis键空间通知Redis Keyspace Notifications的例子
Test
public void testSyncKeyspaceNotification() throws Exception {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379)// 注意这里只能是0号库.withDatabase(0).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri);StatefulRedisConnectionString, String redisConnection redisClient.connect();RedisCommandsString, String redisCommands redisConnection.sync();// 只接收键过期的事件redisCommands.configSet(notify-keyspace-events, Ex);StatefulRedisPubSubConnectionString, String connection redisClient.connectPubSub();connection.addListener(new RedisPubSubAdapter() {Overridepublic void psubscribed(String pattern, long count) {log.info(pattern:{},count:{}, pattern, count);}Overridepublic void message(String pattern, String channel, String message) {log.info(pattern:{},channel:{},message:{}, pattern, channel, message);}});RedisPubSubCommandsString, String commands connection.sync();commands.psubscribe(__keyevent0__:expired);redisCommands.setex(name, 2, throwable);Thread.sleep(10000);redisConnection.close();connection.close();redisClient.shutdown();
}
// pattern:__keyevent0__:expired,count:1
// pattern:__keyevent0__:expired,channel:__keyevent0__:expired,message:name实际上在实现RedisPubSubListener的时候可以单独抽离尽量不要设计成匿名内部类的形式。
事务和批量命令执行
事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD在RedisCommands系列接口中有对应的方法。举个例子
// 同步模式
Test
public void testSyncMulti() throws Exception {COMMAND.multi();COMMAND.setex(name-1, 2, throwable);COMMAND.setex(name-2, 2, doge);TransactionResult result COMMAND.exec();int index 0;for (Object r : result) {log.info(Result-{}:{}, index, r);index;}
}
// Result-0:OK
// Result-1:OKRedis的Pipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端然后Redis服务端把所有的响应结果打包好一次性返回从而节省不必要的网络资源最主要是减少网络请求次数。Redis对于Pipeline机制如何实现并没有明确的规定也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO阻塞IO通讯所以它的做法是客户端缓存将要发送的命令最后需要触发然后同步发送一个巨大的命令列表包再接收和解析一个巨大的响应列表包。Pipeline在Lettuce中对使用者是透明的由于底层的通讯框架是Netty所以网络通讯层面的优化Lettuce不需要过多干预换言之可以这样理解Netty帮Lettuce从底层实现了Redis的Pipeline机制。但是Lettuce的异步API也提供了手动Flush的方法
Test
public void testAsyncManualFlush() {// 取消自动flushASYNC_COMMAND.setAutoFlushCommands(false);ListRedisFuture? redisFutures Lists.newArrayList();int count 5000;for (int i 0; i count; i) {String key key- (i 1);String value value- (i 1);redisFutures.add(ASYNC_COMMAND.set(key, value));redisFutures.add(ASYNC_COMMAND.expire(key, 2));}long start System.currentTimeMillis();ASYNC_COMMAND.flushCommands();boolean result LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));Assertions.assertThat(result).isTrue();log.info(Lettuce cost:{} ms, System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms上面只是从文档看到的一些理论术语但是现实是骨感的对比了下Jedis的Pipeline提供的方法发现了Jedis的Pipeline执行耗时比较低
Test
public void testJedisPipeline() throws Exception {Jedis jedis new Jedis();Pipeline pipeline jedis.pipelined();int count 5000;for (int i 0; i count; i) {String key key- (i 1);String value value- (i 1);pipeline.set(key, value);pipeline.expire(key, 2);}long start System.currentTimeMillis();pipeline.syncAndReturnAll();log.info(Jedis cost:{} ms, System.currentTimeMillis() - start);
}
// Jedis cost:9 ms个人猜测Lettuce可能底层并非合并所有命令一次发送甚至可能是单条发送具体可能需要抓包才能定位。依此来看如果真的有大量执行Redis命令的场景不妨可以使用Jedis的Pipeline。
注意由上面的测试推断RedisTemplate的executePipelined()方法是假的Pipeline执行方法使用RedisTemplate的时候请务必注意这一点。
Lua脚本执行
Lettuce中执行Redis的Lua命令的同步接口如下
public interface RedisScriptingCommandsK, V {T T eval(String var1, ScriptOutputType var2, K... var3);T T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);T T evalsha(String var1, ScriptOutputType var2, K... var3);T T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);ListBoolean scriptExists(String... var1);String scriptFlush();String scriptKill();String scriptLoad(V var1);String digest(V var1);
}异步和反应式的接口方法定义差不多不同的地方就是返回值类型一般我们常用的是eval()、evalsha()和scriptLoad()方法。举个简单的例子
private static RedisCommandsString, String COMMANDS;
private static String RAW_LUA local key KEYS[1]\n local value ARGV[1]\n local timeout ARGV[2]\n redis.call(SETEX, key, tonumber(timeout), value)\n local result redis.call(GET, key)\n return result;;
private static AtomicReferenceString LUA_SHA new AtomicReference();Test
public void testLua() throws Exception {LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));String[] keys new String[]{name};String[] args new String[]{throwable, 5000};String result COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);log.info(Get value:{}, result);
}
// Get value:throwable高可用和分片
为了Redis的高可用一般会采用普通主从Master/Replica这里笔者称为普通主从模式也就是仅仅做了主从复制故障需要手动切换、哨兵和集群。普通主从模式可以独立运行也可以配合哨兵运行只是哨兵提供自动故障转移和主节点提升功能。普通主从和哨兵都可以使用MasterSlave通过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。
这里注意一点MasterSlave中提供的方法如果只要求传入一个RedisURI实例那么Lettuce会进行拓扑发现机制自动获取Redis主从节点信息如果要求传入一个RedisURI集合那么对于普通主从模式来说所有节点信息是静态的不会进行发现和更新。
拓扑发现的规则如下
对于普通主从Master/Replica模式不需要感知RedisURI指向从节点还是主节点只会进行一次性的拓扑查找所有节点信息此后节点信息会保存在静态缓存中不会更新。对于哨兵模式会订阅所有哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制更新缓存的节点信息也就是哨兵天然就是动态发现节点信息不支持静态配置。
拓扑发现机制的提供API为TopologyProvider需要了解其原理的可以参考具体的实现。
对于集群Cluster模式Lettuce提供了一套独立的API。
另外如果Lettuce连接面向的是非单个Redis节点连接实例提供了数据读取节点偏好ReadFrom设置可选值有
MASTER只从Master节点中读取。MASTER_PREFERRED优先从Master节点中读取。SLAVE_PREFERRED优先从Slavor节点中读取。SLAVE只从Slavor节点中读取。NEAREST使用最近一次连接的Redis实例读取。
普通主从模式
假设现在有三个Redis服务形成树状主从关系如下
节点一localhost:6379角色为Master。节点二localhost:6380角色为Slavor节点一的从节点。节点三localhost:6381角色为Slavor节点二的从节点。
首次动态节点发现主从模式的节点信息需要如下构建连接
Test
public void testDynamicReplica() throws Exception {// 这里只需要配置一个节点的连接信息不一定需要是主节点的信息从节点也可以RedisURI uri RedisURI.builder().withHost(localhost).withPort(6379).build();RedisClient redisClient RedisClient.create(uri);StatefulRedisMasterSlaveConnectionString, String connection MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);// 只从从节点读取数据connection.setReadFrom(ReadFrom.SLAVE);// 执行其他Redis命令connection.close();redisClient.shutdown();
}如果需要指定静态的Redis主从节点连接属性那么可以这样构建连接
Test
public void testStaticReplica() throws Exception {ListRedisURI uris new ArrayList();RedisURI uri1 RedisURI.builder().withHost(localhost).withPort(6379).build();RedisURI uri2 RedisURI.builder().withHost(localhost).withPort(6380).build();RedisURI uri3 RedisURI.builder().withHost(localhost).withPort(6381).build();uris.add(uri1);uris.add(uri2);uris.add(uri3);RedisClient redisClient RedisClient.create();StatefulRedisMasterSlaveConnectionString, String connection MasterSlave.connect(redisClient,new Utf8StringCodec(), uris);// 只从主节点读取数据connection.setReadFrom(ReadFrom.MASTER);// 执行其他Redis命令connection.close();redisClient.shutdown();
}哨兵模式
由于Lettuce自身提供了哨兵的拓扑发现机制所以只需要随便配置一个哨兵节点的RedisURI实例即可
Test
public void testDynamicSentinel() throws Exception {RedisURI redisUri RedisURI.builder().withPassword(你的密码).withSentinel(localhost, 26379).withSentinelMasterId(哨兵Master的ID).build();RedisClient redisClient RedisClient.create();StatefulRedisMasterSlaveConnectionString, String connection MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);// 只允许从从节点读取数据connection.setReadFrom(ReadFrom.SLAVE);RedisCommandsString, String command connection.sync();SetArgs setArgs SetArgs.Builder.nx().ex(5);command.set(name, throwable, setArgs);String value command.get(name);log.info(Get value:{}, value);
}
// Get value:throwable集群模式
鉴于笔者对Redis集群模式并不熟悉Cluster模式下的API使用本身就有比较多的限制所以这里只简单介绍一下怎么用。先说几个特性
下面的API提供跨槽位Slot调用的功能
RedisAdvancedClusterCommands。RedisAdvancedClusterAsyncCommands。RedisAdvancedClusterReactiveCommands。
静态节点选择功能
masters选择所有主节点执行命令。slaves选择所有从节点执行命令其实就是只读模式。all nodes命令可以在所有节点执行。
集群拓扑视图动态更新功能
手动更新主动调用RedisClusterClient#reloadPartitions()。后台定时更新。自适应更新基于连接断开和MOVED/ASK命令重定向自动更新。
Redis集群搭建详细过程可以参考官方文档假设已经搭建好集群如下192.168.56.200是笔者的虚拟机Host
192.168.56.200:7001 主节点槽位0-5460。192.168.56.200:7002 主节点槽位5461-10922。192.168.56.200:7003 主节点槽位10923-16383。192.168.56.200:7004 7001的从节点。192.168.56.200:7005 7002的从节点。192.168.56.200:7006 7003的从节点。
简单的集群连接和使用方式如下
Test
public void testSyncCluster(){RedisURI uri RedisURI.builder().withHost(192.168.56.200).build();RedisClusterClient redisClusterClient RedisClusterClient.create(uri);StatefulRedisClusterConnectionString, String connection redisClusterClient.connect();RedisAdvancedClusterCommandsString, String commands connection.sync();commands.setex(name,10, throwable);String value commands.get(name);log.info(Get value:{}, value);
}
// Get value:throwable节点选择
Test
public void testSyncNodeSelection() {RedisURI uri RedisURI.builder().withHost(192.168.56.200).withPort(7001).build();RedisClusterClient redisClusterClient RedisClusterClient.create(uri);StatefulRedisClusterConnectionString, String connection redisClusterClient.connect();RedisAdvancedClusterCommandsString, String commands connection.sync();
// commands.all(); // 所有节点
// commands.masters(); // 主节点// 从节点只读NodeSelectionString, String replicas commands.slaves();NodeSelectionCommandsString, String nodeSelectionCommands replicas.commands();// 这里只是演示,一般应该禁用keys *命令ExecutionsListString keys nodeSelectionCommands.keys(*);keys.forEach(key - log.info(key: {}, key));connection.close();redisClusterClient.shutdown();
}定时更新集群拓扑视图每隔十分钟更新一次这个时间自行考量不能太频繁
Test
public void testPeriodicClusterTopology() throws Exception {RedisURI uri RedisURI.builder().withHost(192.168.56.200).withPort(7001).build();RedisClusterClient redisClusterClient RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnectionString, String connection redisClusterClient.connect();RedisAdvancedClusterCommandsString, String commands connection.sync();commands.setex(name, 10, throwable);String value commands.get(name);log.info(Get value:{}, value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown();
}自适应更新集群拓扑视图
Test
public void testAdaptiveClusterTopology() throws Exception {RedisURI uri RedisURI.builder().withHost(192.168.56.200).withPort(7001).build();RedisClusterClient redisClusterClient RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options ClusterTopologyRefreshOptions.builder().enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnectionString, String connection redisClusterClient.connect();RedisAdvancedClusterCommandsString, String commands connection.sync();commands.setex(name, 10, throwable);String value commands.get(name);log.info(Get value:{}, value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown();
}动态命令和自定义命令
自定义命令是Redis命令有限集不过可以更细粒度指定KEY、ARGV、命令类型、编码解码器和返回值类型依赖于dispatch()方法
// 自定义实现PING方法
Test
public void testCustomPing() throws Exception {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri);StatefulRedisConnectionString, String connect redisClient.connect();RedisCommandsString, String sync connect.sync();RedisCodecString, String codec StringCodec.UTF8;String result sync.dispatch(CommandType.PING, new StatusOutput(codec));log.info(PING:{}, result);connect.close();redisClient.shutdown();
}
// PING:PONG// 自定义实现Set方法
Test
public void testCustomSet() throws Exception {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri);StatefulRedisConnectionString, String connect redisClient.connect();RedisCommandsString, String sync connect.sync();RedisCodecString, String codec StringCodec.UTF8;sync.dispatch(CommandType.SETEX, new StatusOutput(codec),new CommandArgs(codec).addKey(name).add(5).addValue(throwable));String result sync.get(name);log.info(Get value:{}, result);connect.close();redisClient.shutdown();
}
// Get value:throwable动态命令是基于Redis命令有限集并且通过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子
public interface CustomCommand extends Commands {// SET [key] [value]Command(SET ?0 ?1)String setKey(String key, String value);// SET [key] [value]Command(SET :key :value)String setKeyNamed(Param(key) String key, Param(value) String value);// MGET [key1] [key2]Command(MGET ?0 ?1)ListString mGet(String key1, String key2);/*** 方法名作为命令*/CommandNaming(strategy CommandNaming.Strategy.METHOD_NAME)String mSet(String key1, String value1, String key2, String value2);
}Test
public void testCustomDynamicSet() throws Exception {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri);StatefulRedisConnectionString, String connect redisClient.connect();RedisCommandFactory commandFactory new RedisCommandFactory(connect);CustomCommand commands commandFactory.getCommands(CustomCommand.class);commands.setKey(name, throwable);commands.setKeyNamed(throwable, doge);log.info(MGET commands.mGet(name, throwable));commands.mSet(key1, value1,key2, value2);log.info(MGET commands.mGet(key1, key2));connect.close();redisClient.shutdown();
}
// MGET [throwable, doge]
// MGET [value1, value2]高阶特性
Lettuce有很多高阶使用特性这里只列举个人认为常用的两点
配置客户端资源。使用连接池。
更多其他特性可以自行参看官方文档。
配置客户端资源
客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分EventLoopGroups和EventExecutorGroup这些线程池或者线程组是连接程序的基础组件。一般情况下客户端资源应该在多个Redis客户端之间共享并且在不再使用的时候需要自行关闭。笔者认为客户端资源是面向Netty的。注意除非特别熟悉或者花长时间去测试调整下面提到的参数否则在没有经验的前提下凭直觉修改默认值有可能会踩坑。
客户端资源接口是ClientResources实现类是DefaultClientResources。
构建DefaultClientResources实例
// 默认
ClientResources resources DefaultClientResources.create();// 建造器
ClientResources resources DefaultClientResources.builder().ioThreadPoolSize(4).computationThreadPoolSize(4).build()使用
ClientResources resources DefaultClientResources.create();
// 非集群
RedisClient client RedisClient.create(resources, uri);
// 集群
RedisClusterClient clusterClient RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();客户端资源基本配置
属性描述默认值ioThreadPoolSizeI/O线程数Runtime.getRuntime().availableProcessors()computationThreadPoolSize任务线程数Runtime.getRuntime().availableProcessors()
客户端资源高级配置
属性描述默认值eventLoopGroupProviderEventLoopGroup提供商-eventExecutorGroupProviderEventExecutorGroup提供商-eventBus事件总线DefaultEventBuscommandLatencyCollectorOptions命令延时收集器配置DefaultCommandLatencyCollectorOptionscommandLatencyCollector命令延时收集器DefaultCommandLatencyCollectorcommandLatencyPublisherOptions命令延时发布器配置DefaultEventPublisherOptionsdnsResolverDNS处理器JDK或者Netty提供reconnectDelay重连延时配置Delay.exponential()nettyCustomizerNetty自定义配置器-tracing轨迹记录器-
非集群客户端RedisClient的属性配置
Redis非集群客户端RedisClient本身提供了配置属性方法
RedisClient client RedisClient.create(uri);
client.setOptions(ClientOptions.builder().autoReconnect(false).pingBeforeActivateConnection(true).build());非集群客户端的配置属性列表
属性描述默认值pingBeforeActivateConnection连接激活之前是否执行PING命令falseautoReconnect是否自动重连truecancelCommandsOnReconnectFailure重连失败是否拒绝命令执行falsesuspendReconnectOnProtocolFailure底层协议失败是否挂起重连操作falserequestQueueSize请求队列容量2147483647(Integer#MAX_VALUE)disconnectedBehavior失去连接时候的行为DEFAULTsslOptionsSSL配置-socketOptionsSocket配置10 seconds Connection-Timeout, no keep-alive, no TCP noDelaytimeoutOptions超时配置-publishOnScheduler发布反应式信号数据的调度器使用I/O线程
集群客户端属性配置
Redis集群客户端RedisClusterClient本身提供了配置属性方法
RedisClusterClient client RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)).enableAllAdaptiveRefreshTriggers().build();client.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build());集群客户端的配置属性列表
属性描述默认值enablePeriodicRefresh是否允许周期性更新集群拓扑视图falserefreshPeriod更新集群拓扑视图周期60秒enableAdaptiveRefreshTrigger设置自适应更新集群拓扑视图触发器RefreshTrigger-adaptiveRefreshTriggersTimeout自适应更新集群拓扑视图触发器超时设置30秒refreshTriggersReconnectAttempts 自适应更新集群拓扑视图触发重连次数5dynamicRefreshSources是否允许动态刷新拓扑资源truecloseStaleConnections是否允许关闭陈旧的连接truemaxRedirects集群重定向次数上限5validateClusterNodeMembership是否校验集群节点的成员关系true
使用连接池
引入连接池依赖commons-pool2
dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactIdversion2.7.0/version
/dependency基本使用如下
Test
public void testUseConnectionPool() throws Exception {RedisURI redisUri RedisURI.builder().withHost(localhost).withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient RedisClient.create(redisUri);GenericObjectPoolConfig poolConfig new GenericObjectPoolConfig();GenericObjectPoolStatefulRedisConnectionString, String pool ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);try (StatefulRedisConnectionString, String connection pool.borrowObject()) {RedisCommandsString, String command connection.sync();SetArgs setArgs SetArgs.Builder.nx().ex(5);command.set(name, throwable, setArgs);String n command.get(name);log.info(Get value:{}, n);}pool.close();redisClient.shutdown();
}其中同步连接的池化支持需要用ConnectionPoolSupport异步连接的池化支持需要用AsyncConnectionPoolSupportLettuce5.1之后才支持。
几个常见的渐进式删除例子
渐进式删除Hash中的域-属性
Test
public void testDelBigHashKey() throws Exception {// SCAN参数ScanArgs scanArgs ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor ScanCursor.INITIAL;// 目标KEYString key BIG_HASH_KEY;prepareHashTestData(key);log.info(开始渐进式删除Hash的元素...);int counter 0;do {MapScanCursorString, String result COMMAND.hscan(key, cursor, scanArgs);// 重置TEMP游标cursor ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());CollectionString fields result.getMap().values();if (!fields.isEmpty()) {COMMAND.hdel(key, fields.toArray(new String[0]));}counter;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) ScanCursor.FINISHED.isFinished() cursor.isFinished()));log.info(渐进式删除Hash的元素完毕,迭代次数:{} ..., counter);
}private void prepareHashTestData(String key) throws Exception {COMMAND.hset(key, 1, 1);COMMAND.hset(key, 2, 2);COMMAND.hset(key, 3, 3);COMMAND.hset(key, 4, 4);COMMAND.hset(key, 5, 5);
}渐进式删除集合中的元素
Test
public void testDelBigSetKey() throws Exception {String key BIG_SET_KEY;prepareSetTestData(key);// SCAN参数ScanArgs scanArgs ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor ScanCursor.INITIAL;log.info(开始渐进式删除Set的元素...);int counter 0;do {ValueScanCursorString result COMMAND.sscan(key, cursor, scanArgs);// 重置TEMP游标cursor ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());ListString values result.getValues();if (!values.isEmpty()) {COMMAND.srem(key, values.toArray(new String[0]));}counter;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) ScanCursor.FINISHED.isFinished() cursor.isFinished()));log.info(渐进式删除Set的元素完毕,迭代次数:{} ..., counter);
}private void prepareSetTestData(String key) throws Exception {COMMAND.sadd(key, 1, 2, 3, 4, 5);
}渐进式删除有序集合中的元素
Test
public void testDelBigZSetKey() throws Exception {// SCAN参数ScanArgs scanArgs ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor ScanCursor.INITIAL;// 目标KEYString key BIG_ZSET_KEY;prepareZSetTestData(key);log.info(开始渐进式删除ZSet的元素...);int counter 0;do {ScoredValueScanCursorString result COMMAND.zscan(key, cursor, scanArgs);// 重置TEMP游标cursor ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());ListScoredValueString scoredValues result.getValues();if (!scoredValues.isEmpty()) {COMMAND.zrem(key, scoredValues.stream().map(ScoredValueString::getValue).toArray(String[]::new));}counter;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) ScanCursor.FINISHED.isFinished() cursor.isFinished()));log.info(渐进式删除ZSet的元素完毕,迭代次数:{} ..., counter);
}private void prepareZSetTestData(String key) throws Exception {COMMAND.zadd(key, 0, 1);COMMAND.zadd(key, 0, 2);COMMAND.zadd(key, 0, 3);COMMAND.zadd(key, 0, 4);COMMAND.zadd(key, 0, 5);
}在SpringBoot中使用Lettuce
个人认为spring-data-redis中的API封装并不是很优秀用起来比较重不够灵活这里结合前面的例子和代码在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖
dependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion2.1.8.RELEASE/versiontypepom/typescopeimport/scope/dependency/dependencies
/dependencyManagement
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdio.lettuce/groupIdartifactIdlettuce-core/artifactIdversion5.1.8.RELEASE/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.10/versionscopeprovided/scope/dependency
/dependencies 一般情况下每个应用应该使用单个Redis客户端实例和单个连接实例这里设计一个脚手架适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源采用默认的实现即可。对于Redis的连接属性比较主要的有Host、Port和Password其他可以暂时忽略。基于约定大于配置的原则先定制一系列属性配置类其实有些配置是可以完全共用但是考虑到要清晰描述类之间的关系这里拆分多个配置属性类和多个配置方法
Data
ConfigurationProperties(prefix lettuce)
public class LettuceProperties {private LettuceSingleProperties single;private LettuceReplicaProperties replica;private LettuceSentinelProperties sentinel;private LettuceClusterProperties cluster;}Data
public class LettuceSingleProperties {private String host;private Integer port;private String password;
}EqualsAndHashCode(callSuper true)
Data
public class LettuceReplicaProperties extends LettuceSingleProperties {}EqualsAndHashCode(callSuper true)
Data
public class LettuceSentinelProperties extends LettuceSingleProperties {private String masterId;
}EqualsAndHashCode(callSuper true)
Data
public class LettuceClusterProperties extends LettuceSingleProperties {}配置类如下主要使用ConditionalOnProperty做隔离一般情况下很少有人会在一个应用使用一种以上的Redis连接场景
RequiredArgsConstructor
Configuration
ConditionalOnClass(name io.lettuce.core.RedisURI)
EnableConfigurationProperties(value LettuceProperties.class)
public class LettuceAutoConfiguration {private final LettuceProperties lettuceProperties;Bean(destroyMethod shutdown)public ClientResources clientResources() {return DefaultClientResources.create();}BeanConditionalOnProperty(name lettuce.single.host)public RedisURI singleRedisUri() {LettuceSingleProperties singleProperties lettuceProperties.getSingle();return RedisURI.builder().withHost(singleProperties.getHost()).withPort(singleProperties.getPort()).withPassword(singleProperties.getPassword()).build();}Bean(destroyMethod shutdown)ConditionalOnProperty(name lettuce.single.host)public RedisClient singleRedisClient(ClientResources clientResources, Qualifier(singleRedisUri) RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}Bean(destroyMethod close)ConditionalOnProperty(name lettuce.single.host)public StatefulRedisConnectionString, String singleRedisConnection(Qualifier(singleRedisClient) RedisClient singleRedisClient) {return singleRedisClient.connect();}BeanConditionalOnProperty(name lettuce.replica.host)public RedisURI replicaRedisUri() {LettuceReplicaProperties replicaProperties lettuceProperties.getReplica();return RedisURI.builder().withHost(replicaProperties.getHost()).withPort(replicaProperties.getPort()).withPassword(replicaProperties.getPassword()).build();}Bean(destroyMethod shutdown)ConditionalOnProperty(name lettuce.replica.host)public RedisClient replicaRedisClient(ClientResources clientResources, Qualifier(replicaRedisUri) RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}Bean(destroyMethod close)ConditionalOnProperty(name lettuce.replica.host)public StatefulRedisMasterSlaveConnectionString, String replicaRedisConnection(Qualifier(replicaRedisClient) RedisClient replicaRedisClient,Qualifier(replicaRedisUri) RedisURI redisUri) {return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);}BeanConditionalOnProperty(name lettuce.sentinel.host)public RedisURI sentinelRedisUri() {LettuceSentinelProperties sentinelProperties lettuceProperties.getSentinel();return RedisURI.builder().withPassword(sentinelProperties.getPassword()).withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()).withSentinelMasterId(sentinelProperties.getMasterId()).build();}Bean(destroyMethod shutdown)ConditionalOnProperty(name lettuce.sentinel.host)public RedisClient sentinelRedisClient(ClientResources clientResources, Qualifier(sentinelRedisUri) RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}Bean(destroyMethod close)ConditionalOnProperty(name lettuce.sentinel.host)public StatefulRedisMasterSlaveConnectionString, String sentinelRedisConnection(Qualifier(sentinelRedisClient) RedisClient sentinelRedisClient,Qualifier(sentinelRedisUri) RedisURI redisUri) {return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);}BeanConditionalOnProperty(name lettuce.cluster.host)public RedisURI clusterRedisUri() {LettuceClusterProperties clusterProperties lettuceProperties.getCluster();return RedisURI.builder().withHost(clusterProperties.getHost()).withPort(clusterProperties.getPort()).withPassword(clusterProperties.getPassword()).build();}Bean(destroyMethod shutdown)ConditionalOnProperty(name lettuce.cluster.host)public RedisClusterClient redisClusterClient(ClientResources clientResources, Qualifier(clusterRedisUri) RedisURI redisUri) {return RedisClusterClient.create(clientResources, redisUri);}Bean(destroyMethod close)ConditionalOnProperty(name lettuce.cluster)public StatefulRedisClusterConnectionString, String clusterConnection(RedisClusterClient clusterClient) {return clusterClient.connect();}
}最后为了让IDE识别我们的配置可以添加IDE亲缘性/META-INF文件夹下新增一个文件spring-configuration-metadata.json内容如下
{properties: [{name: lettuce.single,type: club.throwable.spring.lettuce.LettuceSingleProperties,description: 单机配置,sourceType: club.throwable.spring.lettuce.LettuceProperties},{name: lettuce.replica,type: club.throwable.spring.lettuce.LettuceReplicaProperties,description: 主从配置,sourceType: club.throwable.spring.lettuce.LettuceProperties},{name: lettuce.sentinel,type: club.throwable.spring.lettuce.LettuceSentinelProperties,description: 哨兵配置,sourceType: club.throwable.spring.lettuce.LettuceProperties},{name: lettuce.single,type: club.throwable.spring.lettuce.LettuceClusterProperties,description: 集群配置,sourceType: club.throwable.spring.lettuce.LettuceProperties}]
}如果想IDE亲缘性做得更好可以添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用如下
Slf4j
Component
public class RedisCommandLineRunner implements CommandLineRunner {AutowiredQualifier(singleRedisConnection)private StatefulRedisConnectionString, String connection;Overridepublic void run(String... args) throws Exception {RedisCommandsString, String redisCommands connection.sync();redisCommands.setex(name, 5, throwable);log.info(Get value:{}, redisCommands.get(name));}
}
// Get value:throwable小结
本文算是基于Lettuce的官方文档对它的使用进行全方位的分析包括主要功能、配置都做了一些示例限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳作为官方的Redis客户端驱动所以值得信赖它的一些API设计确实比较合理扩展性高的同时灵活性也高。个人建议基于Lettuce包自行添加配置到SpringBoot应用用起来会得心应手毕竟RedisTemplate实在太笨重而且还屏蔽了Lettuce一些高级特性和灵活的API。
参考资料
Lettuce Reference Guide
链接
Github Pagehttp://www.throwable.club/2019/09/28/redis-client-driver-lettuce-usage Coding Pagehttp://throwable.coding.me/2019/09/28/redis-client-driver-lettuce-usage