在开发过程中,我们会遇到很多使用线程池的业务场景,例如异步短信通知、异步记录操作日志。大多数使用线程池的场景,就是会将一些可以进行异步操作的业务放在线程池中去完成。例如在生成订单的时候给用户发送短信,生成订单的结果不应该被发送短信的成功与否所左右,也就是说生成订单这个主操作是不依赖于发送短信这个操作,所以我们就可以把发送短信这个操作置为异步操作。那么本文就是来看看Spring中提供的优雅的异步处理方案:在Spring3中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。需要注意的是这些功能都是Spring Framework提供的,而非SpringBoot。因此下文的讲解都是基于Spring Framework的工程。Spring中用@Async注解标记的方法,称为异步方法,它会在调用方的当前线程之外的独立的线程中执行,其实就相当于我们自己new Thread(()-> System.out.println("hello world !"))这样在另一个线程中去执行相应的业务逻辑。
Demo
// @Async 若把注解放在类上或者接口上,那么他所有的方法都会异步执行了~~~~(包括私有方法)
public interface HelloService {
Object hello();
}
@Service
public class HelloServiceImpl implements HelloService {
// 注意此处加上了此注解
@Async
@Override
public Object hello() {
System.out.println("当前线程:" + Thread.currentThread().getName());
return "service hello";
}
}
然后只需要在配置里,开启对异步的支持即可:
// 开启异步注解的支持
@Configuration
@EnableAsync
public class RootConfig {
}
输出如下(当前线程名):
当前线程:SimpleAsyncTaskExecutor-1
可以很明显的发现,它使用的是线程池SimpleAsyncTaskExecutor,这也是Spring默认给我们提供的线程池(其实它不是一个真正的线程池,后面会有讲述)。下面原理部分讲解后,你就能知道怎么让它使用我们自定义的线程池了~~~
@Async注解使用细节
- @Async注解一般用在方法上,如果用在类上,那么这个类所有的方法都是异步执行的;
- @Async可以放在任何方法上,哪怕你是private的(若是同类调用,请务必注意注解失效的情况~~~)
- 所使用的@Async注解方法的类对象应该是Spring容器管理的bean对象
- @Async可以放在接口处(或者接口方法上)。但是只有使用的是JDK的动态代理时才有效,CGLIB会失效。因此建议:统一写在实现类的方法上
- 需要注解@EnableAsync来开启异步注解的支持
- 若你希望得到异步调用的返回值,请你的返回值用Futrue变量包装起来
需要额外导入哪些Jar包?
它的依赖包非常简单,只依赖一些Spring的核心包外加spring-aop,但是如果你已经导入了spring-webmvc这个jar,那就什么不需要额外导入了,因为都有了:
备注:它虽然依赖于Spring AOP,但是它并不需要导入aspectjweaver,因为它和AspectJ没有半毛钱关系
原理、源码解析
@EnableAsync
它位于的包名为org.springframework.scheduling.annotation,jar名为:spring-context
@EnableXXX这种设计模式之前有分析过多次,这个注解就是它的入口,因此本文也一样,从入口处一层一层的剖析:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//默认情况下,要开启异步操作,要在相应的方法或者类上加上@Async注解或者EJB3.1规范下
@Asynchronous注解。
//这个属性使得开发人员可以自己设置开启异步操作的注解(可谓非常的人性化了,但是大多情况下用Spring的就足够了)
Class annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
// 根据类型,拿到该类型的这个注解,然后转换为AnnotationAttributes
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format( "@%s is not present annType.getSimpleName(), importingClassMetadata.getClassName()));
}
// 拿到AdviceMode,最终交给子类,让她自己去实现 决定导入哪个Bean吧
AdviceMode adviceMode = attributes.getEnum(this.getAdviceModeAttributeName());
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException(String.format("Unknown AdviceMode: '%s'", adviceMode));
}
return imports;
}
// 子类去实现 具体导入哪个Bean
@Nullable
protected abstract String[] selectImports(AdviceMode adviceMode);
}
改抽象提供了支持AdviceMode的较为通用的实现,若我们自己想自定义,可以考虑实现此类。
由此可议看出,@EnableAsync最终是向容器内注入了ProxyAsyncConfiguration这个Bean。由名字可议看出,它是一个配置类。
ProxyAsyncConfiguration
// 它是一个配置类,角色为ROLE_INFRASTRUCTURE 框架自用的Bean类型
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
// 它的作用就是诸如了一个AsyncAnnotationBeanPostProcessor,它是个BeanPostProcessor
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// customAsyncAnnotation:自定义的注解类型
// AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation") 为拿到该注解该字段的默认值
Class, Boolean> eligibleBeans = new ConcurrentHashMap(256);
// 当遇到一个pre-object的时候,是否把该processor所持有得advisor放在现有的增强器们之前执行
// 默认是false,会放在最后一个位置上的
public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
this.beforeExistingAdvisors = beforeExistingAdvisors;
}
// 不处理
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
// Bean已经实例化、初始化完成之后执行。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 忽略AopInfrastructureBean的Bean,并且如果没有advisor也会忽略不处理~~~~~
if (bean instanceof AopInfrastructureBean || this.advisor == null) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 如果这个Bean已经被代理过了(比如已经被AOP切中了),那本处就无需再重复创建代理了嘛
// 直接向里面添加advisor就成了
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
// 注意此advised不能是已经被冻结了的。且源对象必须是Eligible合格的
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
// 把自己持有的这个advisor放在首位(如果beforeExistingAdvisors=true)
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
// 否则就是尾部位置
else {
advised.addAdvisor(this.advisor);
}
// 最终直接返回即可,因为已经没有必要再创建一次代理对象了
return bean;
}
}
// 如果这个Bean事合格的(此方法下面有解释) 这个时候是没有被代理过的
if (isEligible(bean, beanName)) {
// 以当前的配置,创建一个ProxyFactory
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
// 如果不是使用CGLIB常见代理,那就去分析出它所实现的接口们 然后放进ProxyFactory 里去
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
// 切面就是当前持有得advisor
proxyFactory.addAdvisor(this.advisor);
// 留给子类,自己还可以对proxyFactory进行自定义~~~~~
customizeProxyFactory(proxyFactory);
// 最终返回这个代理对象~~~~~
return proxyFactory.getProxy(getProxyClassLoader());
}
// No async proxy needed.(相当于没有做任何的代理处理,返回原对象)
return bean;
}
// 检查这个Bean是否是合格的
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
protected boolean isEligible(Class targetClass) {
// 如果已经被缓存着了,那肯定靠谱啊
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
// 如果没有切面(就相当于没有给配置增强器,那铁定是不合格的)
if (this.advisor == null) {
return false;
}
// 这个重要了:看看这个advisor是否能够切入进targetClass这个类,能够切入进取的也是合格的
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
// 子类可以复写。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就复写了这个方法~~~
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
// 子类复写~
protected void customizeProxyFactory(ProxyFactory proxyFactory) {
}
}
看看它的继承图:
MethodValidationPostProcessor属于JSR-303校验方面的范畴,不是本文的内容,因此不会讲述
AbstractBeanFactoryAwareAdvisingPostProcessor
从名字可以看出,它相较于父类,就和BeanFactory有关了,也就是和Bean容器相关了~~~
public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
implements BeanFactoryAware {
// Bean工厂
@Nullable
private ConfigurableListableBeanFactory beanFactory;
// 如果这个Bean工厂不是ConfigurableListableBeanFactory ,那就set一个null
// 我们的`DefaultListableBeanFactory`显然就是它的子类~~~~~
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
(ConfigurableListableBeanFactory) beanFactory : null);
}
@Override
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
// 如果Bean工厂是正常的,那就把这个Bean expose一个特殊的Bean,记录下它的类型
if (this.beanFactory != null) {
AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
}
ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
// 这里创建代理也是和`AbstractAutoProxyCreator`差不多的逻辑。
// 如果没有显示的设置为CGLIB,并且toProxyUtils.shouldProxyTargetClass还被暴露过时一个特殊的Bean,那就强制使用CGLIB代理吧 这里一般和Scope无关的话,都返回false了
if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
return proxyFactory;
}
}
下面就可以看看具体的两个实现类了:
AsyncAnnotationBeanPostProcessor
该实现类就是具体和@Async相关的一个类了~~~
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
// 建议换成AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME 这样语意更加的清晰些
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
// 注解类型
@Nullable
private Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 注意:此处是getMostSpecificMethod 拿到最终要执行的那个方法
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
// 桥接方法~~~~~~~~~~~~~~
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// determine一个用于执行此方法的异步执行器
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 构造一个任务,Callable(此处不采用Runable,因为需要返回值)
Callable task = () -> {
try {
// result就是返回值
Object result = invocation.proceed();
// 注意此处的处理~~~~ 相当于如果不是Future类型,就返回null了
if (result instanceof Future) {
return ((Future) result).get();
}
}
// 处理执行时可能产生的异常~~~~~~
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交任务~~~~invocation.getMethod().getReturnType()为返回值类型
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
SimpleAsyncTaskExecutor:异步执行用户任务的SimpleAsyncTaskExecutor。每次执行客户提交给它的任务时,它会启动新的线程,并允许开发者控制并发线程的上限(concurrencyLimit),从而起到一定的资源节流作用。默认时,concurrencyLimit取值为-1,即**不启用**资源节流 所以它不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程(因此建议我们在使用@Aysnc的时候,自己配置一个线程池,节约资源)
AnnotationAsyncExecutionInterceptor
很显然,他是在AsyncExecutionInterceptor的基础上,提供了对@Async注解的支持。所以它是继承它的。
// 它继承自AsyncExecutionInterceptor ,只复写了一个方法
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
...
// 由此可以见它就是去拿到@Async的value值。以方法的为准,其次是类上面的
// 备注:发现这里是不支持EJB的@Asynchronous注解的,它是不能指定执行器的
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
}
@Async注解失效得原因
如下:若我是这么写的
@Service
public class HelloServiceImpl implements HelloService {
@Override
public Object hello() {
System.out.println("当前线程:" + Thread.currentThread().getName());
helloPrivate(); // 同类内部方法调用
return "service hello";
}
@Async
@Override
public Object hello2() {
System.out.println("当前线程:" + Thread.currentThread().getName());
return null;
}
}
最终输出为;
当前线程:http-nio-8080-exec-3 当前线程:http-nio-8080-exec-3
发现都是tomcat的线程。what?@Async异步线程木有生效???? 这个也不是什么新问题了。在之前一篇博文:【小家java】Spring事务不生效的原因大解读 原因也是一样的,都是所谓的:代理失效问题。如何解决?这里就说一种方案吧:
@Service
public class HelloServiceImpl implements HelloService {
@Autowired
private ApplicationContext applicationContext;
@Override
public Object hello() {
System.out.println("当前线程:" + Thread.currentThread().getName());
// 从容器里拿到该类型的实例~~~~(注意:若是JDK代理,这里的类型只能传接口,而不能是实现类,否则NoSuchBean...)
HelloService helloService = applicationContext.getBean(HelloService.class);
// 然后用本实例去调用 而不是用默认的this去调用
helloService.hello2();
return "service hello";
}
@Async
@Override
public Object hello2() {
System.out.println("当前线程:" + Thread.currentThread().getName());
return null;
}
}
输出:
当前线程:http-nio-8080-exec-4 当前线程:SimpleAsyncTaskExecutor-2
显然这才是我们想要的结果。因此在同类调用的时候,一定要特别的小心,很有可能是不生效的。
各位可以看看你们同事写的代码,据我推测,肯定有同事写了这种不生效的代码~~
使用推荐配置
@EnableAsync //对应的@Enable注解,最好写在属于自己的配置文件上,保持内聚性
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心线程数
executor.setMaxPoolSize(20); //最大线程数
executor.setQueueCapacity(1000); //队列大小
executor.setKeepAliveSeconds(300); //线程最大空闲时间
executor.setThreadNamePrefix("fsx-Executor-"); 指定用于新创建的线程名称的前缀。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)
// 这一步千万不能忘了,否则报错: java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
executor.initialize();
return executor;
}
// 异常处理器:当然你也可以自定义的,这里我就这么简单写了~~~
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
使用Spring的异步,我个人十分建议配置上自己自定义的配置器(如上配置仅供参考),这样能够更好的掌握(比如异常处理AsyncUncaughtExceptionHandler~~~)
总结有了Spring AOP整体运行原理作为基础,再看这些基于AOP的应用,简直行云流水。因此还是应正了那句话:你能走多远,就看你的根基有多稳。最后,使用的有两个建议:
- 异步方法建议尽量处理耗时的任务,或者是处理不希望阻断主流程的任务(比如异步记录操作日志)
- 尽量为@Async准备一个专门的线程池来提高效率减少开销,而不要用Spring默认的SimpleAsyncTaskExecutor,它不是一个真正的线程池~