CountdownLatch介绍

一、概述

CountDownLatch(倒计时锁) 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。它相当于是一个计数器,这个计数器的初始值就是线程的数量,每当一个任务完成后,计数器的值就会减一,当计数器的值为 0 时,表示所有的线程都已经任务了,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。

应用场景

典型的应用场景就是当一个服务启动时,同时会加载很多组件和服务,这时候主线程会等待组件和服务的加载。当所有的组件和服务都加载完毕后,主线程和其他线程在一起完成某个任务。

二、常用方法

方法 说明
CountDownLatch(int count) count为计数器的初始值(一般需要多少个线程执行,count就设为几)
countDown() 每调用一次计数器值-1,直到count被减为0,代表所有线程全部执行完毕
getCount() 获取当前计数器的值
await() 等待计数器变为0,即等待所有异步线程执行完毕
  • boolean await(long timeout, TimeUnit unit),此方法与await()区别是:
    • 此方法至多会等待指定的时间,超时后会自动唤醒,若 timeout 小于等于零,则不会等待;
    • boolean 类型返回值:若计数器变为零了,则返回 true;若指定的等待时间过去了,则返回 false。

CountDownLatch与join的区别?

join相对而言属于比较底层的API,使用比较繁琐,且必须等待线程结束CountDownLatch提供了更加灵活的与线程同步的方式(线程可以在完成某个条件调用countDown()方法)。

三、CountDownLatch使用

3.1 简单应用

需求:开启三个线程,分别睡眠不同的时间,当等待三个线程都执行完成后,最后主线程退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import static site.weiyikai.concurrent.utils.Sleeper.sleep;

/**
* Created by xiaowei
* Date 2022/12/1
* Description CountdownLatch示例
*/
@Slf4j(topic = "c.CountdownLatchDemo")
public class CountdownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 初始倒计时为3
CountDownLatch latch = new CountDownLatch(3);

new Thread(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
// 计数减一
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

log.debug("waiting...");
// 等待计数归零
latch.await();
log.debug("wait end...");
}
}

运行结果

1
2
3
4
5
6
7
8
22:07:45.820 [main] DEBUG c.CountdownLatchDemo - waiting...
22:07:45.820 [Thread-0] DEBUG c.CountdownLatchDemo - begin...
22:07:45.820 [Thread-1] DEBUG c.CountdownLatchDemo - begin...
22:07:45.820 [Thread-2] DEBUG c.CountdownLatchDemo - begin...
22:07:46.825 [Thread-0] DEBUG c.CountdownLatchDemo - end...2
22:07:47.325 [Thread-2] DEBUG c.CountdownLatchDemo - end...1
22:07:47.824 [Thread-1] DEBUG c.CountdownLatchDemo - end...0
22:07:47.824 [main] DEBUG c.CountdownLatchDemo - wait end...

从结果可以看出,主线程等待其他三个线程执行完毕才往下执行。

3.2 CountDownLatch配合线程池使用

需求:同时开启四个线程,三个线程负责处理,一个线程负责汇总结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static site.weiyikai.concurrent.utils.Sleeper.sleep;

/**
* Created by xiaowei
* Date 2022/12/1
* Description 配合线程池使用
*/
@Slf4j(topic = "c.CountdownLatchDemo02")
public class CountdownLatchDemo02 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(()->{
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

运行结果

1
2
3
4
5
6
7
8
22:11:58.111 [pool-1-thread-1] DEBUG c.CountdownLatchDemo02 - begin...
22:11:58.111 [pool-1-thread-4] DEBUG c.CountdownLatchDemo02 - waiting...
22:11:58.111 [pool-1-thread-2] DEBUG c.CountdownLatchDemo02 - begin...
22:11:58.111 [pool-1-thread-3] DEBUG c.CountdownLatchDemo02 - begin...
22:11:59.160 [pool-1-thread-1] DEBUG c.CountdownLatchDemo02 - end...2
22:11:59.616 [pool-1-thread-2] DEBUG c.CountdownLatchDemo02 - end...1
22:12:00.115 [pool-1-thread-3] DEBUG c.CountdownLatchDemo02 - end...0
22:12:00.115 [pool-1-thread-4] DEBUG c.CountdownLatchDemo02 - wait end...

从结果可以看出,负责汇总的线程执行到latch.await();进入到等待状态,等其他三个线程全部执行结束之后,再向下执行。

四、CountDownLatch典型应用场景介绍

场景1:多个游戏资源并发加载完成通知

在这个场景下,某个线程进入到下一个流程,需要多个资源加载好,这里假定为10个,可以采用线程池,令该线程提交10个任务分别加载一个资源,然后调用countDownLatch.await去等待所有资源加载完毕,每个任务准备好资源就countDown

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by xiaowei
* Date 2022/12/1
* Description 同步等待多个线程准备完毕
*/
public class CountdownLatchDemo03 {
public static void main(String[] args) throws InterruptedException {

AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
return new Thread(r, "t" + num.incrementAndGet());
});
String[] all = new String[10];
CountDownLatch latch = new CountDownLatch(10);

Random random = new Random();
// 模拟提交10个线程任务
for (int i = 0; i < 10; i++) {
int x = i;
// 线程池中每个任务模拟加载资源,当资源加载完毕,调用countDown(),将state--
service.submit(() -> {
for (int j = 0; j <= 100; j++) {
try {
// 随机睡眠时间,使得每个线程加载完成时间不同
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
all[x] = Thread.currentThread().getName() + "(" + j + "%" + ")";
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.print("\n游戏开始");
service.shutdown();
}
}

运行结果

1
2
[t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%), t10(100%)]
游戏开始

当所有资源都加载好了,才进行下一个流程,游戏开始

场景2:微服务场景下多个RPC远程调用并发执行

在这个场景下,当前用户的请求,需要多个服务器资源,比如需要商品信息,订单信息,快递信息共三个信息,则需要进行三次远程调用,如果顺序执行的话,显然效率比较低下。
此时可以采用多线程的方式利用线程池配合CountDownLatch并发的去获取资源(采用线程的future对象返回线程执行完返回的结果),节约时间。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RestController
public class TestCountDownlatchController {
@GetMapping("/order/{id}")
public Map<String, Object> order(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("total", "2300.00");
sleep(2000);
return map;
}

@GetMapping("/product/{id}")
public Map<String, Object> product(@PathVariable int id) {

HashMap<String, Object> map = new HashMap<>();
if (id == 1) {
map.put("name", "小爱音箱");
map.put("price", 300);
} else if (id == 2) {
map.put("name", "小米手机");
map.put("price", 2000);
}
map.put("id", id);
sleep(1000);
return map;
}

@GetMapping("/logistics/{id}")
public Map<String, Object> logistics(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("name", "中通快递");
sleep(2500);
return map;
}

private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

rest 远程调用

  • CountDownLatch 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Created by xiaowei
* Date 2022/12/1
* Description 同步等待多个线程准备完毕
*/
@Slf4j
public class CountDownlatchDemo {
public static void main(String[] args) throws InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
latch.countDown();
});
latch.await();
log.debug("执行完毕");
service.shutdown();
}
}
  • future实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Created by xiaowei
* Date 2022/12/1
* Description 同步等待多个线程准备完毕
*/
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String, Object>> f1 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug("执行完毕");
service.shutdown();
}
}

运行结果

1
2
3
4
5
6
19:51:39.711 c.TestCountDownLatch [main] - begin 
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
{name=中通快递, id=1}
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

结果分析

CountDownLatchFuture都可以实现,但是各有各的好处,future更适合需要获取结果,进行合并操作的业务逻辑。


CountdownLatch介绍
http://example.com/2023/03/31/CountdownLatch介绍/
作者
程序员小魏
发布于
2023年3月31日
许可协议