原创:微信公众号
码农参上
,欢迎分享,转载请保留出处。
哈喽大家好啊,我是Hydra。
在前面的文章中,我们介绍了Redis6.0中的新特性客户端缓存client-side caching
,通过telnet连接模拟客户端,测试了三种客户端缓存的工作模式,这篇文章我们就来点硬核实战,看看客户端缓存在java项目中应该如何落地。
铺垫
首先介绍一下今天要使用到的工具Lettuce
,它是一个可伸缩线程安全的redis客户端。多个线程可以共享同一个RedisConnection
,利用nio框架Netty
来高效地管理多个连接。
放眼望向现在常用的redis客户端开发工具包,虽然能用的不少,但是目前率先拥抱redis6.0,支持客户端缓存功能的却不多,而lettuce就是其中的领跑者。
我们先在项目中引入最新版本的依赖,下面正式开始实战环节:
<dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>6.1.8.RELEASE</version> </dependency>
实战
在项目中应用lettuce,开启并使用客户端缓存功能,只需要下面这一段代码:
public static void main(String[] args) throws InterruptedException { // 创建 RedisClient 连接信息 RedisURI redisURI= RedisURI.builder() .withHost("127.0.0.1") .withPort(6379) .build(); RedisClient client = RedisClient.create(redisURI); StatefulRedisConnection<String, String> connect = client.connect(); Map<String, String> map = new HashMap<>(); CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map), connect, TrackingArgs.Builder.enabled().noloop()); String key="user"; while (true){ String value = frontend.get(key); System.out.println(value); TimeUnit.SECONDS.sleep(10); } }
上面的代码主要完成了几项工作:
- 通过
RedisURI
配置redis连接的标准信息,并建立连接 - 创建用于充当本地缓存的
Map
,开启客户端缓存功能,建立一个缓存访问器CacheFrontend
- 在循环中使用
CacheFrontend
,不断查询同一个key对应的值并打印
启动上面的程序,控制台会不断的打印user
对应的缓存,在启动一段时间后,我们在其他的客户端修改user
对应的值,运行的结果如下:
可以看到,在其他客户端修改了key所对应的值后,打印结果也发生了变化。但是到这里,我们也不知道lettuce
是不是真的使用了客户端缓存,虽然结果正确,但是说不定是它每次都重新执行了get
命令呢?
所以我们下面来看看源码,分析一下具体的代码执行流程。
分析
在上面的代码中,最关键的类就是CacheFrontend
了,我们再来仔细看一下上面具体实例化时的语句:
CacheFrontend<String,String> frontend=ClientSideCaching.enable( CacheAccessor.forMap(map), connect, TrackingArgs.Builder.enabled().noloop() );
首先调用了ClientSideCaching
的enable()
方法,我们看一下它的源码:
解释一下传入的3个参数:
CacheAccessor
:一个定义对客户端缓存进行访问接口,上面调用它的forMap
方法返回的是一个MapCacheAccessor
,它的底层使用的我们自定义的Map
来存放本地缓存,并且提供了get
、put
、evict
等方法操作Map
StatefulRedisConnection
:使用到的redis连接TrackingArgs
:客户端缓存的参数配置,使用noloop
后不会接收当前连接修改key后的通知
向redis服务端发送开启tracking
的命令后,继续向下调用create()
方法:
这个过程中实例化了一个重要对象,它就是实现了RedisCache
接口的DefaultRedisCache
对象,实际向redis执行查询时的get
请求、写入的put
请求,都是由它来完成。
实例化完成后,继续向下调用同名的create()
方法:
在这个方法中,实例化了ClientSideCaching
对象,注意一下传入的两个参数,通过前面的介绍也很好理解它们的分工:
- 当本地缓存存在时,直接从
CacheAccessor
中读取 - 当本地缓存不存在时,使用
RedisCache
从服务端读取
需要额外注意一下的是返回前的两行代码,先看第一句(行号114的那行)。
这里向RedisCache
添加了一个监听,当监听到类型为invalidate
的作废消息时,拿到要作废的key,传递给消费者。一般情况下,keys
中只会有一个元素。
消费时会遍历当前ClientSideCaching
的消费者列表invalidationListeners
:
而这个列表中的所有,就是在上面的第二行代码中(行号115的那行)添加的,看一下方法的定义:
而实际传入的方法引用则是下面MapCacheAccessor
的evict()
方法,也就是说,当收到key作废的消息后,会移除掉本地缓存Map
中缓存的这个数据。
客户端缓存的作废逻辑我们梳理清楚了,再来看看它是何时写入的,直接看ClientSideCaching
的get()
方法:
可以看到,get
方法会先从本地缓存MapCacheAccessor
中尝试获取,如果取到则直接返回,如果没有再使用RedisCache
读取redis中的缓存,并将返回的结果存入到MapCacheAccessor
中。
图解
源码看到这里,是不是基本逻辑就串联起来了,我们再画两张图来梳理一下这个流程。先看get
的过程:
再来看一下通知客户端缓存失效的过程:
怎么样,配合这两张图再理解一下,是不是很完美?
其实也不是…回忆一下我们之前使用两级缓存Caffeine+Redis
时,当时使用的通知机制,会在修改redis缓存后通知所有主机修改本地缓存,修改成为最新的值。目前的lettuce看来,显然不满足这一功能,只能做到作废删除缓存但是不会主动更新。
扩展
那么,如果想实现本地客户端缓存的实时更新,我们应该如何在现在的基础上进行扩展呢?仔细想一下的话,思路也很简单:
- 首先,移除掉
lettuce
的客户端缓存本身自带的作废消息监听器 - 然后,添加我们自己的作废消息监听器
回顾一下上面源码分析的图,在调用DefaultRedisCache
的addInvalidationListener()
方法时,其实是调用的是StatefulRedisConnection
的addListener()
方法,也就是说,这个监听器其实是添加在redis连接上的。
如果我们再看一下这个方法源码的话,就会发现,在它的附近还有一个对应的removeListener()
方法,一看就是我们要找的东西,准备用它来移除消息监听。
不过再仔细看看,这个方法是要传参数的啊,我们明显不知道现在里面已经存在的PushListener
有什么,所以没法直接使用,那么无奈只能再接着往下看看这个pushHandler
是什么玩意…
通过注释可以知道,这个PushHandler
就是一个用来操作PushListener
的处理工具,虽然我们不知道具体要移除的PushListener
是哪一个,但是惊喜的是,它提供了一个getPushListeners()
方法,可以获取当前所有的监听器。
这样一来就简单了,我上来直接清除掉这个集合中的所有监听器,问题就迎刃而解了~
不过,在StatefulRedisConnectionImpl
中的pushHandler
是一个私有对象,也没有对外进行暴露,想要操作起来还是需要费上一点功夫的。下面,我们就在分析的结果上进行代码的修改。
魔改
首先,我们需要自定义一个工具类,它的主要功能是操作监听器,所以就命名为ListenerChanger
好了。它要完成的功能主要有三个:
- 移除原有的全部消息监听
- 添加新的自定义消息监听
- 更新本地缓存
MapCacheAccessor
中的数据
首先定义构造方法,需要传入StatefulRedisConnection
和CacheAccessor
作为参数,在后面的方法中会用到,并且创建一个RedisCommands
,用于后面向redis服务端发送get
命令请求。
public class ListenerChanger<K, V> { private StatefulRedisConnection<K, V> connection; private CacheAccessor<K, V> mapCacheAccessor; private RedisCommands<K, V> command; public ListenerChanger(StatefulRedisConnection<K, V> connection, CacheAccessor<K, V> mapCacheAccessor) { this.connection = connection; this.mapCacheAccessor = mapCacheAccessor; this.command = connection.sync(); } //其他方法先省略…… }
移除监听
前面说过,pushHandler
是一个私有对象,我们无法直接获取和操作,所以只能先使用反射获得。PushHandler
中的监听器列表存储在一个CopyOnWriteArrayList
中,我们直接使用迭代器移除掉所有内容即可。
public void removeAllListeners() { try { Class connectionClass = StatefulRedisConnectionImpl.class; Field pushHandlerField = connectionClass.getDeclaredField("pushHandler"); pushHandlerField.setAccessible(true); PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection); CopyOnWriteArrayList<PushListener> pushListeners = (CopyOnWriteArrayList) pushHandler.getPushListeners(); Iterator<PushListener> it = pushListeners.iterator(); while (it.hasNext()) { PushListener listener = it.next(); pushListeners.remove(listener); } } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } }
添加监听
这里我们模仿DefaultRedisCache
中addInvalidationListener()
方法的写法,添加一个监听器,除了最后处理的代码基本一致。对于监听到的要作废的keys
集合,另外启动一个线程更新本地数据。
public void addNewListener() { this.connection.addListener(new PushListener() { @Override public void onPushMessage(PushMessage message) { if (message.getType().equals("invalidate")) { List<Object> content = message.getContent(StringCodec.UTF8::decodeKey); List<K> keys = (List<K>) content.get(1); System.out.println("modifyKeys:"+keys); // start a new thread to update cacheAccessor new Thread(()-> updateMap(keys)).start(); } } }); }
本地更新
使用RedisCommands
重新从redis服务端获取最新的数据,并更新本地缓存mapCacheAccessor
中的数据。
private void updateMap(List<K> keys){ for (K key : keys) { V newValue = this.command.get(key); System.out.println("newValue:"+newValue); mapCacheAccessor.put(key, newValue); } }
至于为什么执行这个方法时额外启动了一个新线程,是因为我在测试中发现,当在PushListener
的onPushMessage
方法中执行RedisCommands
的get()
方法时,会一直取不到值,但是像这样新启动一个线程就没有问题。
测试
下面,我们来写一段测试代码,来测试上面的改动。
public static void main(String[] args) throws InterruptedException { // 省略之前创建连接代码…… Map<String, String> map = new HashMap<>(); CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map); CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor, connect, TrackingArgs.Builder.enabled().noloop()); ListenerChanger<String, String> listenerChanger = new ListenerChanger<>(connect, mapCacheAccessor); // 移除原有的listeners listenerChanger.removeAllListeners(); // 添加新的监听器 listenerChanger.addNewListener(); String key = "user"; while (true) { String value = frontend.get(key); System.out.println(value); TimeUnit.SECONDS.sleep(30); } }
可以看到,代码基本上在之前的基础上没有做什么改动,只是在创建完ClientSideCaching
后,执行了我们自己实现的ListenerChanger
的两个方法。先移除所有监听器、再添加新的监听器。下面我们以debug模式启动测试代码,简单看一下代码的执行逻辑。
首先,在未执行移除操作前,pushHandler
中的监听器列表中有一个监听器:
移除后,监听器列表为空:
在添加完自定义监听器、并且执行完第一次查询操作后,在另外一个redis客户端中修改user
的值,这时PushListener
会收到作废类型的消息监听:
启动一个新线程,查询redis中user
对应的最新值,并放入cacheAccessor
中:
当循环中CacheFrontend
的get()
方法再被执行时,会直接从cacheAccessor
中取到刷新后的值,不需要再次去访问redis服务端了:
总结
到这里,我们基于lettuce
的客户端缓存的基本使用、以及在这个基础上进行的魔改就基本完成了。可以看到,lettuce
客户端已经在底层封装了一套比较成熟的API,能让我们在将redis升级到6.0以后,开箱即用式地使用客户端缓存这一新特性。在使用中,不需要我们关注底层原理,也不用做什么业务逻辑的改造,总的来说,使用起来还是挺香的。
那么,这次的分享就到这里,我是Hydra,下篇文章再见。
推荐阅读
作者简介,
码农参上
,一个热爱分享的公众号,有趣、深入、直接,与你聊聊技术。欢迎添加好友,进一步交流。