在现代分布式系统中,我们常常需要将数据同时写入多个存储系统(如数据库+缓存),这种操作被称为"双写"。双写场景下最大的挑战是如何保证不同存储系统间的数据一致性。本文将深入探讨双写一致性问题,并通过Java实战演示解决方案。
常见问题场景:
基于消息队列的最终一致性方案的实现思路为: 应用服务 → 更新数据库 ↓ 同时发送消息 → 消息队列 → 缓存服务 → Redis
数据库操作与消息发送(原子操作)
java @Service @RequiredArgsConstructor public class UserService { private final UserRepository userRepository; private final MessageSender messageSender; @Transactional public void updateUser(User user) { // 1. 更新数据库 userRepository.save(user); // 2. 发送缓存更新消息 messageSender.sendCacheUpdate(new CacheUpdateEvent( "user:" + user.getId(), user )); } }
消息发送器(保证可靠性)
java @Component @RequiredArgsConstructor public class MessageSender { private final RabbitTemplate rabbitTemplate; private final TransactionTemplate transactionTemplate; public void sendCacheUpdate(CacheUpdateEvent event) { transactionTemplate.execute(status -> { // 将消息存入本地事务表(略) saveToLocalTxTable(event); // 发送到MQ rabbitTemplate.convertAndSend( "cache.update.exchange", "cache.update.key", event ); return null; }); } }
缓存消费者(幂等处理)
java @Component @RequiredArgsConstructor public class CacheConsumer { private final RedisTemplate<String, Object> redisTemplate; private final CacheUpdateRecordRepository recordRepository; @RabbitListener(queues = "cache.update.queue") public void handleMessage(CacheUpdateEvent event) { // 幂等检查:防止重复消费 if (recordRepository.existsByMessageId(event.getMessageId())) { return; } try { // 更新缓存 redisTemplate.opsForValue().set( event.getKey(), event.getData(), Duration.ofMinutes(30) ); // 记录消费日志 recordRepository.save(new CacheUpdateRecord(event.getMessageId())); } catch (Exception e) { // 告警并加入重试队列 sendToRetryQueue(event); } } }
在实体中添加版本号字段,每次更新完版本号叠加,发送消息的同时存储事务消息记录(缓存更新的快照记录),到缓存消费者中进行幂等校验处理,校验后将缓存进行更新,如果更新失败,可抛出异常进行重试,也可进入自定义重试队列中,当重试达到一定次数后,进入死信队列人工处理。
java @Entity public class User { @Id private Long id; @Version private Long version; // 乐观锁版本号 // 其他字段... }
注意
以上仅为demo实现,具体可根据思路自行扩展,例如可以将存储的本地事务消息添加状态,在消费者中可以先查询缓存版本号与当前消息的版本号进行对比,对比不通过则更新缓存,缓存处理成功时,查询一次最新缓存的版本号与事务版本号对比,如果对比成功则对本地事务消息的状态进行更新。
这个时候,定时任务只需要扫描缓存未更新的情况即可,而无需全量扫描。
添加双重保障机制,消息队列保证主流程,通过定时任务兜底检查,如果版本不一致也可以添加补偿缓存数据,保证数据的最终一致性。同时查询方法添加降级策略,保证服务的高可用。
补偿任务(兜底机制)
java @Scheduled(fixedDelay = 30000) public void checkInconsistency() { // 查询最近更新的数据库记录 List<User> recentUsers = userRepository.findUpdatedLast5Minutes(); recentUsers.forEach(user -> { String key = "user:" + user.getId(); User cachedUser = (User) redisTemplate.opsForValue().get(key); // 比较数据库与缓存的数据版本 if (cachedUser == null || cachedUser.getVersion() < user.getVersion()) { // 触发缓存更新 redisTemplate.opsForValue().set(key, user); } }); }
熔断降级策略
java // 当缓存更新失败时降级 public User getUser(Long id) { try { return cacheService.getUser(id); } catch (Exception e) { log.warn("缓存降级,查询数据库"); return userRepository.findById(id).orElse(null); } }
方案的整体效果上,通过高性能的 RocketMQ 可以保障最终一致性达到秒级。不过会对性能产生一些影响:由于添加了消息发送,会给业务流程操作增加部分耗时。同时依赖 MQ 中间件,要注意消息堆积,消息幂等,消息的可靠性处理
此方案的核心原则:容忍短暂不一致,保证最终一致;优先保障核心存储,非核心系统可降级;始终设计补偿机制。
补偿机制的核心思想是:当主操作成功但辅助操作失败时,通过后续的补偿操作来达到数据一致的状态。在双写场景中,补偿机制通常用于在数据库更新成功但缓存更新失败时,通过定时任务或后台线程来修复缓存的不一致状态。
方案本质就是双写,不成功就重试,重试多次不成功,则告警通知,主体流程如下:
定义基础的补偿架构支撑
java @Slf4j @Component public class CompensableTransaction<T> { /** * 最大重试次数 */ private static final int MAX_RETRY = 3; /** * 静态线程池 */ static class CompensableThreadPool { private static final AtomicInteger number = new AtomicInteger(1); public static final ThreadPoolExecutor executor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), r -> new Thread(r, "CompensableTransaction-thread-" + number.getAndIncrement()), new ThreadPoolExecutor.CallerRunsPolicy()); } /** * 执行 */ @SneakyThrows public void execute(String operation, Callable<T> doPrimaryAction, Consumer<T> doSecondaryAction, Consumer<T> doCompensate) { // 执行主操作 T call = doPrimaryAction.call(); try { // 执行辅助操作 doSecondaryAction.accept(call); } catch (Exception e) { // 异步补偿 CompensableThreadPool.executor.execute(() -> { // 补偿操作 int retryCount = 0; while (retryCount < MAX_RETRY) { try { // 自定义补偿逻辑,成功则返回 doCompensate.accept(call); return; } catch (Exception ex) { retryCount++; log.warn("补偿操作失败({}/{}): {}", retryCount, MAX_RETRY, ex.getMessage()); // 指数 try { TimeUnit.MILLISECONDS.sleep(exponentialBackoff(retryCount)); } catch (InterruptedException exc) { log.error("{}补偿操作线程被中断", operation); return; } } } // 最终补偿失败 log.error("{}补偿操作最终失败", operation); alertAdmin(operation); }); } } private void alertAdmin(String operation) { // 发送告警邮件、短信、钉钉消息 log.info("发送告警邮件、短信、钉钉消息"); } /** * 根据重试次数计算指数停顿时间 */ private long exponentialBackoff(int retryCount) { // 100ms, 200ms, 400ms... return (long) (100 * Math.pow(2, retryCount)); } }
数据库 + 缓存双写实现
java @Service public class ImUsersServiceImpl extends ServiceImpl<ImUsersMapper, ImUsers> implements ImUsersService { @Resource private CompensableTransaction<ImUsers> compensableTransaction; /** * 初始化用户信息 * * @param userInfo 微信用户信息 */ @Override public void initUser(WxOAuth2UserInfo userInfo) { // 主操作 Callable<ImUsers> primaryAction = () -> { // 保存用户信息 ImUsers users = BeanUtil.toBean(userInfo, ImUsers.class); this.save(users); // 登录 StpUtil.login(users.getId()); return users; }; // 辅助操作 Consumer<ImUsers> secondaryAction = users -> { // 缓存用户信息 UserInfo info = BeanUtil.toBean(users, UserInfo.class); RedisCacheUtil.hPutAll(String.format(RedisKeyDefine.USER_INFO, users.getId()), BeanUtil.beanToMap(info)); }; // 补偿操作 Consumer<ImUsers> compensateAction = users -> { String key = String.format(RedisKeyDefine.USER_INFO, users.getId()); if (!RedisCacheUtil.hasKey(key)) { RedisCacheUtil.hPutAll(String.format(RedisKeyDefine.USER_INFO, users.getId()), BeanUtil.beanToMap(users)); } }; compensableTransaction.execute("初始化用户信息", primaryAction, secondaryAction, compensateAction); } }
此处定义了3个步骤操作:
在实现以上补偿机制的同时,还要注意补偿失败情下的补偿,做好补偿的监控体系,关注补偿的失败率统计与失败的问题定位,进行人工干预补偿,也可定时扫描补偿,就不在过多陈述
其实补偿方案实现相对简单,不依赖其他中间件,补偿操作异步执行对性能影响小,同时保证最终一致性,可靠性比较高。
但也存在一些缺点,在高并发流量下,会存在一定的脏读,就是数据库更新了,缓存还没更新,存在短暂数据不一致的情况,这也是不可避免的;
在一个就是如果 Redis 缓存挂掉了,会导致线程池阻塞,存在性能瓶颈,同时补偿操作本身可能失败,极端情况下需要人工干预。
提示
我个人更加推荐同步双写,失败补偿的方案,适用于大多数并发不高,可接受短暂脏数据的系统。
延迟双删是一种在双写场景中用于保证缓存与数据库一致性的策略。它主要用于解决在更新数据库和缓存时,由于并发操作导致的脏数据问题。
延迟双删是解决缓存与数据库双写一致性的经典策略,其核心流程为:
先删除缓存 -> 更新数据库 -> 延迟一段时间(比如几百毫秒) -> 再次删除缓存

考虑以下并发场景:
这样,线程B写入的旧数据缓存会被第二次删除,后续读取会重新加载最新数据。
业务代码的基础实现
java @Service @RequiredArgsConstructor public class UserService { private final UserRepository userRepository; private final RedisTemplate<String, Object> redisTemplate; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4); // 更新操作入口 @Transactional public void updateUser(User user) { String cacheKey = "user:" + user.getId(); // 1. 先删除缓存 deleteCache(cacheKey); // 2. 更新数据库 userRepository.save(user); // 3. 提交事务后发起延迟双删 scheduler.schedule(() -> { deleteCacheWithRetry(cacheKey, 3); // 带重试的二次删除 }, 500, TimeUnit.MILLISECONDS); // 延迟500ms } // 带重试机制的缓存删除 private void deleteCacheWithRetry(String key, int maxRetry) { int retryCount = 0; while (retryCount < maxRetry) { try { redisTemplate.delete(key); log.info("延迟双删成功: {}", key); return; } catch (Exception e) { retryCount++; log.warn("缓存删除失败({}/{}): {}", retryCount, maxRetry, key); try { Thread.sleep(100 * retryCount); // 指数退避 } catch (InterruptedException ignored) {} } } log.error("缓存删除最终失败: {}", key); } }
优化版 - 结合版本号校验
java // 实体类增加版本控制 @Entity public class User { @Id private Long id; @Version private Long version; // 乐观锁版本号 // 其他字段... } // 服务层增强校验 public void updateUserWithVersion(User user) { String cacheKey = "user:" + user.getId(); // 1. 先删除缓存 redisTemplate.delete(cacheKey); // 2. 更新数据库(带版本校验) try { userRepository.save(user); } catch (OptimisticLockingFailureException ex) { // 版本冲突时重新加载数据 User freshUser = userRepository.findById(user.getId()).orElseThrow(); throw new ConcurrentUpdateException("数据已被修改,请刷新重试"); } // 3. 延迟双删(携带版本信息) scheduler.schedule(() -> { Object cached = redisTemplate.opsForValue().get(cacheKey); if (cached instanceof User) { User cachedUser = (User) cached; // 仅当缓存版本较旧时删除 if (cachedUser.getVersion() < user.getVersion()) { redisTemplate.delete(cacheKey); } } }, 600, TimeUnit.MILLISECONDS); }
生产级增强方案
java @Component public class CacheDoubleDeleteManager { private final RedisTemplate<String, Object> redisTemplate; private final RedissonClient redissonClient; private final DelayQueue<DeleteTask> delayQueue = new DelayQueue<>(); @PostConstruct public void init() { // 启动处理线程 new Thread(this::processTasks).start(); } // 添加延迟双删任务 public void scheduleDelete(String key, long delayMs) { delayQueue.put(new DeleteTask(key, delayMs)); } // 任务处理核心 private void processTasks() { while (!Thread.currentThread().isInterrupted()) { try { DeleteTask task = delayQueue.take(); RLock lock = redissonClient.getLock(task.key + ":lock"); try { if (lock.tryLock(1, 5, TimeUnit.SECONDS)) { redisTemplate.delete(task.key); } } finally { lock.unlock(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // 延迟任务对象 private static class DeleteTask implements Delayed { final String key; final long executeTime; DeleteTask(String key, long delayMs) { this.key = key; this.executeTime = System.currentTimeMillis() + delayMs; } @Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(executeTime, ((DeleteTask)o).executeTime); } } }
延迟双删是平衡性能与一致性的有效方案,以可控延迟换取更高吞吐,适用于写少读多、可接受短暂不一致的业务
优势:
局限性:
注意
没有完美的解决方案,只有最适合场景的方案。延迟双删在电商库存、用户配置等场景表现优异,但对金融交易等强一致性场景需慎重使用。
| 方案 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 基于消息队列 | 通过消息队列保证最终一致性 | 高可靠性,解耦系统 | 实现复杂,依赖中间件 |
| 补偿机制 | 失败后执行反向操作 | 简单易实现,适用性高 | 高并发下数据短暂不一致性 |
| 延迟双删策略 | 更新前后删除缓存 | 缓解脏读问题 | 不能完全避免不一致 |
保障双写一致性没有银弹,需要根据业务场景选择合适方案。注意每种方案完善重试机制,最好结合版本控制,每种方案的监控+告警+兜底三位一体必不可少。
本文展示的基于主流的解决方案进行讲述。在实际应用中还需结合监控(如缓存命中率、不一致告警)和压力测试持续优化。
本文作者:张豪
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!