一、概述
Semaphore是向外分发资源的许可证,可以允许一个或多个任务同时访问资源。
- Semaphore通过构造参数来指定许可证的数量;
- acquire方法阻塞式获取许可证;
- release方法释放许可证。
可以将其比喻为地铁的安检,每当人流量高峰的时候,安检会先让几个人进去,然后拦住后面的人,待前面几人通过安检门后,会对后面的人用相同的方式放行。
特点
- Semaphore(信号量)是一种计数器,用来保护一个或者多个共享资源的访问。
- 如果线程要访问一个资源就必须先获得信号量。
- 如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0。
- 当信号量使用完时,必须释放。
二、常用方法
Semaphore常用方法说明:
方法 |
说明 |
acquire() |
从信号量获取一个许可,如果无可用许可前将一直阻塞等待 |
acquire(int permits) |
获取指定数目的许可,如果无可用许可前也将会一直阻塞等待 |
acquireUninterruptibly() |
获取一个许可,在获取到许可之前线程一直处于阻塞状态(忽略中断)。 |
tryAcquire() |
尝试获得许可,返回获取许可成功或失败,不阻塞线程。 |
tryAcquire(long timeout, TimeUnit unit) |
尝试获得许可,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。 |
release() |
释放一个许可,唤醒一个获取许可不成功的阻塞线程。 |
hasQueuedThreads() |
等待队列里是否还存在等待线程。 |
getQueueLength() |
获取等待队列里阻塞的线程数。 |
drainPermits() |
清空许可把可用许可数置为0,返回清空许可的数量。 |
availablePermits() |
返回可用的许可数量。 |
基本使用
信号量,用来限制能同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
import static site.weiyikai.concurrent.utils.Sleeper.sleep;
@Slf4j(topic = "c.SemaphoreDemo") public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { semaphore.release(); } }).start(); } } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 22:36:25.882 [Thread-2] DEBUG c.SemaphoreDemo - running... 22:36:25.882 [Thread-1] DEBUG c.SemaphoreDemo - running... 22:36:25.882 [Thread-0] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-2] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-1] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-0] DEBUG c.SemaphoreDemo - end... 22:36:26.886 [Thread-3] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-4] DEBUG c.SemaphoreDemo - running... 22:36:26.886 [Thread-5] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-3] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-4] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-5] DEBUG c.SemaphoreDemo - end... 22:36:27.886 [Thread-6] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-7] DEBUG c.SemaphoreDemo - running... 22:36:27.886 [Thread-8] DEBUG c.SemaphoreDemo - running... 22:36:28.887 [Thread-7] DEBUG c.SemaphoreDemo - end... 22:36:28.887 [Thread-6] DEBUG c.SemaphoreDemo - end... 22:36:28.887 [Thread-9] DEBUG c.SemaphoreDemo - running... 22:36:28.888 [Thread-8] DEBUG c.SemaphoreDemo - end... 22:36:29.887 [Thread-9] DEBUG c.SemaphoreDemo - end...
|
从结果可以看出,每个时刻最多有三个线程在运行,Semaphore 起到效果
三、Semaphore应用
- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
- 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| import lombok.extern.slf4j.Slf4j;
import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicIntegerArray;
public class SemaphorePoolDemo { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(() -> { Connection conn = pool.borrow(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } }
@Slf4j(topic = "c.Pool") class Pool { private final int poolSize;
private Connection[] connections;
private AtomicIntegerArray states;
private Semaphore semaphore;
public Pool(int poolSize) { this.poolSize = poolSize; this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i+1)); } }
public Connection borrow() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } return null; } public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } } }
class MockConnection implements Connection {
private String name;
public MockConnection(String name) { this.name = name; }
@Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } }
|
运行结果
1 2 3 4 5 6 7 8 9 10
| 22:48:15.123 [Thread-0] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:15.123 [Thread-1] DEBUG c.Pool - borrow MockConnection{name='连接2'} 22:48:16.127 [Thread-0] DEBUG c.Pool - free MockConnection{name='连接1'} 22:48:16.127 [Thread-2] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:16.127 [Thread-1] DEBUG c.Pool - free MockConnection{name='连接2'} 22:48:16.127 [Thread-3] DEBUG c.Pool - borrow MockConnection{name='连接2'} 22:48:17.128 [Thread-2] DEBUG c.Pool - free MockConnection{name='连接1'} 22:48:17.128 [Thread-3] DEBUG c.Pool - free MockConnection{name='连接2'} 22:48:17.128 [Thread-4] DEBUG c.Pool - borrow MockConnection{name='连接1'} 22:48:18.128 [Thread-4] DEBUG c.Pool - free MockConnection{name='连接1'}
|
从上面的结果可以看出每个时刻都只有两个线程在使用连接。
四、Semaphore原理
4.1 加解锁流程
Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位数量减一
- 刚开始,permits(state)为3,这时5个线程来获取资源
- 假设其中 Thread-1,Thread-2,Thread-4 CAS竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞
- 这时 Thread-4 释放了 permits,状态如下
- 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态