2025-09-24
微服务与分布式
0

目录

可靠消息投递的核心思想
可靠消息投递的实现方式
RocketMQ 事务消息
工作流程
关键特性
代码示例
优缺点分析
本地消息表
工作流程
关键机制
代码示例
优缺点分析
多服务链式调用处理策略
链式调用(A → B → C)
处理方案:逐级消息传递 + 状态回传
关键核心
并行调用(A → B 且 A → C)
处理方案:多消息记录 + 聚合确认
调用失败的补偿处理
RocketMQ 事务消息处理方案:Saga模式
本地消息处理方案:Saga模式 + 补偿事务
总结
方案对比与选择建议
最佳实践
总结

在分布式系统中,保证数据一致性是一个核心挑战。特别是在跨服务调用时,如何确保多个服务的操作要么全部成功,要么全部失败,这就是分布式事务要解决的问题。本文将深入探讨可靠消息投递的分布式事务原理,重点分析RocketMQ事务消息本地消息表 两种实现方式。

可靠消息投递的核心思想

基本思路:将分布式事务拆解为多个本地事务,通过消息队列保证各个本地事务的最终一致性

核心思想:避免分布式事务

要实现可靠消息投递,必须解决两个核心问题:

  • 消息不丢失:确保消息从生产者到消费者的可靠传递
  • 消息不重复:在保证不丢失的前提下,避免重复消费

其基本原理是:

  • 最终一致性:允许一段时间内的数据不一致,但保证在一定时间内系统会达到一致状态
  • 消息可靠性:确保消息不会丢失,且能被正确处理
  • 事务解耦:将业务操作与消息发送解耦,通过异步方式完成

在可靠消息投递中,关键在于保证"业务操作"与"消息发送"的原子性。如果业务操作成功但消息发送失败,或业务操作失败但消息已发送,系统就会出现数据不一致。可靠消息投递机制通过特定的实现方式,解决了这一问题。

可靠消息投递的实现方式

可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息本地消息表 是两种主流的实现方式。

RocketMQ 事务消息

RocketMQ 事务消息基于 两阶段提交(2PC) 思想,通过"半消息"机制实现业务与消息的原子性。

提示

RocketMQ 天然的支持事务消息,可查看其官方文档 《事务消息》

工作流程

  • 发送半消息:生产者发送一条消息(Half Message),此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息
  • ack 确认: RocketMQ 服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,执行本地业务操作
  • 执行本地事务:生产者执行本地业务操作(如数据库更新)
  • 提交事务:生产者根据本地事务执行结果向服务端提交二次确认结果
    • 本地事务成功:生产者向Broker发送"提交"请求,Broker 将半消息变为可消费状态
    • 本地事务失败:生产者向Broker发送"回滚"请求,Broker 删除该半消息
  • 事务回查:如果 Broker 未收到二次确认,会定期回查生产者,确认事务状态,检查对应消息的本地事务执行的最终结果

image.png

关键特性

  • 半消息机制:Broker 存储半消息于特殊Topic(RMQ_SYS_TRANS_HALF_TOPIC)
  • 事务状态回查:默认每60秒检查一次,最多15次重试
  • 消息状态:Prepared(半消息)、Commit(已提交)、Rollback(已回滚)

代码示例

生产者实现

java
@Slf4j @Component public class MQTXProducerService { public static final String TOPIC = "RLT_TEST_TOPIC"; public static final String TAG = "charge"; @Autowired RocketMQTemplate rocketMQTemplate; /** * 先向MQ Server发送半消息 * @param userCharge 用户充值信息 */ public TransactionSendResult sendHalfMsg(UserCharge userCharge) { /* 执行顺序: 1:发送半消息 2:执行本地事务(实现了 RocketMQLocalTransactionListener 接口的类) 3:发送半消息 4:MQ消费 */ // 生成事务id,唯一,可用业务标识 String transactionId = UUID.randomUUID().toString().replace("-", ""); log.info("1、【发送半消息】transactionId={}", transactionId); // 发送事务消息(参1:生产者所在事务组,参2:topic+tag,参3:消息体(可以传参),参4:发送参数) TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TOPIC + ":" + TAG, MessageBuilder.withPayload(userCharge).setHeader(RocketMQHeaders.MESSAGE_ID, transactionId).build(), userCharge); log.info("【发送半消息】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; } }

执行本地事务 + 回查方法

java
@Slf4j @RocketMQTransactionListener public class MQTXLocalService implements RocketMQLocalTransactionListener { @Autowired private ITUserService userService; @Autowired private TMqTransactionLogMapper mqTransactionLogMapper; /* 这里代码是主要关键的地方,本地事务是给用户增加余额后再插入mq事务日志,这两个操作只有成功了,才返回COMMIT,异常失败就返回ROLLBACK 回查方法不一定会执行,但是得有,回查就是根据我们之前生成穿过来的那个事务id(transactionId)来查询事务日志表, 这样的好处是业务牵涉的表再多无所谓,我这个日志表也与你本地事务绑定,我只需查询这一张事务表就够了,能找到就代表本地事务执行成功了 */ /** * 用于执行本地事务的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) { // 获取消息体里参数 MessageHeaders messageHeaders = message.getHeaders(); String transactionId = (String) messageHeaders.get(RocketMQHeaders.TRANSACTION_ID); log.info("2、【执行本地事务】消息体参数:transactionId={}", transactionId); // 执行带有事务注解的本地方法:增加用户余额+保存mq日志 try { UserCharge userCharge = (UserCharge) obj; userService.addBalance(userCharge, transactionId); log.info("3、【执行本地事务】提交commit:transactionId={}", transactionId); // 正常:向MQ Server发送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("【执行本地事务】发生异常,消息将被回滚", e); // 异常:向MQ Server发送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用于回查本地事务执行结果的方法 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { MessageHeaders headers = message.getHeaders(); String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class); log.info("【回查本地事务】transactionId={}", transactionId); // 根据事务id查询事务日志表 TMqTransactionLog mqTransactionLog = mqTransactionLogMapper.selectById(transactionId); if (null == mqTransactionLog) { // 没查到表明本地事务执行失败,通知回滚 return RocketMQLocalTransactionState.ROLLBACK; } // 查到表明本地事务执行成功,提交 return RocketMQLocalTransactionState.COMMIT; } }

消费者实现

java
@Slf4j @Component @RocketMQMessageListener(topic = MQTXProducerService.TOPIC, selectorExpression = MQTXProducerService.TAG, consumerGroup = "Con_Group_Four") public class MQTXConsumerService implements RocketMQListener<UserCharge> { @Autowired private ITCreditService creditService; @Override public void onMessage(UserCharge userCharge) { // 一般真实环境这里消费前,得做幂等性判断,防止重复消费 // 方法一:如果你的业务中有某个字段是唯一的,有标识性,如订单号,那就可以用此字段来判断 // 方法二:新建一张消费记录表t_mq_consumer_log,字段consumer_key是唯一性,能插入则表明该消息还未消费,往下走,否则停止消费 // 我个人建议用方法二,根据你的项目业务来定义key,这里我就不做幂等判断了,因为此案例只是模拟,重在分布式事务 // 给用户增加积分 TCredit tCredit = creditService.getOne(creditService.baseWrapper(new TCreditDTO().setUserId(userCharge.getUserId()))); boolean i = creditService.updateById(tCredit.setIntegration(tCredit.getIntegration() + userCharge.getChargeAmount())); if (i) { log.info("【MQ消费】用户增加积分成功,userCharge={}", JSONObject.toJSONString(userCharge)); } else { log.error("【MQ消费】用户充值增加积分消费失败,userCharge={}", JSONObject.toJSONString(userCharge)); } } }

优缺点分析

优点

  • 实现相对简单:RocketMQ提供了完整的事务消息机制
  • 无需额外存储:消息状态由Broker管理,不需要额外的本地消息表
  • 事务回查机制:自动处理异常情况,保证最终一致性
  • 与业务解耦:业务代码只需关注本地事务和事务状态确认
  • 性能较好:消息异步处理

缺点

  • 依赖MQ特性:需要MQ支持事务消息(如RocketMQ)
  • 事务执行时间限制:半消息默认超时4小时,事务状态检查需要业务方实现
  • 无法保证严格实时一致性:只能保证最终一致性

本地消息表

本地消息表是一种基于BASE理论的最终一致性方案,核心思路是将分布式事务拆分成本地事务进行处理,将消息数据与业务数据保存在同一个数据库中,保证本地数据库事务保证两者的原子性,利用中间件查询其他服务的事务消息状态。

工作流程

  • 事务主动方:在同一个本地事务中处理业务和写消息表
  • 消息发送:通过消息中间件通知事务被动方
  • 消息处理:事务被动方消费消息并处理业务
  • 状态更新:事务主动方接收消息,更新消息表状态

image.png

关键机制

  • 本地事务保证:业务操作与消息记录在同一个事务中
  • 定时任务兜底:轮询未处理的消息,重试发送
  • 消息幂等:处理重复消息,确保业务正确性

代码示例

数据库表设计

sql
-- 业务订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_no VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, product_id BIGINT NOT NULL, quantity INT NOT NULL, amount DECIMAL(10,2) NOT NULL, status TINYINT NOT NULL DEFAULT 0, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 本地消息表 CREATE TABLE local_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, topic VARCHAR(128) NOT NULL, tags VARCHAR(128), body TEXT NOT NULL, status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待发送,1:已发送,2:发送失败', retry_count INT NOT NULL DEFAULT 0, next_retry_time DATETIME, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_next_retry (status, next_retry_time) );

本地事务+消息处理

java
@Service @Transactional public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private LocalMessageMapper localMessageMapper; @Autowired private MessageSender messageSender; public void createOrderWithMessage(Order order) { // 1. 保存订单 orderMapper.insert(order); // 2. 构建消息 LocalMessage message = new LocalMessage(); message.setMessageId(UUID.randomUUID().toString()); message.setTopic("OrderTopic"); message.setTags("CREATE_ORDER"); Map<String, Object> messageBody = new HashMap<>(); messageBody.put("orderId", order.getId()); messageBody.put("productId", order.getProductId()); messageBody.put("quantity", order.getQuantity()); message.setBody(JSON.toJSONString(messageBody)); message.setStatus(0); // 待发送 // 3. 保存消息(与订单在同一个事务中) localMessageMapper.insert(message); // 事务提交后,异步发送消息 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { messageSender.asyncSendMessage(message); } } ); } }

发送事务消息

java
@Component public class MessageSender { @Autowired private LocalMessageMapper localMessageMapper; @Autowired private RocketMQTemplate rocketMQTemplate; @Async public void asyncSendMessage(LocalMessage message) { try { Message<String> mqMessage = MessageBuilder.withPayload(message.getBody()) .setHeader(RocketMQHeaders.KEYS, message.getMessageId()) .build(); rocketMQTemplate.send(message.getTopic() + ":" + message.getTags(), mqMessage); // 更新消息状态为已发送 message.setStatus(1); localMessageMapper.updateStatus(message); } catch (Exception e) { // 发送失败,更新重试信息 message.setStatus(2); message.setRetryCount(message.getRetryCount() + 1); message.setNextRetryTime(calculateNextRetryTime(message.getRetryCount())); localMessageMapper.updateStatus(message); } } private Date calculateNextRetryTime(int retryCount) { // 指数退避策略 long delay = Math.min(1000 * (long) Math.pow(2, retryCount), 3600000); // 最大1小时 return new Date(System.currentTimeMillis() + delay); } }

消息补偿机制

java
@Component @Slf4j public class MessageCompensateTask { @Autowired private LocalMessageMapper localMessageMapper; @Autowired private MessageSender messageSender; @Scheduled(fixedDelay = 30000) // 每30秒执行一次 public void compensateFailedMessages() { List<LocalMessage> failedMessages = localMessageMapper .selectByStatusAndTime(2, new Date()); // 状态为发送失败且到达重试时间 for (LocalMessage message : failedMessages) { if (message.getRetryCount() >= 10) { // 最大重试次数 log.warn("消息达到最大重试次数,需要人工干预: {}", message.getMessageId()); continue; } try { messageSender.asyncSendMessage(message); } catch (Exception e) { log.error("消息补偿发送失败: {}", message.getMessageId(), e); } } } }

消费者实现

java
@Component @RocketMQMessageListener( topic = "OrderTopic", selectorExpression = "CREATE_ORDER", consumerGroup = "Order_Consumer_Group" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Autowired private InventoryService inventoryService; @Autowired private ConsumedMessageMapper consumedMessageMapper; @Override public void onMessage(MessageExt message) { String messageId = message.getMsgId(); // 检查是否已经消费过(幂等性保障) if (consumedMessageMapper.exists(messageId)) { log.info("消息已消费,跳过处理: {}", messageId); return; } try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); Long productId = Long.valueOf(data.get("productId").toString()); Integer quantity = Integer.valueOf(data.get("quantity").toString()); // 执行库存扣减 inventoryService.deductInventory(productId, quantity); // 记录消费成功的消息 consumedMessageMapper.insert(messageId, new Date()); } catch (Exception e) { log.error("消息消费失败: {}", messageId, e); throw new RuntimeException("消费失败,触发重试", e); } } }

优缺点分析

优点

  • 与MQ解耦:不依赖MQ的特定特性,可适配任何MQ
  • 轻量级实现:不需要复杂配置,容易实现
  • 数据可靠性不依赖MQ:消息可靠性由应用层保证

缺点

  • 与业务强耦合:需要在业务系统中实现消息表
  • 资源占用:消息表与业务表同库,增加数据库负担
  • 事务处理复杂:需要处理幂等性和重试机制

多服务链式调用处理策略

接下来我们考虑下在实际项目中遇到了 多服务调用的协调问题,这是非常典型的分布式事务复杂场景问题。

比如电商系统中的订单、库存、支付服务,需要考虑链式服务之间的调用关系,同时需要考虑如何确保多个步骤要么全部成功,要么全部回滚,在实际编码中实现事务的逆向操作。

提示

以下代码示例中将以本地消息表方案作为示例,在 RocketMQ 事务消息方案中的思路是一样的。

链式调用(A → B → C)

服务A调用服务B,服务B调用服务C,同时每个服务都有自己的本地事务

处理方案:逐级消息传递 + 状态回传

java
// 服务A:订单服务 @Service public class OrderService { @Transactional public void createOrder(OrderDTO order) { // 1. 创建订单 orderRepository.save(order); // 2. 创建消息:通知库存服务扣减库存 MessageRecord message = new MessageRecord(); message.setMessageId(UUID.randomUUID().toString()); message.setBusinessId(order.getId()); message.setMessageContent(JSON.toJSONString(order)); message.setStatus("PENDING"); message.setTargetService("inventory-service"); // 目标服务 messageRecordRepository.save(message); } } // 服务B:库存服务 @Component public class InventoryMessageConsumer { @Autowired private InventoryService inventoryService; public void handleInventoryMessage(MessageRecord message) { try { // 1. 扣减库存(本地事务) inventoryService.decreaseInventory(message.getBusinessId()); // 2. 创建下游消息:通知物流服务创建配送单 MessageRecord downstreamMessage = new MessageRecord(); downstreamMessage.setMessageId(UUID.randomUUID().toString()); downstreamMessage.setBusinessId(message.getBusinessId()); downstreamMessage.setMessageContent(JSON.toJSONString(message)); downstreamMessage.setStatus("PENDING"); downstreamMessage.setTargetService("logistics-service"); // 注意:这里是在库存服务的数据库中创建消息 messageRecordRepository.save(downstreamMessage); // 3. 更新当前消息状态 message.setStatus("SUCCESS"); messageRecordRepository.save(message); } catch (Exception e) { // 处理失败,更新状态为FAILED,后续重试 message.setStatus("FAILED"); messageRecordRepository.save(message); throw e; } } } // 服务C:物流服务 @Component public class LogisticsMessageConsumer { public void handleLogisticsMessage(MessageRecord message) { try { // 创建配送单 logisticsService.createDeliveryOrder(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); // 可选:发送最终确认消息给订单服务 sendFinalConfirmation(message.getBusinessId()); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); throw e; } } }

关键核心

  • 每个服务都是独立的消息生产者和消费者
  • 服务B在处理完自己的业务后,会创建新的消息给服务C
  • 状态回传机制:服务C处理完成后,可以发送确认消息给服务A

并行调用(A → B 且 A → C)

服务A需要同时调用服务B和服务C,两个调用是并行的,互不影响

处理方案:多消息记录 + 聚合确认

java
// 服务A:订单服务 @Service public class OrderService { @Transactional public void createOrder(OrderDTO order) { // 1. 创建订单 orderRepository.save(order); // 2. 创建两条独立的消息 MessageRecord inventoryMessage = createMessage(order, "inventory-service"); MessageRecord paymentMessage = createMessage(order, "payment-service"); messageRecordRepository.save(inventoryMessage); messageRecordRepository.save(paymentMessage); // 3. 可选:创建聚合状态记录 OrderStatus status = new OrderStatus(); status.setOrderId(order.getId()); status.setInventoryStatus("PENDING"); status.setPaymentStatus("PENDING"); status.setOverallStatus("PROCESSING"); orderStatusRepository.save(status); } private MessageRecord createMessage(OrderDTO order, String targetService) { MessageRecord message = new MessageRecord(); message.setMessageId(UUID.randomUUID().toString()); message.setBusinessId(order.getId()); message.setMessageContent(JSON.toJSONString(order)); message.setStatus("PENDING"); message.setTargetService(targetService); return message; } } // 服务B和C的处理逻辑类似,处理完成后发送确认消息 @Component public class InventoryMessageConsumer { public void handleInventoryMessage(MessageRecord message) { try { inventoryService.decreaseInventory(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); // 发送确认消息给订单服务 sendConfirmation("inventory-service", message.getBusinessId(), "SUCCESS"); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); sendConfirmation("inventory-service", message.getBusinessId(), "FAILED"); throw e; } } } // 服务A:处理确认消息 @Component public class OrderConfirmationConsumer { public void handleConfirmation(ConfirmationMessage confirmation) { // 更新聚合状态 OrderStatus status = orderStatusRepository.findByOrderId(confirmation.getBusinessId()); if ("inventory-service".equals(confirmation.getServiceName())) { status.setInventoryStatus(confirmation.getStatus()); } else if ("payment-service".equals(confirmation.getServiceName())) { status.setPaymentStatus(confirmation.getStatus()); } // 检查整体状态 if ("SUCCESS".equals(status.getInventoryStatus()) && "SUCCESS".equals(status.getPaymentStatus())) { status.setOverallStatus("COMPLETED"); } else if ("FAILED".equals(status.getInventoryStatus()) || "FAILED".equals(status.getPaymentStatus())) { status.setOverallStatus("FAILED"); // 触发补偿逻辑 triggerCompensation(status.getOrderId()); } orderStatusRepository.save(status); } }

调用失败的补偿处理

服务A和B的本地事务已经提交,服务C处理失败,需要进行补偿(回滚)操作

以上可靠消息投递的两种基本流程已经理解,其本身只保证消息生产和本地事务的一致性,并不直接管理多个服务的回滚,主要解决的是 生产者本地事务消息发送的原子性 问题,但它本身并不直接管理多个服务之间的分布式事务协调。当涉及多个服务调用时,我们需要额外的机制来处理。

这时候需要引入 Saga模式 的概念,通过 补偿事务 来实现回滚。

RocketMQ 事务消息处理方案:Saga模式

  1. 事务状态表设计,首先需要扩展本地消息表,增加事务状态跟踪:
sql
CREATE TABLE distributed_transaction ( id BIGINT PRIMARY KEY AUTO_INCREMENT, transaction_id VARCHAR(64) NOT NULL UNIQUE COMMENT '全局事务ID', current_service VARCHAR(100) NOT NULL COMMENT '当前服务名', next_service VARCHAR(100) COMMENT '下一个服务名', transaction_status TINYINT NOT NULL COMMENT '0:进行中, 1:已完成, 2:已回滚, 3:失败', business_data TEXT COMMENT '业务数据', compensation_data TEXT COMMENT '补偿需要的数据', retry_count INT DEFAULT 0, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_transaction_id (transaction_id), INDEX idx_status (transaction_status) );
  1. 事务协调器实现
java
@Component @Slf4j public class TransactionCoordinator { @Autowired private DistributedTransactionMapper transactionMapper; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 开始分布式事务 */ public String beginTransaction(String currentService, String businessData) { String transactionId = generateTransactionId(); DistributedTransaction transaction = new DistributedTransaction(); transaction.setTransactionId(transactionId); transaction.setCurrentService(currentService); transaction.setTransactionStatus(0); // 进行中 transaction.setBusinessData(businessData); transactionMapper.insert(transaction); return transactionId; } /** * 执行下一个服务调用 */ public void invokeNextService(String transactionId, String nextService, Object messageBody) { try { // 更新事务状态 DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId); transaction.setNextService(nextService); transactionMapper.update(transaction); // 发送事务消息到下一个服务 Map<String, Object> message = new HashMap<>(); message.put("transactionId", transactionId); message.put("businessData", transaction.getBusinessData()); message.put("messageBody", messageBody); Message<String> mqMessage = MessageBuilder.withPayload(JSON.toJSONString(message)) .setHeader(RocketMQHeaders.KEYS, transactionId) .build(); rocketMQTemplate.sendMessageInTransaction( "TransactionTopic", mqMessage, null ); } catch (Exception e) { log.error("调用下一个服务失败: transactionId={}, nextService={}", transactionId, nextService, e); // 触发补偿流程 triggerCompensation(transactionId); } } /** * 标记事务步骤完成 */ public void completeStep(String transactionId) { DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId); transaction.setTransactionStatus(1); // 已完成 transactionMapper.update(transaction); } /** * 触发补偿流程 */ public void triggerCompensation(String transactionId) { log.info("开始补偿流程: transactionId={}", transactionId); // 发送补偿消息(按照服务调用逆序) Message<String> compensateMessage = MessageBuilder.withPayload(transactionId) .setHeader("COMPENSATION", "true") .build(); rocketMQTemplate.syncSend("CompensationTopic", compensateMessage); } }
  1. 订单服务实现(事务起点)
java
@Service @Slf4j public class OrderService { @Autowired private TransactionCoordinator transactionCoordinator; @Autowired private InventoryService inventoryService; @Transactional public String createOrder(OrderRequest request) { // 1. 开始分布式事务 String transactionId = transactionCoordinator.beginTransaction( "order-service", JSON.toJSONString(request) ); try { // 2. 执行本地事务(创建订单) Order order = createOrderLocal(request); // 3. 保存补偿需要的数据 String compensationData = buildCompensationData(order); // 4. 调用下一个服务(库存服务) InventoryDeductRequest inventoryRequest = buildInventoryRequest(order); transactionCoordinator.invokeNextService( transactionId, "inventory-service", inventoryRequest ); return transactionId; } catch (Exception e) { log.error("创建订单失败: transactionId={}", transactionId, e); transactionCoordinator.triggerCompensation(transactionId); throw new RuntimeException("创建订单失败", e); } } /** * 订单服务补偿操作 */ @Transactional public void compensateOrder(String transactionId, String compensationData) { log.info("执行订单服务补偿: transactionId={}", transactionId); try { // 解析补偿数据 OrderCompensationData data = JSON.parseObject(compensationData, OrderCompensationData.class); // 取消订单(软删除或状态更新) orderMapper.updateStatus(data.getOrderId(), OrderStatus.CANCELLED); // 记录补偿日志 log.info("订单补偿完成: orderId={}", data.getOrderId()); } catch (Exception e) { log.error("订单补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("订单补偿失败", e); } } }
  1. 库存服务实现(中间服务)
java
@Service @Slf4j public class InventoryService { @Autowired private TransactionCoordinator transactionCoordinator; @Autowired private PointService pointService; /** * 库存扣减(事务消息消费者) */ @RocketMQMessageListener( topic = "TransactionTopic", selectorExpression = "inventory-service", consumerGroup = "inventory-consumer-group" ) public class InventoryConsumer implements RocketMQListener<MessageExt> { @Override @Transactional public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); String transactionId = (String) data.get("transactionId"); InventoryDeductRequest request = JSON.parseObject( JSON.toJSONString(data.get("messageBody")), InventoryDeductRequest.class ); // 检查是否补偿消息 if (isCompensationMessage(message)) { compensateInventory(transactionId, request); return; } // 执行库存扣减 boolean success = deductInventory(request); if (success) { // 标记当前步骤完成 transactionCoordinator.completeStep(transactionId); // 调用下一个服务(积分服务) PointAddRequest pointRequest = buildPointRequest(request); transactionCoordinator.invokeNextService( transactionId, "point-service", pointRequest ); } else { // 库存不足,触发补偿 transactionCoordinator.triggerCompensation(transactionId); } } catch (Exception e) { log.error("库存服务处理失败", e); // 触发补偿 String transactionId = extractTransactionId(message); transactionCoordinator.triggerCompensation(transactionId); } } /** * 库存服务补偿操作 */ @Transactional public void compensateInventory(String transactionId, InventoryDeductRequest request) { log.info("执行库存服务补偿: transactionId={}", transactionId); try { // 恢复库存 restoreInventory(request.getProductId(), request.getQuantity()); log.info("库存补偿完成: productId={}, quantity={}", request.getProductId(), request.getQuantity()); } catch (Exception e) { log.error("库存补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("库存补偿失败", e); } } } }
  1. 积分服务实现(最终服务)
java
@Service @Slf4j public class PointService { @RocketMQMessageListener( topic = "TransactionTopic", selectorExpression = "point-service", consumerGroup = "point-consumer-group" ) public class PointConsumer implements RocketMQListener<MessageExt> { @Override @Transactional public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> data = JSON.parseObject(body, Map.class); String transactionId = (String) data.get("transactionId"); PointAddRequest request = JSON.parseObject( JSON.toJSONString(data.get("messageBody")), PointAddRequest.class ); // 检查是否补偿消息 if (isCompensationMessage(message)) { compensatePoints(transactionId, request); return; } // 执行积分增加 boolean success = addPoints(request); if (success) { // 标记整个事务完成 completeTransaction(transactionId); log.info("分布式事务完成: transactionId={}", transactionId); } else { // 积分增加失败,触发补偿 transactionCoordinator.triggerCompensation(transactionId); } } catch (Exception e) { log.error("积分服务处理失败", e); String transactionId = extractTransactionId(message); transactionCoordinator.triggerCompensation(transactionId); } } /** * 积分服务补偿操作 */ @Transactional public void compensatePoints(String transactionId, PointAddRequest request) { log.info("执行积分服务补偿: transactionId={}", transactionId); try { // 扣减积分(恢复原状) deductPoints(request.getUserId(), request.getPoints()); log.info("积分补偿完成: userId={}, points={}", request.getUserId(), request.getPoints()); } catch (Exception e) { log.error("积分补偿失败: transactionId={}", transactionId, e); throw new RuntimeException("积分补偿失败", e); } } } }
  1. 补偿消息消费者
java
@Component @Slf4j public class CompensationConsumer { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private PointService pointService; @RocketMQMessageListener( topic = "CompensationTopic", consumerGroup = "compensation-consumer-group" ) public void handleCompensation(String transactionId) { log.info("处理补偿事务: transactionId={}", transactionId); try { // 按照调用链的逆序执行补偿 // 1. 补偿积分服务 pointService.compensatePoints(transactionId, getPointCompensationData(transactionId)); // 2. 补偿库存服务 inventoryService.compensateInventory(transactionId, getInventoryCompensationData(transactionId)); // 3. 补偿订单服务 orderService.compensateOrder(transactionId, getOrderCompensationData(transactionId)); log.info("补偿流程完成: transactionId={}", transactionId); } catch (Exception e) { log.error("补偿流程执行失败: transactionId={}", transactionId, e); // 记录失败日志,需要人工干预 } } }

核心要点,

  • 每个补偿操作必须保证幂等性,防止重复补偿:

  • 要明确事务的状态机

    java
    public enum TransactionStatus { INITIATED(0, "已初始化"), IN_PROGRESS(1, "进行中"), ORDER_COMPLETED(2, "订单服务完成"), INVENTORY_COMPLETED(3, "库存服务完成"), POINT_COMPLETED(4, "积分服务完成"), SUCCESS(5, "事务成功"), COMPENSATING(6, "补偿中"), COMPENSATED(7, "已补偿"), FAILED(8, "事务失败"); // ... getters and constructors }
  • 超时控制,防止事务长时间挂起

本地消息处理方案:Saga模式 + 补偿事务

java
// 1. 定义补偿消息结构 @Data public class CompensationMessage { private String originalMessageId; // 原始消息ID private String businessId; private String serviceName; // 需要补偿的服务 private String compensationType; // 补偿类型 } // 2. 服务C失败时的处理 @Component public class LogisticsMessageConsumer { public void handleLogisticsMessage(MessageRecord message) { try { logisticsService.createDeliveryOrder(message.getBusinessId()); message.setStatus("SUCCESS"); messageRecordRepository.save(message); } catch (Exception e) { message.setStatus("FAILED"); messageRecordRepository.save(message); // 发送补偿消息 CompensationMessage compensation = new CompensationMessage(); compensation.setOriginalMessageId(message.getMessageId()); compensation.setBusinessId(message.getBusinessId()); compensation.setServiceName("inventory-service"); // 需要补偿的上游服务 compensation.setCompensationType("ROLLBACK_INVENTORY"); rocketMQTemplate.convertAndSend("compensation-topic", compensation); throw e; } } } // 3. 服务B:处理补偿消息 @Component public class InventoryCompensationConsumer { public void handleCompensation(CompensationMessage compensation) { try { // 执行补偿操作:增加库存(回滚扣减操作) inventoryService.increaseInventory(compensation.getBusinessId()); // 发送进一步的补偿消息给服务A CompensationMessage upstreamCompensation = new CompensationMessage(); upstreamCompensation.setBusinessId(compensation.getBusinessId()); upstreamCompensation.setServiceName("order-service"); upstreamCompensation.setCompensationType("ROLLBACK_ORDER_STATUS"); rocketMQTemplate.convertAndSend("compensation-topic", upstreamCompensation); } catch (Exception e) { // 补偿失败需要人工干预或重试 log.error("补偿操作失败", e); // 可以发送告警或记录到人工处理队列 } } } // 4. 服务A:处理最终补偿 @Component public class OrderCompensationConsumer { public void handleCompensation(CompensationMessage compensation) { // 更新订单状态为失败 orderService.updateOrderStatus(compensation.getBusinessId(), "FAILED"); // 可能还需要通知用户或其他业务处理 notificationService.notifyOrderFailed(compensation.getBusinessId()); } }

补偿消息的可靠性保证

java
// 补偿消息表(每个服务都需要) @Data public class CompensationRecord { private String compensationId; private String originalBusinessId; private String serviceName; private String compensationType; private String status; // PENDING, SUCCESS, FAILED private int retryCount; private Date createTime; } // 补偿服务实现 @Service public class CompensationService { @Transactional public void createCompensation(String businessId, String serviceName, String type) { CompensationRecord record = new CompensationRecord(); record.setCompensationId(UUID.randomUUID().toString()); record.setOriginalBusinessId(businessId); record.setServiceName(serviceName); record.setCompensationType(type); record.setStatus("PENDING"); compensationRecordRepository.save(record); // 发送补偿消息 rocketMQTemplate.convertAndSend("compensation-topic", record); } // 定时任务:重试失败的补偿 @Scheduled(fixedRate = 30000) public void retryFailedCompensations() { List<CompensationRecord> failedRecords = compensationRecordRepository.findByStatus("FAILED"); for (CompensationRecord record : failedRecords) { if (record.getRetryCount() < 5) { rocketMQTemplate.convertAndSend("compensation-topic", record); record.setRetryCount(record.getRetryCount() + 1); compensationRecordRepository.save(record); } else { // 超过重试次数,需要人工处理 alertService.sendAlert("补偿失败需要人工处理: " + record.getCompensationId()); } } } }

幂等性保证

java
// 补偿操作的幂等性处理 @Service public class InventoryService { public void increaseInventory(String orderId) { // 检查是否已经执行过补偿 if (compensationRecordRepository.existsByBusinessIdAndType(orderId, "INCREASE_INVENTORY")) { return; // 已经执行过,直接返回 } // 执行补偿逻辑 inventoryRepository.increaseStock(orderId); // 记录补偿执行 CompensationExecution execution = new CompensationExecution(); execution.setBusinessId(orderId); execution.setCompensationType("INCREASE_INVENTORY"); execution.setExecuteTime(new Date()); compensationExecutionRepository.save(execution); } }

总结

通过Saga模式 + RocketMQ事务消息 + 补偿事务的组合方案,我们可以有效处理多服务之间的分布式事务:

  • 正向流程:通过事务消息依次调用各个服务

  • 异常处理:任何服务失败时触发补偿流程

  • 补偿机制:按照调用链的逆序执行补偿操作

  • 可靠性保障:通过幂等性、状态跟踪、超时控制等机制确保数据一致性

  • 监控和运维:建立完整的链路追踪,监控消息处理延迟和失败率,提供人工干预的入口

这种方案虽然实现相对复杂,但能够很好地解决多服务调用的事务一致性问题,是分布式系统中常用的成熟模式。

方案对比与选择建议

特性RocketMQ事务消息本地消息表
实现复杂度中等中等
性能中等
数据一致性
中间件依赖强(RocketMQ)
通用性
运维成本中等中等

选择建议:

  • 如果已经使用RocketMQ:优先考虑事务消息方案,实现更简洁

  • 如果需要跨多种消息中间件:选择本地消息表,更具通用性

  • 对性能要求极高:RocketMQ事务消息性能更优

  • 技术团队熟悉度:选择团队更熟悉的技术方案

最佳实践

幂等性设计:无论使用哪种方案,消费者都必须实现幂等性处理,防止重复消费

  • 使用唯一消息ID进行去重
  • 在业务层面检查处理状态

监控与告警

  • 监控消息积压情况

  • 设置事务失败告警

  • 定期检查补偿任务运行状态

重试策略

  • 实现指数退避重试机制

  • 设置最大重试次数

  • 超过重试次数后人工干预

数据对账:定期执行数据对账任务,发现并修复不一致的数据

java
@Component @Slf4j public class DataReconciliationTask { @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行 public void reconcileData() { // 检查订单与库存的一致性 List<Order> orders = orderService.findInconsistentOrders(); for (Order order : orders) { log.warn("发现数据不一致订单: {}", order.getId()); // 触发补偿流程或通知人工处理 } } }

总结

可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息本地消息表 是两种主流的实现方式。RocketMQ事务消息通过MQ的内置事务机制提供简洁的实现,而本地消息表则通过应用层实现提供更高的灵活性。

在实际项目中,应根据业务特点、系统架构和现有技术栈选择合适的方案。对于事务操作时间短、已使用RocketMQ的系统,RocketMQ事务消息是理想选择;对于需要与MQ解耦、事务操作时间长的系统,本地消息表更为合适。

无论选择哪种方式,都需注意以下关键点:

  • 消息处理的幂等性设计
  • 合理的事务超时和重试机制
  • 业务与消息的解耦设计

通过合理运用这些可靠消息投递的实现方式,我们可以在分布式系统中构建出高可靠、高可用的事务处理机制,为业务系统的稳定运行提供有力保障。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!