线程池以及场景题
1. 说一下线程池的核心参数(线程池的执行原理知道嘛)
线程池核心参数主要参考ThreadPoolExecutor这个类的7个参数的构造函数
name | desc |
---|---|
corePoolSize | 核心线程数目 |
maximumPoolSize | 最大线程数目 = (核心线程+救急线程的最大数目) |
keepAliveTime | 生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放 |
unit | 时间单位 - 救急线程的生存时间单位,如秒、毫秒等 |
workQueue | 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务 |
threadFactory | 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等 |
handler | 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略 |
工作流程:
任务在提交的时候,首先判断核心线程数是否已满,如果没有满则直接添加到工作线程执行
如果核心线程数满了,则判断阻塞队列是否已满,如果没有满,当前任务存入阻塞队列(出队列的时候也是按照)
如果阻塞队列也满了,则判断线程数是否小于最大线程数,如果没到最大线程数,则使用临时线程执行满了进不了队列的任务
- 在非核心线程失效之前如果阻塞队列里还有需要执行的任务,也会辅助核心线程执行任务;
如果到了最大线程数,即所有线程都在忙着,则走拒绝策略
拒绝策略:
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
2. 线程池中有哪些常见的阻塞队列
workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
比较常见的有 4 个,用的最多是 ArrayBlockingQueue 和 LinkedBlockingQueue
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
- DelayedWorkQueue :是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的(一般用于定时任务)
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
ArrayBlockingQueue的LinkedBlockingQueue区别
LinkedBlockingQueue | ArrayBlockingQueue |
---|---|
默认无界,支持有界 | 强制有界 |
底层是链表 | 底层是数组 |
是懒惰的,创建节点的时候添加数据 | 提前初始化 Node 数组 |
入队会生成新 Node | Node需要是提前创建好的 |
两把锁(头尾) | 一把锁 |
左边是 LinkedBlockingQueue 加锁的方式,右边是 ArrayBlockingQueue 加锁的方式
- LinkedBlockingQueue 读和写各有一把锁,性能相对较好
- ArrayBlockingQueue 只有一把锁,读和写公用,性能相对于LinkedBlockingQueue差一些
3. 如何确定核心线程数
在设置核心线程数之前,需要先熟悉一些执行线程池执行任务的类型
- IO密集型任务
一般来说:文件读写、DB读写、网络请求等
推荐:核心线程数大小设置为 2N+1 (N为计算机的CPU核数)
- CPU密集型任务
一般来说:计算型代码、Bitmap转换、Gson转换等
推荐:核心线程数大小设置为 N+1 (N为计算机的CPU核数)
Java代码查看CPU核数
private static final int SYSTEM_CORE_SIZE = Runtime.getRuntime().availableProcessors();
参考回答:
① 高并发、任务执行时间短 ==> CPU核数+1,减少线程上下文的切换
② 并发不高、任务执行时间长
IO密集型的任务 ==> CPU核数 * 2 + 1
计算密集型任务 ==> CPU核数+1
③ 并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计
- 看看这些业务里面某些数据是否能做缓存是第一步
- 增加服务器是第二步
- 至于线程池的设置,设置参考前面说的
4. 线程池的种类有哪些?
在 java.util.concurrent.Executors 类中提供了大量创建连接池的静态方法,常见就有四种
- 创建使用定线程数的线程池
核心线程数与最大线程数一样,没有救急线程
阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
适用场景:适用于任务量已知,相对耗时的任务
- 单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(FIFO)执行
核心线程数和最大线程数都是1
阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
适用场景:适用于按照顺序执行的任务
- 可缓存线程池
核心线程数为0
最大线程数是Integer.MAX_VALUE
阻塞队列为SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
适用场景:适合任务数比较密集,但每个任务执行时间较短的情况
- 提供了“延迟”和“周期执行”功能的ScheduledThreadPoolExecutor
适用场景:有定时和延迟执行的任务
5. 为什么不建议用 Executors 创建线程池
参考阿里开发手册《Java开发手册-嵩山版》
对于普通线程池,用 ThreadPoolExecutor 丰富的构造方法即可
(CPU型):
public class CPUThreadPool {
private static final AtomicLong THEAD_ID = new AtomicLong(1);
private static final int SYSTEM_CORE_SIZE = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = SYSTEM_CORE_SIZE + 1;
private static final int MAXIMUM_POOL_SIZE = SYSTEM_CORE_SIZE * 2;
private static final int KEEP_ALIVE_TIME = 2;
private static final TimeUnit KEEP_ALIVE_TIMEUNIT = TimeUnit.SECONDS;
private static final BlockingDeque<Runnable> BLOCKING_DEQUE = new LinkedBlockingDeque<>(MAXIMUM_POOL_SIZE);
private static final ThreadFactory THREAD_FACTORY = r -> new Thread(r, "System-Thread-CPU" + THEAD_ID.getAndIncrement());
private static final RejectedExecutionHandler REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
private static final ExecutorService THREAD_POOL;
private static final int AWAIT_TIME = 5;
private static final TimeUnit AWAIT_TIMEUNIT = TimeUnit.SECONDS;
static {
THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
KEEP_ALIVE_TIMEUNIT,
BLOCKING_DEQUE,
THREAD_FACTORY,
REJECTED_EXECUTION_HANDLER
);
}
public static void submit(Runnable... tasks) {
// 提交任务
for (int i = 0; i < tasks.length; i++) {
THREAD_POOL.submit(tasks[i]);
}
}
public static void submit(Runnable runnable) {
THREAD_POOL.submit(runnable);
}
public static void shutdown() {
THREAD_POOL.shutdown();
try {
if (!THREAD_POOL.awaitTermination(AWAIT_TIME, AWAIT_TIMEUNIT)) {
THREAD_POOL.shutdownNow();
}
} catch (InterruptedException e) {
THREAD_POOL.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
(IO 型):
public class IOThreadPool {
private static final AtomicLong THEAD_ID = new AtomicLong(1);
private static final int SYSTEM_CORE_SIZE = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = SYSTEM_CORE_SIZE * 2 + 1;
private static final int MAXIMUM_POOL_SIZE = SYSTEM_CORE_SIZE * 3;
private static final int KEEP_ALIVE_TIME = 3;
private static final TimeUnit KEEP_ALIVE_TIMEUNIT = TimeUnit.SECONDS;
private static final BlockingDeque<Runnable> BLOCKING_DEQUE = new LinkedBlockingDeque<>(MAXIMUM_POOL_SIZE);
private static final ThreadFactory THREAD_FACTORY = r -> new Thread(r, "System-Thread-IO" + THEAD_ID.getAndIncrement());
private static final RejectedExecutionHandler REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
private static final ExecutorService THREAD_POOL;
private static final int AWAIT_TIME = 5;
private static final TimeUnit AWAIT_TIMEUNIT = TimeUnit.SECONDS;
static {
THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
KEEP_ALIVE_TIMEUNIT,
BLOCKING_DEQUE,
THREAD_FACTORY,
REJECTED_EXECUTION_HANDLER
);
}
public static void submit(Runnable... tasks) {
// 提交任务
for (int i = 0; i < tasks.length; i++) {
THREAD_POOL.submit(tasks[i]);
}
}
public static void submit(Runnable runnable) {
THREAD_POOL.submit(runnable);
}
public static void shutdown() {
THREAD_POOL.shutdown();
try {
if (!THREAD_POOL.awaitTermination(AWAIT_TIME, AWAIT_TIMEUNIT)) {
THREAD_POOL.shutdownNow();
}
} catch (InterruptedException e) {
THREAD_POOL.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
定时线程池(用 ScheduledThreadPoolExecutor 构造):
public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
private final BlockingQueue<Runnable> customQueue;
public CustomScheduledExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
this.setMaximumPoolSize(maximumPoolSize);
this.setKeepAliveTime(keepAliveTime, unit);
this.customQueue = Objects.isNull(workQueue) ? super.getQueue() : workQueue;
}
@Override
public BlockingQueue<Runnable> getQueue() {
return customQueue;
}
}
@Slf4j
public class SchedulerThreadPool {
private static final AtomicLong THEAD_ID = new AtomicLong(1);
private static final int SYSTEM_CORE_SIZE = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = SYSTEM_CORE_SIZE * 2 + 1;
private static final int MAXIMUM_POOL_SIZE = SYSTEM_CORE_SIZE * 3;
private static final int KEEP_ALIVE_TIME = 3;
private static final TimeUnit KEEP_ALIVE_TIMEUNIT = TimeUnit.SECONDS;
private static final BlockingDeque<Runnable> BLOCKING_DEQUE = null; // null 代表使用默认的阻塞队列
private static final ThreadFactory THREAD_FACTORY = r -> new Thread(r, "System-Thread-Scheduler-IO" + THEAD_ID.getAndIncrement());
private static final RejectedExecutionHandler REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
private static final ScheduledExecutorService THREAD_POOL;
private static final int AWAIT_TIME = 5;
private static final TimeUnit AWAIT_TIMEUNIT = TimeUnit.SECONDS;
static {
THREAD_POOL = new CustomScheduledExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
KEEP_ALIVE_TIMEUNIT,
BLOCKING_DEQUE,
THREAD_FACTORY,
REJECTED_EXECUTION_HANDLER
);
}
// 添加普通定时任务
public static void schedule(Runnable task, long delay, TimeUnit unit) {
TimerUtil.log(delay, unit);
THREAD_POOL.schedule(task, delay, unit);
}
// 添加周期定时任务
public static void scheduleCircle(Runnable task, long initialDelay, long period, TimeUnit unit) {
TimerUtil.log(initialDelay, unit);
THREAD_POOL.scheduleAtFixedRate(() -> {
task.run();
TimerUtil.log(period, unit);
}, initialDelay, period, unit);
}
// 添加下个周期运行的定时任务
public static void scheduleCircle(Runnable task, long delay, TimeUnit unit) {
TimerUtil.log(delay, unit);
THREAD_POOL.scheduleAtFixedRate(() -> {
task.run();
TimerUtil.log(delay, unit);
}, delay, delay, unit);
}
/**
* 函数式接口: Runnable run、Consumer accept、Supplier get、Function apply
*/
// 添加下个周期运行的定时任务
public static void scheduleCircle(Consumer<Map<String, Object>> task, Map<String, Object> session, long delay, TimeUnit unit) {
task.accept(session);
schedule(() -> {
task.accept(session);
scheduleCircle(task, session, delay, unit);
}, delay, unit);
}
// 添加下个周期运行的定时任务
public static void scheduleCircle(Consumer<Map<String, Object>> task, Map<String, Object> session, long initialDelay, long period, TimeUnit unit) {
schedule(() -> {
scheduleCircle(task, session, period, unit);
}, initialDelay, unit);
}
// 添加下个周期运行的定时任务
public static <T> void scheduleCircle(Consumer<T> task, T object, long delay, TimeUnit unit) {
task.accept(object);
schedule(() -> {
task.accept(object);
scheduleCircle(task, object, delay, unit);
}, delay, unit);
}
// 添加下个周期运行的定时任务
public static <T> void scheduleCircle(Consumer<T> task, T object, long initialDelay, long period, TimeUnit unit) {
schedule(() -> {
scheduleCircle(task, object, period, unit);
}, initialDelay, unit);
}
// 添加下个周期运行的定时任务
public static void scheduleCircle(Supplier<Boolean> task, long delay, TimeUnit unit) {
if(Boolean.TRUE.equals(task.get())) {
schedule(() -> {
if(Boolean.TRUE.equals(task.get())) {
scheduleCircle(task, delay, unit);
}
}, delay, unit);
}
}
// 添加下个周期运行的定时任务
public static void scheduleCircle(Supplier<Boolean> task, long initialDelay, long period, TimeUnit unit) {
schedule(() -> {
scheduleCircle(task, period, unit);
}, initialDelay, unit);
}
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
scheduleCircle(() -> {
if(atomicInteger.get() < 5) {
atomicInteger.incrementAndGet();
return Boolean.TRUE;
}else {
return Boolean.FALSE;
}
}, 2, TimeUnit.SECONDS);
}
// 关闭线程池
public static void shutdown() {
THREAD_POOL.shutdown();
try {
if (!THREAD_POOL.awaitTermination(AWAIT_TIME, AWAIT_TIMEUNIT)) {
THREAD_POOL.shutdownNow();
}
} catch (InterruptedException e) {
THREAD_POOL.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void remove(Runnable task) {
((ScheduledThreadPoolExecutor) THREAD_POOL).remove(task);
}
}
6. 多线程使用场景问题
6.1 线程池使用场景CountDownLatch、Future(你们项目哪里用到了多线程)
CountDownLatch
(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
其中构造参数用来初始化等待计数值
await() 用来等待计数归零
countDown() 用来让计数减一
适合一些没有返回值的任务,如分批次新增、修改、删除;
案例一(ES 数据批量导入)
在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常),当时我就想到可以使用线程池的方式导入,利用CountDownLatch 来控制,就能避免一次性加载过多,防止内存溢出
整体流程就是通过CountDownLatch+线程池配合去执行,提高成功率,速率、防止系统崩溃;
而 Future 的话更适合一些有返回的任务,例如一个页面的数据量/统计量很大;
案例二(数据汇总)
在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
- 在实际开发的过程中,难免需要调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用线程池+future来提升性能
- 报表汇总
哪怕是普通线程也有妙用:
案例三(异步调用)
在进行搜索的时候,需要保存用户的搜索记录,而搜索记录不能影响用户的正常搜索,我们通常会开启一个线程去执行历史记录的保存,在新开启的线程在执行的过程中,可以利用线程提交任务
还有一个场景就是邮箱验证, 构造邮件并发送是需要一定时间的,可能会让用户感受到卡顿,我们检查没问题就可以返回成功,然后开一个异步线程去发邮件;
6.2 如何控制某个方法允许并发访问线程的数量?
Semaphore [ˈsɛməˌfɔr] 信号量,是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
Semaphore 两个重要的方法
lsemaphore.acquire(): 请求一个信号量,这时候的信号量个数 -1(一旦没有可使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)
lsemaphore.release():释放一个信号量,此时信号量个数 +1
线程任务类:
public class SemaphoreCase {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 获取许可
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}
}
7. 谈谈对 ThreadLocal 的理解
7.1 基本认识与基本使用
ThreadLocal
- 是多线程中对于解决线程安全的一个操作类,它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题
- ThreadLocal 同时也实现了线程内的资源共享,单个线程内资源共享不影响别的线程,类似于“会话”
案例:使用 JDBC 操作数据库时,会将每一个线程的 Connection 放入各自的 ThreadLocal 中,从而保证每个线程都在各自的 Connection 上进行数据库的操作,避免 A 线程关闭了 B 线程的连接。
基本使用
三个主要方法:
set(value) 设置值
get() 获取值
remove() 清除值
public class ThreadLocalTest {
static ThreadLocal<String> threadLocal = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
String name = Thread.currentThread().getName();
threadLocal.set("itcast");
print(name);
System.out.println(name + "-after remove : " + threadLocal.get());
}, "t1").start();
new Thread(() -> {
String name = Thread.currentThread().getName();
threadLocal.set("itheima");
print(name);
System.out.println(name + "-after remove : " + threadLocal.get());
}, "t2").start();
}
static void print(String str) {
//打印当前线程中本地内存中本地变量的值
System.out.println(str + " :" + threadLocal.get());
//清除本地内存中的本地变量
threadLocal.remove();
}
}
7.2 ThreadLocal的实现原理&源码解析
ThreadLocal本质来说就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现线程数据隔离
在 ThreadLocal 中有一个内部类叫做 ThreadLocalMap,类似于 HashMap
ThreadLocalMap 中有一个属性 table 数组,这个是真正存储数据的位置
set方法
get方法/remove方法
7.3 ThreadLocal 内存泄露问题
Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用
- 强引用:最为普通的引用方式,表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC并不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收
- 弱引用:表示一个对象处于可能有用且非必须的状态。在GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象。对于弱引用的回收,无关内存区域是否足够,一旦发现则会被回收
- 将实例交给 JDK 提供的 WeakReference 去引用;
每一个 Thread 维护一个 ThreadLocalMap,在T hreadLocalMap 中的 Entry 对象继承了 WeakReference。
其中 key 为使用弱引用的 ThreadLocal 实例,value 为线程变量的副本
在使用 ThreadLocal 的时候,强烈建议:务必手动remove