线程池模型架构
- 自定义线程池包括:Thread Pool(线程池)+ Blocking Queue(阻塞队列)
图例分析
图中内容表示,三个消费线程或者说是核心线程 t1
、t2
、t3
通过poll方法从阻塞队列中执行任务,main线程不断地往阻塞队列中put任务task,如果核心线程处于忙碌状态,task就放进阻塞队列中。
实现步骤
步骤1:自定义拒绝策略接口
1 2 3 4 5 6 7 8 9 10
| package site.weiyikai.customthreadpool.ThreadPool;
@FunctionalInterface interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); }
|
作用
当核心线程都被占用,并且阻塞队列中的任务也满时,就会触发拒绝策略。
简单理解就是,任务实在太多了,分配给线程池的所有线程都参与进来处理任务,但还是处理不过来,同时存放任务的缓存队列空间也满了,剩下来的任务该如何进行处理呢,这个时候就涉及到拒绝策略。
拒绝策略的方式:
- 死等
- 带超时等待
- 让调用者放弃任务执行
- 让调用者抛出异常
- 让调用者自己执行任务等等
步骤2:自定义任务队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| package site.weiyikai.ThreadPool;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
class BlockingQueue<T> { Logger log = LoggerFactory.getLogger(BlockingQueue.class); private Deque<T> queue = new ArrayDeque<>(); private ReentrantLock lock = new ReentrantLock(); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capcity;
public BlockingQueue(int capcity) { this.capcity = capcity; }
public T poll(long timeout, TimeUnit unit){ lock.lock();
try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } }
public T task() { lock.lock();
try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } }
public void put(T task) { lock.lock();
try { while (queue.size() == capcity) { try { log.info("等待加入任务队列{}...",task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } }
public boolean offer(T task,long timeout, TimeUnit timeUnit) { lock.lock();
try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0) { return false; } log.info("等待加入任务队列 {} ...", task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } }
public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } }
public void tryPut(RejectPolicy<T> rejectPolicy,T task) { lock.lock();
try { if(queue.size() == capcity) { rejectPolicy.reject(this,task); } else { log.info("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
|
说明
**queue(队列)**:生产者创建的任务都放在queue中 ,Deque
是一个双向链表,比LinkedList
效率要高,当然这里也可以用LinkedList
。
**lock(锁)**:为了防止多个线程获取同一个任务,本次使用的是ReentrantLock
锁,目的是因为它可以提供两个条件变量(集合)fullWaitSet
和emptyWaitSet
。
**fullWaitSet(生产者条件变量)**:当任务队列queue满的时候,生产者线程将要进入fullWaitSet
阻塞状态,不能再进行生产。
**emptyWaitSet(消费者条件变量)**:当queue为空的时候,消费者线程(t1、t2、t3)就无任务可以进行消费任务,同理也应该阻塞,进入emptyWaitSet
。
**capcity(任务容量)**:初始化创建任务队列的容量,与上面两个不一样,这个是放任务的,上面两个是放线程的。
**BlockingQueue(int capcity)**:构造方法,用来初始化任务队列的容量。
**poll(long timeout, TimeUnit unit)**:核心线程带有超时的获取任务的方法
如果任务是空的就阻塞,而且是带有超时的阻塞,如果获取任务成功,说明queue获取后就不是满的状态了,所以应该唤醒fullWaitSet
中阻塞的生产者线程,让生产者线程继续生产任务。
offer(T task, long timeout, TimeUnit timeUnit) :生产者线程用于向队列queue中添加任务的方法
这个方法也是带有阻塞,如果queue是满的,就不应该添加任务,main线程就应该阻塞,这里也是使用了超时阻塞,原因和上面一样,不想让它阻塞太久,任务添加不进就不添加,一直阻塞势必消耗CPU资源。
**size()**:获取当前任务的数量
**tryPut(RejectPolicy rejectPolicy, T task)**:调用生产者提供的拒绝策略,它的调用时机是,核心线程用完了(t1、t2、t3都忙),如果任务队列满了,就执行拒绝策略,如果没满就放任务队列中。
步骤3:自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package site.weiyikai.customthreadpool.ThreadPool;
import javafx.concurrent.Worker; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.HashSet; import java.util.concurrent.TimeUnit;
@Slf4j class ThreadPool { private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet<>(); private int coreSize; private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; }
public void execute(Runnable task) { synchronized (workers) { if(workers.size() < coreSize) { Worker worker = new Worker(task); log.info("新增 worker{}, {}", worker, task); workers.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } }
class Worker extends Thread{ private Runnable task;
public Worker( Runnable task) { this.task = task; }
@Override public void run() { while(task != null || (task = (taskQueue.poll(timeout,timeUnit)))!= null){ try { log.info("正在执行...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); }finally { task = null; } }
synchronized (workers){ log.info("worker 被移除{}", this); workers.remove(this); }
}
} }
|
说明
taskQueue:任务队列,这个队列中有封装好的取任务和添加任务的方法,以及线程的阻塞队列等属性。
workers :存放工作线程,也就是核心线程。
coreSize:定义核心线程的数量。
timeout和timeUnit:任务的超时时间,下面调用之前方法传参用
rejectPolicy:传参用
execute:线程池向外提供的执行方法
ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
RejectPolicy rejectPolicy)
:创建线程池的初始化方法
class Worker:线程实体,它的逻辑是先执行当前任务,如果当前任务执行结束后从任务队列中取任务执行。
步骤4:测试
按照定义的决策策略分别进行演示。
(1)死等
初始化核心线程数是1,超时取任务的时间是1秒,任务队列容量是1,拒绝策略是死等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ queue.put(task); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
运行结果
1 2 3 4
| 16:15:56.232 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:15:56.236 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:15:56.236 [main] DEBUG c.BlockingQueue - 等待加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2... 16:15:56.236 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e
|
结果分析
- 首先创建了一个核心线程,开始执行第一个任务;
- 由于任务执行周期长,核心线程一直处于忙碌状态,因此第二个任务到来时,放入任务队列等待核心线程进入空闲状态;
- 当再来第三个任务时,此时任务队列已满,同时核心线程中的第一个任务也未执行完毕,此时主线程就进入到阻塞队列
fullWaitSet
一直死等,等待queue有位置。
(2)带超时等待
每个任务的执行周期是1秒,拒绝策略是main线程等待1.5秒,如果还添加不进去任务,不再进行添加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
queue.offer(task, 1500, TimeUnit.MILLISECONDS); task.run(); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12
| 16:25:30.918 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:30.922 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:30.922 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:25:30.922 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:25:31.923 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:25:31.923 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:31.923 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:25:32.924 [main] DEBUG c.CustomThreadPoolDemo - 2 16:25:32.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:25:32.925 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:25:33.925 [Thread-0] DEBUG c.CustomThreadPoolDemo - 2 16:25:34.927 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
|
可以看到三个任务都被执行了。
如果将主线程的拒绝策略改成0.5秒呢?
1 2 3 4 5 6 7 8 9
| 16:28:37.554 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.557 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:37.558 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:28:37.558 [main] DEBUG c.BlockingQueue - 等待加入任务队列 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 ... 16:28:38.558 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:28:39.059 [main] DEBUG c.CustomThreadPoolDemo - 2 16:28:39.059 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:28:40.059 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:28:41.060 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
|
可以发现第三个任务没有执行。
(3)让调用者放弃任务执行
直接让main线程打印一下任务,不执行任何添加操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ log.debug("放弃{}", task); }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
运行结果
1 2 3 4 5 6 7 8
| 16:34:53.012 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:53.016 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:53.016 [main] DEBUG c.CustomThreadPoolDemo - 放弃com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 16:34:53.016 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:34:54.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:34:54.018 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:34:55.018 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:34:56.019 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
|
(4)让调用者抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ throw new RuntimeException("任务执行失败 " + task); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12
| 16:46:30.854 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:30.857 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f Exception in thread "main" java.lang.RuntimeException: 任务执行失败 com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@61e717c2 at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.lambda$main$0(CustomThreadPoolDemo.java:29) at com.lilinchao.concurrent.demo_05.BlockingQueue.tryPut(CustomThreadPoolDemo.java:270) at com.lilinchao.concurrent.demo_05.ThreadPool.execute(CustomThreadPoolDemo.java:78) at com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo.main(CustomThreadPoolDemo.java:35) 16:46:30.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:46:31.859 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:46:31.859 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:46:32.860 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:46:33.861 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
|
结果分析
从结果可以看出,核心线程执行第一到来的任务,将第二个任务加入到阻塞队列中,当第三个任务再来时,直接抛出异常,同时不再尝试添加之后的任务。
(5)让调用者自己执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j(topic = "c.CustomThreadPoolDemo") public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ task.run(); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
运行结果
1 2 3 4 5 6 7 8 9
| 16:52:05.328 [main] DEBUG c.ThreadPool - 新增 workerThread[Thread-0,5,main], com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:05.331 [main] DEBUG c.BlockingQueue - 加入任务队列com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:05.332 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@f2a0b8e 16:52:06.332 [Thread-0] DEBUG c.CustomThreadPoolDemo - 0 16:52:06.332 [main] DEBUG c.CustomThreadPoolDemo - 2 16:52:07.333 [main] DEBUG c.CustomThreadPoolDemo - 3 16:52:07.333 [Thread-0] DEBUG c.ThreadPool - 正在执行...com.lilinchao.concurrent.demo_05.CustomThreadPoolDemo$$Lambda$2/1567581361@2b05039f 16:52:08.333 [Thread-0] DEBUG c.CustomThreadPoolDemo - 1 16:52:09.334 [Thread-0] DEBUG c.ThreadPool - worker 被移除Thread[Thread-0,5,main]
|
结果分析
从结果可以看出,后面再来的两个线程直接由主线程进行执行。