前言:
在之前的博客中 https://blog.csdn.net/qq_26323323/article/details/81908955 我们介绍了Spring事务的使用及其传播机制的使用。
本文就要从源码的角度来分析下Spring事务的实现方式
1.SpringTransaction的使用
请参考笔者另一篇博客 https://blog.csdn.net/qq_26323323/article/details/81908955 ,里面有详细用法
2.写在源码分析之前
如果是我们,我们应该如何来实现事务?
什么是事务?事务用在什么地方?如何创建?事务有哪些基本操作?
1)什么是事务?
事务是一系列数据库操作的集合,在一个事务里,所有有关的数据库操作一起提交或一起回滚
2)事务用在什么地方?
如果多个数据库操作需要一起生效或一起失效,那么这些操作需要放在一个事务里面
3)事务如何创建?
用户创建了针对数据库操作的连接(java.sql.Connection)之后,就可以针对Connection进行事务的操作,事务依赖于连接
4)事务的基本操作?
开启事务:Connection.setAutoCommit(false);关闭自动提交则就开启了事务
提交事务:Connection.commit();
回滚事务:Connection.rollback();
那么,Spring帮我们做的就应该是这些基本操作,在我们方法开始的时候,关闭自动提交;在方法正常结束的时候,提交事务;在方法异常的时候,回滚事务;
问题来了:Spring是怎么帮我们做的呢?
按照Spring的特性,应该是代理了当前方法,然后在方法开始、方法结束、方法异常的时候调用事务的操作,下面我们就来看下是否这个方式来做的
3.SpringTransaction源码结构分析
我们在beans.xml中开启事务的应用,需要添加
我们就从这句注解上来分析其功能
1)注解的分析
但凡这种注解,都有对应的解析器,从之前分析AOP功能的源码可知,解析器都实现了NamespaceHandlerSupport类,我们来获取下NamespaceHandlerSupport的实现类都有哪些
看名字就是TxNamespaceHandler类,我们来看下这个类有哪些内容
2)TxNamespaceHandler
public class TxNamespaceHandler extends NamespaceHandlerSupport {
...
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
// 这句代码负责解析annotation-driven
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
Spring会默认调用其init()方法,annotation-driven对应的是AnnotationDrivenBeanDefinitionParser解析器,我们来看下这个解析器的作用
3)AnnotationDrivenBeanDefinitionParser的作用分析
public BeanDefinition parse(Element element, ParserContext parserContext) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// 默认为proxy模式
// 所以要执行该句
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
4)AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
// 在5)中单独分析该句代码的功能
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
// txAdvisorBeanName值为 org.springframework.transaction.config.internalTransactionAdvisor
String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
Object eleSource = parserContext.extractSource(element);
// 1.注册类AnnotationTransactionAttributeSource到Spring中
RootBeanDefinition sourceDef = new RootBeanDefinition(
"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
sourceDef.setSource(eleSource);
sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
// 2.注册类TransactionInterceptor到Spring中
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
interceptorDef.setSource(eleSource);
interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
registerTransactionManager(element, interceptorDef);
interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
// 2.注册类BeanFactoryTransactionAttributeSourceAdvisor到Spring中
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {
advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
}
}
5)AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
public static void registerAutoProxyCreatorIfNecessary(
ParserContext parserContext, Element sourceElement) {
// 主要是这句话
BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
parserContext.getRegistry(), parserContext.extractSource(sourceElement));
useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
registerComponentIfNecessary(beanDefinition, parserContext);
}
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) {
// 重点在这里,主要是将InfrastructureAdvisorAutoProxyCreator注册进来
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}
总结:通过以上的分析可知,的主要功能就是将以下四个类注册到Spring容器中
* AnnotationTransactionAttributeSource
* TransactionInterceptor(主要的拦截功能都在这里实现)
* BeanFactoryTransactionAttributeSourceAdvisor(创建bean的代理类的时候该Advisor会被用上)
* InfrastructureAdvisorAutoProxyCreator
读过笔者关于SpringCache源码解析 https://blog.csdn.net/qq_26323323/article/details/81700626 的读者,应该可以看到,这里的分析与SpringCache的分析特别类似,也确实,Spring的实现方式都是类似的,分析越多的源码,越会有这种感觉。
4.InfrastructureAdvisorAutoProxyCreator功能分析
其实现了BeanPostProcessor接口,则则Spring在创建bean的时候,会默认调用InfrastructureAdvisorAutoProxyCreator的postProcessAfterInitialization()方法,就是在这个方法中创建代理类的,下面我们来看下这个方法
1)AbstractAutoProxyCreator.postProcessAfterInitialization()
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (!this.earlyProxyReferences.contains(cacheKey)) {
// 重要方法
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
// wrapIfNecessary()
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
...
// 1.获取当前类的所有切面拦截类,在2)中详细分析
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
// 2.如果拦截类不为空,则需要创建当前类的代理类
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
// 3.创建代理类,在3)中详细分析
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
以上逻辑类似于之前分析的AOP源码,读者也可以先看下 https://blog.csdn.net/qq_26323323/article/details/81012855
2)getAdvicesAndAdvisorsForBean()获取当前类的所有切面拦截器
本方法为抽象方法,实现由子类AbstractAdvisorAutoProxyCreator实现
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource targetSource) {
List advisors = findEligibleAdvisors(beanClass, beanName);//重点在这里
if (advisors.isEmpty()) {
return DO_NOT_PROXY;
}
return advisors.toArray();
}
// findEligibleAdvisors()
protected List findEligibleAdvisors(Class beanClass, String beanName) {
// 1.获取所有的Advisor
List candidateAdvisors = findCandidateAdvisors();
// 2.获取适合当前类的Advisor
List eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
extendAdvisors(eligibleAdvisors);
if (!eligibleAdvisors.isEmpty()) {
eligibleAdvisors = sortAdvisors(eligibleAdvisors);
}
return eligibleAdvisors;
}
* findCandidateAdvisors()获取所有的Advisor
// findCandidateAdvisors()
protected List findCandidateAdvisors() {
return this.advisorRetrievalHelper.findAdvisorBeans();
}
//findAdvisorBeans()
public List findAdvisorBeans() {
// Determine list of advisor bean names, if not cached already.
String[] advisorNames = null;
synchronized (this) {
// 1.cachedAdvisorBeanNames=org.springframework.transaction.config.internalTransactionAdvisor
advisorNames = this.cachedAdvisorBeanNames;
if (advisorNames == null) {
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the auto-proxy creator apply to them!
advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
this.beanFactory, Advisor.class, true, false);
this.cachedAdvisorBeanNames = advisorNames;
}
}
if (advisorNames.length == 0) {
return new LinkedList();
}
List advisors = new LinkedList();
for (String name : advisorNames) {
if (isEligibleBean(name)) {
if (this.beanFactory.isCurrentlyInCreation(name)) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping currently created advisor '" + name + "'");
}
}
else {
try {
// 2.从工厂中获取cachedAdvisorBeanNames对应的bean
// 在上述代码,可知,
// org.springframework.transaction.config.internalTransactionAdvisor这个名称对应bean为
// BeanFactoryTransactionAttributeSourceAdvisor类
advisors.add(this.beanFactory.getBean(name, Advisor.class));
}
catch (BeanCreationException ex) {
Throwable rootCause = ex.getMostSpecificCause();
if (rootCause instanceof BeanCurrentlyInCreationException) {
BeanCreationException bce = (BeanCreationException) rootCause;
if (this.beanFactory.isCurrentlyInCreation(bce.getBeanName())) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping advisor '" + name +
"' with dependency on currently created bean: " + ex.getMessage());
}
// Ignore: indicates a reference back to the bean we're trying to advise.
// We want to find advisors other than the currently created bean itself.
continue;
}
}
throw ex;
}
}
}
}
return advisors;
}
总结2):适合当前类的Advisor最终为BeanFactoryTransactionAttributeSourceAdvisor类,也就是我们之前在分析中注册的BeanFactoryTransactionAttributeSourceAdvisor bean
3)createProxy( Class beanClass, String beanName, Object[] specificInterceptors, TargetSource targetSource)
创建代理类
这里笔者就不再继续分析了,具体读者可参考 https://blog.csdn.net/qq_26323323/article/details/81012855 博文中创建proxy过程分析一节
总结4:InfrastructureAdvisorAutoProxyCreator的主要作用就是实现了BeanPostProcessor接口,那么Spring的每个bean在创建的过程中,都需要调用其postProcessAfterInitialization()方法,在这个方法中查询出所有适合当前类的Advisor,然后创建当前类的代理类,并将Advisor封装进来,在以后调用当前类的方法时使用
5.代理类invoke()方法调用
通过以上分析可知,Spring为我们创建的BlogServiceImpl bean实际是一个关于BlogServiceImpl的代理类,在调用BlogServiceImpl的相关方法时,实际调用的是代理类的invoke()方法,下面我们就来分析下,invoke()方法被调用的过程,具体了解下我们的缓存是如何工作的
由于本例是JDKProxy创建的方式,而非CGLIBProxy的创建方式,所以BlogServiceImpl的代理类为JdkDynamicAopProxy,下面看下其invoke()方法
1)JdkDynamicAopProxy.invoke()
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Class targetClass = null;
Object target = null;
try {
...
Object retVal;
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
if (target != null) {
targetClass = target.getClass();
}
// 1.获取当前方法的拦截器链,也就是Advisor列表
// 最终返回的是当前Advisor的拦截器MethodInterceptor列表
// 在2)中详细分析
List chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// 2.如果拦截器链为空,说明当前方法没有缓存注解,直接调用方法即可
if (chain.isEmpty()) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
// 3.说明当前方法有缓存注解,则需要先调用拦截器链的方法
else {
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// 真正的调用在这里
// 在3)中详细分析
retVal = invocation.proceed();
}
...
return retVal;
}
...
}
2)AdvisedSupport.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);获取当前方法的拦截器链
public List getInterceptorsAndDynamicInterceptionAdvice(Method method, Class targetClass) {
MethodCacheKey cacheKey = new MethodCacheKey(method);
List cached = this.methodCache.get(cacheKey);
if (cached == null) {
// 真正的实现在这里
cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(
this, method, targetClass);
this.methodCache.put(cacheKey, cached);
}
return cached;
}
//DefaultAdvisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice()
public List getInterceptorsAndDynamicInterceptionAdvice(
Advised config, Method method, Class targetClass) {
// This is somewhat tricky... We have to process introductions first,
// but we need to preserve order in the ultimate list.
List interceptorList = new ArrayList(config.getAdvisors().length);
Class actualClass = (targetClass != null ? targetClass : method.getDeclaringClass());
boolean hasIntroductions = hasMatchingIntroductions(config, actualClass);
AdvisorAdapterRegistry registry = GlobalAdvisorAdapterRegistry.getInstance();
// 1.遍历当前bean的所有Advisor
// 就当前示例而言,只有一个Advisor,就是之前创建的BeanFactoryTransactionAttributeSourceAdvisor
for (Advisor advisor : config.getAdvisors()) {
if (advisor instanceof PointcutAdvisor) {
// Add it conditionally.
PointcutAdvisor pointcutAdvisor = (PointcutAdvisor) advisor;
if (config.isPreFiltered() || pointcutAdvisor.getPointcut().getClassFilter().matches(actualClass)) {
// 2.获取Advisor的Interceptor,也就是在分析时
// 被添加到BeanFactoryTransactionAttributeSourceAdvisor类的TransactionInterceptor类
MethodInterceptor[] interceptors = registry.getInterceptors(advisor);
MethodMatcher mm = pointcutAdvisor.getPointcut().getMethodMatcher();
if (MethodMatchers.matches(mm, method, actualClass, hasIntroductions)) {
if (mm.isRuntime()) {
// Creating a new object instance in the getInterceptors() method
// isn't a problem as we normally cache created chains.
for (MethodInterceptor interceptor : interceptors) {
interceptorList.add(new InterceptorAndDynamicMethodMatcher(interceptor, mm));
}
}
else {
interceptorList.addAll(Arrays.asList(interceptors));
}
}
}
}
else if (advisor instanceof IntroductionAdvisor) {
IntroductionAdvisor ia = (IntroductionAdvisor) advisor;
if (config.isPreFiltered() || ia.getClassFilter().matches(actualClass)) {
Interceptor[] interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
}
else {
Interceptor[] interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
}
return interceptorList;
}
所以,拦截器链最终返回的是我们之前分析的TransactionInterceptor类
3)ReflectiveMethodInvocation.proceed()拦截器链的调用分析
// InterceptedMethodInvocation.proceed()
public Object proceed() throws Throwable {
try {
// 主要就是遍历调用Interceptor的invoke方法
return index == interceptors.length
? methodProxy.invokeSuper(proxy, arguments)
: interceptors[index].invoke(
new InterceptedMethodInvocation(proxy, methodProxy, arguments, index + 1));
} catch (Throwable t) {
pruneStacktrace(t);
throw t;
}
}
那我们看下TransactionInterceptor.invoke()方法
6.TransactionInterceptor.invoke()
经上分析,代理类首先调用TransactionInterceptor.invoke(),然后再执行业务方法
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 1.获取被代理类
Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 2.真正方法执行
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
// 3.回调方法
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
// invokeWithinTransaction()
protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation)
throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 1.声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 2.编程式事务(这个我们暂不分析,实际逻辑与声明式一致)
else {
...
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
我们先暂时分析到这,关于TransactionInterceptor.invoke()方法的具体内容我们在下一篇博客中继续详细分析
总结:
事务功能的实现实际也是通过Spring代理来实现的。生成当前类的代理类,调用代理类的invoke()方法,在invoke()方法中调用TransactionInterceptor拦截器的invoke()方法
重要操作流程如下:
1)解析,将InfrastructureAdvisorAutoProxyCreator注入到Spring容器中,该类的作用是在Spring创建bean实例的时候,会执行其postProcessAfterInitialization()方法,生成bean实例的代理类
2)解析,将BeanFactoryTransactionAttributeSourceAdvisor类注入到Spring容器中,该类的主要作用是作为一个Advisor添加到上述代理类中
3)BeanFactoryTransactionAttributeSourceAdvisor类拥有对TransactionInterceptor的依赖,TransactionInterceptor作为一个方法拦截器,负责对执行方法的拦截
4)当前类方法调用被拦截到TransactionInterceptor后,TransactionInterceptor会调用invoke方法,来真正实现事务功能