一、概述
CountDownLatch(倒计时锁)
能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。它相当于是一个计数器,这个计数器的初始值就是线程的数量,每当一个任务完成后,计数器的值就会减一,当计数器的值为 0 时,表示所有的线程都已经任务了,然后在 CountDownLatch
上等待的线程就可以恢复执行接下来的任务。
应用场景
典型的应用场景就是当一个服务启动时,同时会加载很多组件和服务,这时候主线程会等待组件和服务的加载。当所有的组件和服务都加载完毕后,主线程和其他线程在一起完成某个任务。
二、常用方法
方法 |
说明 |
CountDownLatch(int count) |
count为计数器的初始值(一般需要多少个线程执行,count就设为几) |
countDown() |
每调用一次计数器值-1,直到count被减为0,代表所有线程全部执行完毕 |
getCount() |
获取当前计数器的值 |
await() |
等待计数器变为0,即等待所有异步线程执行完毕 |
- boolean await(long timeout, TimeUnit unit),此方法与await()区别是:
- 此方法至多会等待指定的时间,超时后会自动唤醒,若 timeout 小于等于零,则不会等待;
- boolean 类型返回值:若计数器变为零了,则返回 true;若指定的等待时间过去了,则返回 false。
CountDownLatch与join的区别?
join相对而言属于比较底层的API,使用比较繁琐,且必须等待线程结束,CountDownLatch
提供了更加灵活的与线程同步的方式(线程可以在完成某个条件调用countDown()方法)。
三、CountDownLatch使用
3.1 简单应用
需求:开启三个线程,分别睡眠不同的时间,当等待三个线程都执行完成后,最后主线程退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch; import static site.weiyikai.concurrent.utils.Sleeper.sleep;
@Slf4j(topic = "c.CountdownLatchDemo") public class CountdownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start();
new Thread(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start();
new Thread(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start();
log.debug("waiting..."); latch.await(); log.debug("wait end..."); } }
|
运行结果
1 2 3 4 5 6 7 8
| 22:07:45.820 [main] DEBUG c.CountdownLatchDemo - waiting... 22:07:45.820 [Thread-0] DEBUG c.CountdownLatchDemo - begin... 22:07:45.820 [Thread-1] DEBUG c.CountdownLatchDemo - begin... 22:07:45.820 [Thread-2] DEBUG c.CountdownLatchDemo - begin... 22:07:46.825 [Thread-0] DEBUG c.CountdownLatchDemo - end...2 22:07:47.325 [Thread-2] DEBUG c.CountdownLatchDemo - end...1 22:07:47.824 [Thread-1] DEBUG c.CountdownLatchDemo - end...0 22:07:47.824 [main] DEBUG c.CountdownLatchDemo - wait end...
|
从结果可以看出,主线程等待其他三个线程执行完毕才往下执行。
3.2 CountDownLatch配合线程池使用
需求:同时开启四个线程,三个线程负责处理,一个线程负责汇总结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
import static site.weiyikai.concurrent.utils.Sleeper.sleep;
@Slf4j(topic = "c.CountdownLatchDemo02") public class CountdownLatchDemo02 { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..."); latch.await(); log.debug("wait end..."); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
|
运行结果
1 2 3 4 5 6 7 8
| 22:11:58.111 [pool-1-thread-1] DEBUG c.CountdownLatchDemo02 - begin... 22:11:58.111 [pool-1-thread-4] DEBUG c.CountdownLatchDemo02 - waiting... 22:11:58.111 [pool-1-thread-2] DEBUG c.CountdownLatchDemo02 - begin... 22:11:58.111 [pool-1-thread-3] DEBUG c.CountdownLatchDemo02 - begin... 22:11:59.160 [pool-1-thread-1] DEBUG c.CountdownLatchDemo02 - end...2 22:11:59.616 [pool-1-thread-2] DEBUG c.CountdownLatchDemo02 - end...1 22:12:00.115 [pool-1-thread-3] DEBUG c.CountdownLatchDemo02 - end...0 22:12:00.115 [pool-1-thread-4] DEBUG c.CountdownLatchDemo02 - wait end...
|
从结果可以看出,负责汇总的线程执行到latch.await();
进入到等待状态,等其他三个线程全部执行结束之后,再向下执行。
四、CountDownLatch典型应用场景介绍
场景1:多个游戏资源并发加载完成通知
在这个场景下,某个线程进入到下一个流程,需要多个资源加载好,这里假定为10个,可以采用线程池,令该线程提交10个任务分别加载一个资源,然后调用countDownLatch.await
去等待所有资源加载完毕,每个任务准备好资源就countDown
。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import java.util.Arrays; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger;
public class CountdownLatchDemo03 { public static void main(String[] args) throws InterruptedException {
AtomicInteger num = new AtomicInteger(0); ExecutorService service = Executors.newFixedThreadPool(10, (r) -> { return new Thread(r, "t" + num.incrementAndGet()); }); String[] all = new String[10]; CountDownLatch latch = new CountDownLatch(10);
Random random = new Random(); for (int i = 0; i < 10; i++) { int x = i; service.submit(() -> { for (int j = 0; j <= 100; j++) { try { Thread.sleep(random.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } all[x] = Thread.currentThread().getName() + "(" + j + "%" + ")"; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.print("\n游戏开始"); service.shutdown(); } }
|
运行结果
1 2
| [t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%), t10(100%)] 游戏开始
|
当所有资源都加载好了,才进行下一个流程,游戏开始
场景2:微服务场景下多个RPC远程调用并发执行
在这个场景下,当前用户的请求,需要多个服务器资源,比如需要商品信息,订单信息,快递信息共三个信息,则需要进行三次远程调用,如果顺序执行的话,显然效率比较低下。
此时可以采用多线程的方式利用线程池配合CountDownLatch
并发的去获取资源(采用线程的future对象返回线程执行完返回的结果),节约时间。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @RestController public class TestCountDownlatchController { @GetMapping("/order/{id}") public Map<String, Object> order(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("total", "2300.00"); sleep(2000); return map; }
@GetMapping("/product/{id}") public Map<String, Object> product(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>(); if (id == 1) { map.put("name", "小爱音箱"); map.put("price", 300); } else if (id == 2) { map.put("name", "小米手机"); map.put("price", 2000); } map.put("id", id); sleep(1000); return map; }
@GetMapping("/logistics/{id}") public Map<String, Object> logistics(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("name", "中通快递"); sleep(2500); return map; }
private void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
rest 远程调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
@Slf4j public class CountDownlatchDemo { public static void main(String[] args) throws InterruptedException { RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); latch.countDown(); }); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); latch.countDown(); }); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); latch.countDown(); }); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); latch.countDown(); }); latch.await(); log.debug("执行完毕"); service.shutdown(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@Slf4j public class Main { public static void main(String[] args) throws InterruptedException { RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); Future<Map<String, Object>> f1 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f2 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f3 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); return r; }); Future<Map<String, Object>> f4 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); return r; }); System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); System.out.println(f4.get()); log.debug("执行完毕"); service.shutdown(); } }
|
运行结果
1 2 3 4 5 6
| 19:51:39.711 c.TestCountDownLatch [main] - begin {total=2300.00, id=1} {price=300, name=小爱音箱, id=1} {price=2000, name=小米手机, id=2} {name=中通快递, id=1} 19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
|
结果分析
CountDownLatch
和Future
都可以实现,但是各有各的好处,future更适合需要获取结果,进行合并操作的业务逻辑。