什么是线程池,这个池字是什么
线程池,主要利用池化思想,线程池,字符串常量池等
为什么要有一个线程池?
正常线程的创建:1,手动创建一个线程
2.给该线程分配任务,线程执行任务
3.执行完成,手动释放线程
有了线程池后,默认线程数为3,最大线程数为5,等待队列为6,在正常状态下,我们只有三个默认线程去处理任务,流程是这样的:
1.等待队列按照顺序分出任务给空闲线程,直到任务为空或者无空闲线程
2.线程执行完任务后不会销毁,而是继续执行任务,如果此时没有任务,那么线程进入空闲状态,不会被销毁
如果任务变多,等待队列满呢
此时只有三个线程,但是任务队列已经满了,证明在这个状态下只有三个线程工作是不够的,所以我们开启最大线程数,全部开始处理任务,如果此时仍然想有任务进入队列,拒绝。
在将任务处理完毕后,队列处于空闲或者任务很少的状态,销毁线程,使当前的线程数为默认线程数
优点:
提高线程利用率
提高响应速度(省去创建线程的时间了)
控制并发数(控制最大线程数量)
便于统一管理(都位于线程池中)
代码实现:
package org.my;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* 创建了num个线程
* 每个线程做woker这个任务
* 而woker这个任务是不断取jobs中的一个任务进行执行
* 直到jobs为空,然后进入等待状态
* 如果jobs又重新exe添加了新job
* jobs唤醒worker可以开始运行
* 线程就可以继续开始工作。
*/
public class ThreadPoolLearn313<Job extends Runnable> implements ThreadPool<Job> {
//最大线程数量
private static final int MAX_WORKER_NUMBERS = 10;
//默认线程数
private static final int DEFAULT_WORKER_NUMBERS = 5;
//最小线程数
private static final int MIN_WORKER_NUMBERS = 1;
//任务队列
private final LinkedList<Job> jobs =new LinkedList<>();
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
//AtomicLong可理解为加了synchronized的long
private AtomicLong threadNum = new AtomicLong();
private int workerNum = DEFAULT_WORKER_NUMBERS;
public ThreadPoolLearn313() {
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
thread.start();
}
}
public ThreadPoolLearn313(int num) {
this.workerNum = num>MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
initializeWorkers(workerNum);
}
@Override
public void execute(Job job) {
if (job!=null){
synchronized (jobs){
jobs.addLast(job);
jobs.notify();
}
}
}
//关闭线程池
@Override
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void addWorks(int num) {
synchronized (jobs){
if(num+workerNum>MAX_WORKER_NUMBERS){
num =MAX_WORKER_NUMBERS-workerNum;
}
initializeWorkers(num);
workerNum+=num;
}
}
@Override
public void removeWorks(int num) {
synchronized (jobs){
if(num>=workerNum){
throw new IllegalArgumentException("beyond workNum");
}
int count=0;
while (count<num){
Worker worker = workers.get(count);
if(workers.remove(worker)){
worker.shutdown();
count++;
}
}
workerNum-=count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
class Worker implements Runnable{
private volatile boolean running = true;
@Override
public void run() {
while (running){
Job job =null;
synchronized (jobs){
while (jobs.isEmpty()){
try {
jobs.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
job = jobs.removeFirst();
}
if(job!=null){
job.run();
}
}
}
public void shutdown(){
running = false;
}
}
}