Springboot项目中,异步编程底层实现原理详解(三)

本系列文章简介:

        在现代的开发中,异步编程已经成为了必备的技能。随着计算机性能的提升和多核处理器的普及,异步编程可以充分利用系统资源,提高程序的性能和响应速度。在Spring Boot项目中,异步编程也得到了广泛的应用。

        异步编程可以让我们在执行耗时操作时,不需要等待其完成,而是直接返回结果,然后在合适的时候再处理这个结果。这样可以提高程序的并发性能,提升用户体验。

        Spring Boot提供了很多异步编程的支持,包括使用注解方式、使用线程池、使用消息队列等等。底层实现原理主要是通过线程池来实现的。

        在使用异步编程时,我们需要注意一些使用方式。首先,要明确哪些操作是可以异步执行的,比如数据库访问、网络请求等。其次,要合理配置线程池的大小和其他参数,避免出现线程池资源耗尽的情况。此外,还需要考虑异步操作的异常处理和结果回调等问题。

        在本系列文章中,我们将介绍Spring Boot异步编程的底层实现原理,并给出一些正确的使用方式和注意事项。通过学习和掌握异步编程的知识,我们可以更好地利用系统资源,提高程序的性能和效率。让我们一起来探索异步编程的奥秘吧!

        欢迎大家订阅《Java技术栈高级攻略》专栏,一起学习,一起涨分!

目录

1、前言

2、Springboot项目之基于@Async实现异步编码

2.1 开启异步注解

2.2 异步调用底层实现原理

2.3 异步线程池如何选择

3、结语


1、前言

        在现代软件开发中,异步编程已经变得越来越重要。随着计算机性能的提升和用户对高效响应的需求,异步编程可以提供更好的用户体验和更高的系统性能。

        异步编程的原理是将耗时的操作放在后台线程中执行,不阻塞主线程的执行。这样可以使程序能够同时处理多个任务,并在任务完成后立即响应用户,提高程序的并发能力和响应速度。

        本文将跟随《Springboot项目中,异步编程底层实现原理详解(二)》的进度,继续介绍异步编程。希望通过本系列文章的学习,您将能够更好地理解异步编程的内部工作原理,掌握异步编程的使用技巧,以及通过合理的设计完成最佳实践,充分发挥优化异步编程的潜力,为系统的高效运行提供有力保障。

2、Springboot项目之基于@Async实现异步编码

2.1 开启异步注解

        详见《Springboot项目中,异步编程底层实现原理详解(一)

2.2 异步调用底层实现原理

        详见 《Springboot项目中,异步编程底层实现原理详解(二)

2.3 异步线程池如何选择

上文中提到不要使用默认线程池,是因为@Async以前版本使用的默认线程池是SimpleAsyncTaskExecutor,这个线程池有诸多问题,推荐使用的线程池是ThreadPoolTaskExecutor。实际上,SpringBoot官方已经考虑到这个问题了,在新的版本中(2.0.9以上),@Async默认使用的线程池就是ThreadPoolTaskExecutor,这里注意看下你使用的版本。

SpringBoot 2.0.9以及之前的版本,使用的线程池默认是SimpleAsyncTaskExecutor,之后默认使用的是ThreadPoolTaskExecutor。

那么,是否就可以直接使用默认的线程池了呢?接下来就来研究下默认线程池的源码。

上文中有讲到AnnotationAsyncExecutionInterceptor这个类,它是一个拦截器,继承了AsyncExecutionInterceptor类,每次执行异步方法都会执行AsyncExecutionInterceptor方法。

该方法中入参的Excutor参数可以是null,如果是null,就使用默认的Excutor,如果不是null,就使用指定的Excutor。

看一下AsyncExecutionInterceptor继承的父类AsyncExecutionAspectSupport中的AsyncExecutionAspectSupport方法

该方法调用了 getDefaultExecutor方法,源码如下:      

如果入参是null,按照上述方法逻辑,第一行通过beanFactory.getBean获取TaskExcutor类型的Bean。但是这里是通过类型获取的,实现Excutor的如果有多个类,就会报错,被catch捕获,进入第二行代码获取Bean,通过名字获取的Bean,名字是taskExcutor,

该taskExcutor是SpringBoot默认配置好的,接着看SpringBoot的自动配置相关源码TaskExecutionAutoConfiguration类。

TaskExecutionAutoConfiguration类源码如下:

@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@AutoConfiguration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {


	public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

	@Bean
	@ConditionalOnMissingBean
	public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
			ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
			ObjectProvider<TaskDecorator> taskDecorator) {
		TaskExecutionProperties.Pool pool = properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		builder = builder.keepAlive(pool.getKeepAlive());
		Shutdown shutdown = properties.getShutdown();
		builder = builder.awaitTermination(shutdown.isAwaitTermination());
		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
		builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
		builder = builder.taskDecorator(taskDecorator.getIfUnique());
		return builder;
	}

	@Lazy
	@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
			AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
	@ConditionalOnMissingBean(Executor.class)
	public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
		return builder.build();
	}

}

源码中,需要注意到@ConditionalOnMissingBean注解,只在没有Excutor实现类的时候,才会默认配置。和上面通过beanFactory.getBean获取TaskExcutor类型的Bean代码遥相呼应。如果我们项目没有注入Excutor的话,SpringBoot就会默认注入ThreadPoolTaskExecutor。如果我们指定了,这样使用@Async(“myExcutor”)则使用我们指定的。如果没指定,还有多个实现Bean冲突了,则使用Bean名字是taskExcutor的。

再来看一下类上的注解@EnableConfigurationProperties(TaskExecutionProperties.class),默认线程池配置,创建线程池时使用的都是TaskExecutionProperties这个配置文件。

TaskExecutionProperties类源码如下:

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

	private final Pool pool = new Pool();

	private final Shutdown shutdown = new Shutdown();

	private String threadNamePrefix = "task-";

	public Pool getPool() {
		return this.pool;
	}

	public Shutdown getShutdown() {
		return this.shutdown;
	}

	public String getThreadNamePrefix() {
		return this.threadNamePrefix;
	}

	public void setThreadNamePrefix(String threadNamePrefix) {
		this.threadNamePrefix = threadNamePrefix;
	}

	public static class Pool {

		private int queueCapacity = Integer.MAX_VALUE;

		private int coreSize = 8;

		private int maxSize = Integer.MAX_VALUE;

		private boolean allowCoreThreadTimeout = true;

		private Duration keepAlive = Duration.ofSeconds(60);

		public int getQueueCapacity() {
			return this.queueCapacity;
		}

		public void setQueueCapacity(int queueCapacity) {
			this.queueCapacity = queueCapacity;
		}

		public int getCoreSize() {
			return this.coreSize;
		}

		public void setCoreSize(int coreSize) {
			this.coreSize = coreSize;
		}

		public int getMaxSize() {
			return this.maxSize;
		}

		public void setMaxSize(int maxSize) {
			this.maxSize = maxSize;
		}

		public boolean isAllowCoreThreadTimeout() {
			return this.allowCoreThreadTimeout;
		}

		public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
			this.allowCoreThreadTimeout = allowCoreThreadTimeout;
		}

		public Duration getKeepAlive() {
			return this.keepAlive;
		}

		public void setKeepAlive(Duration keepAlive) {
			this.keepAlive = keepAlive;
		}

	}

	public static class Shutdown {

		private boolean awaitTermination;

		private Duration awaitTerminationPeriod;

		public boolean isAwaitTermination() {
			return this.awaitTermination;
		}

		public void setAwaitTermination(boolean awaitTermination) {
			this.awaitTermination = awaitTermination;
		}

		public Duration getAwaitTerminationPeriod() {
			return this.awaitTerminationPeriod;
		}

		public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) {
			this.awaitTerminationPeriod = awaitTerminationPeriod;
		}

	}

}

上文中的线程池的配置参数,有几项明显不合理,比如最大等待队列的参数配置,最大线程池的参数配置等,这些需要我们根据业务场景进行配置,不然存在隐患,会导致服务出现OOM错误。

下面简单讲一下线程池的工作原理,线程池有几个重要的基本参数,核心线程数,等待队列,最大线程数量,线程存活时间等。有新的任务过来时,先启用核心线程,核心线程满了后,再加到等待队列,等待队列也满了后,如果小于最大线程数,则继续开启新的线程,直到达到最大线程数量。系统默认配置的核心线程数是8,但是等待队列最大值无上限,这里就不太合适。需要按照业务实际使用场景来配置。同时最大线程数也需要配置,不然资源不够,起了无限多的线程,就OOM了。存活时间默认60s,也是根据自己的任务响应时间来配置。allowCoreThreadTimeout是配置核心线程数是否设置存活时间的参数,如果配置true,那么核心线程数存活时间也会生效,空闲时不会一直存在。

看到这里,是不是已经了然于胸了,默认的线程池不可直接用,那么我们就需要自定义线程池,才能使用异步注解。

自定义异步配置类如下:

@Data
@Configuration
public class AsyncThreadPoolConfiguration {
    /**
     * 核心线程数
     */
    private Integer coreNum;

    /**
     * 最大线程数
     */
    private Integer maxNum;

    /**
     * 最大空闲时间
     */
    private Integer maxTimeoutSec = 300;

    /**
     * 是否允许核心线程数超时
     */
    private boolean allowCoreThreadTimeOut = false;

    /**
     * 线程名称的前缀
     */
    private String threadNamePrefix = "Ken-Executor-";

    /**
     * 阻塞队列的最大容量
     */
    private Integer queueCapacity = 1000;

    /**
     * 线程拒绝的方式
     */
    private RejectEnum rejectHandler = RejectEnum.CallerRunsPolicy;

    @PostConstruct
    public void init(){
        //获得cpu的核心数量
        int core = Runtime.getRuntime().availableProcessors();

        //配置核心线程数 - cpu核心数 - CPU密集型的应用
        if (coreNum == null || coreNum <= 0) {
            coreNum = core;
        }

        //配置最大线程数 - cpu核心数 * 2 - IO密集型的应用
        if (maxNum == null || maxNum <= 0) {
            maxNum = core * 2;
        }
    }

}
public enum RejectEnum {
    //抛出异常的方式
    AbortPolicy(new ThreadPoolExecutor.AbortPolicy()),
    //提交任务的线程自行执行该任务
    CallerRunsPolicy(new ThreadPoolExecutor.CallerRunsPolicy()),
    //线程池会放弃当前等待队列中,最久的任务,当前被拒绝的任何放入队列
    DiscardOldestPolicy(new ThreadPoolExecutor.DiscardOldestPolicy()),
    //直接丢弃当前拒绝的任务
    DiscardPolicy(new ThreadPoolExecutor.DiscardPolicy());

    /**
     * 拒绝策略的处理器
     */
    private RejectedExecutionHandler rejectedExecutionHandler;

    RejectEnum(RejectedExecutionHandler rejectedExecutionHandler) {
        this.rejectedExecutionHandler = rejectedExecutionHandler;
    }

    public RejectedExecutionHandler getRejectedExecutionHandler() {
        return rejectedExecutionHandler;
    }
}

@Configuration
public class AsyncThreadPoolConfig {

    @Autowired
    AsyncThreadPoolConfiguration asyncThreadPoolConfiguration;

    @Bean
    @Primary
    @ConditionalOnBean(annotation = EnableAsync.class)
    public Executor parkAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(asyncThreadPoolConfiguration.getCoreNum());
        taskExecutor.setMaxPoolSize(asyncThreadPoolConfiguration.getMaxNum());
        taskExecutor.setKeepAliveSeconds(asyncThreadPoolConfiguration.getMaxTimeoutSec());
        taskExecutor.setAllowCoreThreadTimeOut(asyncThreadPoolConfiguration.isAllowCoreThreadTimeOut());
        taskExecutor.setRejectedExecutionHandler(asyncThreadPoolConfiguration.getRejectHandler().getRejectedExecutionHandler());
        taskExecutor.setThreadNamePrefix(asyncThreadPoolConfiguration.getThreadNamePrefix());
        taskExecutor.setQueueCapacity(asyncThreadPoolConfiguration.getQueueCapacity());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

上述步骤完成之后,在需要使用异步注解的地方,指定线程池名称@Async("parkAsyncExecutor")即可

@Async("parkAsyncExecutor")
    public CompletableFuture<String> B() {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        System.out.println("B start");
        try {
            Thread.sleep(5000); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("B end");
        completableFuture.complete("result from B");
        return completableFuture;
    }

至此,大功告成!!!

大家可以通过,代码调试,断点DEG的方式,看一下程序的执行顺序,更加深刻的了解整个异步执行的流程!!!

3、结语

        文章至此,已接近尾声!希望此文能够对大家有所启发和帮助。同时,感谢大家的耐心阅读和对本文档的信任。在未来的技术学习和工作中,期待与各位大佬共同进步,共同探索新的技术前沿。最后,再次感谢各位的支持和关注。您的支持是作者创作的最大动力,如果您觉得这篇文章对您有所帮助,请分享给身边的朋友和同事!

相关推荐

  1. 并发编程笔记2--volatile底层实现原理

    2024-04-21 11:30:06       31 阅读
  2. SpringBoot 异步编程

    2024-04-21 11:30:06       52 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-21 11:30:06       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-21 11:30:06       101 阅读
  3. 在Django里面运行非项目文件

    2024-04-21 11:30:06       82 阅读
  4. Python语言-面向对象

    2024-04-21 11:30:06       91 阅读

热门阅读

  1. npm 常用命令详解

    2024-04-21 11:30:06       41 阅读
  2. C# 匿名方法与扩展方法详解

    2024-04-21 11:30:06       38 阅读
  3. axios的跨越封装

    2024-04-21 11:30:06       33 阅读
  4. 贝叶斯逻辑回归案例和使用场景

    2024-04-21 11:30:06       39 阅读
  5. 事务的隔离级别

    2024-04-21 11:30:06       33 阅读
  6. asp.net core rabbitmq的基本使用

    2024-04-21 11:30:06       37 阅读
  7. jvm概述

    jvm概述

    2024-04-21 11:30:06      36 阅读