<返回更多

springboot 缓存一致性常用解决方案

2022-09-04  今日头条  小心程序猿QAQ
加入收藏

多级缓存在微服务的架构设计中可谓随处可见,多级缓存作为提升系统高并发的常规手段,在各类大中小型的系统设计中都有体现;

下图是一张简单的服务端多级缓存设计示意图,多级缓存的常用解决方案,像ehcache + redis,或caffeine + springcache等,即利用JVM内存缓存 + redis缓存配合;


 

一、缓存一致性问题

多级缓存带来的好处是显著的,一定程度上可以应对较高的并发,但随之带来了一个比较大的问题就是缓存一致性问题;

我们知道,JVM缓存属于进程级的缓存,和当前服务实例是绑定的,而redis缓存可以作为分布式缓存,通常JVM缓存的是那些生命周期较短的热点查询数据,即过期时间不会太久,而redis缓存相对来说,过期时间相对长一点,JVM缓存通常作为服务端扛压的第一道屏障,如果设置的过期时间太长,将会对JVM内存的开销非常大,所以一般作为短频使用;

设想这么一个场景,服务A采用多实例部署,这里假设部署了两个节点,首次根据ID查询一个用户信息的对象数据将会同时被JVM缓存,同时也会被redis缓存,下一次过来同样参数的请求时,首先走JVM缓存,查到了直接返回,否则走redis缓存;

上面是一个正常的关于缓存存取的过程,问题是,JVM缓存是同进程绑定的,如果第一个节点的数据发生了变更,比如删除了,对于redis缓存来说,可以做到动态刷缓存的效果,但是redis缓存和本地缓存之间并没有一种强同步的机制确保两者的缓存保持一致;

甚至来说,第一个节点与第二个节点之间,两者是无状态的,当第一个节点上面的数据被删除时,假如此刻并发的查询请求到达第二个节点,JVM缓存查询到必然是上一次缓存的数据;

于是,我们的问题就是,在多级缓存模式下,如何解决缓存一致性的问题呢?

二、一个简单的案例

基于之前的一篇springcache 详细使用和spring boot 二级缓存案例基础上我们进行案例演示和改造;

在案例中,我们提供了几个核心的接口:

 

1、缓存核心配置类import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.annotation.JsonTypeInfo;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.MApperFeature;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.databind.SerializationFeature;import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;import com.fasterxml.jackson.datatype.jsr310.JAVATimeModule;import com.github.benmanes.caffeine.cache.Caffeine;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.cache.caffeine.CaffeineCacheManager;import org.springframework.cache.interceptor.KeyGenerator;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.data.redis.cache.RedisCacheConfiguration;import org.springframework.data.redis.cache.RedisCacheManager;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializationContext;import org.springframework.data.redis.serializer.StringRedisSerializer;import org.springframework.util.StringUtils;import java.lang.reflect.Method;import java.time.Duration;import java.util.concurrent.TimeUnit;@Configurationpublic class RedisConfig extends CachingConfigurerSupport {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(mapper);template.setValueSerializer(jackson2JsonRedisSerializer);//使用StringRedisSerializer来序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;* 分钟级别* @param connectionFactory* @return@Bean("cacheManagerMinutes")public RedisCacheManager cacheManagerMinutes(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3 * 60L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 小时级别* @param connectionFactory* @return@Bean("cacheManagerHour")@Primarypublic RedisCacheManager cacheManagerHour(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 天级别* @param connectionFactory* @return@Bean("cacheManagerDay")public RedisCacheManager cacheManagerDay(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600 * 24L);;return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 正常时间的本地缓存@Bean("caffeineCacheManager")public CacheManager caffeineCacheManager() {CaffeineCacheManager cacheManager = new CaffeineCacheManager();cacheManager.setCaffeine(Caffeine.newBuilder().expireAfterWrite(50, TimeUnit.SECONDS).initialCapacity(256).maximumSize(10000));return cacheManager;private RedisCacheConfiguration instanceConfig(long ttl){Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);objectMapper.registerModule(new JavaTimeModule());objectMapper.configure(MapperFeature.USE_ANNOTATIONS,false);//只针对非空的值进行序列化objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);//将类型序列化到属性的json字符串objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL,JsonTypeInfo.As.PROPERTY);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofSeconds(ttl)).disableCachingNullValues().serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));* 自定义key生成策略* @return@Bean("defaultSpringKeyGenerator")public KeyGenerator defaultSpringKeyGenerator(){return new KeyGenerator() {@Overridepublic Object generate(Object o, Method method, Object... objects) {String key = o.getClass().getSimpleName() + "_"+ method.getName() +"_"+ StringUtils.arrayToDelimitedString(objects,"_");System.out.println("key :" + key);return key;}2、配置文件开启使用 springcachespring:redis:host: localhostport: 6379cache:type: redis3、几个核心接口1)根据用户ID获取用户@GetMapping("/getById")public DbUser getById(String id){return dbUserService.getById(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查询走数据库");DbUser dbUser = dbUserMapper.getByUserId(id);return dbUser;2)根据用户ID查询用户,并缓存到JVM;@GetMapping("/getByIdFromCaffeine")public DbUser getByIdFromCaffeine(String id){return dbUserService.getByIdFromCaffeine(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "caffeineCacheManager")public DbUser getByIdFromCaffeine(String id) {System.out.println("查询数据库");DbUser dbUser = dbUserMapper.getByUserId(id);System.out.println("第一次走缓存");return dbUser;3)根据用户ID删除用户;@GetMapping("/deleteById")public String deleteById(String id){return dbUserService.deleteById(id);@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);return "delete success";4、功能测试

 

首先在数据库的 db_user 表准备一条测试数据

分别调用查询用户接口

1、 http://localhost:8083/getById?id=1 ;

多次刷新接口,sql只输出了一次


 

2、 http://localhost:8083/getByIdFromCaffeine?id=1


 

多次刷新接口,sql只输出了一次


 

从上面的结果可以看到,我们模拟了查询数据分别缓存到了JVM内存和redis的效果,接下来,删除当前这条数据,执行下面的接口

3、 http://localhost:8083/deleteById?id=1


 


 

再次调用第一个查询用户的接口,无返回数据,表明redis中缓存的结果被清理了,这是我们使用了springcache后,通过 CacheEvict 这个注解,会自动帮我们管理redis中的缓存;

但这时,再次调用查询JVM缓存的接口,发现仍然可以从本地缓存中得到数据

4、 http://localhost:8083/getByIdFromCaffeine?id=1


 

基于上面的测试结果,可以看到,缓存一致性的问题就产生了,这里我故意将本地缓存的时间调整的长了一点,实际开发过程中,建议本地缓存的时间一般不要超过1分钟;

三、解决方案一:清理redis缓存,同步清理本地缓存1、增加一个本地缓存操作的工具类import com.congge.config.SpringContextHolder;import org.springframework.cache.Cache;import org.springframework.cache.CacheManager;import java.util.Objects;public class CaffeineCacheUtils {private static CacheManager cm;static {cm = SpringContextHolder.getBean("caffeineCacheManager");* 添加缓存* @param cacheName 缓存名称* @param key 缓存key* @param value 缓存值public static void put(String cacheName, String key, Object value) {Cache cache = cm.getCache(cacheName);cache.put(key, value);* 获取缓存* @param cacheName 缓存名称* @param key 缓存key* @returnpublic static Object get(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;return Objects.requireNonNull(cache.get(key)).get();* 获取缓存(字符串)* @param cacheName 缓存名称* @param key 缓存key* @returnpublic static String getString(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return Objects.requireNonNull(wrapper.get()).toString();* 获取缓存(泛型)* @param cacheName 缓存名称* @param key 缓存key* @param clazz 缓存类* @param 返回值泛型* @returnpublic static T get(String cacheName, String key, Class clazz) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return (T) wrapper.get();* 清理缓存* @param cacheName 缓存名称* @param key 缓存keypublic static void evict(String cacheName, String key) {Cache cache = cm.getCache(cacheName);System.out.println(cache.getName());if (cache != null) {cache.evict(key);2、删除用户接口中同步清理本地缓存

只需改造下删除接口的服务实现方法即可

private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();* 删除,同时需要删除相关的key* @param id* @return@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);caffeineCacheUtils.evict("dbUser",id);return "delete success";3、方案优缺点优点

缺点 四、解决方案二:使用zookeeper 实现缓存同步

 

对zookeeper有所了解和使用的同学,应该对zk的节点管理不陌生,zk作为一款分布式协调中间件,在很多分布式场景都有着广泛的使用,比如实现集群选举,分布式锁,节点管理等等,利用zk的节点属性,可以很好的解决这个问题;

使用zk的解决思路

1、zk客户端依赖com.101teczkclient0.10org.slf4jslf4j-log4j122、提供一个zk节点操作工具类import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.SerializableSerializer;import org.Apache.zookeeper.CreateMode;public class ZkUtils {private ZkClient zkClient = null;private String node;/*public static void main(String[] args) {ZkUtils zkUtils = new ZkUtils();zkUtils.createNode(node);zkUtils.nodeExist(node);zkUtils.deleteNode(node);public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//监听节点变化//需要通过java修改zookeeper数据,才能监听到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//节点数据变化时触发@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//节点数据删除时触发@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);* 创建zk节点* @param nodepublic void createNode(String node) {//创建持久节点String node1 = zkClient.create("/" + node, node, CreateMode.PERSISTENT);System.out.println(node1);* 修改zk节点数据* @param node* @param datapublic void writeNodeData(String node, String data) {zkClient.writeData("/" + node, 233);* 查询zk节点* @param nodepublic boolean nodeExist(String node) {boolean exists = zkClient.exists("/" + node);return exists;* 查询节点数据* @param node* @returnpublic String findNodeData(String node) {Object data = zkClient.readData("/" + node);System.out.println(data);return data.toString();* 删除节点* @param nodepublic void deleteNode(String node) {boolean b2 = zkClient.deleteRecursive("/" + node);System.out.println(b2);3、查询用户接口,注册缓存的key对应的z-node节点@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查询走数据库");DbUser dbUser = dbUserMapper.getByUserId(id);//FIXME 将缓存注册到节点registerCacheNode(id);return dbUser;public void registerCacheNode(String id){String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.createNode(node);4、删除用户接口添加删除zk节点逻辑@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);//删除 z-node 节点String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.deleteNode(node);return "delete success";5、改造zk监听逻辑,同步移除本地缓存private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//监听节点变化//需要通过java修改zookeeper数据,才能监听到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//节点数据变化时触发@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//节点数据删除时触发@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);caffeineCacheUtils.evict("dbUser","1");6、测试1、启动服务后,按照上面的测试步骤,分别调用2个查询接口

 

通过控制台输出结果,可以看到节点数据注册到zk中


 


 

2、调用删除接口

此时zk的监听逻辑中监听到了节点数据变更的事件,在变更的逻辑中,我们将同步删除本地缓存的数据;


 

再次调用时发现缓存已经被清理


 

通过上面的操作演示,实现了基于zk的节点注册与事件监听机制实现缓存一致性的问题处理;

五、解决方案三:使用redis事件订阅与发布机制实现缓存同步

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

这种模式很像消息队列的实现机制,服务端发布消息到topic,客户端监听topic的消息,并做自身的业务处理;

只不过在redis这里,不叫topic,而是channel,下面来看一个简单的redis实现的发布订阅使用

1、导入依赖org.springframework.bootspring-boot-starter-data-redis2、自定义 RedisMessageListener

该类的功能和消息中间件中的监听逻辑很相似

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;@Componentpublic class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取消息byte[] messageBody = message.getBody();// 使用值序列化器转换Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 获取监听的频道byte[] channelByte = message.getChannel();// 使用字符串序列化器转换Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名称转换String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---频道---: " + channel);System.out.println("---消息内容---: " + msg);3、自定义 RedisSubConfig

该类用于配置特定的channel,即监听来自哪些channel的消息

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configurationpublic class RedisSubConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListenercontainer.addMessageListener(listener, new ChannelTopic("redis.life"));container.addMessageListener(listener, new ChannelTopic("redis.news"));return container;4、最后编写一个接口做测试@GetMapping("/testPublish")public void testPublish(){dbUserService.testPublish();@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void testPublish() {redisTemplate.convertAndSend("redis.life", "aaa");redisTemplate.convertAndSend("redis.news", "bbb");

调用下接口,可以看到控制台输出如下信息


 

通过上面的演示,快速了解了一下redis的这种发布订阅模式的功能使用,下面就来使用这种方式来解决缓存一致性问题;

5、删除用户接口中向redis channel 推送消息@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);redisTemplate.convertAndSend("redis.user", id);return "delete success";6、RedisMessageListener 改造

添加删除本地缓存逻辑

@Overridepublic void onMessage(Message message, byte[] pattern) {CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();// 获取消息byte[] messageBody = message.getBody();// 使用值序列化器转换Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 获取监听的频道byte[] channelByte = message.getChannel();// 使用字符串序列化器转换Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名称转换String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---频道---: " + channel);System.out.println("---消息内容---: " + msg);caffeineCacheUtils.evict("dbUser",patternStr);7、模拟测试

启动服务后,直接调用删除用户接口,可以看到,监听逻辑中收到了一条消息,然后调用本地缓存工具类删除本地缓存即可


 


 

六、解决方案四:使用消息队列实现缓存同步

了解了redis发布订阅这种方式实现原理后,如果再更换为消息中间件来实现就不难理解了,其实现的大致思路如下:

 

  1. 调用删除接口删除用户;
  2. 向特定的队列推送一条删除消息;
  3. 在消息监听逻辑中接收消息,并清理本地缓存

 

以rabbitmq为例,其核心实现如下:

@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver 接收到了消息 : " +msg);//执行本地缓存的删除操作

关于rabbitmq的相关实现感兴趣的同学可以参考:rabbbitmq 技术全解

七、总结

关于后3三种的实现,不仅可以解决缓存一致性问题,同时适用于分布式应用的场景,算是比较通用的解决方案,但这样一来,引入了第三方组件,也增加了系统整体的复杂性,这一点需要在架构设计中进行综合考量,结合小编本人的一些实践经验,比较推荐使用redis的发布订阅模式,这种方式简单高效,同时兼顾了避免引入更多的外部组件,可酌情参考。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>