线程池吞掉异常的case:源码阅读与解决方法

1. 问题背景

有一天给同事CR,看到一段这样的代码

try {
    for (param : params) {
        //并发处理,func无返回值
        ThreadPool.submit(func(param));
    }
} catch (Exception e) {
    log.info("func抛异常啦,参数是:{}", param)
}

我:你这段代码是利用并发降低RT对吧,如果func内部抛异常,你确定可以catch到吗

同事:可以啊, 为什么不可以(...

我:不如你run一把,在func mock一个异常出来试试

同事:我靠还真是

我:你可以用execute,改动比较小

同事:那么是为什么呢

2. 同事的例子

import java.util.concurrent.*;

public class ThreadPoolTest {

        public static void main(String[] args) throws Exception {
            ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
            testExecute(executorService);
            Thread.sleep(2000);
            testSubmit1(executorService);
            Thread.sleep(2000);
            testSubmit2(executorService);
        }

        private static void testExecute(ExecutorService executorService) {
            executorService.execute(() -> {
                System.out.println("执行线程池execute方法");
                throw new RuntimeException("execute方法抛出异常");
            });
        }

        private static void testSubmit1(ExecutorService executorService) {
            executorService.submit(() -> {
                System.out.println("执行线程池submit方法1");
                throw new RuntimeException("submit方法1抛出异常");
            });
        }

        private static void testSubmit2(ExecutorService executorService) throws Exception {
            Future<Object> feature = executorService.submit(() -> {
                System.out.println("执行线程池submit方法2");
                throw new RuntimeException("submit方法2抛出异常");
            });
            feature.get();
        }
}

执行结果:

执行线程池execute方法
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: execute方法抛出异常
	at ThreadPoolTest.lambda$testExecute$0(ThreadPoolTest.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
执行线程池submit方法1
执行线程池submit方法2
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit方法2抛出异常
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at ThreadPoolTest.testSubmit2(ThreadPoolTest.java:39)
	at ThreadPoolTest.main(ThreadPoolTest.java:17)
Caused by: java.lang.RuntimeException: submit方法2抛出异常
	at ThreadPoolTest.lambda$testSubmit2$2(ThreadPoolTest.java:37)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

3. 原理分析

3.1 线程池包的继承结构

3.2 submit和execute方法的差异

3.2.1 execute

方法定义在最顶层的Executor接口,并且Executor接口有且仅有这一个方法

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

方法实现在ThreadPoolExecutor:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

实际执行的过程,在worker(是runnable的实现类)的run方法,run方法实际执行的是runWorker方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到执行过程中,如果task.run();发生异常,没有catch处理,异常会层层向外抛出;最终进入finally块,执行processWorkerExit;

3.2.2 submit

submit方法定义在ExecutorService

public interface ExecutorService extends Executor {

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     *
     * <p>
     * If you would like to immediately block waiting
     * for a task, you can use constructions of the form
     * {@code result = exec.submit(aCallable).get();}
     *
     * <p>Note: The {@link Executors} class includes a set of methods
     * that can convert some other common closure-like objects,
     * for example, {@link java.security.PrivilegedAction} to
     * {@link Callable} form so they can be submitted.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     *
     * @param task the task to submit
     * @param result the result to return
     * @param <T> the type of the result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     *
     * @param task the task to submit
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    Future<?> submit(Runnable task);
}

实现在AbstractExecutorService

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

可以看到这里创建了RunnableFuture(而不是基础的worker),顾名思义,RunnableFuture同时实现了Runnable和Future接口,也就意味着可以对该任务执行get操作,看看RunnableFuture的run方法:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

catch块对方法异常做了处理,与执行结果一同在Future中暂存起来;submit()执行完毕后返回Future对象,执行future.get()会触发异常的抛出;

当然了,如果你只是执行了submit,没有获取future,异常就会“神奇地”消失。

参考:

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队

https://zhuanlan.zhihu.com/p/651997713

相关推荐

  1. 线,我异常呢?

    2024-06-17 23:36:02       44 阅读
  2. Nginx线剖析

    2024-06-17 23:36:02       36 阅读
  3. C# 线线使用方法、注意事项

    2024-06-17 23:36:02       53 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-17 23:36:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-17 23:36:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-17 23:36:02       87 阅读
  4. Python语言-面向对象

    2024-06-17 23:36:02       96 阅读

热门阅读

  1. 内网穿透的原理:实现远程访问的技术揭秘

    2024-06-17 23:36:02       31 阅读
  2. 佐助题库1228答案

    2024-06-17 23:36:02       31 阅读
  3. Spring Boot 面试热点(二)

    2024-06-17 23:36:02       34 阅读
  4. SQLite 日期 & 时间

    2024-06-17 23:36:02       29 阅读
  5. Linux安装docker

    2024-06-17 23:36:02       28 阅读
  6. xss-lab靶场的level15-level20

    2024-06-17 23:36:02       25 阅读
  7. 知识库的创建(1) - KnowledgeFile文件加载和分割

    2024-06-17 23:36:02       27 阅读
  8. Flink集群架构

    2024-06-17 23:36:02       20 阅读
  9. PCA 在图像分析上的应用

    2024-06-17 23:36:02       28 阅读
  10. 第二章 - 第1节- 逻辑运算 -课后习题

    2024-06-17 23:36:02       32 阅读