上一章我们讲解了AT模式原理及适用场景,本期我们来讲讲TCC模式
1. TCC模式TCC模式,全称Try-Confirm-Cancel,通过名称也能看出来其流程主要有三个步骤:
- 预处理 Try:实现业务检查和资源预留
- 确认/提交 Confirm:业务确认和提交
- 撤销/回滚 Cancel:业务回滚
如果看过上一章的同学,看到这里是不是有点熟悉了,这不也是二阶段提交嘛。TCC模式本身就是二阶段提交的一种改进,不一样的是,这次就没有AT模式那么方便了,因为他需要我们自己写代码来实现了。
1.2 案例理解TCC模式的关键在于理解Try-Confirm阶段,其中Try用来实现业务检查和资源预留,这个概念比较抽象,我们举个例子来看看:
现在我们有一个下订单的操作,订单创建后需要扣减商品库存
1.2.1 Try阶段实现-
1、所需的资源是什么? 我们上面说了,Try要实现业务检查和资源预留,那么这个业务的业务检查和资源预留是什么呢?其实判断标准很简单,就是发生了变化的主体,比如这个业务中订单会新增,库存会扣减,那么订单和商品库存都是所需的资源
-
2、业务检查是什么? 因为订单是新增的,所以不涉及和其他业务的交流,但是库存是基于之前的基础上进行扣减的,那么我们所需做的业务检查就是查看扣减的库存是否足够,也就是在我们进行这个业务的瞬间,是不是有别的业务或者另一个线程已经占用了这个资源。
-
3、资源预留如何操作? 检查库存足够后,再来看资源预留,首先我们需要把订单创建出来,但是并不提交,一个简单的做法就是订单新增后,添加一个状态标注为未提交,未提交状态下的订单是不会被用户查看到的。 另外库存也需要进行预留,做法就是将订单所需的库存进行一个标注冻结,冻结之后的库存不能被其他业务所占用
以微服务视角来看的话,添加未提交的订单和冻结商品库存两个操作都是订单服务和商品服务的本地操作,我们还需要将两个本地操作,或者我们理解为本地事务,当然这里并不需要像AT模式那样,要求数据库支持事务,这里的事务更多通过我们自己来实现的,比如上述的订单状态和库存冻结状态
同时我们需要将两个本地事务提交给一个服务端,由服务端来记录协调。这个服务端就是我们上一章讲的TC(事务协调者)
如上我们就完成Try的过程,然后再来看Confirm的过程,Confirm是个业务确认和提交的过程,当我们的订单新增和库存扣减两个子业务操作都完成且无报错(只是还没有最终提交),那么就可以通知TC,让TC来告诉各个本地事务可以进行提交操作了:
(1)将订单状态更新为已提交,已提交状态的订单数据可以被用户访问到。 (2)冻结的库存取消冻结,正式扣减
1.2.3 Cancel阶段实现如果订单新增操作或者库存冻结操作,或者这两个操作之后的某行代码报错,那么就会通知TC:有问题,赶紧撤退!!
于是乎,TC就会给注册在他上面的RM(本地资源管理者)下达回退的指令。
RM收到后会将操作回退: (1)删除未提交的订单 (2)取消冻结的库存
这里所谓的RM,是我们自己实现的,也就是实现一个cancel方法,该方法实现的操作就是上述的回退操作。
好了,通过这样的一个案例,大家理解TCC模式的概念了吗?
案例总结关键在于体会Try-Confirm-Cancel这三个操作所需要执行的逻辑。当然上述的案例我们实现的操作是非常基础的,更优实现方式还要大家进阶的去做学习了解。
还有一点大家应该注意到:
咱们整个实现TCC的过程,实际上是没有用到全局锁的,这是和AT模式另外一个大的区别。TCC模式更多的是利用本地行锁或者乐观锁、状态区分的形式来实现资源隔离
这产生了一个结论: 那就是TCC模式比AT模式性能高得多
,因为没有全局锁的限制,所以其速度飞快提升。因此呢TCC模式也就适用于对性能有较高的分布式事务场景
TCC模式的实现关键在于拆分二阶段,也就是如何把一步操作拆分为两步,比如上述的库存扣减,本身就是一个update语句,但是TCC下却需要我们拆分为先冻结库存,然后再扣减这部分库存
那么下面我们就结合seata,来看看TCC模式的代码具体实现,我们还是以上述的案例为例:
seata的安装及AT模式的seata实现之前已经讲过了,这里不再重复,如果不清楚的可以翻翻微服务专栏中之前的文章
1、订单服务
注意需要在接口上添加@LocalTCC
注解
订单新增方法上要添加@TwoPhaseBusinessAction
注解,并且声明confirm,cancel方法
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.orderserver.entity.Order;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
* @author whx
* @date 2022/4/30
*/
@LocalTCC
public interface IOrderTccService extends IService {
/**
* @TwoPhaseBusinessAction 描述⼆阶段提交
* name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
* commitMethod: Commit/Confirm⽅法的⽅法名
* rollbackMethod:Rollback/Cancel⽅法的⽅法名
* @BusinessActionContextParamete 该注解可以将参数传递给声明的commitMethod和rollbackMethod,通过BusinessActionContext 获取。
*/
@TwoPhaseBusinessAction(name = "addOrder", commitMethod = "addCommit", rollbackMethod = "addRollBack")
void addOrder(@BusinessActionContextParameter(paramName = "order") Order order);
boolean addCommit(BusinessActionContext context);
boolean addRollBack(BusinessActionContext context);
}
实现类
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.orderserver.entity.Order;
import com.example.orderserver.feign.ProductApi;
import com.example.orderserver.mapper.OrderMapper;
import com.example.orderserver.service.IOrderTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author whx
* @date 2022/4/30
*/
@Slf4j
@Service
@AllArgsConstructor
public class IOrderTccServiceImpl extends ServiceImpl implements IOrderTccService {
// 商品feign调用接口
private final ProductApi productApi;
@Override
public void addOrder(Order order) {
// 设置状态为未提交
order.setStatus(0);
// 新增订单
this.save(order);
// 扣减库存
productApi.reduceInventory(order.getProduct().getId(),order.getProduct().getFrozenInventory());
}
@Override
public boolean addCommit(BusinessActionContext context) {
Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);
int count = this.count(Wrappers.lambdaQuery().eq(Order::getId,order.getId()));
if(count > 0){
order.setStatus(1);
baseMapper.updateById(order);
}
log.info("事务 xid="+context.getXid()+" 提交成功");
return true;
}
@Override
public boolean addRollBack(BusinessActionContext context) {
Order order = JSON.parseObject(context.getActionContext("order").toString(), Order.class);
int count = this.count(Wrappers.lambdaQuery().eq(Order::getId,order.getId()));
if(count > 0){
// 删除数据
this.removeById(order.getId());
}
log.info("事务 xid="+context.getXid()+" 回滚成功");
return true;
}
}
2、商品服务
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.productserver.entity.Product;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
* @author whx
* @date 2022/4/30
*/
@LocalTCC
public interface IProductTccService extends IService{
/**
* @TwoPhaseBusinessAction 描述⼆阶段提交
* name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
* commitMethod: Commit/Confirm⽅法的⽅法名
* rollbackMethod: Rollback/Cancel⽅法的⽅法名
* @BusinessActionContextParamete 该注解可以将参数传递给声明的commitMethod和rollbackMethod,通过BusinessActionContext 获取。
*/
@TwoPhaseBusinessAction(name = "reduceInventory", commitMethod = "reduceCommit", rollbackMethod = "reduceRollBack")
String reduceInventory(@BusinessActionContextParameter(paramName = "productId") Long id,
@BusinessActionContextParameter(paramName = "number") Integer number);
boolean reduceCommit(BusinessActionContext context);
boolean reduceRollBack(BusinessActionContext context);
}
实现类
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.productserver.entity.Product;
import com.example.productserver.mapper.ProductMapper;
import com.example.productserver.service.IProductTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
/**
* @author whx
* @date 2022/4/30
*/
@Slf4j
@Service
public class IProductTccServiceImpl extends ServiceImpl implements IProductTccService {
/**
* Transactional 添加行锁,防止多线程下更新库存时被其他线程读取到脏数据,这样能保证第一次获取时FrozenInventory为0
* @param productId
* @param number
* @return
*/
@Override
@Transactional(isolation = Isolation.READ_COMMITTED)
public String reduceInventory(Long productId,Integer number) {
Product product = this.getOne(Wrappers.lambdaQuery()
.select(Product::getId,Product::getInventory,Product::getFrozenInventory)
.eq(Product::getId,productId));
// 当前可用库存需要排除历史冻结库存
if(product.getInventory() >= number){
// 更新冻结库存
product.setFrozenInventory(number);
this.updateById(product);
return "库存更新成功";
}
return "商品库存不足";
}
@Override
public boolean reduceCommit(BusinessActionContext context) {
Product product = this.getOne(Wrappers.lambdaQuery()
.select(Product::getId,Product::getInventory,Product::getFrozenInventory)
.eq(Product::getId,context.getActionContext("goodsId")));
Integer number = Integer.parseInt(context.getActionContext("number").toString());
if (product != null) {
//扣减库存
product.setInventory(product.getInventory() - number);
product.setFrozenInventory(0);
this.saveOrUpdate(product);
}
log.info("事务 xid="+context.getXid()+" 提交成功");
return true;
}
@Override
public boolean reduceRollBack(BusinessActionContext context) {
Product product = this.getOne(Wrappers.lambdaQuery()
.select(Product::getId,Product::getFrozenInventory)
.eq(Product::getId,context.getActionContext("goodsId")));
Integer number = Integer.parseInt(context.getActionContext("number").toString());
if (product != null) {
// 恢复冻结库存
product.setFrozenInventory(0);
this.saveOrUpdate(product);
}
log.info("事务 xid="+context.getXid()+" 回滚成功");
return true;
}
}
4. TCC模式的补偿措施
4.1 重试机制
我们上述的代码中在confirm,cancel方法中实现了事务的提交和回滚,但是因为是我们自己通过代码实现的,所以还需要考虑一个问题:执行失败后的重试机制
当我们的confirm或者cancel方法也出现报错时,为了保证事务的最终一致性,我们应当做好重试机制处理:
比如将数据发送到MQ,然后再进行接收处理。
4.2 幂等性问题所谓幂等就是操作一次和操作多次的执行效果是一样的。
想象一下,我们的库存扣除操作,如果因为某一步操作报错,导致需要回滚重试,结果每次重试都会重复扣减库存,那这样肯定是不对的。
所以为了保证我们在confirm,cancel中进行的重试机制不会使得我们的资源发生重复消耗,那么需要我们对方法做好幂等性处理:
比如说通过添加状态字段来判断是否执行过
4.3 悬挂问题所谓悬挂问题,就是二阶段模式中,cancel比try先执行
这是怎么导致的呢?
就拿我们上述的案例来假设,在订单服务中调用商品服务的扣减库存方法reduceInventory时,因为是通过RPC(feign)的方式来调用的,那么如果调用时刚好网络堵塞,或者商品服务出现问题,导致调用失败,出现报错,TM会通知TC出现错误,TC会通知所有的RM进行本地事务回滚,也就是执行cancel方法。
当cancel方法执行完成后,try方法偏偏连通了,又执行了,那么就出现了问题,订单会被更新为未提交,但因为事务已经被cancel过了,就不会再执行confirm,也就没有谁再来将资源状态从预处理更新为已处理了。资源就会导致浪费。
这时进行的回滚操作其实并没有真实回滚业务,这个现象我们称之为空回滚
。
所有我们需要针对悬挂问题进行防悬挂处理,方案呢就是限制如果二阶段执行完成,一阶段就不能再执行。
比如执行cancel方法时会判断是否是空回滚,出现空回滚注册事务ID,try方法执行前先检查事务ID是否存在,如果存在则不允许执行
当然这些处理呢,seata已经帮我们实现了,这也是使用现成的分布式事务框架的好处。省心!但是我们自己要知道这些问题和原理。
seata中的解决方案是增加一个事务记录表,在cancel阶段最后往事务记录表中插入一条记录(xid-status)标记cancel阶段已经执行过。此时try阶段进入时发现已经执行过回滚操作,则放弃try阶段的执行
5. TCC模式应用场景其实在上述我们也已经提到了TCC模式相比于AT模式的两大特性:
- 没有了全局锁,性能高
- 自己实现代码,无需依赖于数据库事务性
因此TCC模式适用于
- 对性能有较高要求的业务场景
- 存储库不支持事务的场景
- 支持跨服务:这一点之前没有提到,seata的AT模式是不能跨服务的,但是因为TCC的二阶段代码都是我们自己实现的,所以跨服务跨平台都可以自主实现
以上操作的源码见seata 实现TCC分布式事务源码中的
springcloud2/order-server-feign
springcloud/product-server-feign
注:代码未经严格测试,仅提供思路及方法示例。有兴趣的可以自行验证测试哦
如果这篇文章帮助到你,不妨点个赞支持一下吧