FutureTask源码阅读


本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习

本文基于corretto-17.0.9源码,参考本文时请打开相应的源码对照,否则你会不知道我在说什么

简介

FutureTask是Future的一个实现,提供了可取消的异步任务执行。FutureTask具体实现了RunnableTask这个接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {
   
  void run();
}

作为Future,为什么实现了Runnable而不是Callable呢,因为实现Runnable接口可以方便地将这个任务交给Executor执行,然后任务内部封装了一个Callable对象,并且在run方法中最终会调用Callable,并将结果保存在内部,当调用get的时候获取结果。

例子

Callable<String> task = ...; // 实际执行的任务
FutureTask<String> future = new FutureTask<>(task);
executor.execute(future); // 执行任务
future.get(); // 获取结果

代码分析

成员变量

FutureTask的核心是状态机,即内部将Waiter Thread抽象出各种状态,底层通过Thread和LockSupport检测线程状态并改变线程的实际状态(阻塞、中断等),而状态机的状态转换是通过CAS进行状态转换的。先来看看FutureTask为任务定义的几种状态

public class FutureTask<V> implements RunnableFuture<V> {
   
  // 表示任务的状态。状态初始化为NEW,所有可能的状态转换如下:
  // NEW -> COMPLETING -> NORMAL
  // NEW -> COMPLETING -> EXCEPTIONAL
  // NEW -> CANCELLED
  // NEW -> INTERRUPTING -> INTERRUPTED
  private volatile int state;
  
  // 刚创建FutureTask到任务执行中这段时间内都处于NEW状态
  private static final int NEW          = 0;
  // 任务结束(包括正常结束和异常)之前的过渡状态
  private static final int COMPLETING   = 1;
  // 任务正常结束后的状态
  private static final int NORMAL       = 2;
  // 任务抛出异常后的状态
  private static final int EXCEPTIONAL  = 3;
  // 任务被取消后的状态
  private static final int CANCELLED    = 4;
  // 状态中断前的过渡状态
  private static final int INTERRUPTING = 5;
  // 任务被中断后的状态
  private static final int INTERRUPTED  = 6;

一定要注意,这里的状态是任务的状态,而不是worker thread的状态,也不是waiter thread的状态

分析代码逻辑前,先明确一下这些状态的具体含义。

首先两个过渡状态COMPLETING、INTERRUPTING怎么理解呢?拿COMPLETING举例子:

// 任务运行结束后保存返回结果
protected void set(V v) {
   
  // 设置为COMPLETING,防止其它线程设置为其他状态
  if (STATE.compareAndSet(this, NEW, COMPLETING)) {
   
    // 保存结果
    outcome = v;
    // 设置为NORMAL
    STATE.setRelease(this, NORMAL);
    finishCompletion();
  }
}

这下知道了,“保存结果”属于临界区操作,并且从逻辑上来说只能在保存结果后,才能去设NORMAL,因此就只能先设置一个过渡状态COMPLETING(完成中),防止其他线程同时进入临界区。

至此,我们只需要关注NEW、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED这几个状态,因为COMPLETING只是用来保护NORMAL/EXCEPTIONAL的,同理INTERRUPTING保护INTERRUPTED,没有实际的作用。

其次,NORMAL和EXCEPTIONAL也可以归为一类,两者都表示任务结束,前者对应任务正常返回,后者对应任务抛出异常。

综上,我们最终只需要关注NEW、NORMAL/EXCEPTIONAL(视为一种状态)、CANCELLED、INTERRUPTED四种状态,其中NORMAL/EXCEPTIONAL、CANCELLED、INTERRUPTED这几个状态都属于最终状态(terminal),也就是状态一旦变成他们其中的任何一种,都不会再变成其他状态。

最后简单看看其他成员变量

// 实际需要运行的任务,通过构造函数传入
private Callable<V> callable;
// 保存执行结果或执行抛出的异常
private Object outcome;
// Worker Thread
private volatile Thread runner;
// 保存所有的Waiter Thread,相当于阻塞队列(实际上是Treiber stack)
private volatile WaitNode waiters;

最后一个waiters实际上是阻塞栈(称为Treiber stack,一种lock-free的栈),即FILO。之所以不用队列,我认为是因为这些阻塞者之间并没有竞争锁的关系,一旦任务完成或取消,他们一下子全部被释放,因此没必要将他们公平地释放,不差那几纳秒。其次,相比队列得要两个指针分别指向头和尾,栈只需要一个栈顶指针就能方便地操作,主打一个简单。waiters并不是重点,知道是用来干什么的就可以。

方法

构造函数就不看了,比较简单。

根据先有鸡后有蛋的原则,先分析任务的执行(run),再分析结果的获取(get)。先看看被Woker Thread真正用于执行任务的run方法:

public void run() {
   
  // 如果state不是NEW了,或者已经存在runner,说明任务正在/已经 被 完成/取消,直接返回
  if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
  try {
   
    Callable<V> c = callable;
    if (c != null && state == NEW) {
   
      V result;
      boolean ran;
      try {
   
        // 执行任务
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
   
        // 任务自身抛出异常,setException设置结果为异常
        result = null;
        ran = false;
        setException(ex);
      }
      // 任务正常返回,保存结果
      if (ran)
        set(result);
    }
  } finally {
   
    // 至此任务已经完成,做收尾工作
    runner = null;
    int s = state;
    if (s >= INTERRUPTING)
      handlePossibleCancellationInterrupt(s);
  }
}

流程主要就是,状态非NEW或者runner不为空的话就不用运行任务,因为它可能已经在运行或者已经运行结束。然后运行任务后通过set或者setException设置结果。最后还有一个非常细节的handlePossibleCancellationInterrupt方法,不过得放在文末说,因为还涉及到其他的方法,目前就先当他没有任何用途。

接下来再看get方法:

public V get() throws InterruptedException, ExecutionException {
   
  int s = state;
  // 如果任务还没有变成终态,则进行阻塞等待
  if (s <= COMPLETING)
    s = awaitDone(false, 0L);
  // 阻塞结束,返回结果
  return report(s);
}

get比较简单,看一下awaitDone是怎么阻塞等待的:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
   
  long startTime = 0L;    // Special value 0L means not yet parked
  FutureTask.WaitNode q = null;
  boolean queued = false;
  for ( ; ; ) {
   
    int s = state;
    // 分支1:如果任务已经是终态,则直接返回
    if (s > COMPLETING) {
   
      if (q != null)
        q.thread = null;
      return s;
    }
    // 分支2:如果任务正处于过渡态,则自旋等待任务进入终态
    else if (s == COMPLETING)
      Thread.yield();
    // 分支3:如果线程被中断,那么将其从等待栈删除,并抛中断异常
    else if (Thread.interrupted()) {
   
      removeWaiter(q);
      throw new InterruptedException();
    }
    // 分支4:如果没有被阻塞的其他线程
    else if (q == null) {
   
      // 如果不需要等待,直接返回
      if (timed && nanos <= 0L)
        return s;
      // 创建节点准备入栈
      q = new FutureTask.WaitNode();
    }
    // 分支5:如果还没入栈,将其入栈
    else if (!queued)
      queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    // 分支6:如果为有限等待,计算等待时长并且parkNanos
    else if (timed) {
   
      final long parkNanos;
      // 计算应该等待的时长
      if (startTime == 0L) {
   
        startTime = System.nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
      } else {
   
        long elapsed = System.nanoTime() - startTime;
        // 如果已经超时,则交给上层负责抛出超时异常
        if (elapsed >= nanos) {
   
          removeWaiter(q);
          return state;
        }
        parkNanos = nanos - elapsed;
      }
      // 上面nanoTime函数可能耗时长,此时任务状态可能已经变成了终态,进行二次检查
      if (state < COMPLETING)
        LockSupport.parkNanos(this, parkNanos);
    }
    // 分支7:如果为无限等待,直接park
    else
      LockSupport.park(this);
  }
}

awaitDone使用了一个for( ; ; )无限循环,里面有多个分支处理不同的情况,这是想干嘛。看过Doug Lea大神写过的其他一些代码就知道,比如AQS中的acuqire、enqueue函数等,for( ; ; ) { if else }这种写法其实也构成了一个FSM(有限状态机),只不过没有像FutureTask那样显式地抽象成几个状态常量,而是根据当前某些变量的值选择进入对应的分支,然后进入下一个状态或终态…注意要与FutureTask任务的状态区分开来,这里只是将awaitDone这块代码逻辑用状态机的方式理解而已。

7个分支分别对应7个状态,有的状态是过渡态(分支1,3,分支4中不需要等待的情况),有的是终态,具体的逻辑写在注释里。

看下removeWaiter如何实现:

private void removeWaiter(FutureTask.WaitNode node) {
   
  if (node != null) {
   
    node.thread = null;
    retry:
    for (;;) {
   
      // pred为q的前驱,q为被移除节点,s为q的后继
      for (FutureTask.WaitNode pred = null, q = waiters, s; q != null; q = s) {
   
        s = q.next;
        if (q.thread != null)
          pred = q;
        // 让pred.next指向s,即移除q
        else if (pred != null) {
   
          pred.next = s;
          if (pred.thread == null)
            continue retry;
        }
        else if (!WAITERS.compareAndSet(this, q, s))
          continue retry;
      }
      break;
    }
  }
}

removeWaiter其实不只移除node,还移除其他所有无效节点(thread==null的节点,因为不再与线程绑定,节点已经没用了)。注意最外层循环,这个循环是用来当发生race的时候重新遍历栈的,尽最大的努力去移除所有的无效节点。之所以是“尽最大努力”,是因为它只检测本次遍历的前驱节点或栈顶,如果是中间某个已经遍历过的节点成为了无效节点,只能下一次removeWaiter的时候才能检测出来。

removeWaiter主要还是针对单个节点的移除,如果任务已经结束,还需要一次性移除所有节点,也就是finishCompletion函数完成的功能:

private void finishCompletion() {
   
  for (FutureTask.WaitNode q; (q = waiters) != null;) {
   
		// 直接通过null out the waiters删除栈,保证只有一个线程在释放栈上的节点
    if (WAITERS.weakCompareAndSet(this, q, null)) {
   
      for (;;) {
   
        Thread t = q.thread;
        // 唤醒线程
        if (t != null) {
   
          q.thread = null;
          LockSupport.unpark(t);
        }
        // 遍历下一个节点
        FutureTask.WaitNode next = q.next;
        if (next == null)
          break;
        q.next = null;
        q = next;
      }
      break;
    }
  }

  done();

  callable = null;        // to reduce footprint
}

最外层的循环是为了避免有节点waiter意外入队,进行二次检查。

最后再回头看看之前介绍run方法时遗留的handlePossibleCancellationInterrupt

private void handlePossibleCancellationInterrupt(int s) {
   
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
  if (s == INTERRUPTING)
    while (state == INTERRUPTING)
      Thread.yield();
}

看起来他好像啥也没做,但其实是为了解决一个隐蔽的bug:在cancel(true)的内部,会尝试对runner.interrupt:

Thread t = runner;
if (t != null) // 第一行
  t.interrupt(); // 第二行

如果在第一行执行完后,发生线程调度,调度到工作线程,然后run方法刚好跑完,然后该线程在线程池的安排下去执行了其他的任务,此时线程调度回执行cancel(true)的线程,它继续运行第二行,诶,它本来要中断的是之前的那个任务,但interrupt是针对线程而不是针对任务进行中断的,此时就错误地中断了第二个任务,造成bug。

因此handlePossibleCancellationInterrupt的目的是让中断的发生在run还没退出前发生,这样就不会错误地中断该工作线程执行的下一个任务。

补充

注意到waiters的设置有时候是通过WAITERS.weakCompareAndSet,那么weakCompareAndSet与compareAndSet有什么区别呢?可以参考这篇回答,结合这个回答,单纯从使用的角度上来说,如果cas的重试的成本不高,那么可以考虑使用weakCompareAndSet,而如果java代码没有循环cas,比如只是简单的if(cas),或者重试成本比较高,比如removeWaiter中CAS失败后的重试意味着要重新遍历栈,那么可以考虑使用compareAndSet。总而言之是权衡了软件或硬件来实现功能的成本。

参考链接

「简书」【细谈Java并发】谈谈FutureTask

相关推荐

  1. FutureTask阅读

    2024-01-22 15:40:01       41 阅读
  2. **FutureTask应用&分析**(二)

    2024-01-22 15:40:01       33 阅读
  3. 阅读】evmⅡ

    2024-01-22 15:40:01       19 阅读
  4. kubelet阅读

    2024-01-22 15:40:01       20 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-22 15:40:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-22 15:40:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-22 15:40:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-22 15:40:01       20 阅读

热门阅读

  1. ES-同词义配置

    2024-01-22 15:40:01       31 阅读
  2. 蓝桥杯求解回文数(栈求解版本)

    2024-01-22 15:40:01       39 阅读
  3. 使用sklearn严格计算AUROC和AUPRC

    2024-01-22 15:40:01       35 阅读
  4. idea实用快捷键

    2024-01-22 15:40:01       34 阅读
  5. rnn相关

    rnn相关

    2024-01-22 15:40:01      36 阅读
  6. 事务复习1-理论基础

    2024-01-22 15:40:01       34 阅读
  7. 问题解决:django模型查询报错,找不到数据库表

    2024-01-22 15:40:01       37 阅读
  8. 集齐用 channel 把 Go 程序写崩的三种姿势~

    2024-01-22 15:40:01       25 阅读
  9. 【webrtc】跟webrtc学时间戳、序号类型转换

    2024-01-22 15:40:01       27 阅读
  10. lc142.环形链表Ⅱ

    2024-01-22 15:40:01       37 阅读