hi,你好!欢迎访问本站!登录
本站由网站地图腾讯云宝塔系统阿里云强势驱动
当前位置:首页 - 教程 - 杂谈 - 正文 君子好学,自强不息!

死磕 java线程系列之线程池深切剖析——将来使命实行流程

2019-11-18杂谈搜奇网30°c
A+ A-

(手机横屏看源码更轻易)

注:java源码剖析部份如无特别申明均基于 java8 版本。

注:线程池源码部份如无特别申明均指ThreadPoolExecutor类。

简介

前面我们一同进修了线程池中平常使命的实行流程,但实在线程池中另有一种使命,叫作将来使命(future task),应用它您可以猎取使命实行的效果,它是怎样完成的呢?

发起进修本章前先去看看彤哥之前写的《死磕 java线程系列之自身动手写一个线程池(续)》,有助于明白本章的内容,且那里的代码比较短小,学起来相对轻易一些。

题目

(1)线程池中的将来使命是怎样实行的?

(2)我们能学到哪些比较好的设想形式?

(3)对我们将来进修别的框架有什么协助?

来个栗子

我们照样从一个例子入手,来解说来章的内容。

我们定义一个线程池,并应用它提交5个使命,这5个使命离别返回0、1、2、3、4,在将来的某一时刻,我们再取用它们的返回值,做一个累加操纵。

public class ThreadPoolTest02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 新建一个牢固5个线程的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        List<Future<Integer>> futureList = new ArrayList<>();
        // 提交5个使命,离别返回0、1、2、3、4
        for (int i = 0; i < 5; i++) {
            int num = i;

            // 使命实行的效果用Future包装
            Future<Integer> future = threadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("return: " + num);
                // 返回值
                return num;
            });

            // 把future添加到list中
            futureList.add(future);
        }

        // 使命悉数提交完再从future中get返回值,并做累加
        int sum = 0;
        for (Future<Integer> future : futureList) {
            sum += future.get();
        }

        System.out.println("sum=" + sum);
    }
}

这里我们思索两个题目:

(1)假如这里应用平常使命,要怎样写,时刻也许是多少?

假如应用平常使命,那末就要把累加操纵放到使命内里,而且并非那末好写(final的题目),总时刻也许是1秒多一点。然则,如许有一个瑕玷,就是累加操纵跟使命自身的内容耦合到一同了,背面假如改成累乘,还要修正使命的内容。

(2)假如这里把future.get()放到for轮回内里,时刻也许是多少?

这个题目我们先不回覆,先来看源码剖析。

submit()要领

submit要领,它是提交有返回值使命的一种体式格局,内部应用将来使命(FutureTask)包装,再交给execute()去实行,末了返回将来使命自身。

public <T> Future<T> submit(Callable<T> task) {
    // 非空检测
    if (task == null) throw new NullPointerException();
    // 包装成FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 交给execute()要领去实行
    execute(ftask);
    // 返回futureTask
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 将平常使命包装成FutureTask
    return new FutureTask<T>(callable);
}

这里的设想很奇妙,实际上这两个要领都是在AbstractExecutorService这个抽象类中完成的,这是模板要领的一种应用。

我们来看看FutureTask的继续系统:

FutureTask完成了RunnableFuture接口,而RunnableFuture接口组合了Runnable接口和Future接口的才能,而Future接口供应了get使命返回值的才能。

题目:submit()要领返回的为何是Future接口而不是RunnableFuture接口或许FutureTask类呢?

答:这是由于submit()返回的效果,对外部挪用者只想暴露其get()的才能(Future接口),而不想暴露其run()的才能(Runaable接口)。

FutureTask类的run()要领

经过上一章的进修,我们晓得execute()要领末了挪用的是task的run()要领,上面我们传进去的使命,末了被包装成了FutureTask,也就是说execute()要领末了会挪用到FutureTask的run()要领,所以我们直接看这个要领就可以了。

public void run() {
    // 状况不为NEW,或许修正为当前线程来运转这个使命失利,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    
    try {
        // 真正的使命
        Callable<V> c = callable;
        // state必需为NEW时才运转
        if (c != null && state == NEW) {
            // 运转的效果
            V result;
            boolean ran;
            try {
                // 使命实行的处所【本文由公从号“彤哥读源码”原创】
                result = c.call();
                // 已实行终了
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 处置惩罚异常
                setException(ex);
            }
            if (ran)
                // 处置惩罚效果
                set(result);
        }
    } finally {
        // 置空runner
        runner = null;
        // 处置惩罚中断
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看到代码也比较简单,先做状况的检测,再实行使命,末了处置惩罚效果或异常。

实行使命这里没啥题目,让我们看看处置惩罚效果或异常的代码。

protected void setException(Throwable t) {
    // 将状况从NEW置为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置为传进来的异常(outcome为挪用get()要领时返回的)
        outcome = t;
        // 终究的状况设置为EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 挪用完成要领
        finishCompletion();
    }
}
protected void set(V v) {
    // 将状况从NEW置为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置为传进来的效果(outcome为挪用get()要领时返回的)
        outcome = v;
        // 终究的状况设置为NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 挪用完成要领
        finishCompletion();
    }
}

咋一看,这两个要领好像差不多,差别的是出去的效果不一样且状况不一样,末了都挪用了finishCompletion()要领。

private void finishCompletion() {
    // 假如行列不为空(这个行列实际上为挪用者线程)
    for (WaitNode q; (q = waiters) != null;) {
        // 置空行列
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 挪用者线程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 假如挪用者线程不为空,则叫醒它
                    // 【本文由公从号“彤哥读源码”原创】
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 钩子要领,子类重写
    done();
    // 置空使命
    callable = null;        // to reduce footprint
}

全部run()要领总结下来:

(1)FutureTask有一个状况state掌握使命的运转历程,平常运转完毕state从NEW->COMPLETING->NORMAL,异常运转完毕state从NEW->COMPLETING->EXCEPTIONAL;

(2)FutureTask保留了运转使命的线程runner,它是线程池中的某个线程;

(3)挪用者线程是保留在waiters行列中的,它是什么时刻设置进去的呢?

(4)使命实行终了,除了设置状况state变化之外,还要叫醒挪用者线程。

挪用者线程是什么时刻保留在FutureTask中(waiters)的呢?检察组织要领:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

发明并没有相干信息,我们再试想一下,假如挪用者不挪用get()要领,那末这类将来使命是否是跟平常使命没有什么区别?确切是的哈,所以只要挪用get()要领了才有必要保留挪用者线程到FutureTask中。

所以,我们来看看get()要领中是什么鬼。

FutureTask类的get()要领

get()要领挪用时假如使命未实行终了,会壅塞直到使命完毕。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 假如状况小于即是COMPLETING,则进入行列守候
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 返回效果(异常)
    return report(s);
}

是否是很清晰,假如使命状况小于即是COMPLETING,则进入行列守候。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 我们这里假定不带超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 处置惩罚中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 4. 假如状况大于COMPLETING了,则跳出轮回并返回
        // 这是自旋的出口
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 假如状况即是COMPLETING,申明使命快完成了,就差设置状况到NORMAL或EXCEPTIONAL和设置效果了
        // 这时刻就让出CPU,优先完成使命
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 1. 假如行列为空
        else if (q == null)
            // 初始化行列(WaitNode中记录了挪用者线程)
            q = new WaitNode();
        // 2. 未进入行列
        else if (!queued)
            // 尝试入队
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 超时处置惩罚
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 3. 壅塞当前线程(挪用者线程)
        else
            // 【本文由公从号“彤哥读源码”原创】
            LockSupport.park(this);
    }
}

这里我们假定挪用get()时使命还未实行,也就是其状况为NEW,我们试着按上面标示的1、2、3、4走一遍逻辑:

(1)第一次轮回,状况为NEW,直接到1处,初始化行列并把挪用者线程封装在WaitNode中;

(2)第二次轮回,状况为NEW,行列不为空,到2处,让包括挪用者线程的WaitNode入队;

(3)第三次轮回,状况为NEW,行列不为空,且已入队,到3处,壅塞挪用者线程;

(4)假定过了一会使命实行终了了,依据run()要领的剖析末了会unpark挪用者线程,也就是3处会被叫醒;

(5)第四次轮回,状况一定大于COMPLETING了,退出轮回并返回;

题目:为何要在for轮回中掌握全部流程呢,把这里的每一步零丁拿出来写行不行?

答:由于每一次行动都须要从新搜检状况state有无变化,假如拿出去写也是可以的,只是代码会异常冗杂。这里只剖析了get()时状况为NEW,别的的状况也可以自行考证,都是可以保证准确的,以至两个线程交织运转(断点的技能)。

OK,这里返回以后,再看看是怎样处置惩罚终究的效果的。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 使命平常完毕
    if (s == NORMAL)
        return (V)x;
    // 被作废了
    if (s >= CANCELLED)
        throw new CancellationException();
    // 实行异常
    throw new ExecutionException((Throwable)x);
}

还记得前面剖析run的时刻吗,使命实行异常时是把异常放在outcome内里的,这里就用到了。

(1)假如平常实行完毕,则返回使命的返回值;

(2)假如异常完毕,则包装成ExecutionException异常抛出;

经由过程这类体式格局,线程中涌现的异常也可以返回给挪用者线程了,不会像实行平常使命那样挪用者是不晓得使命实行究竟有无胜利的。

别的

FutureTask除了可以猎取使命的返回值之外,还可以作废使命的实行。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

这里作废使命是经由过程中断实行线程来处置惩罚的,有兴致的同砚可以自身剖析一下。

回覆开篇

假如这里把future.get()放到for轮回内里,时刻也许是多少?

答:也许会是5秒多一点,由于每提交一个使命,都要壅塞挪用者线程直到使命实行终了,每一个使命实行都是1秒多,所以总时刻就是5秒多点。

总结

(1)将来使命是经由过程把平常使命包装成FutureTask来完成的。

(2)经由过程FutureTask不仅可以猎取使命实行的效果,另有感知到使命实行的异常,以至还可以作废使命;

(3)AbstractExecutorService中定义了许多模板要领,这是一种很主要的设想形式;

(4)FutureTask实在就是典范的异常挪用的完成体式格局,背面我们进修到Netty、Dubbo的时刻还会见到这类设想头脑的。

彩蛋

RPC框架中异步挪用是怎样完成的?

答:RPC框架经常使用的挪用体式格局有同步挪用、异步挪用,实在它们本质上都是异步挪用,它们就是用FutureTask的体式格局来完成的。

平常地,经由过程一个线程(我们叫作长途线程)去挪用长途接口,假如是同步挪用,则直接让挪用者线程壅塞着守候长途线程挪用的效果,待效果返回了再返回;假如是异步挪用,则先返回一个将来可以猎取到长途效果的东西FutureXxx,固然,假如这个FutureXxx在长途效果返回之前挪用了get()要领一样会壅塞着挪用者线程。

有兴致的同砚可以先去预习一下dubbo的异步挪用(它是把Future扔到RpcContext中的)。

迎接关注我的民众号“彤哥读源码”,检察更多源码系列文章, 与彤哥一同畅游源码的海洋。

  选择打赏方式
微信赞助

打赏

QQ钱包

打赏

支付宝赞助

打赏

  移步手机端
死磕 java线程系列之线程池深切剖析——将来使命实行流程

1、打开你手机的二维码扫描APP
2、扫描左则的二维码
3、点击扫描获得的网址
4、可以在手机端阅读此文章
未定义标签

本文来源:搜奇网

本文地址:https://www.sou7.cn/282280.html

关注我们:微信搜索“搜奇网”添加我为好友

版权声明: 本文仅代表作者个人观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。请记住本站网址https://www.sou7.cn/搜奇网。

发表评论

选填

必填

必填

选填

请拖动滑块解锁
>>