您当前的位置: 首页 >  spring

科技D人生

暂无认证

  • 0浏览

    0关注

    1550博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spring学习总结(29)——Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)

科技D人生 发布时间:2019-12-02 11:43:16 ,浏览量:0

在开发过程中,我们会遇到很多使用线程池的业务场景,例如异步短信通知、异步记录操作日志。大多数使用线程池的场景,就是会将一些可以进行异步操作的业务放在线程池中去完成。例如在生成订单的时候给用户发送短信,生成订单的结果不应该被发送短信的成功与否所左右,也就是说生成订单这个主操作是不依赖于发送短信这个操作,所以我们就可以把发送短信这个操作置为异步操作。那么本文就是来看看Spring中提供的优雅的异步处理方案:在Spring3中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。需要注意的是这些功能都是Spring Framework提供的,而非SpringBoot。因此下文的讲解都是基于Spring Framework的工程。Spring中用@Async注解标记的方法,称为异步方法,它会在调用方的当前线程之外的独立的线程中执行,其实就相当于我们自己new Thread(()-> System.out.println("hello world !"))这样在另一个线程中去执行相应的业务逻辑。

Demo

// @Async 若把注解放在类上或者接口上,那么他所有的方法都会异步执行了~~~~(包括私有方法)
public interface HelloService {
    Object hello();
}

@Service
public class HelloServiceImpl implements HelloService {

    // 注意此处加上了此注解
    @Async
    @Override
    public Object hello() {
        System.out.println("当前线程:" + Thread.currentThread().getName());
        return "service hello";
    }
}
然后只需要在配置里,开启对异步的支持即可:
// 开启异步注解的支持
@Configuration
@EnableAsync 
public class RootConfig {

}

输出如下(当前线程名):

当前线程:SimpleAsyncTaskExecutor-1

可以很明显的发现,它使用的是线程池SimpleAsyncTaskExecutor,这也是Spring默认给我们提供的线程池(其实它不是一个真正的线程池,后面会有讲述)。下面原理部分讲解后,你就能知道怎么让它使用我们自定义的线程池了~~~

@Async注解使用细节

  1. @Async注解一般用在方法上,如果用在类上,那么这个类所有的方法都是异步执行的;
  2. @Async可以放在任何方法上,哪怕你是private的(若是同类调用,请务必注意注解失效的情况~~~)
  3. 所使用的@Async注解方法的类对象应该是Spring容器管理的bean对象
  4. @Async可以放在接口处(或者接口方法上)。但是只有使用的是JDK的动态代理时才有效,CGLIB会失效。因此建议:统一写在实现类的方法上
  5. 需要注解@EnableAsync来开启异步注解的支持
  6. 若你希望得到异步调用的返回值,请你的返回值用Futrue变量包装起来

需要额外导入哪些Jar包?

它的依赖包非常简单,只依赖一些Spring的核心包外加spring-aop,但是如果你已经导入了spring-webmvc这个jar,那就什么不需要额外导入了,因为都有了:

备注:它虽然依赖于Spring AOP,但是它并不需要导入aspectjweaver,因为它和AspectJ没有半毛钱关系

原理、源码解析

@EnableAsync

它位于的包名为org.springframework.scheduling.annotation,jar名为:spring-context

@EnableXXX这种设计模式之前有分析过多次,这个注解就是它的入口,因此本文也一样,从入口处一层一层的剖析:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	//默认情况下,要开启异步操作,要在相应的方法或者类上加上@Async注解或者EJB3.1规范下        
    @Asynchronous注解。
	//这个属性使得开发人员可以自己设置开启异步操作的注解(可谓非常的人性化了,但是大多情况下用Spring的就足够了)
	Class annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
		Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
		
		// 根据类型,拿到该类型的这个注解,然后转换为AnnotationAttributes
		AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
		if (attributes == null) {
			throw new IllegalArgumentException(String.format( "@%s is not present annType.getSimpleName(), importingClassMetadata.getClassName()));
		}

		// 拿到AdviceMode,最终交给子类,让她自己去实现 决定导入哪个Bean吧
		AdviceMode adviceMode = attributes.getEnum(this.getAdviceModeAttributeName());
		String[] imports = selectImports(adviceMode);
		if (imports == null) {
			throw new IllegalArgumentException(String.format("Unknown AdviceMode: '%s'", adviceMode));
		}
		return imports;
	}

	// 子类去实现 具体导入哪个Bean
	@Nullable
	protected abstract String[] selectImports(AdviceMode adviceMode);
}

改抽象提供了支持AdviceMode的较为通用的实现,若我们自己想自定义,可以考虑实现此类。

由此可议看出,@EnableAsync最终是向容器内注入了ProxyAsyncConfiguration这个Bean。由名字可议看出,它是一个配置类。

ProxyAsyncConfiguration

// 它是一个配置类,角色为ROLE_INFRASTRUCTURE 框架自用的Bean类型
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	// 它的作用就是诸如了一个AsyncAnnotationBeanPostProcessor,它是个BeanPostProcessor
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		
		// customAsyncAnnotation:自定义的注解类型
		// AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation") 为拿到该注解该字段的默认值
		Class, Boolean> eligibleBeans = new ConcurrentHashMap(256);
 
	// 当遇到一个pre-object的时候,是否把该processor所持有得advisor放在现有的增强器们之前执行
	// 默认是false,会放在最后一个位置上的
	public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
		this.beforeExistingAdvisors = beforeExistingAdvisors;
	}
	
	// 不处理
	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) {
		return bean;
	}

	// Bean已经实例化、初始化完成之后执行。
	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		// 忽略AopInfrastructureBean的Bean,并且如果没有advisor也会忽略不处理~~~~~
		if (bean instanceof AopInfrastructureBean || this.advisor == null) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		
		// 如果这个Bean已经被代理过了(比如已经被AOP切中了),那本处就无需再重复创建代理了嘛
		// 直接向里面添加advisor就成了
		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			// 注意此advised不能是已经被冻结了的。且源对象必须是Eligible合格的
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				// Add our local Advisor to the existing proxy's Advisor chain...
				// 把自己持有的这个advisor放在首位(如果beforeExistingAdvisors=true)
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				// 否则就是尾部位置
				else {
					advised.addAdvisor(this.advisor);
				}
				// 最终直接返回即可,因为已经没有必要再创建一次代理对象了
				return bean;
			}
		}

		// 如果这个Bean事合格的(此方法下面有解释) 这个时候是没有被代理过的
		if (isEligible(bean, beanName)) {
			// 以当前的配置,创建一个ProxyFactory 
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			// 如果不是使用CGLIB常见代理,那就去分析出它所实现的接口们 然后放进ProxyFactory 里去
			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
			
			// 切面就是当前持有得advisor
			proxyFactory.addAdvisor(this.advisor);
			// 留给子类,自己还可以对proxyFactory进行自定义~~~~~
			customizeProxyFactory(proxyFactory);
			// 最终返回这个代理对象~~~~~
			return proxyFactory.getProxy(getProxyClassLoader());
		}

		// No async proxy needed.(相当于没有做任何的代理处理,返回原对象)
		return bean;
	}
	
	// 检查这个Bean是否是合格的
	protected boolean isEligible(Object bean, String beanName) {
		return isEligible(bean.getClass());
	}
	protected boolean isEligible(Class targetClass) {
		// 如果已经被缓存着了,那肯定靠谱啊
		Boolean eligible = this.eligibleBeans.get(targetClass);
		if (eligible != null) {
			return eligible;
		}
		// 如果没有切面(就相当于没有给配置增强器,那铁定是不合格的)
		if (this.advisor == null) {
			return false;
		}
	
		// 这个重要了:看看这个advisor是否能够切入进targetClass这个类,能够切入进取的也是合格的
		eligible = AopUtils.canApply(this.advisor, targetClass);
		this.eligibleBeans.put(targetClass, eligible);
		return eligible;
	}

	// 子类可以复写。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就复写了这个方法~~~
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
		ProxyFactory proxyFactory = new ProxyFactory();
		proxyFactory.copyFrom(this);
		proxyFactory.setTarget(bean);
		return proxyFactory;
	}

	// 子类复写~
	protected void customizeProxyFactory(ProxyFactory proxyFactory) {
	}

}

看看它的继承图:

MethodValidationPostProcessor属于JSR-303校验方面的范畴,不是本文的内容,因此不会讲述

AbstractBeanFactoryAwareAdvisingPostProcessor

从名字可以看出,它相较于父类,就和BeanFactory有关了,也就是和Bean容器相关了~~~

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
		implements BeanFactoryAware {

	// Bean工厂
	@Nullable
	private ConfigurableListableBeanFactory beanFactory;

	// 如果这个Bean工厂不是ConfigurableListableBeanFactory ,那就set一个null
	// 我们的`DefaultListableBeanFactory`显然就是它的子类~~~~~
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
				(ConfigurableListableBeanFactory) beanFactory : null);
	}

	
	@Override
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
		// 如果Bean工厂是正常的,那就把这个Bean expose一个特殊的Bean,记录下它的类型
		if (this.beanFactory != null) {
			AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
		}

		ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
		
		// 这里创建代理也是和`AbstractAutoProxyCreator`差不多的逻辑。
		// 如果没有显示的设置为CGLIB,并且toProxyUtils.shouldProxyTargetClass还被暴露过时一个特殊的Bean,那就强制使用CGLIB代理吧 这里一般和Scope无关的话,都返回false了
		if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
				AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
			proxyFactory.setProxyTargetClass(true);
		}
		return proxyFactory;
	}

}

下面就可以看看具体的两个实现类了:

AsyncAnnotationBeanPostProcessor

该实现类就是具体和@Async相关的一个类了~~~

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

	// 建议换成AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME 这样语意更加的清晰些
	public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;

	// 注解类型
	@Nullable
	private Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		// 注意:此处是getMostSpecificMethod 拿到最终要执行的那个方法
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		// 桥接方法~~~~~~~~~~~~~~
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

		// determine一个用于执行此方法的异步执行器
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}

		// 构造一个任务,Callable(此处不采用Runable,因为需要返回值)
		Callable task = () -> {
			try {
				// result就是返回值
				Object result = invocation.proceed();
				// 注意此处的处理~~~~ 相当于如果不是Future类型,就返回null了
				if (result instanceof Future) {
					return ((Future) result).get();
				}
			}
			// 处理执行时可能产生的异常~~~~~~
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		
		// 提交任务~~~~invocation.getMethod().getReturnType()为返回值类型
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}
	
}

SimpleAsyncTaskExecutor:异步执行用户任务的SimpleAsyncTaskExecutor。每次执行客户提交给它的任务时,它会启动新的线程,并允许开发者控制并发线程的上限(concurrencyLimit),从而起到一定的资源节流作用。默认时,concurrencyLimit取值为-1,即**不启用**资源节流 所以它不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程(因此建议我们在使用@Aysnc的时候,自己配置一个线程池,节约资源)

AnnotationAsyncExecutionInterceptor

很显然,他是在AsyncExecutionInterceptor的基础上,提供了对@Async注解的支持。所以它是继承它的。

// 它继承自AsyncExecutionInterceptor ,只复写了一个方法
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
	...
	// 由此可以见它就是去拿到@Async的value值。以方法的为准,其次是类上面的
	// 备注:发现这里是不支持EJB的@Asynchronous注解的,它是不能指定执行器的
	@Override
	@Nullable
	protected String getExecutorQualifier(Method method) {
		// Maintainer's note: changes made here should also be made in
		// AnnotationAsyncExecutionAspect#getExecutorQualifier
		Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
		if (async == null) {
			async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
		}
		return (async != null ? async.value() : null);
	}
}

@Async注解失效得原因

如下:若我是这么写的

@Service
public class HelloServiceImpl implements HelloService {

 @Override
 public Object hello() {
 System.out.println("当前线程:" + Thread.currentThread().getName());
 helloPrivate(); // 同类内部方法调用
 return "service hello";
 }

 @Async
 @Override
 public Object hello2() {
 System.out.println("当前线程:" + Thread.currentThread().getName());
 return null;
 }
}

最终输出为;

当前线程:http-nio-8080-exec-3
当前线程:http-nio-8080-exec-3

发现都是tomcat的线程。what?@Async异步线程木有生效???? 这个也不是什么新问题了。在之前一篇博文:【小家java】Spring事务不生效的原因大解读 原因也是一样的,都是所谓的:代理失效问题。如何解决?这里就说一种方案吧:

@Service
public class HelloServiceImpl implements HelloService {

 @Autowired
 private ApplicationContext applicationContext;

 @Override
 public Object hello() {
 System.out.println("当前线程:" + Thread.currentThread().getName());
 // 从容器里拿到该类型的实例~~~~(注意:若是JDK代理,这里的类型只能传接口,而不能是实现类,否则NoSuchBean...)
 HelloService helloService = applicationContext.getBean(HelloService.class);
 // 然后用本实例去调用 而不是用默认的this去调用
 helloService.hello2();
 return "service hello";
 }

 @Async
 @Override
 public Object hello2() {
 System.out.println("当前线程:" + Thread.currentThread().getName());
 return null;
 }
}

输出:

当前线程:http-nio-8080-exec-4
当前线程:SimpleAsyncTaskExecutor-2

显然这才是我们想要的结果。因此在同类调用的时候,一定要特别的小心,很有可能是不生效的。

各位可以看看你们同事写的代码,据我推测,肯定有同事写了这种不生效的代码~~

使用推荐配置

@EnableAsync //对应的@Enable注解,最好写在属于自己的配置文件上,保持内聚性
@Configuration
public class AsyncConfig implements AsyncConfigurer {

 @Override
 public Executor getAsyncExecutor() {
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 executor.setCorePoolSize(10); //核心线程数
 executor.setMaxPoolSize(20); //最大线程数
 executor.setQueueCapacity(1000); //队列大小
 executor.setKeepAliveSeconds(300); //线程最大空闲时间
 executor.setThreadNamePrefix("fsx-Executor-"); 指定用于新创建的线程名称的前缀。
 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)

	 // 这一步千万不能忘了,否则报错: java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
 executor.initialize();
 return executor;
 }

 // 异常处理器:当然你也可以自定义的,这里我就这么简单写了~~~
 @Override
 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
 return new SimpleAsyncUncaughtExceptionHandler();
 }
}

使用Spring的异步,我个人十分建议配置上自己自定义的配置器(如上配置仅供参考),这样能够更好的掌握(比如异常处理AsyncUncaughtExceptionHandler~~~)

总结

有了Spring AOP整体运行原理作为基础,再看这些基于AOP的应用,简直行云流水。因此还是应正了那句话:你能走多远,就看你的根基有多稳。最后,使用的有两个建议:

  1. 异步方法建议尽量处理耗时的任务,或者是处理不希望阻断主流程的任务(比如异步记录操作日志)
  2. 尽量为@Async准备一个专门的线程池来提高效率减少开销,而不要用Spring默认的SimpleAsyncTaskExecutor,它不是一个真正的线程池~
关注
打赏
1662604032
查看更多评论
立即登录/注册

微信扫码登录

0.0629s