在分布式系统中,保证数据一致性是一个核心挑战。特别是在跨服务调用时,如何确保多个服务的操作要么全部成功,要么全部失败,这就是分布式事务要解决的问题。本文将深入探讨可靠消息投递的分布式事务原理,重点分析RocketMQ事务消息 和 本地消息表 两种实现方式。
基本思路:将分布式事务拆解为多个本地事务,通过消息队列保证各个本地事务的最终一致性。
核心思想:避免分布式事务。
要实现可靠消息投递,必须解决两个核心问题:
其基本原理是:
在可靠消息投递中,关键在于保证"业务操作"与"消息发送"的原子性。如果业务操作成功但消息发送失败,或业务操作失败但消息已发送,系统就会出现数据不一致。可靠消息投递机制通过特定的实现方式,解决了这一问题。
可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息 和 本地消息表 是两种主流的实现方式。
RocketMQ 事务消息基于 两阶段提交(2PC) 思想,通过"半消息"机制实现业务与消息的原子性。
提示
RocketMQ 天然的支持事务消息,可查看其官方文档 《事务消息》

生产者实现
java @Slf4j @Component public class MQTXProducerService { public static final String TOPIC = "RLT_TEST_TOPIC"; public static final String TAG = "charge"; @Autowired RocketMQTemplate rocketMQTemplate; /** * 先向MQ Server发送半消息 * @param userCharge 用户充值信息 */ public TransactionSendResult sendHalfMsg(UserCharge userCharge) { /* 执行顺序: 1:发送半消息 2:执行本地事务(实现了 RocketMQLocalTransactionListener 接口的类) 3:发送半消息 4:MQ消费 */ // 生成事务id,唯一,可用业务标识 String transactionId = UUID.randomUUID().toString().replace("-", ""); log.info("1、【发送半消息】transactionId={}", transactionId); // 发送事务消息(参1:生产者所在事务组,参2:topic+tag,参3:消息体(可以传参),参4:发送参数) TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TOPIC + ":" + TAG, MessageBuilder.withPayload(userCharge).setHeader(RocketMQHeaders.MESSAGE_ID, transactionId).build(), userCharge); log.info("【发送半消息】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; } }
执行本地事务 + 回查方法
java @Slf4j @RocketMQTransactionListener public class MQTXLocalService implements RocketMQLocalTransactionListener { @Autowired private ITUserService userService; @Autowired private TMqTransactionLogMapper mqTransactionLogMapper; /* 这里代码是主要关键的地方,本地事务是给用户增加余额后再插入mq事务日志,这两个操作只有成功了,才返回COMMIT,异常失败就返回ROLLBACK 回查方法不一定会执行,但是得有,回查就是根据我们之前生成穿过来的那个事务id(transactionId)来查询事务日志表, 这样的好处是业务牵涉的表再多无所谓,我这个日志表也与你本地事务绑定,我只需查询这一张事务表就够了,能找到就代表本地事务执行成功了 */ /** * 用于执行本地事务的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) { // 获取消息体里参数 MessageHeaders messageHeaders = message.getHeaders(); String transactionId = (String) messageHeaders.get(RocketMQHeaders.TRANSACTION_ID); log.info("2、【执行本地事务】消息体参数:transactionId={}", transactionId); // 执行带有事务注解的本地方法:增加用户余额+保存mq日志 try { UserCharge userCharge = (UserCharge) obj; userService.addBalance(userCharge, transactionId); log.info("3、【执行本地事务】提交commit:transactionId={}", transactionId); // 正常:向MQ Server发送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("【执行本地事务】发生异常,消息将被回滚", e); // 异常:向MQ Server发送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用于回查本地事务执行结果的方法 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { MessageHeaders headers = message.getHeaders(); String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class); log.info("【回查本地事务】transactionId={}", transactionId); // 根据事务id查询事务日志表 TMqTransactionLog mqTransactionLog = mqTransactionLogMapper.selectById(transactionId); if (null == mqTransactionLog) { // 没查到表明本地事务执行失败,通知回滚 return RocketMQLocalTransactionState.ROLLBACK; } // 查到表明本地事务执行成功,提交 return RocketMQLocalTransactionState.COMMIT; } }
消费者实现
java @Slf4j @Component @RocketMQMessageListener(topic = MQTXProducerService.TOPIC, selectorExpression = MQTXProducerService.TAG, consumerGroup = "Con_Group_Four") public class MQTXConsumerService implements RocketMQListener<UserCharge> { @Autowired private ITCreditService creditService; @Override public void onMessage(UserCharge userCharge) { // 一般真实环境这里消费前,得做幂等性判断,防止重复消费 // 方法一:如果你的业务中有某个字段是唯一的,有标识性,如订单号,那就可以用此字段来判断 // 方法二:新建一张消费记录表t_mq_consumer_log,字段consumer_key是唯一性,能插入则表明该消息还未消费,往下走,否则停止消费 // 我个人建议用方法二,根据你的项目业务来定义key,这里我就不做幂等判断了,因为此案例只是模拟,重在分布式事务 // 给用户增加积分 TCredit tCredit = creditService.getOne(creditService.baseWrapper(new TCreditDTO().setUserId(userCharge.getUserId()))); boolean i = creditService.updateById(tCredit.setIntegration(tCredit.getIntegration() + userCharge.getChargeAmount())); if (i) { log.info("【MQ消费】用户增加积分成功,userCharge={}", JSONObject.toJSONString(userCharge)); } else { log.error("【MQ消费】用户充值增加积分消费失败,userCharge={}", JSONObject.toJSONString(userCharge)); } } }
优点:
缺点:
本地消息表是一种基于BASE理论的最终一致性方案,核心思路是将分布式事务拆分成本地事务进行处理,将消息数据与业务数据保存在同一个数据库中,保证本地数据库事务保证两者的原子性,利用中间件查询其他服务的事务消息状态。

数据库表设计
sql -- 业务订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_no VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, product_id BIGINT NOT NULL, quantity INT NOT NULL, amount DECIMAL(10,2) NOT NULL, status TINYINT NOT NULL DEFAULT 0, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 本地消息表 CREATE TABLE local_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, topic VARCHAR(128) NOT NULL, tags VARCHAR(128), body TEXT NOT NULL, status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待发送,1:已发送,2:发送失败', retry_count INT NOT NULL DEFAULT 0, next_retry_time DATETIME, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_next_retry (status, next_retry_time) );
本地事务+消息处理
java @Service @Transactional public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private LocalMessageMapper localMessageMapper; @Autowired private MessageSender messageSender; public void createOrderWithMessage(Order order) { // 1. 保存订单 orderMapper.insert(order); // 2. 构建消息 LocalMessage message = new LocalMessage(); message.setMessageId(UUID.randomUUID().toString()); message.setTopic("OrderTopic"); message.setTags("CREATE_ORDER"); Map<String, Object> messageBody = new HashMap<>(); messageBody.put("orderId", order.getId()); messageBody.put("productId", order.getProductId()); messageBody.put("quantity", order.getQuantity()); message.setBody(JSON.toJSONString(messageBody)); message.setStatus(0); // 待发送 // 3. 保存消息(与订单在同一个事务中) localMessageMapper.insert(message); // 事务提交后,异步发送消息 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { messageSender.asyncSendMessage(message); } } ); } }
发送事务消息
java @Component public class MessageSender { @Autowired private LocalMessageMapper localMessageMapper; @Autowired private RocketMQTemplate rocketMQTemplate; @Async public void asyncSendMessage(LocalMessage message) { try { Message<String> mqMessage = MessageBuilder.withPayload(message.getBody()) .setHeader(RocketMQHeaders.KEYS, message.getMessageId()) .build(); rocketMQTemplate.send(message.getTopic() + ":" + message.getTags(), mqMessage); // 更新消息状态为已发送 message.setStatus(1); localMessageMapper.updateStatus(message); } catch (Exception e) { // 发送失败,更新重试信息 message.setStatus(2); message.setRetryCount(message.getRetryCount() + 1); message.setNextRetryTime(calculateNextRetryTime(message.getRetryCount())); localMessageMapper.updateStatus(message); } } private Date calculateNextRetryTime(int retryCount) { // 指数退避策略 long delay = Math.min(1000 * (long) Math.pow(2, retryCount), 3600000); // 最大1小时 return new Date(System.currentTimeMillis() + delay); } }
消息补偿机制
java @Component @Slf4j public class MessageCompensateTask { @Autowired private LocalMessageMapper localMessageMapper; @Autowired private MessageSender messageSender; @Scheduled(fixedDelay = 30000) // 每30秒执行一次 public void compensateFailedMessages() { List<LocalMessage> failedMessages = localMessageMapper .selectByStatusAndTime(2, new Date()); // 状态为发送失败且到达重试时间 for (LocalMessage message : failedMessages) { if (message.getRetryCount() >= 10) { // 最大重试次数 log.warn("消息达到最大重试次数,需要人工干预: {}", message.getMessageId()); continue; } try { messageSender.asyncSendMessage(message); } catch (Exception e) { log.error("消息补偿发送失败: {}", message.getMessageId(), e); } } } }
消费者实现
java @Component @RocketMQMessageListener( topic = "OrderTopic", selectorExpression = "CREATE_ORDER", consumerGroup = "Order_Consumer_Group" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Autowired private InventoryService inventoryService; @Autowired private ConsumedMessageMapper consumedMessageMapper; @Override public void onMessage(MessageExt message) { String messageId = message.getMsgId(); // 检查是否已经消费过(幂等性保障) if (consumedMessageMapper.exists(messageId)) { log.info("消息已消费,跳过处理: {}", messageId); return; } try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); Long productId = Long.valueOf(data.get("productId").toString()); Integer quantity = Integer.valueOf(data.get("quantity").toString()); // 执行库存扣减 inventoryService.deductInventory(productId, quantity); // 记录消费成功的消息 consumedMessageMapper.insert(messageId, new Date()); } catch (Exception e) { log.error("消息消费失败: {}", messageId, e); throw new RuntimeException("消费失败,触发重试", e); } } }
优点:
缺点:
接下来我们考虑下在实际项目中遇到了 多服务调用的协调问题,这是非常典型的分布式事务复杂场景问题。
比如电商系统中的订单、库存、支付服务,需要考虑链式服务之间的调用关系,同时需要考虑如何确保多个步骤要么全部成功,要么全部回滚,在实际编码中实现事务的逆向操作。
提示
以下代码示例中将以本地消息表方案作为示例,在 RocketMQ 事务消息方案中的思路是一样的。
服务A调用服务B,服务B调用服务C,同时每个服务都有自己的本地事务
java // 服务A:订单服务 @Service public class OrderService { @Transactional public void createOrder(OrderDTO order) { // 1. 创建订单 orderRepository.save(order); // 2. 创建消息:通知库存服务扣减库存 MessageRecord message = new MessageRecord(); message.setMessageId(UUID.randomUUID().toString()); message.setBusinessId(order.getId()); message.setMessageContent(JSON.toJSONString(order)); message.setStatus("PENDING"); message.setTargetService("inventory-service"); // 目标服务 messageRecordRepository.save(message); } } // 服务B:库存服务 @Component public class InventoryMessageConsumer { @Autowired private InventoryService inventoryService; public void handleInventoryMessage(MessageRecord message) { try { // 1. 扣减库存(本地事务) inventoryService.decreaseInventory(message.getBusinessId()); // 2. 创建下游消息:通知物流服务创建配送单 MessageRecord downstreamMessage = new MessageRecord(); downstreamMessage.setMessageId(UUID.randomUUID().toString()); downstreamMessage.setBusinessId(message.getBusinessId()); downstreamMessage.setMessageContent(JSON.toJSONString(message)); downstreamMessage.setStatus("PENDING"); downstreamMessage.setTargetService("logistics-service"); // 注意:这里是在库存服务的数据库中创建消息 messageRecordRepository.save(downstreamMessage); // 3. 更新当前消息状态 message.setStatus("SUCCESS"); messageRecordRepository.save(message); } catch (Exception e) { // 处理失败,更新状态为FAILED,后续重试 message.setStatus("FAILED"); messageRecordRepository.save(message); throw e; } } } // 服务C:物流服务 @Component public class LogisticsMessageConsumer { public void handleLogisticsMessage(MessageRecord message) { try { // 创建配送单 logisticsService.createDeliveryOrder(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); // 可选:发送最终确认消息给订单服务 sendFinalConfirmation(message.getBusinessId()); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); throw e; } } }
服务A需要同时调用服务B和服务C,两个调用是并行的,互不影响
java // 服务A:订单服务 @Service public class OrderService { @Transactional public void createOrder(OrderDTO order) { // 1. 创建订单 orderRepository.save(order); // 2. 创建两条独立的消息 MessageRecord inventoryMessage = createMessage(order, "inventory-service"); MessageRecord paymentMessage = createMessage(order, "payment-service"); messageRecordRepository.save(inventoryMessage); messageRecordRepository.save(paymentMessage); // 3. 可选:创建聚合状态记录 OrderStatus status = new OrderStatus(); status.setOrderId(order.getId()); status.setInventoryStatus("PENDING"); status.setPaymentStatus("PENDING"); status.setOverallStatus("PROCESSING"); orderStatusRepository.save(status); } private MessageRecord createMessage(OrderDTO order, String targetService) { MessageRecord message = new MessageRecord(); message.setMessageId(UUID.randomUUID().toString()); message.setBusinessId(order.getId()); message.setMessageContent(JSON.toJSONString(order)); message.setStatus("PENDING"); message.setTargetService(targetService); return message; } } // 服务B和C的处理逻辑类似,处理完成后发送确认消息 @Component public class InventoryMessageConsumer { public void handleInventoryMessage(MessageRecord message) { try { inventoryService.decreaseInventory(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); // 发送确认消息给订单服务 sendConfirmation("inventory-service", message.getBusinessId(), "SUCCESS"); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); sendConfirmation("inventory-service", message.getBusinessId(), "FAILED"); throw e; } } } // 服务A:处理确认消息 @Component public class OrderConfirmationConsumer { public void handleConfirmation(ConfirmationMessage confirmation) { // 更新聚合状态 OrderStatus status = orderStatusRepository.findByOrderId(confirmation.getBusinessId()); if ("inventory-service".equals(confirmation.getServiceName())) { status.setInventoryStatus(confirmation.getStatus()); } else if ("payment-service".equals(confirmation.getServiceName())) { status.setPaymentStatus(confirmation.getStatus()); } // 检查整体状态 if ("SUCCESS".equals(status.getInventoryStatus()) && "SUCCESS".equals(status.getPaymentStatus())) { status.setOverallStatus("COMPLETED"); } else if ("FAILED".equals(status.getInventoryStatus()) || "FAILED".equals(status.getPaymentStatus())) { status.setOverallStatus("FAILED"); // 触发补偿逻辑 triggerCompensation(status.getOrderId()); } orderStatusRepository.save(status); } }
服务A和B的本地事务已经提交,服务C处理失败,需要进行补偿(回滚)操作
以上可靠消息投递的两种基本流程已经理解,其本身只保证消息生产和本地事务的一致性,并不直接管理多个服务的回滚,主要解决的是 生产者本地事务 与 消息发送的原子性 问题,但它本身并不直接管理多个服务之间的分布式事务协调。当涉及多个服务调用时,我们需要额外的机制来处理。
这时候需要引入 Saga模式 的概念,通过 补偿事务 来实现回滚。
sql CREATE TABLE distributed_transaction ( id BIGINT PRIMARY KEY AUTO_INCREMENT, transaction_id VARCHAR(64) NOT NULL UNIQUE COMMENT '全局事务ID', current_service VARCHAR(100) NOT NULL COMMENT '当前服务名', next_service VARCHAR(100) COMMENT '下一个服务名', transaction_status TINYINT NOT NULL COMMENT '0:进行中, 1:已完成, 2:已回滚, 3:失败', business_data TEXT COMMENT '业务数据', compensation_data TEXT COMMENT '补偿需要的数据', retry_count INT DEFAULT 0, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_transaction_id (transaction_id), INDEX idx_status (transaction_status) );
java @Component @Slf4j public class TransactionCoordinator { @Autowired private DistributedTransactionMapper transactionMapper; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 开始分布式事务 */ public String beginTransaction(String currentService, String businessData) { String transactionId = generateTransactionId(); DistributedTransaction transaction = new DistributedTransaction(); transaction.setTransactionId(transactionId); transaction.setCurrentService(currentService); transaction.setTransactionStatus(0); // 进行中 transaction.setBusinessData(businessData); transactionMapper.insert(transaction); return transactionId; } /** * 执行下一个服务调用 */ public void invokeNextService(String transactionId, String nextService, Object messageBody) { try { // 更新事务状态 DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId); transaction.setNextService(nextService); transactionMapper.update(transaction); // 发送事务消息到下一个服务 Map<String, Object> message = new HashMap<>(); message.put("transactionId", transactionId); message.put("businessData", transaction.getBusinessData()); message.put("messageBody", messageBody); Message<String> mqMessage = MessageBuilder.withPayload(JSON.toJSONString(message)) .setHeader(RocketMQHeaders.KEYS, transactionId) .build(); rocketMQTemplate.sendMessageInTransaction( "TransactionTopic", mqMessage, null ); } catch (Exception e) { log.error("调用下一个服务失败: transactionId={}, nextService={}", transactionId, nextService, e); // 触发补偿流程 triggerCompensation(transactionId); } } /** * 标记事务步骤完成 */ public void completeStep(String transactionId) { DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId); transaction.setTransactionStatus(1); // 已完成 transactionMapper.update(transaction); } /** * 触发补偿流程 */ public void triggerCompensation(String transactionId) { log.info("开始补偿流程: transactionId={}", transactionId); // 发送补偿消息(按照服务调用逆序) Message<String> compensateMessage = MessageBuilder.withPayload(transactionId) .setHeader("COMPENSATION", "true") .build(); rocketMQTemplate.syncSend("CompensationTopic", compensateMessage); } }
java @Service @Slf4j public class OrderService { @Autowired private TransactionCoordinator transactionCoordinator; @Autowired private InventoryService inventoryService; @Transactional public String createOrder(OrderRequest request) { // 1. 开始分布式事务 String transactionId = transactionCoordinator.beginTransaction( "order-service", JSON.toJSONString(request) ); try { // 2. 执行本地事务(创建订单) Order order = createOrderLocal(request); // 3. 保存补偿需要的数据 String compensationData = buildCompensationData(order); // 4. 调用下一个服务(库存服务) InventoryDeductRequest inventoryRequest = buildInventoryRequest(order); transactionCoordinator.invokeNextService( transactionId, "inventory-service", inventoryRequest ); return transactionId; } catch (Exception e) { log.error("创建订单失败: transactionId={}", transactionId, e); transactionCoordinator.triggerCompensation(transactionId); throw new RuntimeException("创建订单失败", e); } } /** * 订单服务补偿操作 */ @Transactional public void compensateOrder(String transactionId, String compensationData) { log.info("执行订单服务补偿: transactionId={}", transactionId); try { // 解析补偿数据 OrderCompensationData data = JSON.parseObject(compensationData, OrderCompensationData.class); // 取消订单(软删除或状态更新) orderMapper.updateStatus(data.getOrderId(), OrderStatus.CANCELLED); // 记录补偿日志 log.info("订单补偿完成: orderId={}", data.getOrderId()); } catch (Exception e) { log.error("订单补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("订单补偿失败", e); } } }
java @Service @Slf4j public class InventoryService { @Autowired private TransactionCoordinator transactionCoordinator; @Autowired private PointService pointService; /** * 库存扣减(事务消息消费者) */ @RocketMQMessageListener( topic = "TransactionTopic", selectorExpression = "inventory-service", consumerGroup = "inventory-consumer-group" ) public class InventoryConsumer implements RocketMQListener<MessageExt> { @Override @Transactional public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); String transactionId = (String) data.get("transactionId"); InventoryDeductRequest request = JSON.parseObject( JSON.toJSONString(data.get("messageBody")), InventoryDeductRequest.class ); // 检查是否补偿消息 if (isCompensationMessage(message)) { compensateInventory(transactionId, request); return; } // 执行库存扣减 boolean success = deductInventory(request); if (success) { // 标记当前步骤完成 transactionCoordinator.completeStep(transactionId); // 调用下一个服务(积分服务) PointAddRequest pointRequest = buildPointRequest(request); transactionCoordinator.invokeNextService( transactionId, "point-service", pointRequest ); } else { // 库存不足,触发补偿 transactionCoordinator.triggerCompensation(transactionId); } } catch (Exception e) { log.error("库存服务处理失败", e); // 触发补偿 String transactionId = extractTransactionId(message); transactionCoordinator.triggerCompensation(transactionId); } } /** * 库存服务补偿操作 */ @Transactional public void compensateInventory(String transactionId, InventoryDeductRequest request) { log.info("执行库存服务补偿: transactionId={}", transactionId); try { // 恢复库存 restoreInventory(request.getProductId(), request.getQuantity()); log.info("库存补偿完成: productId={}, quantity={}", request.getProductId(), request.getQuantity()); } catch (Exception e) { log.error("库存补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("库存补偿失败", e); } } } }
java @Service @Slf4j public class PointService { @RocketMQMessageListener( topic = "TransactionTopic", selectorExpression = "point-service", consumerGroup = "point-consumer-group" ) public class PointConsumer implements RocketMQListener<MessageExt> { @Override @Transactional public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); String transactionId = (String) data.get("transactionId"); PointAddRequest request = JSON.parseObject( JSON.toJSONString(data.get("messageBody")), PointAddRequest.class ); // 检查是否补偿消息 if (isCompensationMessage(message)) { compensatePoints(transactionId, request); return; } // 执行积分增加 boolean success = addPoints(request); if (success) { // 标记整个事务完成 completeTransaction(transactionId); log.info("分布式事务完成: transactionId={}", transactionId); } else { // 积分增加失败,触发补偿 transactionCoordinator.triggerCompensation(transactionId); } } catch (Exception e) { log.error("积分服务处理失败", e); String transactionId = extractTransactionId(message); transactionCoordinator.triggerCompensation(transactionId); } } /** * 积分服务补偿操作 */ @Transactional public void compensatePoints(String transactionId, PointAddRequest request) { log.info("执行积分服务补偿: transactionId={}", transactionId); try { // 扣减积分(恢复原状) deductPoints(request.getUserId(), request.getPoints()); log.info("积分补偿完成: userId={}, points={}", request.getUserId(), request.getPoints()); } catch (Exception e) { log.error("积分补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("积分补偿失败", e); } } } }
java @Component @Slf4j public class CompensationConsumer { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private PointService pointService; @RocketMQMessageListener( topic = "CompensationTopic", consumerGroup = "compensation-consumer-group" ) public void handleCompensation(String transactionId) { log.info("处理补偿事务: transactionId={}", transactionId); try { // 按照调用链的逆序执行补偿 // 1. 补偿积分服务 pointService.compensatePoints(transactionId, getPointCompensationData(transactionId)); // 2. 补偿库存服务 inventoryService.compensateInventory(transactionId, getInventoryCompensationData(transactionId)); // 3. 补偿订单服务 orderService.compensateOrder(transactionId, getOrderCompensationData(transactionId)); log.info("补偿流程完成: transactionId={}", transactionId); } catch (Exception e) { log.error("补偿流程执行失败: transactionId={}", transactionId, e); // 记录失败日志,需要人工干预 } } }
核心要点,
每个补偿操作必须保证幂等性,防止重复补偿:
要明确事务的状态机
java public enum TransactionStatus { INITIATED(0, "已初始化"), IN_PROGRESS(1, "进行中"), ORDER_COMPLETED(2, "订单服务完成"), INVENTORY_COMPLETED(3, "库存服务完成"), POINT_COMPLETED(4, "积分服务完成"), SUCCESS(5, "事务成功"), COMPENSATING(6, "补偿中"), COMPENSATED(7, "已补偿"), FAILED(8, "事务失败"); // ... getters and constructors }
超时控制,防止事务长时间挂起
java // 1. 定义补偿消息结构 @Data public class CompensationMessage { private String originalMessageId; // 原始消息ID private String businessId; private String serviceName; // 需要补偿的服务 private String compensationType; // 补偿类型 } // 2. 服务C失败时的处理 @Component public class LogisticsMessageConsumer { public void handleLogisticsMessage(MessageRecord message) { try { logisticsService.createDeliveryOrder(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); // 发送补偿消息 CompensationMessage compensation = new CompensationMessage(); compensation.setOriginalMessageId(message.getMessageId()); compensation.setBusinessId(message.getBusinessId()); compensation.setServiceName("inventory-service"); // 需要补偿的上游服务 compensation.setCompensationType("ROLLBACK_INVENTORY"); rocketMQTemplate.convertAndSend("compensation-topic", compensation); throw e; } } } // 3. 服务B:处理补偿消息 @Component public class InventoryCompensationConsumer { public void handleCompensation(CompensationMessage compensation) { try { // 执行补偿操作:增加库存(回滚扣减操作) inventoryService.increaseInventory(compensation.getBusinessId()); // 发送进一步的补偿消息给服务A CompensationMessage upstreamCompensation = new CompensationMessage(); upstreamCompensation.setBusinessId(compensation.getBusinessId()); upstreamCompensation.setServiceName("order-service"); upstreamCompensation.setCompensationType("ROLLBACK_ORDER_STATUS"); rocketMQTemplate.convertAndSend("compensation-topic", upstreamCompensation); } catch (Exception e) { // 补偿失败需要人工干预或重试 log.error("补偿操作失败", e); // 可以发送告警或记录到人工处理队列 } } } // 4. 服务A:处理最终补偿 @Component public class OrderCompensationConsumer { public void handleCompensation(CompensationMessage compensation) { // 更新订单状态为失败 orderService.updateOrderStatus(compensation.getBusinessId(), "FAILED"); // 可能还需要通知用户或其他业务处理 notificationService.notifyOrderFailed(compensation.getBusinessId()); } }
补偿消息的可靠性保证
java // 补偿消息表(每个服务都需要) @Data public class CompensationRecord { private String compensationId; private String originalBusinessId; private String serviceName; private String compensationType; private String status; // PENDING, SUCCESS, FAILED private int retryCount; private Date createTime; } // 补偿服务实现 @Service public class CompensationService { @Transactional public void createCompensation(String businessId, String serviceName, String type) { CompensationRecord record = new CompensationRecord(); record.setCompensationId(UUID.randomUUID().toString()); record.setOriginalBusinessId(businessId); record.setServiceName(serviceName); record.setCompensationType(type); record.setStatus("PENDING"); compensationRecordRepository.save(record); // 发送补偿消息 rocketMQTemplate.convertAndSend("compensation-topic", record); } // 定时任务:重试失败的补偿 @Scheduled(fixedRate = 30000) public void retryFailedCompensations() { List<CompensationRecord> failedRecords = compensationRecordRepository.findByStatus("FAILED"); for (CompensationRecord record : failedRecords) { if (record.getRetryCount() < 5) { rocketMQTemplate.convertAndSend("compensation-topic", record); record.setRetryCount(record.getRetryCount() + 1); compensationRecordRepository.save(record); } else { // 超过重试次数,需要人工处理 alertService.sendAlert("补偿失败需要人工处理: " + record.getCompensationId()); } } } }
幂等性保证
java // 补偿操作的幂等性处理 @Service public class InventoryService { public void increaseInventory(String orderId) { // 检查是否已经执行过补偿 if (compensationRecordRepository.existsByBusinessIdAndType(orderId, "INCREASE_INVENTORY")) { return; // 已经执行过,直接返回 } // 执行补偿逻辑 inventoryRepository.increaseStock(orderId); // 记录补偿执行 CompensationExecution execution = new CompensationExecution(); execution.setBusinessId(orderId); execution.setCompensationType("INCREASE_INVENTORY"); execution.setExecuteTime(new Date()); compensationExecutionRepository.save(execution); } }
通过Saga模式 + RocketMQ事务消息 + 补偿事务的组合方案,我们可以有效处理多服务之间的分布式事务:
这种方案虽然实现相对复杂,但能够很好地解决多服务调用的事务一致性问题,是分布式系统中常用的成熟模式。
| 特性 | RocketMQ事务消息 | 本地消息表 |
|---|---|---|
| 实现复杂度 | 中等 | 中等 |
| 性能 | 高 | 中等 |
| 数据一致性 | 强 | 强 |
| 中间件依赖 | 强(RocketMQ) | 弱 |
| 通用性 | 低 | 高 |
| 运维成本 | 中等 | 中等 |
选择建议:
幂等性设计:无论使用哪种方案,消费者都必须实现幂等性处理,防止重复消费
监控与告警
重试策略
数据对账:定期执行数据对账任务,发现并修复不一致的数据
java @Component @Slf4j public class DataReconciliationTask { @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行 public void reconcileData() { // 检查订单与库存的一致性 List<Order> orders = orderService.findInconsistentOrders(); for (Order order : orders) { log.warn("发现数据不一致订单: {}", order.getId()); // 触发补偿流程或通知人工处理 } } }
可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息 和 本地消息表 是两种主流的实现方式。RocketMQ事务消息通过MQ的内置事务机制提供简洁的实现,而本地消息表则通过应用层实现提供更高的灵活性。
在实际项目中,应根据业务特点、系统架构和现有技术栈选择合适的方案。对于事务操作时间短、已使用RocketMQ的系统,RocketMQ事务消息是理想选择;对于需要与MQ解耦、事务操作时间长的系统,本地消息表更为合适。
无论选择哪种方式,都需注意以下关键点:
通过合理运用这些可靠消息投递的实现方式,我们可以在分布式系统中构建出高可靠、高可用的事务处理机制,为业务系统的稳定运行提供有力保障。
本文作者:张豪
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!