面试题 mq的,业务系统还没执行完,但是mq已经发了,mq需要业务系统的数据要做计算 Published on Apr 22, 2026 in 随笔 with 0 comment 解决方案 核心思想 优点 缺点 适用场景 1. 事务提交后发送 将send操作移到@Transactional方法外 简单、无侵入 发送失败无法回滚业务 一致性要求不高的场景 2. 本地消息表 业务操作和消息插入在同一本地事务,轮询发送 可靠、不依赖MQ特性 增加数据库负担、轮询有延迟 通用可靠方案 3. RocketMQ事务消息 预发送(半消息) → 执行本地事务 → 提交/回滚 高性能、最终一致性、无需轮询 依赖RocketMQ RocketMQ场景的首选 4. 消费端重试+幂等 消费者如果查不到数据,延迟重试 简单兜底 增加消费延迟 配合上述方案使用 三、RocketMQ事务消息最佳实践(你的加分项) 既然你提到用的是RocketMQ,这个方案是最优解,也是面试中的亮点。它的核心是两阶段提交 + 事务状态回查机制。 1. 工作流程 发送半消息:生产者发送一条“半消息”到Broker,此时消息对消费者不可见。 执行本地事务:生产者执行本地业务操作(如插入订单)。 提交/回滚:根据本地事务执行结果,向Broker发送commit或rollback指令。如果是commit,消息才对消费者可见。 事务回查:如果长时间未收到二次确认,Broker会主动回调生产者,检查本地事务状态,然后决定提交还是回滚 为什么必须设置 TransactionListener? RocketMQ 的事务消息基于两阶段提交,Broker 需要回调你的业务代码来完成两个关键动作: 执行本地事务:半消息发送成功后,RocketMQ 会回调 executeLocalTransaction 方法,让你执行数据库操作 回查事务状态:如果网络超时或生产者宕机,Broker 会回调 checkLocalTransaction 方法,确认本地事务的最终状态 1. 实现 TransactionListener @Component public class OrderTransactionListener implements TransactionListener { @Autowired private OrderMapper orderMapper; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 从消息属性获取业务ID String orderId = msg.getUserProperty("orderId"); // 执行数据库操作 orderMapper.updateStatus(orderId, "SUCCESS"); // 成功 → 提交消息,消费者可见 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { // 失败 → 回滚消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getUserProperty("orderId"); Order order = orderMapper.selectById(orderId); // 根据数据库状态决定最终结果 return order != null ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } } 2. 设置监听器并发送事务消息 @Service public class OrderService { @Autowired private OrderTransactionListener transactionListener; public void createOrderWithTransaction(Order order) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("tx-producer-group"); producer.setNamesrvAddr("localhost:9876"); // 2. 【关键】设置事务监听器 producer.setTransactionListener(transactionListener); producer.start(); // 3. 构建消息 Message msg = new Message("OrderTopic", JSON.toJSONBytes(order)); msg.putUserProperty("orderId", String.valueOf(order.getId())); // 4. 发送事务消息 producer.sendMessageInTransaction(msg, null); } } 3. Spring Boot 中的简化方式 @Component @RocketMQTransactionListener(txProducerGroup = "tx-producer-group") public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务... return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 回查逻辑... return RocketMQLocalTransactionState.COMMIT; } } 然后在业务代码中直接使用 RocketMQTemplate: @Autowired private RocketMQTemplate rocketMQTemplate; public void send() { rocketMQTemplate.sendMessageInTransaction("tx-producer-group", "OrderTopic", message, null); } 本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。