Spring SimpleAsyncTaskExecutor学习

一. 简介

  1. SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程,没有最大线程数设置;并发大的时候会产生严重的性能问题;
  2. 在 Java 中创建线程并不便宜,线程对象占用大量内存,在大型应用程序中,分配和取消分配许多线程对象会产生大量内存管理开销;
  3. 它会为每个人物启动一个新任务,异步执行它;
  4. 支持通过 concurrencyLimit 属性限制并发线程,也就是进行流控;默认情况下,不会进行流控,也就是说并发线程数是无限的;

目前了解的,Spring Kafka 的 KafkaListener 会使用 SimpleAsyncTaskExecutor,其他场景没有见使用;

二. 使用

1. 不进行并发限流

不进行并发限流,每次执行 SimpleAsyncTaskExecutor.execute(runnable) 都会创建一个新线程去异步执行任务;

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}

打印如下:

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1

2. 进行并发限流

进行并发限流,每次执行 SimpleAsyncTaskExecutor.execute(runnable) 也会创建一个新线程去异步执行任务;

那并发限流和不并发限流的区别在哪呢?

  • 不并发限流:10 个线程并发执行;
  • 并发限流:concurrencyThrottle 并发线程设置为 3 的话,某一时刻只能有 3 个线程并发执行;
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}

打印如下:

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10

三. 源码分析

SimpleAsyncTaskExecutor 的源码比较少,我们直接看这个类;

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}

通过上面的描述,我们对 SimpleAsyncTaskExecutor 有了简单的认识;

不进行并发流控的情况下,很好理解,每次执行 SimpleAsyncTaskExecutor.execute() 都会创建一个新的线程;

我们主要看下进行并发流控的情况下,它是怎么进行流控的;

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}

this.concurrencyThrottle.beforeAccess() 和 ConcurrencyThrottlingRunnable.run() 都是关键点,下面我们分开分析;

1. concurrencyThrottle.beforeAccess()

可以看到它是通过 synchronized 和 concurrencyLimit 来控制并发限流的;

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}

2. ConcurrencyThrottlingRunnable

我们看下这个类的 run();

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}

至此,SimpleAsyncTaskExecutor 分析完毕;

实际开发中,我们不要使用 SimpleAsyncTaskExecutor,避免发生灾难性的问题;

相关推荐

  1. 学习 学习

    2024-07-11 16:18:04       61 阅读
  2. 学期学习计划

    2024-07-11 16:18:04       40 阅读
  3. 学习笔记:机器学习

    2024-07-11 16:18:04       76 阅读
  4. C++学习-List学习

    2024-07-11 16:18:04       46 阅读
  5. opencv学习 机器学习

    2024-07-11 16:18:04       56 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 16:18:04       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 16:18:04       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 16:18:04       57 阅读
  4. Python语言-面向对象

    2024-07-11 16:18:04       68 阅读

热门阅读

  1. PostgreSQL的pg_bulkload工具

    2024-07-11 16:18:04       22 阅读
  2. python积的最大分解

    2024-07-11 16:18:04       21 阅读
  3. 遇到NotOfficeXmlFileException

    2024-07-11 16:18:04       20 阅读
  4. Android 获取当前电池状态

    2024-07-11 16:18:04       21 阅读
  5. Perl 语言入门学习

    2024-07-11 16:18:04       24 阅读
  6. 容器按↑还是不显示上一个命令

    2024-07-11 16:18:04       23 阅读
  7. 59、Flink 的项目配置 Connector 和 Format 详解

    2024-07-11 16:18:04       21 阅读
  8. 基于ArcGIS污染物浓度及风险的时空分布

    2024-07-11 16:18:04       20 阅读