hi,你好!欢迎访问本站!登录
本站由网站地图腾讯云宝塔系统阿里云强势驱动
当前位置:首页 - 教程 - 杂谈 - 正文 君子好学,自强不息!

死磕 java线程系列之线程池深切剖析——一般使命实行流程

2019-11-18杂谈搜奇网31°c
A+ A-

(手机横屏看源码更轻易)

注:java源码剖析部份如无特别申明均基于 java8 版本。

注:线程池源码部份如无特别申明均指ThreadPoolExecutor类。

简介

前面我们一同进修了Java中线程池的体系结构、组织要领和生命周期,本章我们一同来进修线程池中平常使命究竟是怎样实行的。

发起进修本章前先去看看彤哥之前写的《死磕 java线程系列之自身动手写一个线程池》那两章,有助于明白本章的内容,且那里的代码比较短小,学起来相对轻易一些。

题目

(1)线程池中的平常使命是怎样实行的?

(2)使命又是在那里被实行的?

(3)线程池中有哪些主要的要领?

(4)怎样运用Debug形式一步一步调试线程池?

运用案例

我们建立一个线程池,它的中心数目为5,最大数目为10,余暇时刻为1秒,行列长度为5,谢绝战略打印一句话。

假如运用它运转20个使命,会是什么效果呢?

public class ThreadPoolTest01 {
    public static void main(String[] args) {
        // 新建一个线程池
        // 中心数目为5,最大数目为10,余暇时刻为1秒,行列长度为5,谢绝战略打印一句话
        ExecutorService threadPool = new ThreadPoolExecutor(5, 10,
                1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(currentThreadName() + ", discard task");
            }
        });

        // 提交20个使命,注重视察num
        for (int i = 0; i < 20; i++) {
            int num = i;
            threadPool.execute(()->{
                try {
                    System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }

    private static String currentThreadName() {
        return Thread.currentThread().getName();
    }
}

组织要领的7个参数我们就不细致诠释了,有兴致的可以看看《死磕 java线程系列之线程池深切剖析——组织要领》那章。

我们一同来看看一次运转的效果:

pool-1-thread-1, 0 running, 1572678434411
pool-1-thread-3, 2 running, 1572678434411
pool-1-thread-2, 1 running, 1572678434411
pool-1-thread-4, 3 running, 1572678434411
pool-1-thread-5, 4 running, 1572678434411
pool-1-thread-6, 10 running, 1572678434412
pool-1-thread-7, 11 running, 1572678434412
pool-1-thread-8, 12 running, 1572678434412
main, discard task
main, discard task
main, discard task
main, discard task
main, discard task
// 【本文由公从号“彤哥读源码”原创】
pool-1-thread-9, 13 running, 1572678434412
pool-1-thread-10, 14 running, 1572678434412
pool-1-thread-3, 5 running, 1572678436411
pool-1-thread-1, 6 running, 1572678436411
pool-1-thread-6, 7 running, 1572678436412
pool-1-thread-2, 8 running, 1572678436412
pool-1-thread-7, 9 running, 1572678436412

注重,视察num值的打印信息,先是打印了0~4,再打印了10~14,末了打印了5~9,居然不是按递次打印的,为何呢?

让我们一步一步debug进去检察。

execute()要领

execute()要领是线程池提交使命的要领之一,也是最中心的要领。

// 提交使命,使命并不是马上实行,所以翻译成实行使命好像不太适宜
public void execute(Runnable command) {
    // 使命不能为空
    if (command == null)
        throw new NullPointerException();
        
    // 掌握变量(高3位存储状况,低29位存储事情线程的数目)
    int c = ctl.get();
    // 1. 假如事情线程数目小于中心数目
    if (workerCountOf(c) < corePoolSize) {
        // 就增加一个事情线程(中心)
        if (addWorker(command, true))
            return;
        // 从新猎取下掌握变量
        c = ctl.get();
    }
    // 2. 假如到达了中心数目且线程池是运转状况,使命入行列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次搜检线程池状况,假如不是运转状况,就移除使命并实行谢绝战略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 容错搜检事情线程数目是不是为0,假如为0就建立一个
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 使命入行列失利,尝试建立非中心事情线程
    else if (!addWorker(command, false))
        // 非中心事情线程建立失利,实行谢绝战略
        reject(command);
}

关于线程池状况的内容,我们这里不拿出来细讲了,有兴致的可以看看《死磕 java线程系列之线程池深切剖析——生命周期》那章。

提交使命的历程大抵以下:

(1)事情线程数目小于中心数目,建立中心线程;

(2)到达中心数目,进入使命行列;

(3)使命行列满了,建立非中心线程;

(4)到达最大数目,实行谢绝战略;

实在,就是三道坎——中心数目、使命行列、最大数目,如许就比较好记了。

流程图大抵以下:

使命流转的历程我们知道了,然则使命是在那里实行的呢?继续往下看。

addWorker()要领

这个要领主要用来建立一个事情线程,并启动之,个中会做线程池状况、事情线程数目等种种检测。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 推断有无资历建立新的事情线程
    // 重如果一些状况/数目的搜检等等
    // 这段代码比较复杂,可以先跳过
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状况搜检
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 事情线程数目搜检
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 数目加1并跳出轮回
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    // 假如上面的前提满足,则会把事情线程数目加1,然后实行下面建立线程的行动

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 建立事情线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次搜检线程池的状况
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    
                    // 增加到事情线程行列
                    workers.add(w);
                    // 还在池子中的线程数目(只能在mainLock中运用)
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    
                    // 标记线程增加胜利
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 线程增加胜利以后启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失利,实行失利要领(线程数目减1,实行tryTerminate()要领等)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里实在还没到使命实行的处所,上面我们可以看到线程是包括在Worker这个类中的,那末,我们就跟踪到这个类中看看。

Worker内部类

Worker内部类可以看做是对事情线程的包装,平常地,我们说事情线程就是指Worker,但现实上是指其保护的Thread实例。

// Worker继续自AQS,自带锁的属性
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 真正事情的线程
    final Thread thread;
    // 第一个使命,从组织要领传进来
    Runnable firstTask;
    // 完成使命数
    volatile long completedTasks;

    // 组织要领// 【本文由公从号“彤哥读源码”原创】
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 运用线程工场生成一个线程
        // 注重,这里把Worker自身作为Runnable传给线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 完成Runnable的run()要领
    public void run() {
        // 挪用ThreadPoolExecutor的runWorker()要领
        runWorker(this);
    }
    
    // 省略锁的部份
}

这里要可以看出来事情线程Thread启动的时刻现实是挪用的Worker的run()要领,进而挪用的是ThreadPoolExecutor的runWorker()要领。

runWorker()要领

runWorker()要领是真正实行使命的处所。

final void runWorker(Worker w) {
    // 事情线程
    Thread wt = Thread.currentThread();
    // 使命
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 强迫开释锁(shutdown()内里有加锁)
    // 这里相当于疏忽那里的中断标记
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 取使命,假如有第一个使命,这里先实行第一个使命
        // 只需能取到使命,这就是个死轮回
        // 一般来讲getTask()返回的使命是不能够为空的,由于前面execute()要领是有空推断的
        // 那末,getTask()什么时刻才会返回空使命呢?
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 搜检线程池的状况
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            
            try {
                // 钩子要领,轻易子类在使命实行前做一些处置惩罚
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正使命实行的处所
                    task.run();
                    // 非常处置惩罚
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 钩子要领,轻易子类在使命实行后做一些处置惩罚
                    afterExecute(task, thrown);
                }
            } finally {
                // task置为空,从新从行列中取
                task = null;
                // 完成使命数加1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 到这里肯定是上面的while轮回退出了
        processWorkerExit(w, completedAbruptly);
    }
}

这个要领比较简单,疏忽状况检测和锁的内容,假如有第一个使命,就先实行之,以后再从使命行列中取使命来实行,猎取使命是经由过程getTask()来举行的。

getTask()

从行列中猎取使命的要领,内里包括了对线程池状况、余暇时刻等的掌握。

private Runnable getTask() {
    // 是不是超时
    boolean timedOut = false;
    
    // 死轮回
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状况是SHUTDOWN的时刻会把行列中的使命实行完直到行列为空
        // 线程池状况是STOP时马上退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        // 事情线程数目// 【本文由公从号“彤哥读源码”原创】
        int wc = workerCountOf(c);

        // 是不是许可超时,有两种状况:
        // 1. 是许可中心线程数超时,这类就是说一切的线程都能够超时
        // 2. 是事情线程数大于了中心数目,这类肯定是许可超时的
        // 注重,非中心线程是肯定许可超时的,这里的超时现实上是指取使命超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 超时推断(还包括一些容错推断)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 超时了,削减事情线程数目,并返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            // 削减事情线程数目失利,则重试
            continue;
        }

        try {
            // 真正取使命的处所
            // 默许状况下,只要当事情线程数目大于中心线程数目时,才会挪用poll()要领触发超时挪用
            
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            
            // 取到使命了就一般返回
            if (r != null)
                return r;
            // 没取到使命表明超时了,回到continue谁人if中返回null
            timedOut = true;
        } catch (InterruptedException retry) {
            // 捕捉到了中断非常
            // 中断标记是在挪用shutDown()或许shutDownNow()的时刻设置进去的
            // 此时,会回到for轮回的第一个if处推断状况是不是要返回null
            timedOut = false;
        }
    }
}

注重,这里取使命会依据事情线程的数目推断是运用BlockingQueue的poll(timeout, unit)要领照样take()要领。

poll(timeout, unit)要领会在超常常返回null,假如timeout<=0,行列为空时直接返回null。

take()要领会一向壅塞直到取到使命或抛出中断非常。

所以,假如keepAliveTime设置为0,当使命行列为空时,非中心线程取不出来使命,会马上完毕其生命周期。

默许状况下,是不许可中心线程超时的,然则可以经由过程下面这个要领设置使中心线程也可超时。

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}

至此,线程池中使命的实行流程就完毕了。

再看开篇题目

视察num值的打印信息,先是打印了0~4,再打印了10~14,末了打印了5~9,居然不是按递次打印的,为何呢?

线程池的参数:中心数目5个,最大数目10个,使命行列5个。

答:实行前5个使命实行时,恰好还不到中心数目,所以新建中心线程并实行了他们;

实行中心的5个使命时,已到达中心数目,所以他们先入行列;

实行背面5个使命时,已达中心数目且行列已满,所以新建非中心线程并实行了他们;

再实行末了5个使命时,线程池已到达满负荷状况,所以实行了谢绝战略。

总结

本章经由过程一个例子并连系线程池的主要要领我们一同剖析了线程池中平常使命实行的流程。

(1)execute(),提交使命的要领,依据中心数目、使命行列大小、最大数目,分红四种状况推断使命应该往哪去;

(2)addWorker(),增加事情线程的要领,经由过程Worker内部类封装一个Thread实例保护事情线程的实行;

(3)runWorker(),真正实行使命的处所,先实行第一个使命,再络绎不绝从使命行列中取使命来实行;

(4)getTask(),真正从行列取使命的处所,默许状况下,依据事情线程数目与中心数目的关联推断运用行列的poll()照样take()要领,keepAliveTime参数也是在这里运用的。

彩蛋

中心线程和非中心线程有什么区分?

答:现实上并没有什么区分,重如果依据corePoolSize来推断使命该去那里,二者在实行使命的历程当中并没有任何区分。有能够新建的时刻是中心线程,而keepAliveTime时刻到了完毕了的也多是刚开始建立的中心线程。

Worker继续自AQS有何意义?

前面我们看了Worker内部类的定义,它继续自AQS,天生自带锁的特征,那末,它的锁是用来干什么的呢?跟使命的实行有关联吗?

答:既然是跟锁(同步)有关,申明Worker类跨线程运用了,此时我们检察它的lock()要领发明只在runWorker()要领中运用了,然则其tryLock()倒是在interruptIdleWorkers()要领中运用的。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers()要领的意义是中断余暇线程的意义,它只会中断BlockingQueue的poll()或take()要领,而不会中断正在实行的使命。

平常来讲,interruptIdleWorkers()要领的挪用不是在本事情线程,而是在主线程中挪用的,还记得《死磕 java线程系列之线程池深切剖析——生命周期》中说过的shutdown()和shutdownNow()要领吗?

视察两个要领中中断线程的要领,shutdown()中就是挪用了interruptIdleWorkers()要领,这里tryLock()猎取到锁了再中断,假如没有猎取到锁则不中断,没猎取到锁只要一种状况,也就是lock()地点的处所,也就是有使命正在实行。

而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以挪用shutdownNow()能够会中断正在实行的使命。

所以,Worker继续自AQS现实是要运用其锁的才能,这个锁重如果用来掌握shutdown()时不要中断正在实行使命的线程。

迎接关注我的民众号“彤哥读源码”,检察更多源码系列文章, 与彤哥一同畅游源码的海洋。

  选择打赏方式
微信赞助

打赏

QQ钱包

打赏

支付宝赞助

打赏

  移步手机端
死磕 java线程系列之线程池深切剖析——一般使命实行流程

1、打开你手机的二维码扫描APP
2、扫描左则的二维码
3、点击扫描获得的网址
4、可以在手机端阅读此文章
未定义标签

本文来源:搜奇网

本文地址:https://www.sou7.cn/282166.html

关注我们:微信搜索“搜奇网”添加我为好友

版权声明: 本文仅代表作者个人观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。请记住本站网址https://www.sou7.cn/搜奇网。

发表评论

选填

必填

必填

选填

请拖动滑块解锁
>>