分布式事务
什么是分布式事务?
在微服务架构中,一个业务操作涉及多个服务,需要保证数据一致性。
订单服务 → 创建订单
↓
库存服务 → 扣减库存
↓
账户服务 → 扣减余额
任意一步失败,都需要全部回滚解决方案
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强一致 | 低 | 中 | 传统分布式事务 |
| TCC | 最终一致 | 高 | 高 | 高性能要求 |
| SAGA | 最终一致 | 中 | 高 | 长事务 |
| 本地消息表 | 最终一致 | 高 | 低 | 简单场景 |
| 事务消息 | 最终一致 | 高 | 中 | 异步场景 |
2PC(两阶段提交)
原理
第一阶段:准备阶段
┌─────────────────────────────────────────────┐
│ 协调者(TM) │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ▼ ▼ ▼ │
│ 参与者A 参与者B 参与者C │
│ 准备成功 准备成功 准备成功 │
└─────────────────────────────────────────────┘
第二阶段:提交阶段
┌─────────────────────────────────────────────┐
│ 协调者(TM) │
│ 发送提交指令 │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ▼ ▼ ▼ │
│ 参与者A 参与者B 参与者C │
│ 提交 提交 提交 │
└─────────────────────────────────────────────┘问题
| 问题 | 说明 |
|---|---|
| 阻塞 | 参与者等待协调者指令期间锁定资源 |
| 单点故障 | 协调者宕机,参与者一直阻塞 |
| 数据不一致 | 第二阶段部分参与者收不到提交指令 |
TCC(Try-Confirm-Cancel)
原理
Try:预留资源
┌─────────────────────────────────────────────┐
│ 账户余额:100 元 │
│ 冻结金额:20 元(Try) │
│ 可用余额:80 元 │
└─────────────────────────────────────────────┘
Confirm:确认提交
┌─────────────────────────────────────────────┐
│ 账户余额:80 元(扣除冻结) │
│ 冻结金额:0 元 │
└─────────────────────────────────────────────┘
Cancel:取消预留
┌─────────────────────────────────────────────┐
│ 账户余额:100 元(解冻) │
│ 冻结金额:0 元 │
└─────────────────────────────────────────────┘代码示例
public interface StockTccService {
// Try:预留库存
boolean tryDeduct(String productId, int count);
// Confirm:确认扣减
boolean confirmDeduct(String productId, int count);
// Cancel:取消预留
boolean cancelDeduct(String productId, int count);
}
@Service
public class StockTccServiceImpl implements StockTccService {
@Autowired
private StockMapper stockMapper;
@Override
public boolean tryDeduct(String productId, int count) {
// 检查库存
Stock stock = stockMapper.selectById(productId);
if (stock.getAvailable() < count) {
throw new RuntimeException("库存不足");
}
// 预留库存(冻结)
stockMapper.freeze(productId, count);
return true;
}
@Override
public boolean confirmDeduct(String productId, int count) {
// 确认扣减(清除冻结)
stockMapper.confirmFreeze(productId, count);
return true;
}
@Override
public boolean cancelDeduct(String productId, int count) {
// 取消预留(解冻)
stockMapper.unfreeze(productId, count);
return true;
}
}问题
| 问题 | 说明 | 解决方案 |
|---|---|---|
| 空回滚 | Try 未执行,Cancel 却执行了 | 记录事务状态 |
| 悬挂 | Cancel 比 Try 先执行 | 记录事务状态 |
| 幂等 | 重复执行 Confirm/Cancel | 幂等设计 |
SAGA
原理
正向流程:
T1(订单) → T2(库存) → T3(账户) → 完成
补偿流程:
T1(订单) → T2(库存) → T3(账户-失败)
↓
C3(账户补偿)
↓
C2(库存补偿)
↓
C1(订单补偿)实现方式
// 定义补偿动作
public class OrderSaga {
public void createOrder(Order order) {
// T1: 创建订单
orderService.create(order);
}
public void compensateOrder(Order order) {
// C1: 取消订单
orderService.cancel(order.getId());
}
}
// 编排 SAGA
public class OrderSagaOrchestrator {
public void execute(Order order) {
try {
// T1
orderService.create(order);
// T2
stockService.deduct(order.getProductId(), order.getCount());
// T3
accountService.debit(order.getUserId(), order.getAmount());
} catch (Exception e) {
// 补偿
compensate(order);
}
}
private void compensate(Order order) {
// 按相反顺序执行补偿
accountService.credit(order.getUserId(), order.getAmount());
stockService.add(order.getProductId(), order.getCount());
orderService.cancel(order.getId());
}
}本地消息表
原理
1. 本地事务 + 写入消息表
┌─────────────────────────────────────────────┐
│ 本地事务: │
│ - 扣减库存 │
│ - 写入消息表(待发送) │
└─────────────────────────────────────────────┘
↓
2. 定时任务扫描消息表
┌─────────────────────────────────────────────┐
│ 定时任务: │
│ - 扫描待发送消息 │
│ - 发送到 MQ │
│ - 更新消息状态为已发送 │
└─────────────────────────────────────────────┘
↓
3. 消费者消费消息
┌─────────────────────────────────────────────┐
│ 消费者: │
│ - 接收消息 │
│ - 执行业务逻辑 │
│ - ACK 确认 │
└─────────────────────────────────────────────┘代码示例
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 写入消息表(同一事务)
Message message = new Message();
message.setTopic("stock_deduct");
message.setContent(JSON.toJSONString(order));
message.setStatus("PENDING");
messageMapper.insert(message);
}
}
@Component
public class MessageSender {
@Scheduled(fixedDelay = 1000)
public void sendPendingMessages() {
// 查询待发送消息
List<Message> messages = messageMapper.selectPending();
for (Message message : messages) {
try {
// 发送到 MQ
mqProducer.send(message.getTopic(), message.getContent());
// 更新状态
messageMapper.updateStatus(message.getId(), "SENT");
} catch (Exception e) {
// 发送失败,下次重试
}
}
}
}事务消息(RocketMQ)
// 发送事务消息
public void sendTransactionMessage(Order order) {
Message msg = new Message("order_topic", JSON.toJSONString(order).getBytes());
transactionMQProducer.sendMessageInTransaction(msg, null);
}
// 本地事务执行器
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder(JSON.parseObject(new String(msg.getBody()), Order.class));
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
if (orderService.exists(order.getId())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}面试高频问题
Q1: 分布式事务有哪些方案?
| 方案 | 特点 |
|---|---|
| 2PC | 强一致,阻塞,有单点问题 |
| TCC | 最终一致,高性能,复杂 |
| SAGA | 最终一致,适合长事务 |
| 本地消息表 | 简单,最终一致 |
| 事务消息 | 异步,高性能 |
Q2: TCC 的问题如何解决?
- 空回滚:记录事务状态,Try 未执行时拒绝 Cancel
- 悬挂:记录事务状态,Cancel 先执行时拒绝 Try
- 幂等:每个操作记录执行状态
Q3: SAGA 和 TCC 的区别?
| 对比 | TCC | SAGA |
|---|---|---|
| 性能 | 高 | 中 |
| 复杂度 | 高 | 中 |
| 适用场景 | 短事务 | 长事务 |
| 补偿方式 | Cancel | 反向操作 |
总结
分布式事务核心要点:
1. 2PC:两阶段提交,强一致但阻塞
2. TCC:Try-Confirm-Cancel,高性能
3. SAGA:长事务,补偿机制
4. 本地消息表:简单可靠
5. 事务消息:异步高性能