并发编程之线程池的设计和原理

一、线程池

提前创建一系列的线程,保存在这个线程池中,有任务要执行的时候,从线程池中取出线程来执行。没有任务的时候,线程池放回去。

二、为什么要使用线程池

线程使用上的问题:

  • 线程的频繁创建 和 销毁

  • 线程的数量过多,会造成CPU资源的开销

    • 上下文切换(消耗CPU资源)

那么如何实现线程的复用呢? 池化技术

三、线程池的设计猜想

3.1线程池的设计思考?

需求: 实现线程的重复使用

分解:

  • 如何使用线程的复用?

    • 让线程实现复用的唯一方法,就是让线程不结束

      • 那如何让线程执行新的任务呢?也就是说,任务怎么给他执行?

        • [共享内存]-> List.add()

      • 线程一直处于运行状态,合理吗?

        • 有任务来的时候,执行,没有任务的时候,阻塞

结论: 通过阻塞队列的方式,来实现线程池中线程的复用

思考: 通过阻塞队列的方式,如果队列满了,可以阻塞主线程/生产者线程吗?显然不能,那么怎么办:

  • 1.增加消费者线程的数量(扩容) 也就是增加线程去执行任务,消费者多了,阻塞队列自然被消费的也快了,就不容易阻塞主线程了

  • 2.如果扩容解决不了问题,那只能采用拒绝策略

    • 报错

    • 直接丢弃这个任务

    • 直接普通线程调用task.run(直接重新开启一个线程)

    • 队列中头部的等待最久的任务丢弃,然后把当前任务添加到阻塞队列

    • 存储起来,后续等待空闲之后重试(自定义去完成)

结束的方法:让线程执行结束(run方法执行结束),也就是跳出while循环

3.2 线程池的核心参数

a.线程数量(核心线程数)  [初始化时创建]

b. 最大线程数 [还能够扩容多少个线程 与核心线程数的差]

c. 存活时间 [扩容的线程要有一个存活时间]

d. 存活时间单位

e.阻塞队列(使用哪一种阻塞队列)

f.线程工厂(生产线程的工厂)(有默认值)

h.阻绝策略(有默认值 默认抛出异常)

四、Java中提供的线程池

Exectors

  • newScheduledThreadPool 提供周期的线程池

  • newFixedThreadPool 固定线程数量

  • newSingleThreadExecutor 只有一个线程的线程池

  • newCachedThreadPool 可以缓存的线程池->理论上来说,有多少情况,就构建多少个线程

五、自定义线程池源码分析

线程池中的核心线程是延迟初始化的

  • 先初始化核心线程

  • 调用阻塞队列的方法,把task存进去

    • 如果true,说明当前的请求量不大,核心线程就可以搞定

    • false,增加工作线程(非核心线程)

      • 如果添加失败,说明当前的工作线程数量达到了最大的线程数,直接调用拒绝策略

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 前3位记录运行状态  后29位记录线程数
        int c = ctl.get();
        // 1.判断当前工作线程数是否小于核心线程数(延迟初始化)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))  // 添加工作线程,并执行任务
                return;
            c = ctl.get();
        }
        // 2.添加到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3. 增加工作线程
        else if (!addWorker(command, false))
            reject(command);   // 4. 增加失败,则执行拒绝策略
    }

5.1 addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 1. 修改工作线程记录数
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))   //1.1 CAS操作进行修改
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 2. 创建线程并启动
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 3. 添加失败 则回滚
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

分析: addWorker主要做3件事,.a.使用CAS操作修改原子值中的工作线程数  b.将任务放到Worker对象中,并启动线程 c.如果线程启动失败,则回滚。线程启动后,则自动调用Worker对象中的run方法。

5.2 runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 1. while循环保证当前线程不结束,直到task为空
            while (task != null || (task = getTask()) != null) {
                // 2.这里是因为表示当前线程正在运行一个任务,其它地方要执行shutdown 你要等我执行结束
                w.lock(); // worker继承了AQS  实现了互斥锁
             
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();  // 是否应该中断 在gettask中处理
                try {
                    // 空实现,方便扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 空实现,方便扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

5.3 getTask

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //1. 判断如果线程池已经结束,直接返回Null,需要清理掉所有的工作线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 2. 是否允许超时   allowCoreThreadTimeOut 为true 也就是说设置为true 核心线程也 
              // 可以被销毁
              // 或者工作线程数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))   // cas操作减少工作线程
                    return null;
                continue;
            }

            try {
                //* 执行任务
                Runnable r = timed ?
                    // 超时阻塞
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 如果阻塞队列为空,则会阻塞在这个地方
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

分析: getTask中从队列中取出任务并执行。注意:allowCoreThreadTimeOut 设置为true,则核心线程也可以被销毁

5.4 拒绝策略

a. AbortPolicy(抛出异常)

        // 报错 抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

b. CallerRunsPolicy (创建一个普通线程,并执行)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

c. DiscardOldestPolicy (从队列头部抛弃一个,执行当前的)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

d. DiscardPolicy (什么也不做)

   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

相关推荐

  1. 线原理基本使用~

    2024-05-03 07:32:05       42 阅读
  2. 线运行原理使用案例

    2024-05-03 07:32:05       40 阅读
  3. 探索并发编程:深入理解线

    2024-05-03 07:32:05       18 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-03 07:32:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-03 07:32:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-03 07:32:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-03 07:32:05       20 阅读

热门阅读

  1. 数论7-同余

    2024-05-03 07:32:05       10 阅读
  2. 周报 | 24.4.22-24.4.28文章汇总

    2024-05-03 07:32:05       14 阅读
  3. 计算机视觉(CV)是什么以及应用场景

    2024-05-03 07:32:05       11 阅读
  4. Mac 电脑 vscode 终端提示 zsh: command not found

    2024-05-03 07:32:05       11 阅读
  5. 【C++ 问题总结】

    2024-05-03 07:32:05       9 阅读