hi,你好!欢迎访问本站!登录
本站由简数采集腾讯云宝塔系统阿里云强势驱动
当前位置:首页 - 教程 - 杂谈 - 正文 君子好学,自强不息!

死磕 java线程系列之线程池深切剖析——定时使命实行流程

2019-11-18杂谈搜奇网25°c
A+ A-

(手机横屏看源码更轻易)

注:java源码剖析部份如无特别申明均基于 java8 版本。

注:本文基于ScheduledThreadPoolExecutor定时线程池类。

简介

前面我们一同进修了一般使命、将来使命的实行流程,本日我们再来进修一种新的使命——定时使命。

定时使命是我们常常会用到的一种使命,它示意在将来某个时刻实行,或许将来根据某种划定规矩反复实行的使命。

题目

(1)怎样保证使命是在将来某个时刻才被实行?

(2)怎样保证使命根据某种划定规矩反复实行?

来个栗子

建立一个定时线程池,用它来跑四种差别的定时使命。

public class ThreadPoolTest03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 建立一个定时线程池
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);

        System.out.println("start: " + System.currentTimeMillis());

        // 实行一个无返回值使命,5秒后实行,只实行一次
        scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("spring: " + System.currentTimeMillis());
        }, 5, TimeUnit.SECONDS);

        // 实行一个有返回值使命,5秒后实行,只实行一次
        ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("inner summer: " + System.currentTimeMillis());
            return "outer summer: ";
        }, 5, TimeUnit.SECONDS);
        // 猎取返回值
        System.out.println(future.get() + System.currentTimeMillis());

        // 按牢固频次实行一个使命,每2秒实行一次,1秒后实行
        // 使命开始时的2秒后
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            System.out.println("autumn: " + System.currentTimeMillis());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        }, 1, 2, TimeUnit.SECONDS);

        // 按牢固延时实行一个使命,每延时2秒实行一次,1秒实行
        // 使命终了时的2秒后,本文由公从号“彤哥读源码”原创
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            System.out.println("winter: " + System.currentTimeMillis());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        }, 1, 2, TimeUnit.SECONDS);
    }
}

定时使命团体分为四种:

(1)将来实行一次的使命,无返回值;

(2)将来实行一次的使命,有返回值;

(3)将来按牢固频次反复实行的使命;

(4)将来按牢固延时反复实行的使命;

本文重要以第三种为例举行源码剖析。

scheduleAtFixedRate()要领

提交一个按牢固频次实行的使命。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    // 参数推断
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
        
    // 将一般使命装潢成ScheduledFutureTask
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    // 钩子要领,给子类用来替代装潢task,这里以为t==sft
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 延时实行
    delayedExecute(t);
    return t;
}

能够看到,这里的处置惩罚跟将来使命相似,都是装潢成另一个使命,再拿去实行,差别的是这里交给了delayedExecute()要领去实行,这个要领是干吗的呢?

delayedExecute()要领

延时实行。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 假如线程池封闭了,实行谢绝战略
    if (isShutdown())
        reject(task);
    else {
        // 先把使命扔到行列中去
        super.getQueue().add(task);
        // 再次搜检线程池状况
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保证有充足有线程实行使命
            ensurePrestart();
    }
}
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 建立事情线程
    // 注重,这里没有传入firstTask参数,由于上面先把使命扔到行列中去了
    // 别的,没用上maxPoolSize参数,所以最大线程数目在定时线程池中现实是没有用的
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

到这里就终了了?!

现实上,这里只是掌握使命能不能被实行,真正实行使命的处所在使命的run()要领中。

还记得上面的使命被装潢成了ScheduledFutureTask类的实例吗?所以,我们只要看ScheduledFutureTask的run()要领就能够了。

ScheduledFutureTask类的run()要领

定时使命实行的处所。

public void run() {
    // 是不是反复实行
    boolean periodic = isPeriodic();
    // 线程池状况推断
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 一次性使命,直接挪用父类的run()要领,这个父类现实上是FutureTask
    // 这里我们不再解说,有兴致的同砚看看上一章的内容
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 反复性使命,先挪用父类的runAndReset()要领,这个父类也是FutureTask
    // 本文重要剖析下面的部份
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次实行的时刻
        setNextRunTime();
        // 反复实行,本文由公从号“彤哥读源码”原创
        reExecutePeriodic(outerTask);
    }
}

能够看到,关于反复性使命,先挪用FutureTask的runAndReset()要领,再设置下次实行的时刻,末了再挪用reExecutePeriodic()要领。

FutureTask的runAndReset()要领与run()要领相似,只是其使命运转终了后不会把状况修改成NORMAL,有兴致的同砚点进源码看看。

再来看看reExecutePeriodic()要领。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 线程池状况搜检
    if (canRunInCurrentRunState(true)) {
        // 再次把使命扔到使命行列中
        super.getQueue().add(task);
        // 再次搜检线程池状况
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 保证事情线程充足
            ensurePrestart();
    }
}

到这里是不是是恍然大悟了,本来定时线程池实行反复使命是在使命实行终了后,又把使命扔回了使命行列中。

反复性的题目处理了,那末,它是怎样掌握使命在某个时刻实行的呢?

OK,这就轮到我们的延时行列上台了。

DelayedWorkQueue内部类

我们晓得,线程池实行使命时须要从使命行列中拿使命,而一般的使命行列,假如内里有使命就直接拿出来了,然则延时行列不一样,它内里的使命,假如没有到时刻也是拿不出来的,这也是前面剖析中一上来就把使命扔进行列且建立Worker没有传入firstTask的缘由。

说了这么多,它究竟是怎样完成的呢?

实在,延时行列我们在前面都详细剖析过,想看完全源码剖析的能够看看之前的《死磕 java鸠合之DelayQueue源码剖析》。

延时行列内部是运用“堆”这类数据结构来完成的,有兴致的同砚能够看看之前的《托付,口试别再问我堆(排序)了!》。

我们这里只拿一个take()要领出来剖析。

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 堆顶使命
            RunnableScheduledFuture<?> first = queue[0];
            // 假如行列为空,则守候
            if (first == null)
                available.await();
            else {
                // 另有多久到时刻
                long delay = first.getDelay(NANOSECONDS);
                // 假如小于即是0,申明这个使命到时刻了,能够从行列中出队了
                if (delay <= 0)
                    // 出队,然后堆化
                    return finishPoll(first);
                // 还没到时刻
                first = null;
                // 假如前面有线程在守候,直接进入守候
                if (leader != null)
                    available.await();
                else {
                    // 当前线程作为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 守候上面盘算的延时时刻,再自动叫醒
                        available.awaitNanos(delay);
                    } finally {
                        // 叫醒后再次取得锁后把leader再置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            // 相当于叫醒下一个守候的使命
            available.signal();
        // 解锁,本文由公从号“彤哥读源码”原创
        lock.unlock();
    }
}

大抵的道理是,应用堆的特征猎取最快到时刻的使命,即堆顶的使命:

(1)假如堆顶的使命到时刻了,就让它从行列中了队;

(2)假如堆顶的使命还没到时刻,就看它另有多久到时刻,应用前提锁守候这段时刻,待时刻到了后从新走(1)的推断;

如许就处理了能够在指定时刻后实行使命。

别的

实在,ScheduledThreadPoolExecutor也是能够运用execute()或许submit()提交使命的,只不过它们会被当做0延时的使命来实行一次。

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
}

总结

完成定时使命有两个题目要处理,分别是指定将来某个时刻实行使命、反复实行。

(1)指定某个时刻实行使命,是经由过程延时行列的特征来处理的;

(2)反复实行,是经由过程在使命实行后再次把使命加入到行列中来处理的。

彩蛋

到这里基本上一般的线程池的源码剖析就终了了,这类线程池是比较典范的完成体式格局,团体上来讲,效力相对不是特别高,由于一切的事情线程共用同一个行列,每次从行列中取使命都要加锁解锁操纵。

那末,能不能给每一个事情线程装备一个使命行列呢,在提交使命的时刻就把使命分配给指定的事情线程,如许在取使命的时刻就不须要频仍的加锁解锁了。

答案是一定的,下一章我们一同来看看这类基于“事情盗取”理论的线程池——ForkJoinPool。

迎接关注我的民众号“彤哥读源码”,检察更多源码系列文章, 与彤哥一同畅游源码的海洋。

  选择打赏方式
微信赞助

打赏

QQ钱包

打赏

支付宝赞助

打赏

  移步手机端
死磕 java线程系列之线程池深切剖析——定时使命实行流程

1、打开你手机的二维码扫描APP
2、扫描左则的二维码
3、点击扫描获得的网址
4、可以在手机端阅读此文章
未定义标签

本文来源:搜奇网

本文地址:https://www.sou7.cn/282347.html

关注我们:微信搜索“搜奇网”添加我为好友

版权声明: 本文仅代表作者个人观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。请记住本站网址https://www.sou7.cn/搜奇网。

发表评论

选填

必填

必填

选填

请拖动滑块解锁
>>