一、基本概念
保护性暂停(Guarded Suspension):用在一个线程等待另一个线程的执行结果时使用。
保护性暂停的暂停就是当条件不满足的时候就去进行wait
等待。
要点
- 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个
GuardedObject
。
- 如果有结果不断从一个线程到另一个线程那么此时就不能使用这个保护性暂停模式了,可以使用*消息队列*(见生产者/消费者)。
- JDK中,join的实现、Future的实现,采用的就是此模式。(用join一个线程等待另一个线程结束就可以拿到结果了,其实这也是保护性线程的一个应用)
- 因为要等待另一方的结果,因此归类到同步模式。(关于同步模式的顺序控制实现见文章:同步模式之顺序控制
线程2产生这个结果,然后线程1想要得到这个结果,那就可以让GuardedObject
充当一个桥梁,让线程1、2都关联到这个对象上。
二、单任务版GuardedObject
示例代码
t1 等待 t2线程的下载结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| import site.weiyikai.concurrent.utils.Downloader; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.util.List;
@Slf4j(topic = "c.GuardedTest01") public class GuardedTest01 { public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { log.debug("等待结果"); List<String> list = (List<String>) guardedObject.get(); log.debug("结果大小:{}", list.size()); }, "t1").start();
new Thread(() -> { log.debug("执行下载"); try { List<String> list = Downloader.download(); guardedObject.complete(list); } catch (IOException e) { e.printStackTrace(); } }, "t2").start();
} }
class GuardedObject {
private Object response; private final Object lock = new Object();
public Object get() { synchronized (lock) { while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } }
public void complete(Object response) { synchronized (lock) { this.response = response; lock.notifyAll(); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List;
public class Downloader { public static List<String> download() throws IOException { HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection(); List<String> lines = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { lines.add(line); } } return lines; } }
|
运行结果
1 2 3
| 23:45:31.032 c.GuardedTest01 [t2] - 执行下载 23:45:31.032 c.GuardedTest01 [t1] - 等待结果 23:45:32.409 c.GuardedTest01 [t1] - 结果大小:3
|
升级代码,设置超时时间
如果超过时间还没返回结果,此时就不等了,退出while循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import site.weiyikai.concurrent.utils.Sleeper; import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.GuardedTest02") public class GuardedTest02 { public static void main(String[] args) { GuardedObjectTime guardeObject = new GuardedObjectTime(); new Thread(() -> { log.debug("begin"); Object obj = guardeObject.get(2000); log.debug("结果是:{}", obj); }, "t1").start();
new Thread(() -> { log.debug("begin"); Sleeper.sleep(1);
guardeObject.complete(new Object()); }, "t2").start(); } }
class GuardedObjectTime { private Object response;
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; while (response == null) { long waitTime = timeout - passedTime; if (waitTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } }
|
1 2 3
| 23:50:19.739 c.GuardedTest02 [t1] - begin 23:50:19.739 c.GuardedTest02 [t2] - begin 23:50:20.745 c.GuardedTest02 [t1] - 结果是:java.lang.Object@2e10de1
|
1 2 3
| 23:52:16.857 c.GuardedTest02 [t1] - begin 23:52:16.857 c.GuardedTest02 [t2] - begin 23:52:18.861 c.GuardedTest02 [t1] - 结果是:null
|
三、原理之join
t.join( )方法阻塞调用此方法的线程(calling thread)进入 TIMED_WAITING 状态,直到线程 t 执行完成,此线程再继续;
通常用于在main( )主线程内,等待其它线程完成再结束main( )主线程
1 2 3 4 5 6 7 8 9 10
| synchronized (t1) { while (t1.isAlive()) { t1.wait(0); } }
|
join源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0;
if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); }
if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
|
- 当millis==0时,会进入while( isAlive( ) )循环;即只要子线程是活的,主线程就不停的等待。
- wait( )的作用是让“当前线程”等待,而这里的“当前线程”是指当前运行的线程。虽然是调用子线程的wait( )方法,但是它是通过“主线程”去调用的;所以,休眠的是主线程,而不是“子线程”!
这样理解: 例子中的Thread t只是一个对象 , isAlive( )判断当前对象(例子中的t对象)是否存活, wait()阻塞的是当前执行的线程(一般是main方法)
可以看出,Join方法实现是通过wait( )。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait( ),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。
四、多任务版 GuardedObject
- 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
- 不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个产生结果的线程和使用结果的线程是一一对应的关系,但是生产者消费者模式并不是。
- rpc框架的调用中就使用到了这种模式。
示例代码
送信收信案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| import site.weiyikai.concurrent.utils.Sleeper; import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable; import java.util.Map; import java.util.Set;
@Slf4j(topic = "c.GuardedTest03") public class GuardedTest03 { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } } }
@Slf4j(topic = "c.People") class People extends Thread{ @Override public void run() { GuardedObject03 guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail); } }
@Slf4j(topic = "c.Postman") class Postman extends Thread { private int id; private String mail;
public Postman(int id, String mail) { this.id = id; this.mail = mail; }
@Override public void run() { GuardedObject03 guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } }
class Mailboxes { private static Map<Integer, GuardedObject03> boxes = new Hashtable<>();
private static int id = 1; private static synchronized int generateId() { return id++; }
public static GuardedObject03 getGuardedObject(int id) { return boxes.remove(id); }
public static GuardedObject03 createGuardedObject() { GuardedObject03 go = new GuardedObject03(generateId()); boxes.put(go.getId(), go); return go; }
public static Set<Integer> getIds() { return boxes.keySet(); } }
class GuardedObject03 {
private int id;
public GuardedObject03(int id) { this.id = id; }
public int getId() { return id; }
private Object response;
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; while (response == null) { long waitTime = timeout - passedTime; if (timeout - passedTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } }
|
运行结果
1 2 3 4 5 6 7 8 9
| 00:13:15.571 c.People [Thread-1] - 开始收信 id:1 00:13:15.571 c.People [Thread-2] - 开始收信 id:2 00:13:15.571 c.People [Thread-0] - 开始收信 id:3 00:13:16.570 c.Postman [Thread-3] - 送信 id:3, 内容:内容3 00:13:16.570 c.People [Thread-0] - 收到信 id:3, 内容:内容3 00:13:16.570 c.Postman [Thread-4] - 送信 id:2, 内容:内容2 00:13:16.571 c.People [Thread-2] - 收到信 id:2, 内容:内容2 00:13:16.571 c.Postman [Thread-5] - 送信 id:1, 内容:内容1 00:13:16.571 c.People [Thread-1] - 收到信 id:1, 内容:内容1
|
五、异步模式之生产者/消费者
5.1 定义
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(一个生产一个消费)
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
与保护性暂停的共同点:都是在多个线程之间进行数据传输;
异步模式中, 生产者产生消息之后消息没有被立刻消费
同步模式中, 消息在产生之后被立刻消费了。
示例
线程间通信的消息队列
注:只是简单实现,功能很基础,了解核心原理即刻
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import static site.weiyikai.concurrent.utils.Sleeper.sleep;
@Slf4j(topic = "c.Test04") public class Test04 { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) { int id=i; new Thread(()->{ try { queue.put(new Message(id,"值"+id)); } catch (InterruptedException e) { e.printStackTrace(); } },"生产者"+i).start(); }
new Thread(()->{ while(true){ try { sleep(2); Message message = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者").start(); } }
@Slf4j(topic = "c.MessageQueue") class MessageQueue{ private LinkedList<Message> list=new LinkedList<>();
private int capcity;
public MessageQueue(int capcity) { this.capcity = capcity; }
public Message take() throws InterruptedException { synchronized (list) { while (list.isEmpty()) { log.debug("队列为空,消费者线程等待"); list.wait(); } Message message = list.removeFirst(); log.debug("已消费一个信息"); list.notifyAll(); return message; } }
public void put(Message message) throws InterruptedException { synchronized (list){ while(list.size()==capcity){ log.debug("队列已满,生产者线程等待"); list.wait(); } list.addLast(message); log.debug("已生产一个信息"); list.notifyAll(); } } }
final class Message{ private int id; private Object value;
public Message(int id, Object value) { this.id = id; this.value = value; }
public int getId() { return id; }
public Object getValue() { return value; }
@Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }
|
运行结果
1 2 3 4 5 6 7 8
| 00:23:23.322 c.MessageQueue [生产者0] - 已生产一个信息 00:23:23.324 c.MessageQueue [生产者2] - 已生产一个信息 00:23:23.324 c.MessageQueue [生产者1] - 队列已满,生产者线程等待 00:23:25.321 c.MessageQueue [消费者] - 已消费一个信息 00:23:25.321 c.MessageQueue [生产者1] - 已生产一个信息 00:23:27.322 c.MessageQueue [消费者] - 已消费一个信息 00:23:29.323 c.MessageQueue [消费者] - 已消费一个信息 00:23:31.323 c.MessageQueue [消费者] - 队列为空,消费者线程等待
|