介绍一下分布式事务
介绍一下分布式事务
一句话定义:分布式事务就是保证多个独立微服务 / 数据库操作的原子性,要么全部成功,要么全部回滚,最终达到数据一致性的机制 🔒
为什么会有分布式事务问题?🤔
单体应用:所有操作在一个数据库里,靠本地事务 ACID 就能保证一致性
微服务架构:一个业务操作可能跨多个服务、多个数据库,本地事务失效
经典例子:下单流程 = 扣减库存 + 创建订单 + 扣减余额
- 如果扣减库存成功,创建订单失败 → 库存就少了
- 如果创建订单成功,扣减余额失败 → 用户白嫖了商品
分布式事务的核心理论基础 📚
CAP 定理(分布式系统基石)
- CP:牺牲可用性,保证强一致性(如 ZooKeeper)
- AP:牺牲强一致性,保证可用性(如 Eureka)
BASE 理论(CAP 的妥协方案)
- Basically Available:基本可用
- Soft State:软状态
- Eventually Consistent:最终一致性
👉 绝大多数互联网公司采用最终一致性方案,而不是强一致性
主流分布式事务方案对比 📊
| 方案 | 核心思想 | 一致性级别 | 性能 | 适用场景 | 代表框架 |
|---|---|---|---|---|---|
| 2PC(两阶段提交) | 准备阶段 + 提交阶段 | 强一致性 | 很差 | 传统数据库跨库 | MySQL XA |
| 3PC(三阶段提交) | 2PC 改进版,增加超时机制 | 强一致性 | 差 | 理论多,实际少 | 无主流框架 |
| TCC(补偿事务) | Try-Confirm-Cancel 三阶段 | 最终一致性 | 好 | 核心业务、高并发 | Seata-TCC |
| SAGA(长事务) | 正向操作 + 逆向补偿 | 最终一致性 | 很好 | 长流程、非核心业务 | Seata-SAGA |
| 本地消息表 | 本地事务 + 消息队列 | 最终一致性 | 很好 | 异步场景、对一致性要求不高 | RocketMQ 事务消息 |
| 最大努力通知 | 尽最大努力通知对方,失败则重试 | 最终一致性 | 最好 | 非核心业务、可容忍不一致 | 短信通知、邮件通知 |
大厂最常用的三种方案详解 ✨
1. TCC 模式(Try-Confirm-Cancel)
- 优点:性能高,灵活性强,没有锁阻塞
- 缺点:侵入性强,每个服务都要写三个方法,开发量大
2. 本地消息表 + 消息队列
- 优点:实现简单,性能好,没有分布式锁
- 缺点:消息表和业务表耦合,只适合异步场景
3. Seata AT 模式(阿里开源,最主流)
核心原理:基于本地事务 + 两阶段提交,自动生成回滚日志,对业务代码零侵入
- 优点:开发成本极低,性能好,阿里大厂背书
- 缺点:只能支持关系型数据库,有一定的锁竞争
我在项目中的实际使用经验 💡
- 核心交易链路:使用 Seata AT 模式,保证下单、支付、库存的一致性
- 非核心异步场景:使用 RocketMQ 事务消息,比如积分发放、短信通知
- 高并发秒杀场景:不使用分布式事务,采用 "最终一致性 + 补偿" 方案,先扣减库存,再异步创建订单,失败则库存回滚
总结 📝
分布式事务没有银弹,需要根据业务场景和一致性要求来选择合适的方案:
- 对一致性要求极高 → 用 TCC
- 追求开发效率 → 用 Seata AT
- 异步非核心场景 → 用本地消息表
- 能容忍最终一致 → 优先用最终一致性方案
👉 记住:能用本地事务解决的,绝对不要用分布式事务!分布式事务是万不得已的选择 😅
核心代码实现 💻
1 Seata AT 模式(零侵入首选 | 阿里大厂标准)
技术亮点:仅需一个注解实现分布式事务,业务代码零侵入,自动生成回滚日志
/**
* 订单服务 - 全局事务入口
* 核心亮点:@GlobalTransactional 注解开启全局事务
* 无需修改任何业务逻辑,Seata自动代理所有数据库操作
*/
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockFeignClient stockFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
@Override
@GlobalTransactional(rollbackFor = Exception.class) // 🔥 核心:全局事务注解
public Long createOrder(Long productId, Integer count, Long userId) {
// 1. 远程调用库存服务扣减库存
stockFeignClient.deductStock(productId, count);
// 2. 远程调用账户服务扣减余额
accountFeignClient.deductBalance(userId, productId.getPrice() * count);
// 3. 本地创建订单
Order order = new Order();
order.setProductId(productId);
order.setCount(count);
order.setUserId(userId);
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
return order.getId();
}
}2 TCC 模式(高并发核心业务 | 性能最优)
技术亮点:完全自定义事务逻辑,无数据库锁阻塞,支持非关系型数据库
/**
* 库存服务TCC接口定义
* 核心亮点:@LocalTCC + @TwoPhaseBusinessAction 自动生成TCC代理
*/
@LocalTCC
public interface StockTccService {
/**
* Try阶段:冻结库存(资源预留)
*/
@TwoPhaseBusinessAction(name = "stockTccService", commitMethod = "confirm", rollbackMethod = "cancel")
boolean tryDeductStock(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
/**
* Confirm阶段:真正扣减库存(确认执行)
*/
boolean confirm(BusinessActionContext context);
/**
* Cancel阶段:解冻库存(回滚补偿)
*/
boolean cancel(BusinessActionContext context);
}/**
* TCC实现类(核心逻辑)
* 技术亮点:内置事务状态表解决空回滚、悬挂、幂等性三大经典问题
*/
@Service
public class StockTccServiceImpl implements StockTccService {
@Autowired
private StockMapper stockMapper;
@Autowired
private TccTransactionLogMapper tccLogMapper;
@Override
@Transactional
public boolean tryDeductStock(Long productId, Integer count) {
String xid = RootContext.getXID();
// 🔥 防悬挂:先检查是否已经执行过Cancel
if (tccLogMapper.exists(xid, "CANCEL")) {
return false;
}
// 🔥 幂等性:检查Try是否已经执行过
if (tccLogMapper.exists(xid, "TRY")) {
return true;
}
// 冻结库存
int rows = stockMapper.freezeStock(productId, count);
if (rows == 0) {
throw new RuntimeException("库存不足");
}
// 记录事务状态
tccLogMapper.insert(xid, "TRY", "stock_service");
return true;
}
@Override
@Transactional
public boolean confirm(BusinessActionContext context) {
String xid = context.getXid();
// 幂等性检查
if (tccLogMapper.exists(xid, "CONFIRM")) {
return true;
}
// 真正扣减冻结的库存
Long productId = (Long) context.getActionContext("productId");
Integer count = (Integer) context.getActionContext("count");
stockMapper.confirmDeduct(productId, count);
// 更新事务状态
tccLogMapper.updateStatus(xid, "CONFIRM");
return true;
}
@Override
@Transactional
public boolean cancel(BusinessActionContext context) {
String xid = context.getXid();
// 🔥 防空回滚:检查Try是否执行过
if (!tccLogMapper.exists(xid, "TRY")) {
// 记录Cancel状态,防止后续Try执行(悬挂)
tccLogMapper.insert(xid, "CANCEL", "stock_service");
return true;
}
// 幂等性检查
if (tccLogMapper.exists(xid, "CANCEL")) {
return true;
}
// 解冻库存
Long productId = (Long) context.getActionContext("productId");
Integer count = (Integer) context.getActionContext("count");
stockMapper.unfreezeStock(productId, count);
// 更新事务状态
tccLogMapper.updateStatus(xid, "CANCEL");
return true;
}
}3 RocketMQ 事务消息(异步非核心 | 性能最好)
技术亮点:RocketMQ 原生支持事务消息,保证本地事务与消息发送的原子性
/**
* 订单服务发送事务消息
* 核心亮点:sendMessageInTransaction 方法原子性执行本地事务和消息发送
*/
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Override
public Long createOrder(Long productId, Integer count, Long userId) {
Order order = new Order();
order.setProductId(productId);
order.setCount(count);
order.setUserId(userId);
order.setStatus(OrderStatus.PENDING);
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order_topic",
MessageBuilder.withPayload(order).build(),
order // 传递给事务监听器的参数
);
return order.getId();
}
}/**
* 事务消息监听器(核心逻辑)
* 技术亮点:自动回查机制,解决本地事务执行状态未知的问题
*/
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransactionLogMapper transactionLogMapper;
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
String transactionId = msg.getTransactionId();
// 1. 执行本地事务:创建订单
orderMapper.insert(order);
// 2. 记录事务日志(用于回查)
transactionLogMapper.insert(transactionId, "SUCCESS");
// 提交消息,让消费者消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 回滚本地事务,丢弃消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transactionId = msg.getTransactionId();
// 检查本地事务执行状态
if (transactionLogMapper.exists(transactionId, "SUCCESS")) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}分布式事务核心技术难点及解决方案 ⚠️
| 技术难点 | 问题描述 | 解决方案 ✅ |
|---|---|---|
| 空回滚 | Cancel 阶段执行时,Try 阶段还没执行或执行失败 (如网络超时导致 Try 请求丢失) | 1. 每个分支事务维护事务状态表 2. Cancel 执行前先检查 Try 是否执行成功 3. 未执行则直接返回,并记录 Cancel 状态 |
| 悬挂问题 | Try 阶段超时,全局事务回滚执行 Cancel 之后 Try 请求才到达,导致资源被永久冻结 | 1. Cancel 执行时记录事务状态 2. Try 执行前先检查是否已经执行过 Cancel 3. 如果是则拒绝执行 Try |
| 幂等性问题 | 网络重试导致同一个事务被多次执行 (如 TC 重试调用 Confirm/Cancel) | 1. 使用全局事务 ID (XID) 作为唯一幂等键 2. 执行前先检查事务状态 3. 已执行过则直接返回成功 |
| 数据一致性与性能平衡 | 强一致性方案(2PC)性能极差 最终一致性方案存在数据不一致窗口 | 1. 核心业务用 Seata AT/TCC 2. 非核心业务用本地消息表 / 最大努力通知 3. 采用定时对账 + 人工补偿兜底 |
| 长事务问题 | 全局事务持有数据库锁时间过长 导致并发性能急剧下降 | 1. 拆分长事务为多个短事务 2. 避免在事务中执行远程调用和耗时操作 3. 非核心逻辑异步化 |
| 分布式锁竞争 | 多个事务同时修改同一行数据 导致锁等待超时甚至死锁 | 1. 优化数据库索引,使用行级锁 2. 控制事务粒度,缩短锁持有时间 3. 采用乐观锁代替悲观锁 |
| TC 单点故障 | Seata 的 TC (事务协调器) 挂了 所有全局事务都无法执行 | 1. TC 采用集群部署,使用 Nacos 注册中心 2. 采用数据库存储模式保存事务日志 3. 配置自动故障转移 |
| 数据隔离性问题 | 全局事务未提交时,其他事务能看到中间状态 导致脏读问题 | 1. Seata AT 模式默认使用全局锁 2. 写隔离:全局事务提交前,其他事务不能修改该行数据 3. 读隔离:默认读未提交,可通过配置提升为读已提交 |
项目实战经验总结 📝
- 优先级原则:能用本地事务解决的绝对不要用分布式事务;能用最终一致性解决的绝对不要用强一致性
- 方案选择:
- 90% 的场景用Seata AT 模式(开发效率最高)
- 高并发核心交易用TCC 模式(性能最好)
- 异步非核心场景用RocketMQ 事务消息(吞吐量最高)
- 兜底机制:所有分布式事务方案都必须配合定时对账和人工补偿机制,这是数据一致性的最后一道防线
真实面试模拟
真实面试模拟
🧑💼 面试官:
来,咱们聊个场景设计题——你先给我介绍一下分布式事务吧。
🧑💻 候选人:
好的面试官。分布式事务说白了就是:一次业务操作跨了多个独立的数据库或服务,得保证这些操作要么全成功,要么全回滚,让数据从一种正确状态变到另一种正确状态。
举个典型例子 📦
用户下单 → 订单服务写库 → 库存服务扣库存 → 积分服务加积分。
如果扣库存失败,但订单已经落库了,数据就对不齐,这时候就得靠分布式事务来兜底。
说到这必须提一嘴 CAP 理论 🧠
分布式系统里,一致性、可用性、分区容错性最多同时满足俩。网络分区(P)在生产环境必然存在,所以大部分系统选 AP 或 CP。于是分布式事务通常不追求强一致,而是追求最终一致性(BASE)——基本可用、软状态、最终一致。
落地的话,常见方案我整理了个小表 📊
| 方案 | 一致性 | 吞吐 | 复杂度 | 适用 |
|---|---|---|---|---|
| 2PC/3PC | 强一致 | 低 | 中 | 老系统跨库XA |
| TCC | 最终(业务控) | 高 | 高 | 资金、红包 |
| 可靠消息最终一致 | 最终 | 高 | 中 | 下单、积分链路 |
| Saga | 最终 | 高 | 中高 | 长流程、异构 |
| Seata AT | 最终 | 较高 | 低 | 无侵入式改造 |
时间关系我挑两个最经典的细说下。
① 2PC(两阶段提交)
协调者先问所有参与者:“能提交吗?” → 准备阶段。
全Yes就发提交命令,有一个No就全体回滚。
我画个简图 👇
✅ 强一致,但 ❌ 同步阻塞,协调者单点,性能差,现在互联网场景用得少。
② 可靠消息最终一致(本地消息表 + MQ)
核心思路:上游先落地业务和消息,再用MQ异步通知下游。
典型就是 RocketMQ 的事务消息,我画下流程 👇
- 先发半消息(对下游不可见)
- 执行本地事务,成功则提交半消息,失败则回滚
- 万一本地事务提交成功但确认消息丢了呢?RocketMQ有回查机制,主动反查上游补偿。
- 下游靠唯一业务ID做幂等,重复消费也不会出错。
✅ 吞吐高,解耦,❌ 短暂不一致,但业务上完全能接受。
🧑💼 面试官:
挺好。那你们项目中实际用的是哪种?为什么没用 TCC?
🧑💻 候选人:
我们订单-库存-积分场景用的就是 RocketMQ 事务消息 + 本地消息表。
因为业务允许秒级延迟,对实时一致性要求没那么高,所以最终一致完全够。
TCC 虽然能主动回滚,一致性更强,但每个接口都要实现 Try/Confirm/Cancel,业务侵入太大,开发成本高,而且还要处理空回滚、悬挂这些头疼问题。
权衡下来,消息方案性价比最高,线上跑了两年没出过账不平问题 ✅。
🧑💼 面试官:
那我再问个细节——如果消息已经发出去,但上游本地事务回滚了,这咋办?
🧑💻 候选人:
这个 RocketMQ 事务消息已经规避了 😄
因为流程是先发半消息,这时候消息对下游不可见。本地事务提交了,才把半消息标记为可投递;本地事务回滚了,半消息也跟着回滚,下游根本收不到。
所以不存在“消息发出去了但本地事务回滚”的情况。
🧑💼 面试官:
最后问下,除了这些方案,你还有什么想补充的?
🧑💻 候选人:
再亮三个加分点吧 💡
- 幂等性设计:下游必须用业务唯一键(比如订单号)做数据库唯一索引,防重复消费。
- 兜底扫描:万一消息中间件有问题,定时任务扫描本地消息表补偿,做到万无一失。
- Seata AT 模式:如果不想写太多补偿代码,可以考虑,无侵入自动回滚,但要接受全局锁带来的性能损耗,适合低并发场景。
最后一句心得:分布式事务没有银弹,能最终一致就别强一致,根据业务容忍度选型,这才是落地的关键 👍
🧑💼 面试官:
你刚提到了 RocketMQ 事务消息这个方案,讲得不错。那你能不能写一下核心代码,把技术亮点体现出来?然后再说说这个场景下有哪些技术难点,你们怎么解决的?
🧑💻 候选人:
好的,我直接写关键部分,保证有亮点 ✨
💻 核心代码(可靠消息最终一致)
场景:用户下单,订单服务发送事务消息,库存服务消费扣减。
1. 订单服务:发送事务消息(RocketMQ 事务消息)
// 订单服务 - 发送半消息并执行本地事务
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional // 本地事务:订单表 + 消息表
public void createOrder(OrderDTO orderDTO) {
// 1. 生成唯一业务键(幂等关键)
String orderId = generateOrderId();
// 2. 构造消息
Message<String> message = MessageBuilder.withPayload(orderId).build();
// 3. 发送事务消息(亮点:半消息 + 本地事务 + 回查)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic", // 主题
message, // 消息体
orderDTO // 业务参数,会透传给执行本地事务方法
);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new BusinessException("订单创建失败");
}
}
// 执行本地事务(RocketMQ 回调)
@RocketMQTransactionListener(txProducerGroup = "order-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
OrderDTO orderDTO = (OrderDTO) arg;
try {
// 1. 插入订单表
orderMapper.insert(orderDTO.toOrder());
// 2. 插入本地消息表(后续兜底扫描用)
localMessageMapper.insert(buildLocalMessage(msg));
// 3. 可在此预扣库存、冻结积分等...
return RocketMQLocalTransactionState.COMMIT; // 本地事务成功,提交半消息
} catch (Exception e) {
log.error("本地事务执行失败", e);
return RocketMQLocalTransactionState.ROLLBACK; // 回滚半消息
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 回查:根据消息体里的业务ID查订单表,判断是否落库成功
String orderId = new String((byte[]) msg.getPayload());
Order order = orderMapper.selectByOrderId(orderId);
if (order != null) {
return RocketMQLocalTransactionState.COMMIT; // 已落库,补提交
}
return RocketMQLocalTransactionState.ROLLBACK; // 未查到,补回滚
}
}
}✨ 技术亮点:
- 利用
sendMessageInTransaction实现半消息机制,确保本地事务和消息投递原子性。 - 实现
RocketMQLocalTransactionListener,包含executeLocalTransaction和checkLocalTransaction,完美应对网络闪断、回调丢失。 - 本地事务中同时写订单表 + 本地消息表,给定时兜底扫描留下痕迹。
2. 库存服务:消费消息,扣减库存(幂等设计)
// 库存服务 - 消费订单消息,扣减库存
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "stock-group")
public class StockDeductConsumer implements RocketMQListener<String> {
@Autowired
private StockService stockService;
@Override
public void onMessage(String orderId) {
// 幂等性保证:根据orderId判断是否已处理
if (stockService.isAlreadyDeducted(orderId)) {
log.warn("重复消息,已跳过处理: {}", orderId);
return;
}
// 开始扣库存(本地事务)
try {
stockService.deductStock(orderId);
// 内部实现:使用唯一索引避免重复扣减
// INSERT INTO stock_deduct_log (order_id) VALUES (#{orderId})
// 如果插入重复,唯一索引报错,捕获异常后跳过即可
} catch (DuplicateKeyException e) {
log.info("幂等拦截,重复扣库存请求: {}", orderId);
}
}
}✨ 技术亮点:
- 唯一业务键幂等:通过
order_id写扣减流水表,数据库加唯一索引,天然的防重手段。 - 异常处理不吞错,打印日志后继续消费,保证不阻塞后续消息。
3. 兜底扫描:定时任务补偿
// 定时任务 - 扫描本地消息表,补偿未成功的消息
@Component
public class LocalMessageCompensateJob {
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(cron = "0 0/5 * * * ?") // 每5分钟扫描
public void compensate() {
// 查询状态为“待发送”且超过2分钟未更新的消息
List<LocalMessage> messages = localMessageMapper.selectPendingMessages(2);
for (LocalMessage msg : messages) {
try {
// 重新发送到MQ
Message<String> mqMsg = MessageBuilder.withPayload(msg.getOrderId()).build();
rocketMQTemplate.syncSend("order-topic", mqMsg);
// 更新本地消息状态为已发送
localMessageMapper.updateStatus(msg.getId(), MessageStatus.SENT);
} catch (Exception e) {
log.error("补偿发送失败: {}", msg.getOrderId(), e);
}
}
}
}✨ 技术亮点:
- 双保险:即使 RocketMQ 回查机制全部失效,定时扫描也能保证消息最终投递。
- 生产环境可加分布式锁,避免多实例重复发送。
⚠️ 场景对应的技术难点 & 解决方案
这个下单 → 库存场景,我们踩过不少坑,我归纳成 四个难点 🚧
| 技术难点 | 详细描述 | 解决方案 |
|---|---|---|
| 1. 消息可靠投递 | 本地事务提交了,消息发丢了;或者消息发了,本地事务回滚。怎么保证一个成功另一个必定成功? | 采用 RocketMQ 事务消息(半消息+回查)。先发对下游不可见的半消息,本地事务提交后才标为可投递。如果本地回滚,半消息也回滚,下游根本收不到。回查机制解决网络超时导致的半消息悬挂问题。 |
| 2. 下游消费幂等 | MQ 保证至少一次投递,重复消费会导致重复扣库存,超卖。 | 为每个业务操作建立唯一标识(订单号),下游写入扣减流水表时用 order_id 做唯一索引。插入重复直接捕获 DuplicateKeyException 跳过。 |
| 3. 回查逻辑设计 | checkLocalTransaction 被调时,怎么知道本地事务到底成功了没?查不到数据是没执行?还是执行了但还没来得及落库? | 回查时反查订单表(或本地消息表),根据业务键查数据是否存在。如果存在说明事务成功 → 返回 COMMIT;不存在则 ROLLBACK。为了防“执行中”的中间态,回查方法要加适当重试或等待。 |
| 4. 分布式事务超时 | 订单服务本地事务成功,但库存服务消费消息后,由于自身故障(比如数据库连接满)长时间未处理,导致用户看订单已生成但库存没变。 | ① 设置合理的消费超时和重试机制;② 在库存服务做好死信队列处理,防止阻塞主队列;③ 配合定时扫描本地消息表,对长时间未消费成功的业务进行人工或自动补偿(如通知用户、退款)。 |
| 5. 空回滚(TCC特有) | 如果直接使用 TCC,Cancel 比 Try 先到怎么办?或者 Cancel 被重复调用? | 在 TCC 场景需允许空回滚:Cancel 执行时若没有 Try 对应的预留资源,直接返回成功。同时 Cancel 操作也要幂等。我们之前的架构没用 TCC,但此点易被追问,要能答出。 |
🧑💼 面试官:
不错,代码写得重点突出,难点分析也到位,尤其是把幂等和回查讲透了 👍
🧑💻 候选人:
谢谢面试官,我再补充个实际生产上的小经验:我们当初上线后,凌晨时段数据库慢查询导致本地事务执行超过 MQ 默认回查时间,结果产生了大量回查调用。后来我们把本地事务执行超时阈值调高,并把回查方法改成只查数据不写操作,尽量轻量,完美解决 😄
🧑💼 面试官:
好,这个经验很有价值,面试到这儿,你回去等通知吧。
