并发学习25--多线程 ThreadPoolExecutor

类图:定义了一些重要的接口和实现类

线程池的几种状态:

ThreadPoolExecutor构造方法

1.救急线程

线程池中会有核心线程和救急线程;救急线程数=最大线程数-核心线程数。而救急线程会在阻塞队列已经占满的情况下,执行下一个即将要被拒绝策略执行任务。且救急线程在执行完任务后的KeepAliveTime时间内,如果没有执行新的任务,那么就会从线程池remove这个线程。不同于核心线程是一直存在于线程池中的。

救急线程是懒惰创建的,只有当有界阻塞队列满的时候才会创建救急线程执行任务。如果阻塞队列是无界的,那么永远不会创建救急线程。

2.核心线程

会优先使用核心线程来执行任务,且核心线程是懒惰创建。当核心线程执行完任务后,并不会主动结束自己,而是还在运行中。需要特殊处理使之结束。

3.拒绝策略

当救急线程被占用完后再来新的任务会由拒绝策略完成。

4.JDK提供线程池执行过程

4.JDK提供的拒绝策略

NewFixedThreadPool 固定大小线程池

1.介绍

2.代码示例
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j(topic = "TC41")
public class TC41 {
    public static void main(String[] args) throws InterruptedException{
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            private AtomicInteger threadPoolNumber = new AtomicInteger(1);
            
            //使用ThreadFactory自定义线程名
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"Pool_t"+threadPoolNumber.getAndIncrement());
            }
        });

        pool.execute(()->{
            log.debug("1");
        });
        pool.execute(()->{
            log.debug("2");
        });
        pool.execute(()->{
            log.debug("3");
        });
        //result: 且程序不会结束,因为核心线程只是执行完任务,但并没有结束
        //11:10:02.144 [Pool_t2] DEBUG TC41 - 2
        //11:10:02.144 [Pool_t1] DEBUG TC41 - 1
        //11:10:02.147 [Pool_t2] DEBUG TC41 - 3
    }
}

NewCachedThreadPool 带缓冲的线程池

1.介绍

若每个任务执行时间都很长的话会创建太多线程,消耗CPU影响性能。

2.SynchronousQueue

SynchronousQueue指t1线程在Queue里放任务时放不进去,只能阻塞在外面,当t2线程来取的时候,就可以把任务取走了。意味着:该队列就是为了阻塞一个任务且不用拒绝策略,只等待线程来执行。当SynchronousQueue配合NewCachedThreadPool执行时,就是每当阻塞一个任务就创建一个救急线程。

SynchronousQueue使用代码示例
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.SynchronousQueue;

@Slf4j(topic = "TC42")
public class TC42 {
    public static void main(String[] args) {
        SynchronousQueue queue = new SynchronousQueue();
        new Thread(()->{
            try {
                log.debug("put 1....");
                //当执行put()后被阻塞住,当take()执行完后,put()才执行完毕
                queue.put(1);
                log.debug("putted 1....");

                log.debug("put 2....");
                queue.put(2);
                log.debug("putted 2....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            log.debug("get 1...");
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            log.debug("get 2...");
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3").start();
    }
}

 NewSingleThreadExecutor 单线程执行器

线程异常后,线程池会重新创建新的线程
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "TC43")
public class TC43 {
    public static void main(String[] args) throws InterruptedException {
        test1();
        
        //result: 线程1抛出异常停止后,线程池又创建了一个线程2去执行后面的codes 
        //14:15:44.489 [pool-1-thread-1] DEBUG TC43 - 1
        //Exception in thread "pool-1-thread-1" 14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 2
        //14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 3
    }

    public static void test1() throws InterruptedException{
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.execute(()->{
            log.debug("1");
            int i=1/0;
        });

        service.execute(()->{
            log.debug("2");
        });

        service.execute(()->{
            log.debug("3");
        });
    }
}

提交任务

 Future<T> Submit

有一个Future类型的返回值

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j(topic = "TC44")
public class TC44 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future = pool.submit(()->{
            log.debug("running");
            Thread.sleep(1000);
            return "ok";
        });

        log.debug("{}",future.get());
    }
}
List<Future<T>> invokeAll()

执行集合中所有线程,并返回所有返回值到一个集合里。

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j(topic = "TC45")
public class TC45 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
                ()->{
                    log.debug("begin...");
                    Thread.sleep(1000);
                    return "1";
                },
                ()->{
                    log.debug("begin...");
                    Thread.sleep(500);
                    return "2";
                },
                ()->{
                    log.debug("begin...");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f->{
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
Object invokeAny()

集合中哪个线程结束最早返回哪个线程,其余线程停止执行任务。

0.5S后打印结果,因为第二个callable线程结束最早只等待了0.5S.

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j(topic = "TC46")
public class TC46 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        Object o = pool.invokeAny(Arrays.asList(
                ()->{
                  log.debug("begin 1....");
                  Thread.sleep(1000);
                    log.debug("end 1....");
                    return "1";
                },
                ()->{
                    log.debug("begin 2....");
                    Thread.sleep(500);
                    log.debug("end 2....");
                    return "2";
                },
                ()->{
                    log.debug("begin 3....");
                    Thread.sleep(800);
                    log.debug("end 3....");
                    return "3";
                }
        ));

        log.debug("{}",o.toString());
    }
}

关闭线程池

shutdown()

 不会等待正在运行的线程,等它们运行结束,就会自己停止

shutdownNow()

其他终结方法

 

相关推荐

  1. C++线并发

    2024-03-30 15:50:03       20 阅读
  2. 进程并发服务器与线并发服务器

    2024-03-30 15:50:03       51 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-30 15:50:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-30 15:50:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-30 15:50:03       87 阅读
  4. Python语言-面向对象

    2024-03-30 15:50:03       96 阅读

热门阅读

  1. 每日一题:C语言经典例题之鸡兔同笼

    2024-03-30 15:50:03       45 阅读
  2. Grok - X AI 314B大模型

    2024-03-30 15:50:03       49 阅读
  3. 【SQL】COUNT()函数 用法详解

    2024-03-30 15:50:03       46 阅读
  4. C#面:简述抽象函数(方法)

    2024-03-30 15:50:03       42 阅读
  5. 【PostgreSQL】- 1.2 PostgreSQL 配置单独的数据库存储

    2024-03-30 15:50:03       46 阅读
  6. 【EBS】ORACLE EBS R12财务月结基础

    2024-03-30 15:50:03       36 阅读
  7. python常用的语法

    2024-03-30 15:50:03       48 阅读
  8. Windows MySQL通过data 文件夹恢复数据

    2024-03-30 15:50:03       48 阅读