Semaphore介绍

一、概述

Semaphore是向外分发资源的许可证,可以允许一个或多个任务同时访问资源。

  • Semaphore通过构造参数来指定许可证的数量;
  • acquire方法阻塞式获取许可证;
  • release方法释放许可证。

可以将其比喻为地铁的安检,每当人流量高峰的时候,安检会先让几个人进去,然后拦住后面的人,待前面几人通过安检门后,会对后面的人用相同的方式放行。

特点

  • Semaphore(信号量)是一种计数器,用来保护一个或者多个共享资源的访问。
  • 如果线程要访问一个资源就必须先获得信号量。
  • 如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0。
  • 当信号量使用完时,必须释放。

二、常用方法

Semaphore常用方法说明

方法 说明
acquire() 从信号量获取一个许可,如果无可用许可前将一直阻塞等待
acquire(int permits) 获取指定数目的许可,如果无可用许可前也将会一直阻塞等待
acquireUninterruptibly() 获取一个许可,在获取到许可之前线程一直处于阻塞状态(忽略中断)。
tryAcquire() 尝试获得许可,返回获取许可成功或失败,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit) 尝试获得许可,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
release() 释放一个许可,唤醒一个获取许可不成功的阻塞线程。
hasQueuedThreads() 等待队列里是否还存在等待线程。
getQueueLength() 获取等待队列里阻塞的线程数。
drainPermits() 清空许可把可用许可数置为0,返回清空许可的数量。
availablePermits() 返回可用的许可数量。

基本使用

信号量,用来限制能同时访问共享资源的线程上限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;

import static site.weiyikai.concurrent.utils.Sleeper.sleep;

/**
* Created by xiaowei
* Date 2022/11/29
* Description Semaphore示例
*/
@Slf4j(topic = "c.SemaphoreDemo")
public class SemaphoreDemo {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 获取许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
22:36:25.882 [Thread-2] DEBUG c.SemaphoreDemo - running...
22:36:25.882 [Thread-1] DEBUG c.SemaphoreDemo - running...
22:36:25.882 [Thread-0] DEBUG c.SemaphoreDemo - running...
22:36:26.886 [Thread-2] DEBUG c.SemaphoreDemo - end...
22:36:26.886 [Thread-1] DEBUG c.SemaphoreDemo - end...
22:36:26.886 [Thread-0] DEBUG c.SemaphoreDemo - end...
22:36:26.886 [Thread-3] DEBUG c.SemaphoreDemo - running...
22:36:26.886 [Thread-4] DEBUG c.SemaphoreDemo - running...
22:36:26.886 [Thread-5] DEBUG c.SemaphoreDemo - running...
22:36:27.886 [Thread-3] DEBUG c.SemaphoreDemo - end...
22:36:27.886 [Thread-4] DEBUG c.SemaphoreDemo - end...
22:36:27.886 [Thread-5] DEBUG c.SemaphoreDemo - end...
22:36:27.886 [Thread-6] DEBUG c.SemaphoreDemo - running...
22:36:27.886 [Thread-7] DEBUG c.SemaphoreDemo - running...
22:36:27.886 [Thread-8] DEBUG c.SemaphoreDemo - running...
22:36:28.887 [Thread-7] DEBUG c.SemaphoreDemo - end...
22:36:28.887 [Thread-6] DEBUG c.SemaphoreDemo - end...
22:36:28.887 [Thread-9] DEBUG c.SemaphoreDemo - running...
22:36:28.888 [Thread-8] DEBUG c.SemaphoreDemo - end...
22:36:29.887 [Thread-9] DEBUG c.SemaphoreDemo - end...

从结果可以看出,每个时刻最多有三个线程在运行,Semaphore 起到效果

三、Semaphore应用

  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import lombok.extern.slf4j.Slf4j;

import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerArray;

/**
* Created by xiaowei
* Date 2022/11/29
* Description Semaphore应用
*/
public class SemaphorePoolDemo {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}

@Slf4j(topic = "c.Pool")
class Pool {
// 1. 连接池大小
private final int poolSize;

// 2. 连接对象数组
private Connection[] connections;

// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;

private Semaphore semaphore;

// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}

// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}

class MockConnection implements Connection {

private String name;

public MockConnection(String name) {
this.name = name;
}

@Override
public String toString() {
return "MockConnection{" +
"name='" + name + '\'' +
'}';
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
22:48:15.123 [Thread-0] DEBUG c.Pool - borrow MockConnection{name='连接1'}
22:48:15.123 [Thread-1] DEBUG c.Pool - borrow MockConnection{name='连接2'}
22:48:16.127 [Thread-0] DEBUG c.Pool - free MockConnection{name='连接1'}
22:48:16.127 [Thread-2] DEBUG c.Pool - borrow MockConnection{name='连接1'}
22:48:16.127 [Thread-1] DEBUG c.Pool - free MockConnection{name='连接2'}
22:48:16.127 [Thread-3] DEBUG c.Pool - borrow MockConnection{name='连接2'}
22:48:17.128 [Thread-2] DEBUG c.Pool - free MockConnection{name='连接1'}
22:48:17.128 [Thread-3] DEBUG c.Pool - free MockConnection{name='连接2'}
22:48:17.128 [Thread-4] DEBUG c.Pool - borrow MockConnection{name='连接1'}
22:48:18.128 [Thread-4] DEBUG c.Pool - free MockConnection{name='连接1'}

从上面的结果可以看出每个时刻都只有两个线程在使用连接。

四、Semaphore原理

4.1 加解锁流程

Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位数量减一

  • 刚开始,permits(state)为3,这时5个线程来获取资源

image-20230331194728663

  • 假设其中 Thread-1,Thread-2,Thread-4 CAS竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

image-20230331194938491

  • 这时 Thread-4 释放了 permits,状态如下

image-20230331194952051

  • 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

image-20230331195006698


Semaphore介绍
http://example.com/2023/03/31/Semaphore介绍/
作者
程序员小魏
发布于
2023年3月31日
许可协议