任务调度线程池介绍

前言

JDK 1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能。

在JDK 1.5之前,实现任务的周期性调度主要使用的是Timer类和TimerTask类。

本文将简单介绍ScheduledThreadPoolExecutor类与Timer类的区别,ScheduledThreadPoolExecutor类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。

一、Timer与TimerTask概述

  • 使用 Timer 实现任务调度的核心类是 TimerTimerTask
    • 其中 Timer 负责设定 TimerTask 的起始与间隔执行时间。
    • 使用者只需要创建一个 TimerTask 的继承类,实现自己的 run 方法,然后将其丢给 Timer 去执行即可。
  • Timer 的设计核心是一个 TaskList 和一个 TaskThread
    • Timer 将接收到的任务丢到自己的 TaskList 中,TaskList 按照 Task 的最初执行时间进行排序。
    • TimerThread 在创建 Timer 时会启动成为一个守护线程。这个线程会轮询所有任务,找到一个最近要执行的任务,然后休眠,当到达最近要执行任务的开始时间点,TimerThread 被唤醒并执行该任务。
    • 之后 TimerThread 更新最近一个要执行的任务,继续休眠。

优缺点

Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务

二、Timer与TimerTask方法介绍

  • Timer类

Timer方法 用途
Timer() 创建一个计时器并启动该计时器
cancel() 停止该计时器,并放弃所有已安排的任务,对当前正在执行的任务没有影响
purge() 将所有已经取消的任务移除,用于释放内存空间
schedule(TimerTask task,Date time) 安排一个任务在指定的时间执行,如果已经超过该时间,则立即执行
schedule(TimerTask task,Date firstTime,long period) 安排一个任务在指定的时间执行,然后以固定的频率重复执行
scheduleAtFixedRate(TimerTask task,Date firstTime,long period) 安排一个任务在指定的时间执行,然后以近似固定的频率重复执行
schedule(TimerTask task,long delay) 安排一个任务在一段时间后执行
schedule(TimerTask task,long delay,long period) 安排一个任务在一段时间后执行,然后以固定的频率重复执行
scheduleAtFixedRate(TimerTask task,long delay,long period) 安排一个任务在一段时间后执行,然后以近似固定的频率重复执行
  • TimerTask抽象类

TimerTask方法 用途
cancel() 用于终止此任务。如果该任务只执行一次且还没有执行,则永远不会再执行;如果为重复执行的任务,则之后不会再执行;如果该任务正在执行,则执行完后不会再执行
run() 该任务所要执行的具体操作
ExecutionTime() 返回最近一次要执行该任务的时间,如果正在执行,则返回此任务的执行安排时间,一般在run()方法中调用,用于判断当前是否有足够的时间来执行

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import lombok.extern.slf4j.Slf4j;

import java.util.Timer;
import java.util.TimerTask;

import static site.weiyikai.concurrent.utils.Sleeper.sleep;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 1.0
*/
@Slf4j(topic = "c.ThreadPoolDemo05")
public class ThreadPoolDemo05 {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
// int i = 1 / 0;
sleep(2);
}
};

TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};

timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}

运行结果

1
2
20:52:25.363 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 1
20:52:27.371 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 2

结果分析

  • 使用 timer 添加两个任务,希望它们都在 1s 后执行;
  • 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行。

修改上述代码,去掉int i = 1 / 0;中的注释,使task1任务在运行使报错,在执行代码查看运行结果。

1
2
3
4
5
20:58:17.594 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 1
Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero
at com.lilinchao.concurrent.demo_05.ThreadPoolDemo05$1.run(ThreadPoolDemo05.java:23)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

从结果可以看出,当task1运行报错时,将不在接着向下执行task2任务,而是直接退出程序。

三、ScheduledThreadPoolExecutor

3.1 调度方法ScheduleAtFixedRate和ScheduleWithFixedDelay概述

  • ScheduleAtFixedRate 每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为 initialDelay, initialDelay+period, initialDelay+2*period, …;
  • ScheduleWithFixedDelay 每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:initialDelay, initialDelay+executeTime+delay, initialDelay+2*executeTime+2*delay

ScheduleAtFixedRate 是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。

3.2 延时执行

使用 Executors 提供的带任务调度线程池对象来创建延时线程

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 1.0
*/
@Slf4j(topic = "c.ThreadPoolDemo06")
public class ThreadPoolDemo06 {
public static void main(String[] args) throws IOException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
log.debug("task1 execute time {}" , new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}, 1, TimeUnit.SECONDS);

executor.schedule(() -> {
log.debug("task2 execute time {}" , new Date());
}, 1, TimeUnit.SECONDS);

// 阻塞当前线程
System.in.read();
}
}

运行结果

1
2
21:15:42.462 [pool-1-thread-1] DEBUG c.ThreadPoolDemo06 - task1 execute time Tue Nov 22 21:15:42 CST 2022
21:15:42.462 [pool-1-thread-2] DEBUG c.ThreadPoolDemo06 - task2 execute time Tue Nov 22 21:15:42 CST 2022

可以看到两个任务是同一时间执行的, 并不会因为某个任务执行时间慢影响其他的任务。

3.3 定时执行

  • scheduleAtFixedRate

任务执行时间小于周期(周期不变)

示例

在1s后执行任务,之后每2s执行一次任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 任务执行时间小于周期(周期不变)
*/
@Slf4j(topic = "c.ThreadPoolDemo07")
public class ThreadPoolDemo07 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// 在1s后执行任务,之后每2s执行一次任务
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 1, 2, TimeUnit.SECONDS);
}
}

运行结果

1
2
3
4
5
6
7
21:27:37.856 [main] DEBUG c.ThreadPoolDemo07 - start...
21:27:38.928 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:27:40.925 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:27:42.927 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:27:44.925 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:27:46.926 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
.........

任务执行时间大于周期(此时周期为任务的执行时间)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 任务执行时间大于周期(此时周期为任务的执行时间)
*/
@Slf4j(topic = "c.ThreadPoolDemo07")
public class ThreadPoolDemo07 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// 在1s后执行任务 之后每2s执行一次任务
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
try {
// 任务休眠时间(3s)大于执行周期(2s)
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 2, TimeUnit.SECONDS);
}
}

运行结果

1
2
3
4
5
6
7
21:34:12.647 [main] DEBUG c.ThreadPoolDemo07 - start...
21:34:13.710 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:34:16.710 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:34:19.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:34:22.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
21:34:25.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...
...........

结果分析

一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 3s。

  • scheduleWithFixedDelay

scheduleWithFixedDelay中任务真正的执行周期为: 任务的执行时间 + 执行周期

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 1.0
*/
@Slf4j(topic = "c.ThreadPoolDemo08")
public class ThreadPoolDemo08 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(() -> {
log.debug("running...");
try {
// 任务睡眠5s
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 任务在1s后执行,执行周期为2s
}, 1, 2, TimeUnit.SECONDS);
}
}

运行结果

1
2
3
4
5
6
7
8
21:44:22.093 [main] DEBUG c.ThreadPoolDemo08 - start...
21:44:23.150 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
21:44:30.152 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
21:44:37.152 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
21:44:44.153 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
21:44:51.156 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
21:44:58.157 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running...
.......

从结果可以看出,除了开始运行延迟1s以外,剩下的每次执行时间间隔为7s,即:任务睡眠时间5s+执行周期2s

评价

整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务

3.4 正确处理线程中的异常

现象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 不进行异常处理
*/
@Slf4j(topic = "c.ThreadPoolDemo07")
public class ThreadPoolDemo07 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// 在1s后执行任务,之后每2s执行一次任务
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
//此处程序会出现异常
int number = 1 / 0;
}, 1, 2, TimeUnit.SECONDS);
}
}

运行结果

1
2
21:50:53.877 [main] DEBUG c.ThreadPoolDemo07 - start...
21:50:54.936 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running...

结果分析

当任务执行遇到异常时,程序会一直停留在异常这里不在向下运行。那么我们该怎么正确的处理异常呢?

主动捕捉异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 主动捕捉异常
*/
@Slf4j(topic = "c.ThreadPoolDemo09")
public class ThreadPoolDemo09 {
public static void main(String[] args) throws IOException {
// 主动捕捉异常
ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(() -> {
log.debug("task1 ...");
try {
int i = 1 / 0;
} catch (Exception e) {
log.error("error : {}" , e);
}
});

System.in.read();
}
}

运行结果

1
2
3
4
5
6
7
21:55:20.752 [pool-1-thread-1] DEBUG c.ThreadPoolDemo09 - task1 ...
21:55:20.759 [pool-1-thread-1] ERROR c.ThreadPoolDemo09 - error : {}
java.lang.ArithmeticException: / by zero
at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.lambda$main$0(ThreadPoolDemo09.java:23)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
使用Future获取异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* Created by xiaowei
* Date 2022/11/22
* Description 使用Future获取异常
*/
@Slf4j(topic = "c.ThreadPoolDemo09")
public class ThreadPoolDemo09 {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);

Future<Boolean> future = pool.submit(() -> {
log.debug("task1 ...");
int i = 1 / 0;
return true;
});

// 通过 future.get() 可以获取异常的结果
log.debug("result : {}" , future.get());

System.in.read();
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
21:57:48.509 [pool-1-thread-1] DEBUG c.ThreadPoolDemo09 - task1 ...
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.main(ThreadPoolDemo09.java:42)
Caused by: java.lang.ArithmeticException: / by zero
at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.lambda$main$0(ThreadPoolDemo09.java:38)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

四、线程池定时任务

需求:如何让每周四 18:00:00 定时执行任务?

基本思路

  • 计算 initialDelay : 计算当前时间和周四的时间差 : now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
  • 如果当前时间已经超过本周四 18:00:00.000, 那么找下周四 18:00:00.000
  • 计算时间 : 1000 3600 24 * 7 ;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import lombok.extern.slf4j.Slf4j;
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("all")
@Slf4j(topic = "c.ThreadPoolScheduleTest")
public class ThreadPoolScheduleTest {


public static void main(String[] args) {

// 获取当前的时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.00
LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);

// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if (now.compareTo(thursday) > 0) {
thursday = thursday.plusWeeks(1);
}

// 计算延迟执行的时间差
long initialDelay = Duration.between(now, thursday).toMillis();
// 计算时间间隔 : 1周的毫秒值
long oneWeek = 1000 * 3600 * 24 * 7 ;

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

log.debug("开始时间 : {}" , new Date());

executorService.scheduleAtFixedRate(() ->{
log.debug("开始执行时间 : {} " , new Date());
}, initialDelay, oneWeek, TimeUnit.MILLISECONDS);

}

}

总结

线程角度

  • Timer是单线程模式,如果某个TimerTask任务的执行时间比较久,会影响到其他任务的调度执行。
  • ScheduledThreadPoolExecutor是多线程模式,并且重用线程池,某个ScheduledFutureTask任务执行的时间比较久,不会影响到其他任务的调度执行。

系统时间敏感度

  • Timer调度是基于操作系统的绝对时间的,对操作系统的时间敏感,一旦操作系统的时间改变,则Timer的调度不再精确。
  • ScheduledThreadPoolExecutor调度是基于相对时间的,不受操作系统时间改变的影响。

是否捕获异常

  • Timer不会捕获TimerTask抛出的异常,加上Timer又是单线程的。一旦某个调度任务出现异常,则整个线程就会终止,其他需要调度的任务也不再执行。
  • ScheduledThreadPoolExecutor基于线程池来实现调度功能,某个任务抛出异常后,其他任务仍能正常执行。

是否支持对任务排序

  • Timer不支持对任务的排序。
  • ScheduledThreadPoolExecutor类中定义了一个静态内部类DelayedWorkQueue,DelayedWorkQueue类本质上是一个有序队列,为需要调度的每个任务按照距离下次执行时间间隔的大小来排序

能否获取返回的结果

  • Timer中执行的TimerTask类只是实现了java.lang.Runnable接口,无法从TimerTask中获取返回的结果。
  • ScheduledThreadPoolExecutor中执行的ScheduledFutureTask类继承了FutureTask类,能够通过Future来获取返回的结果。

任务调度线程池介绍
http://example.com/2023/03/30/任务调度线程池介绍/
作者
程序员小魏
发布于
2023年3月30日
许可协议