0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

FutureTask是如何通过阻塞来获取到异步线程执行结果的呢?

OSC开源社区 来源:OSCHINA 社区 2023-08-12 14:37 次阅读

1、FutureTask 对象介绍

Future 对象大家都不陌生,是 JDK1.5 提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。 在 Java 中想要通过线程执行一个任务,离不开 Runnable 与 Callable 这两个接口。 Runnable 与 Callable 的区别在于,Runnable 接口只有一个 run 方法,该方法用来执行逻辑,但是并没有返回值;而 Callable 的 call 方法,同样用来执行业务逻辑,但是是有一个返回值的。

Callable 执行任务过程中可以通过 FutureTask 获得任务的执行状态,并且可以在执行完成后通过 Future.get () 方式获取执行结果。 Future 是一个接口,而 FutureTask 就是 Future 的实现类。并且 FutureTask 实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个 FutureTask 并直接把它放到线程池执行,然后获取 FutureTask 的执行结果。

2、FutureTask 源码解析

2.1 主要方法和属性

那么 FutureTask 是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下 FutureTask 中的属性。

// FutureTask的状态及其常量
privatevolatileint state;
    privatestaticfinalint NEW          =0;
    privatestaticfinalint COMPLETING   =1;
    privatestaticfinalint NORMAL       =2;
    privatestaticfinalint EXCEPTIONAL  =3;
    privatestaticfinalint CANCELLED    =4;
    privatestaticfinalint INTERRUPTING =5;
    privatestaticfinalint INTERRUPTED  =6;
    
    // callable对象,执行完后置空
    privateCallable callable;
    // 要返回的结果或要引发的异常来自 get() 方法
    privateObject outcome;// non-volatile, protected by state reads/writes
    // 执行Callable的线程
    privatevolatileThread runner;
    // 等待线程的一个链表结构
    privatevolatileWaitNode waiters;

FutureTask 中几个比较重要的方法。

// 取消任务的执行
booleancancel(boolean mayInterruptIfRunning);
// 返回任务是否已经被取消
booleanisCancelled();
// 返回任务是否已经完成,任务状态不为NEW即为完成
booleanisDone();
// 通过get方法获取任务的执行结果
Vget()throwsInterruptedException,ExecutionException;
// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常
Vget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException;

2.2 FutureTask 执行

当我们在线程池中执行一个 Callable 方法时,其实是将 Callable 任务封装成一个 RunnableFuture 对象去执行,同时将这个 RunnableFuture 对象返回,这样我们就拿到了 FutureTask 的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。 以下为 ThreadPoolExecutor 线程池提交一个 callable 方法的源码。

public Future submit(Callable task){
        if(task ==null)thrownewNullPointerException();
        RunnableFuture ftask =newTaskFor(task);
        execute(ftask);
        return ftask;
    }

protected RunnableFuture newTaskFor(Callable callable){
        returnnewFutureTask(callable);
    }

2.3 run 方法介绍

RunnableFuture 其实也是一个可以执行的 runnable,我们看下他的 run 方法。其主要流程就是执行 call 方法,正常执行完毕后将 result 结果赋值到 outcome 属性上。

publicvoidrun(){
        if(state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null,Thread.currentThread()))
            return;
        try{
            // 将callable赋值到本地变量
            Callable c = callable;
            // 判断callable不为空并且FutureTask的状态必须为新创建
            if(c !=null&& state == NEW){
                V result;
                boolean ran;
                try{
                    // 执行call方法(用户自己实现的call逻辑),并获取到result结果
                    result = c.call();
                    ran =true;
                }catch(Throwable ex){
                    result =null;
                    ran =false;
                    // 如果执行过程出现异常,则将异常对象赋值到outcome上
                    setException(ex);
                }
                // 如果正常执行完毕,则将result赋值到outcome属性上
                if(ran)
                    set(result);
            }
        }finally{
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner =null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if(s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

以下逻辑为正常执行完成后赋值的逻辑。

// 如果任务没有被取消,将future执行完的返回值赋值给result结果
// FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态
// 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束
protectedvoidset(V v){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
            finishCompletion();
        }
    }

以下为执行异常时赋值逻辑,直接将 Throwable 对象赋值到 outcome 属性上。

protectedvoidsetException(Throwable t){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
            finishCompletion();
        }
    }

无论是正常执行还是异常执行,最终都会调用一个 finishCompletion 方法,用来做工作的收尾工作。

2.4 get 方法介绍

Future 的 get 方法有两个重载的方法,一个是 get () 获取结果,一个是 get (long, TimeUnit) 带有超时时间的获取结果,我们看下 FutureTask 中的这两个方法是如何实现的。

// 不带有超时时间,一直阻塞直到获取结果
publicVget()throwsInterruptedException,ExecutionException{
        int s = state;
        if(s <= COMPLETING)
            // 等待结果完成,带有超时的get方法也是调用的awaitDone方法
            s =awaitDone(false,0L);
        // 返回结果
        returnreport(s);
    }

// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常
publicVget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException{
        if(unit ==null)
            thrownewNullPointerException();
        int s = state;
        // 如果任务未中断,调用awaitDone方法等待任务结果
        if(s <= COMPLETING &&
            (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
            thrownewTimeoutException();
        // 返回结果
        returnreport(s);
    }

我们主要看下 awaitDone 方法的执行逻辑。此方法会通过 for 循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。

// timed:是否需要超时获取
// nanos:超时时间单位纳秒
privateintawaitDone(boolean timed,long nanos)
        throwsInterruptedException{
        finallong deadline = timed ?System.nanoTime()+ nanos :0L;
        WaitNode q =null;
        boolean queued =false;
        // 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因
        for(;;){
            if(Thread.interrupted()){
                removeWaiter(q);
                thrownewInterruptedException();
            }

            int s = state;
            // 任务状态大于COMPLETING,则表明任务结束,直接返回
            if(s > COMPLETING){
                if(q !=null)
                    q.thread =null;
                return s;
            }
            elseif(s == COMPLETING)// cannot time out yet
                // Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
                // COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL
                Thread.yield();
            elseif(q ==null)
                // 每调用一次get方法,都会创建一个WaitNode等待节点
                q =newWaitNode();
            elseif(!queued)
                // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW
            elseif(timed){
                nanos = deadline -System.nanoTime();
                if(nanos <=0L){
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

我们在看下 report 方法,在调用 get 方法时是如何返回结果的。

这里首先获取 outcome 的值,并判断任务是否已经执行完成,如果执行完成,则将 outcome 对象强转成泛型指定的类型;如果任务被取消了,则抛出一个 CancellationException 异常;如果都不是,则说明任务在执行过程中发生了异常,此时任务状态位 EXCEPTIONAL,此时的 outcome 即为 Throwable 对象,所以将 outcome 强转为 Throwable 并抛出异常。

由此可以知道,我们将一个 FutureTask 任务 submit 到线程池中执行的时候,如果发生了异常,是会在调用 get 方法的时候抛出的。
privateVreport(int s)throwsExecutionException{
        Object x = outcome;
        if(s == NORMAL)
            return(V)x;
        if(s >= CANCELLED)
            thrownewCancellationException();
        thrownewExecutionException((Throwable)x);
    }

2.5 cancel 方法介绍

cancel 方法用于取消正在运行的任务,如果任务取消成功,则返回 TRUE,如果取消失败则返回 FALSE。

// mayInterruptIfRunning:允许中断正在运行的任务
publicbooleancancel(boolean mayInterruptIfRunning){
        // mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED
        if(!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            returnfalse;
        // 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断
        try{// in case call to interrupt throws exception
            if(mayInterruptIfRunning){
                try{
                    Thread t = runner;
                    if(t !=null)
                        t.interrupt();
                }finally{// final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        }finally{
            // 取消后的收尾工作
            finishCompletion();
        }
        returntrue;
    }

2.6 isDone/isCancelled 方法介绍

isDone 方法用于判断 FutureTask 是否已经完成;isCancelled 方法用来判断 FutureTask 是否已经取消,这两个方法都是通过状态位来判断的。

publicbooleanisCancelled(){
        return state >= CANCELLED;
    }

    publicbooleanisDone(){
        return state != NEW;
    }

2.7 finishCompletion 方法介绍

我们看下 finishCompletion 方法都做了哪些工作。

// 删除所有等待线程并发出信号,最后执行done方法
privatevoidfinishCompletion(){
        // assert state > COMPLETING;
        for(WaitNode q;(q = waiters)!=null;){
            if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
                for(;;){
                    Thread t = q.thread;
                    if(t !=null){
                        q.thread =null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if(next ==null)
                        break;
                    q.next =null;// unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable =null;// to reduce footprint
    }

我们看到 done 方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}

3、总结

通过源码解读可以了解到 Future 的原理:

第一步:主线程将任务封装成一个 Callable 对象,通过 submit 方法提交到线程池去执行。

第二步:线程池执行任务的 run 方法,主线程则可以继续执行其他逻辑。

第三步:线程池中方法执行完成后将结果赋值到 outcome 属性上,并修改任务状态。

第四步:主线程在需要拿到异步任务结果的时候,主动调用 fugure.get () 方法来获取结果。

第五步:如果异步线程在执行过程中发生异常,则会在调用 future.get () 方法的时候抛出来。 以上就是对于 FutureTask 的分析,我们可以了解 FutureTask 任务执行的方式以及 Future.get 已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解 FutureTask 的任务执行状态以及状态的变化过程。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 状态机
    +关注

    关注

    2

    文章

    486

    浏览量

    27182
  • 线程池
    +关注

    关注

    0

    文章

    53

    浏览量

    6768
  • for循环
    +关注

    关注

    0

    文章

    61

    浏览量

    2420

原文标题:并发编程 - FutureTask 解析

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    鸿蒙原生应用开发-ArkTS语言基础类库多线程I/O密集型任务开发

    使用异步并发可以解决单次I/O任务阻塞的问题,但是如果遇到I/O密集型任务,同样会阻塞线程中其它任务的执行,这时需要使用多
    发表于 03-21 14:57

    Java线程阻塞方法大全

    如果线程是因为调用了wait()、sleep()或者join()方法而导致的阻塞,可以中断线程,并且通过抛出InterruptedException
    发表于 04-02 15:42

    Java的线程唤醒与阻塞规则

    如果线程是因为调用了wait()、sleep()或者join()方法而导致的阻塞,可以中断线程,并且通过抛出InterruptedException
    发表于 07-06 15:11

    同步与异步阻塞与非阻塞的区别是什么

    同步与异步阻塞与非阻塞的区别
    发表于 01-26 06:12

    labview怎么终止‘执行系统命令’并获取所有结果

    现在有一个应用要一边运动一边通过执行系统命令’获取一系列数据,获取数据的指令是一直以0.1秒的速率读取数据,我可以在运动开始的时候开启‘
    发表于 04-07 10:31

    获取不到互斥量,线程为何还能执行操作共享资源?

    );结果发现共享资源操作结果是正确的。注:rt_mutex_t ble_mutex = RT_NULL;/ 全局变量 /我的疑问:1、获取不到互斥量,线程不应该挂起吗,为何还能
    发表于 04-28 09:58

    如何使用多线程异步操作等并发设计方法最大化程序的性能

    很多朋友往往会使用线程执行耗时较长的I/O操作。这样在只有少数几个并发操作的时候还无伤大雅,如果需要处理大量的并发操作时就不合适了。  异步调用与多
    发表于 08-23 16:31

    A线程如何在线程本身识别变量是否改变

    阻塞获取可以解决但是这个B线程是别人代码写的。不好修改不想再增加一个线程去循环读取变量X是否改变,再释放信号量需求A线程如何在
    发表于 11-02 11:02

    异步调用子vi问题

    我试了异步调用子vi,现在的问题是子vi是一个循环,但是我在主程序获取子vi的结果时,只有子vi结束了才能获取且只能获取到循环最后一次的
    发表于 11-11 10:34

    是否有函数或者功能可以实现A线程阻塞变量的值

    阻塞获取可以解决但是这个B线程是别人代码写的。不好修改不想再增加一个线程去循环读取变量X是否改变,再释放信号量需求A线程如何在
    发表于 02-01 16:25

    详解同步异步阻塞阻塞

    同步、异步分别指的是一种通讯方式,当 cpu 不需要执行线程上下文切换就能完成任务,此时便认为这种通讯方式是同步的,相对的如果存在cpu 上下文切换,这种方式便是异步
    的头像 发表于 05-03 17:53 4653次阅读
    详解同步<b class='flag-5'>异步</b>和<b class='flag-5'>阻塞</b>非<b class='flag-5'>阻塞</b>

    使用匿名管道技术获取CMD命令的执行结果

    远程 CMD 是指恶意程序接收到控制端发送的 CMD 指令后,在本地执行 CMD 命令,并将执行结果回传至控制端。本文将演示使用匿名管道技术获取 CMD 命令的
    的头像 发表于 04-03 18:04 2361次阅读

    CompletableFuture异步线程是真的优雅

    虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果获取却是很不方便,我们必须使用Future.get()的方式阻塞
    的头像 发表于 08-07 15:40 357次阅读
    CompletableFuture<b class='flag-5'>异步</b>多<b class='flag-5'>线程</b>是真的优雅

    阻塞态可以直接到运行态吗

    的过渡。当一个进程或线程处于阻塞态时,实际上是在等待某种事件或资源的状态。只有在这些事件或资源可用并且满足执行条件时,进程或线程才能够从阻塞
    的头像 发表于 11-17 11:43 1040次阅读

    verilog同步和异步的区别 verilog阻塞赋值和非阻塞赋值的区别

    Verilog中同步和异步的区别,以及阻塞赋值和非阻塞赋值的区别。 一、Verilog中同步和异步的区别 同步传输和异步传输是指数据在电路中
    的头像 发表于 02-22 15:33 378次阅读