手写一个线程池

自己手动写一个线程池的必要条件需要先了解我们使用的线程池的功能。为什么会有线程池?这是为了减少线程创建和销毁的开销。复用线程的目的。为了达到这个目的。预计方案是:需要一个存放任务的队列,主线程相当于生产者,在这个队列里面push任务,启动几个核心线程一直消费这个队列,然后执行其中的任务。

1、线程池的几个重要参数

(1)corePoolSize 核心线程数 含义是一直存活的线程数

(2)maxPoolSize 最大线程数 含义是最大运行的线程个数

(3)keepaliveTime 存活的时间 超过核心线程数的线程存活时间

(4)unit 单位

(5)blockingQueue 阻塞队列 放置任务使用

(6)threadFactory 线程工厂

(7)拒绝策略:abort策略表示拒绝任务,但是会抛异常,discard策略表示拒绝任务,不会抛异常。CallerRunPolicy策略表示会使用调用者去执行任务。DiscardoldPolicy策略表示抛弃老的任务,将新的任务添加进队列

上述几种参数决定了线程池要对外暴露什么样的接口,精选1-5必要参数手动写一个线程池。

2、代码逻辑

我这边编写的步骤是:

1、先将参数定义好,构造方法定义出来,除了上述必要的1-5的参数外,还需要一个消费者工作集合。为什么要保存成集合呢?主要用于在取消的时候遍历其集合,然后进行中断。

2、编写submit方法逻辑,首先参数肯定是Runable任务,判断逻辑:

(1)当运行的工作线程小于核心线程,则直接启动一个工作线程然后加入线程集合中。

(2)当1不成立后,那就需要加入阻塞队列等待了,如果阻塞队列是数组类型阻塞队列,这时有范围限制,如果不超过,则加入队列。

(3)当阻塞队列超了,则查看当前线程个数是否小于最大线程个数。如果不超过,则启动一个工作线程消费其任务,并且加入其工作集合中

(4)如果超过最大线程数,实际上走拒绝策略逻辑,这里我们就直接报错。

3、上述工作线程内部怎么消费呢?因为是阻塞队列,自然就是循环队列中取任务然后执行。而这里的取任务,分成了poll和take。这是为什么?从上面步骤可知核心线程数超过之后也会存在启动工作线程的情况,那这些线程有一个保活时间,时间一到,只要此线程空闲,线程则会跳出循环结束执行。而这里阻塞队列的poll可以实现上述逻辑。take则在没有任务时就阻塞。

下面是按照其上述逻辑的代码,大家可以参考。

package org.example.Thread1;

import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class SelfThreadPoolExecutor {


    //一直存活的核心线程数
    private int corePoolSize;
    //运行存在的最大的线程数
    private int maxPoolSize;


    private AtomicInteger status = new AtomicInteger();

    private AtomicInteger workCount = new AtomicInteger();

    private final static Integer RUNNING = 0;

    private final static Integer STOP = 1;

    private int keepAliveTime;

    private BlockingQueue<Runnable> blockingQueue;


    private HashSet<Worker> hashSet = new HashSet<>();

    private ReentrantLock mainLock = new ReentrantLock();


    public SelfThreadPoolExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, BlockingQueue<Runnable> blockingQueue) {
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.keepAliveTime = keepAliveTime;
        this.blockingQueue = blockingQueue;
    }
    public void submit(Runnable runnable) {
        if (status.get() == STOP) {
            throw new RuntimeException("不能添加新的任务了");
        }

        if (workCount.get() < corePoolSize && addWork(runnable, true)) {
            return;
        }
        if (blockingQueue.offer(runnable)) {
            return;
        }
        if (workCount.get() < maxPoolSize && addWork(runnable, false)) {
            return;
        }
        throw new RuntimeException("拒绝此任务");

    }
    public boolean addWork(Runnable runnable, boolean core) {
        if (status.get() == STOP) {
            return false;
        }
        while (true) {
            if (workCount.get() > (core ? corePoolSize : maxPoolSize)) {
                return false;
            }
            boolean inc = workCount.compareAndSet(workCount.get(), workCount.get()+1);
            if (!inc) {
                continue;
            }
            break;
        }
        mainLock.lock();
        try {
            Worker worker = new Worker(runnable);
            worker.thread.start();
            hashSet.add(worker);
        } finally {
            while (true) {
                boolean inc = workCount.compareAndSet(workCount.get(), workCount.get()-1);
                if (!inc) {
                    continue;
                }
                break;
            }
            mainLock.unlock();
        }
        return true;
    }
    public void shutdown() {
        mainLock.lock();
        try {
            while (true) {
                if (status.get() == STOP) {
                    break;
                }
                boolean stop = status.compareAndSet(status.get(), STOP);
                if (stop) {
                    break;
                }
            }

            for (Worker worker : hashSet) {
                if (!worker.thread.isInterrupted()) {
                    worker.thread.interrupt();
                }
            }
        }finally {
            mainLock.unlock();
        }
    }
    private final class Worker implements Runnable{
        Runnable firstTask;

        final Thread thread;
        public Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            thread = new Thread(this);
        }
        @Override
        public void run() {
            runWork(this);
        }

        public void runWork(Worker w) {
            Thread wt = w.thread;
            Runnable task = w.firstTask;
            w.firstTask = null;
            try {
                while (task != null || (task = getTask()) != null) {
                    try {
                        if (wt.isInterrupted()) {
                            break;
                        }
                        if (status.get() == STOP) {
                            break;
                        }
                        task.run();
                    } finally {
                        task = null;
                    }
                }
            } finally {
                mainLock.lock();
                try {
                    hashSet.remove(w);
                } finally {
                    mainLock.unlock();
                }
            }

        }
        public Runnable getTask() {
            boolean timedOut = false;
            while (true) {
                try {
                    if (status.get() == STOP) {
                        return null;
                    }
                    if (timedOut) {
                        return null;
                    }
                    Runnable task = null;
                    if (workCount.get() > corePoolSize) {
                        task = blockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    } else {
                        task = blockingQueue.take();
                    }
                    if (task != null) {
                        return task;
                    }
                    timedOut = true;
                } catch (InterruptedException e) {
                    timedOut = true;
                }

            }
        }
    }
}

 

 

相关推荐

  1. 一个线

    2024-03-15 14:02:02       44 阅读
  2. 一步一步线之十一线应用内存

    2024-03-15 14:02:02       33 阅读
  3. 面:go能不能一个简单的协

    2024-03-15 14:02:02       40 阅读
  4. 信号量&线&读者者模型

    2024-03-15 14:02:02       24 阅读
  5. 一步一步线之四简单线

    2024-03-15 14:02:02       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-15 14:02:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-15 14:02:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-15 14:02:02       87 阅读
  4. Python语言-面向对象

    2024-03-15 14:02:02       96 阅读

热门阅读

  1. selinux

    selinux

    2024-03-15 14:02:02      37 阅读
  2. vue和js常识

    2024-03-15 14:02:02       37 阅读
  3. LeetCode hot100-11

    2024-03-15 14:02:02       37 阅读
  4. C#学习汇总

    2024-03-15 14:02:02       44 阅读
  5. DP-力扣 120.三角形最小路径和

    2024-03-15 14:02:02       37 阅读
  6. C++由动态链接库dll生成lib文件

    2024-03-15 14:02:02       40 阅读
  7. Filebeat rpm方式安装及配置

    2024-03-15 14:02:02       45 阅读
  8. ESP32 域名解析得到IP地址

    2024-03-15 14:02:02       45 阅读