死磕 java线程系列之线程池深切剖析——定时使命实行流程
2019-11-18杂谈搜奇网43°c
A+ A-(手机横屏看源码更轻易)
注:java源码剖析部份如无特别申明均基于 java8 版本。
注:本文基于ScheduledThreadPoolExecutor定时线程池类。
简介
前面我们一同进修了一般使命、将来使命的实行流程,本日我们再来进修一种新的使命——定时使命。
定时使命是我们常常会用到的一种使命,它示意在将来某个时刻实行,或许将来根据某种划定规矩反复实行的使命。
题目
(1)怎样保证使命是在将来某个时刻才被实行?
(2)怎样保证使命根据某种划定规矩反复实行?
来个栗子
建立一个定时线程池,用它来跑四种差别的定时使命。
public class ThreadPoolTest03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 建立一个定时线程池
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
System.out.println("start: " + System.currentTimeMillis());
// 实行一个无返回值使命,5秒后实行,只实行一次
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("spring: " + System.currentTimeMillis());
}, 5, TimeUnit.SECONDS);
// 实行一个有返回值使命,5秒后实行,只实行一次
ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("inner summer: " + System.currentTimeMillis());
return "outer summer: ";
}, 5, TimeUnit.SECONDS);
// 猎取返回值
System.out.println(future.get() + System.currentTimeMillis());
// 按牢固频次实行一个使命,每2秒实行一次,1秒后实行
// 使命开始时的2秒后
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("autumn: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
// 按牢固延时实行一个使命,每延时2秒实行一次,1秒实行
// 使命终了时的2秒后,本文由公从号“彤哥读源码”原创
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("winter: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
}
}
定时使命团体分为四种:
(1)将来实行一次的使命,无返回值;
(2)将来实行一次的使命,有返回值;
(3)将来按牢固频次反复实行的使命;
(4)将来按牢固延时反复实行的使命;
本文重要以第三种为例举行源码剖析。
scheduleAtFixedRate()要领
提交一个按牢固频次实行的使命。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 参数推断
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将一般使命装潢成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 钩子要领,给子类用来替代装潢task,这里以为t==sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延时实行
delayedExecute(t);
return t;
}
能够看到,这里的处置惩罚跟将来使命相似,都是装潢成另一个使命,再拿去实行,差别的是这里交给了delayedExecute()要领去实行,这个要领是干吗的呢?
delayedExecute()要领
延时实行。
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 假如线程池封闭了,实行谢绝战略
if (isShutdown())
reject(task);
else {
// 先把使命扔到行列中去
super.getQueue().add(task);
// 再次搜检线程池状况
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保证有充足有线程实行使命
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 建立事情线程
// 注重,这里没有传入firstTask参数,由于上面先把使命扔到行列中去了
// 别的,没用上maxPoolSize参数,所以最大线程数目在定时线程池中现实是没有用的
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
到这里就终了了?!
现实上,这里只是掌握使命能不能被实行,真正实行使命的处所在使命的run()要领中。
还记得上面的使命被装潢成了ScheduledFutureTask类的实例吗?所以,我们只要看ScheduledFutureTask的run()要领就能够了。
ScheduledFutureTask类的run()要领
定时使命实行的处所。
public void run() {
// 是不是反复实行
boolean periodic = isPeriodic();
// 线程池状况推断
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性使命,直接挪用父类的run()要领,这个父类现实上是FutureTask
// 这里我们不再解说,有兴致的同砚看看上一章的内容
else if (!periodic)
ScheduledFutureTask.super.run();
// 反复性使命,先挪用父类的runAndReset()要领,这个父类也是FutureTask
// 本文重要剖析下面的部份
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次实行的时刻
setNextRunTime();
// 反复实行,本文由公从号“彤哥读源码”原创
reExecutePeriodic(outerTask);
}
}
能够看到,关于反复性使命,先挪用FutureTask的runAndReset()要领,再设置下次实行的时刻,末了再挪用reExecutePeriodic()要领。
FutureTask的runAndReset()要领与run()要领相似,只是其使命运转终了后不会把状况修改成NORMAL,有兴致的同砚点进源码看看。
再来看看reExecutePeriodic()要领。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 线程池状况搜检
if (canRunInCurrentRunState(true)) {
// 再次把使命扔到使命行列中
super.getQueue().add(task);
// 再次搜检线程池状况
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 保证事情线程充足
ensurePrestart();
}
}
到这里是不是是恍然大悟了,本来定时线程池实行反复使命是在使命实行终了后,又把使命扔回了使命行列中。
反复性的题目处理了,那末,它是怎样掌握使命在某个时刻实行的呢?
OK,这就轮到我们的延时行列上台了。
DelayedWorkQueue内部类
我们晓得,线程池实行使命时须要从使命行列中拿使命,而一般的使命行列,假如内里有使命就直接拿出来了,然则延时行列不一样,它内里的使命,假如没有到时刻也是拿不出来的,这也是前面剖析中一上来就把使命扔进行列且建立Worker没有传入firstTask的缘由。
说了这么多,它究竟是怎样完成的呢?
实在,延时行列我们在前面都详细剖析过,想看完全源码剖析的能够看看之前的《死磕 java鸠合之DelayQueue源码剖析》。
延时行列内部是运用“堆”这类数据结构来完成的,有兴致的同砚能够看看之前的《托付,口试别再问我堆(排序)了!》。
我们这里只拿一个take()要领出来剖析。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶使命
RunnableScheduledFuture<?> first = queue[0];
// 假如行列为空,则守候
if (first == null)
available.await();
else {
// 另有多久到时刻
long delay = first.getDelay(NANOSECONDS);
// 假如小于即是0,申明这个使命到时刻了,能够从行列中出队了
if (delay <= 0)
// 出队,然后堆化
return finishPoll(first);
// 还没到时刻
first = null;
// 假如前面有线程在守候,直接进入守候
if (leader != null)
available.await();
else {
// 当前线程作为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 守候上面盘算的延时时刻,再自动叫醒
available.awaitNanos(delay);
} finally {
// 叫醒后再次取得锁后把leader再置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 相当于叫醒下一个守候的使命
available.signal();
// 解锁,本文由公从号“彤哥读源码”原创
lock.unlock();
}
}
大抵的道理是,应用堆的特征猎取最快到时刻的使命,即堆顶的使命:
(1)假如堆顶的使命到时刻了,就让它从行列中了队;
(2)假如堆顶的使命还没到时刻,就看它另有多久到时刻,应用前提锁守候这段时刻,待时刻到了后从新走(1)的推断;
如许就处理了能够在指定时刻后实行使命。
别的
实在,ScheduledThreadPoolExecutor也是能够运用execute()或许submit()提交使命的,只不过它们会被当做0延时的使命来实行一次。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
总结
完成定时使命有两个题目要处理,分别是指定将来某个时刻实行使命、反复实行。
(1)指定某个时刻实行使命,是经由过程延时行列的特征来处理的;
(2)反复实行,是经由过程在使命实行后再次把使命加入到行列中来处理的。
彩蛋
到这里基本上一般的线程池的源码剖析就终了了,这类线程池是比较典范的完成体式格局,团体上来讲,效力相对不是特别高,由于一切的事情线程共用同一个行列,每次从行列中取使命都要加锁解锁操纵。
那末,能不能给每一个事情线程装备一个使命行列呢,在提交使命的时刻就把使命分配给指定的事情线程,如许在取使命的时刻就不须要频仍的加锁解锁了。
答案是一定的,下一章我们一同来看看这类基于“事情盗取”理论的线程池——ForkJoinPool。
迎接关注我的民众号“彤哥读源码”,检察更多源码系列文章, 与彤哥一同畅游源码的海洋。