Java 多執行緒共享模型之管程(下)
共享模型之管程
wait、notify
wait、notify 原理
- Owner 執行緒發現條件不滿足,調用 wait 方法,即可進入 WaitSet 變為 WAITING 狀態
- BLOCKED 和 WAITING 的執行緒都處於阻塞狀態,不佔用 CPU 時間片
- BLOCKED 執行緒會在 Owner 執行緒釋放鎖時喚醒
- WAITING 執行緒會在 Owner 執行緒調用 notify 或 notifyAll 時喚醒,但喚醒後並不意味者立刻獲得鎖,仍需進入EntryList 重新競爭
API 介紹
- obj.wait() 讓進入 object 監視器的執行緒到 waitSet 等待
- obj.notify() 在 object 上正在 waitSet 等待的執行緒中挑一個喚醒
- obj.notifyAll() 讓 object 上正在 waitSet 等待的執行緒全部喚醒
它們都是執行緒之間進行協作的手段,都屬於 Object 對象的方法。必須獲得此對象的鎖,才能調用這幾個方法
package WaNo;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo2")
public class demo2 {
static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (lock){
log.debug("執行");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其他程式碼");
}
},"t1").start();
new Thread(() -> {
synchronized (lock){
log.debug("執行");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其他程式碼");
}
},"t2").start();
Thread.sleep(2000);
log.debug("喚醒 lock 上其他執行緒");
synchronized (lock){
lock.notify(); //喚醒 lock 上的一個執行緒(隨機)
//lock.notifyAll(); //喚醒 lock 上的所有執行緒
}
}
}
-
notify()
20:20:58 [t1] c.demo2 - 執行 20:20:58 [t2] c.demo2 - 執行 20:21:00 [main] c.demo2 - 喚醒 lock 上其他執行緒 20:21:00 [t1] c.demo2 - 其他程式碼
-
notifyAll()
20:22:04 [t1] c.demo2 - 執行 20:22:04 [t2] c.demo2 - 執行 20:22:06 [main] c.demo2 - 喚醒 lock 上其他執行緒 20:22:06 [t2] c.demo2 - 其他程式碼 20:22:06 [t1] c.demo2 - 其他程式碼
wait() 方法會釋放對象的鎖,進入 WaitSet 等待區,從而讓其他執行緒就機會獲取對象的鎖。無限制等待,直到notify 為止
wait(long n) 有時限的等待, 到 n 毫秒後結束等待,或是被 notify
wait、notify 正確使用
sleep vs. wait
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要強制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
- sleep 在睡眠的同時,不會釋放對象鎖的,但 wait 在等待的時候會釋放對象鎖
- 它們狀態 TIMED_WAITING
step 1
思考下面的解決方案好不好,為什麼?
package WaNo;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo4")
public class demo4 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeOut = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (room){
log.debug("有煙沒?[{}]",hasCigarette);
if(!hasCigarette){
log.debug("沒煙,睡會!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有煙沒?[{}]",hasCigarette);
if(hasCigarette){
log.debug("開始幹活!");
}
}
},"小南").start();
for(int i=0;i<5;i++){
new Thread(() -> {
synchronized (room){
log.debug("開始幹活!");
}
},"其他人").start();
}
Thread.sleep(1000);
new Thread(() -> {
hasCigarette = true;
log.debug("煙到了!");
},"送煙的").start();
}
}
輸出:
20:41:09 [小南] c.demo4 - 有煙沒?[false]
20:41:09 [小南] c.demo4 - 沒煙,睡會!
20:41:10 [送煙的] c.demo4 - 煙到了!
20:41:11 [小南] c.demo4 - 有煙沒?[true]
20:41:11 [小南] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
- 其它幹活的執行緒,都要一直阻塞,效率太低
- 小南執行緒必須睡足 2s 後才能醒來,就算煙提前送到,也無法立刻醒來
- 加了 synchronized (room) 後,就好比小南在裡面反鎖了門睡覺,煙根本沒法送進門,main 沒加synchronized 就好像 main 執行緒是翻窗戶進來的
- 解決方法,使用 wait – notify 機制
step 2
思考下面的實現行嗎,為什麼?
package WaNo.step;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo4")
public class step2 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeOut = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (room){
log.debug("有煙沒?[{}]",hasCigarette);
if(!hasCigarette){
log.debug("沒煙,睡會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有煙沒?[{}]",hasCigarette);
if(hasCigarette){
log.debug("開始幹活!");
}
}
},"小南").start();
for(int i=0;i<5;i++){
new Thread(() -> {
synchronized (room){
log.debug("開始幹活!");
}
},"其他人").start();
}
Thread.sleep(1000);
new Thread(() -> {
synchronized (room){
hasCigarette = true;
log.debug("煙到了!");
room.notify();
}
},"送煙的").start();
}
}
輸出:
20:46:32 [小南] c.demo4 - 有煙沒?[false]
20:46:32 [小南] c.demo4 - 沒煙,睡會!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:33 [送煙的] c.demo4 - 煙到了!
20:46:33 [小南] c.demo4 - 有煙沒?[true]
20:46:33 [小南] c.demo4 - 開始幹活!
- 解決了其它幹活的執行緒阻塞的問題
- 但如果有其它執行緒也在等待條件呢?
step 3
package WaNo.step;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo4")
public class step3 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeOut = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (room){
log.debug("有煙沒?[{}]",hasCigarette);
if(!hasCigarette){
log.debug("沒煙,睡會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有煙沒?[{}]",hasCigarette);
if(hasCigarette){
log.debug("開始幹活!");
} else {
log.debug("沒幹成活...");
}
}
},"小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外賣送到沒?[{}]", hasTakeOut);
if (!hasTakeOut) {
log.debug("沒外賣,先歇會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外賣送到沒?[{}]", hasTakeOut);
if (hasTakeOut) {
log.debug("可以開始幹活了");
} else {
log.debug("沒幹成活...");
}
}
}, "小女").start();
for(int i=0;i<5;i++){
new Thread(() -> {
synchronized (room){
log.debug("開始幹活!");
}
},"其他人").start();
}
Thread.sleep(1000);
new Thread(() -> {
synchronized (room){
hasCigarette = true;
log.debug("煙到了!");
room.notify();
}
},"送煙的").start();
}
}
輸出:
20:53:12.173 [小南] c.TestCorrectPosture - 有煙沒?[false]
20:53:12.176 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:53:12.176 [小女] c.TestCorrectPosture - 外賣送到沒?[false]
20:53:12.176 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:53:13.174 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有煙沒?[false]
20:53:13.174 [小南] c.TestCorrectPosture - 沒幹成活...
notify 只能隨機喚醒一個 WaitSet 中的執行緒,這時如果有其它執行緒也在等待,那麼就可能喚醒不了正確的執行緒,稱之為【虛假喚醒】
解決方法,改為 notifyAll
step 4
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外賣到了噢!");
room.notifyAll();
}
}, "送外賣的").start();
輸出:
20:55:23.978 [小南] c.TestCorrectPosture - 有煙沒?[false]
20:55:23.982 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:55:23.982 [小女] c.TestCorrectPosture - 外賣送到沒?[false]
20:55:23.982 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:55:24.979 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:55:24.979 [小女] c.TestCorrectPosture - 外賣送到沒?[true]
20:55:24.980 [小女] c.TestCorrectPosture - 可以開始幹活了
20:55:24.980 [小南] c.TestCorrectPosture - 有煙沒?[false]
20:55:24.980 [小南] c.TestCorrectPosture - 沒幹成活...
用 notifyAll 僅解決某個執行緒的喚醒問題,但使用 if + wait 判斷僅有一次機會,一旦條件不成立,就沒有重新判斷的機會了
解決方法,用 while + wait,當條件不成立,再次 wait
step 5
將 if 改為 while
while (!hasCigarette) {
log.debug("沒煙,先歇會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
輸出:
20:58:34.322 [小南] c.TestCorrectPosture - 有煙沒?[false]
20:58:34.326 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:58:34.326 [小女] c.TestCorrectPosture - 外賣送到沒?[false]
20:58:34.326 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:58:35.323 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外賣送到沒?[true]
20:58:35.324 [小女] c.TestCorrectPosture - 可以開始幹活了
20:58:35.324 [小南] c.TestCorrectPosture - 沒煙,先歇會!
套路總結
synchronized(lock) {
while(條件不成立) {
lock.wait();
}
// 幹活
}
//另一個執行緒
synchronized(lock) {
lock.notifyAll();
}
同步模式之保護性暫停
定義
即 Guarded Suspension,用在一個執行緒等待另一個執行緒的執行結果
要點
- 有一個結果需要從一個執行緒傳遞到另一個執行緒,讓他們關聯同一個 GuardedObject
- 如果有結果不斷從一個執行緒到另一個執行緒那麼可以使用消息隊列(見生產者/消費者)
- JDK 中,join 的實現、Future 的實現,採用的就是此模式
- 因為要等待另一方的結果,因此歸類到同步模式
實現
package WaNo.step;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j(topic = "c.demo4")
public class demo4 {
public static void main(String[] args) {
//執行緒1 等待執行緒2 的下載結果
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
List<String> list = (List<String>) guardedObject.get();
log.debug("結果的大小是:{}",list.size());
},"t1").start();
new Thread(() -> {
log.debug("執行下載");
try {
Thread.sleep(5000);
List<String> list = new ArrayList<>();
list.add("1");
guardedObject.complete(list);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class GuardedObject {
//結果
private Object response;
//獲取結果
public Object get() {
synchronized (this){
//還沒有結果
while (response == null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
//產生結果
public void complete(Object response){
synchronized (this){
//給結果成員變數賦值
this.response = response;
this.notifyAll();
}
}
}
輸出:
16:47:15 [t2] c.demo4 - 執行下載
16:47:20 [t1] c.demo4 - 結果的大小是:1
非同步模式之生產者/消費者
要點
- 與前面的保護性暫停中的 GuardObject 不同,不需要產生結果和消費結果的執行緒一一對應
- 消費隊列可以用來平衡生產和消費的執行緒資源
- 生產者僅負責產生結果數據,不關心數據該如何處理,而消費者專心處理結果數據
- 消息隊列是有容量限制的,滿時不會再加入數據,空時不會再消耗數據
- JDK 中各種阻塞隊列,採用的就是這種模式
package WaNo;
import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
@Slf4j(topic = "c.demo5")
public class demo5 {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id,"值"+id));
},"生產者" + i).start();
}
new Thread(() -> {
while (true){
try {
Thread.sleep(1000);
Message message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消費者").start();
}
}
//消息隊列類(執行緒間通訊)
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
//消息隊列集合
private LinkedList<Message> list = new LinkedList<>();
//隊列容量
private int capcity;
public MessageQueue(int capcity){
this.capcity = capcity;
}
//獲取消息
public Message take(){
//檢查隊列是否為空
synchronized (list){
while (list.isEmpty()){
try {
log.debug("隊列為空,消費者執行緒等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//從隊列頭部獲取消息返回
Message message = list.removeFirst();
log.debug("已消費消息 {}",message);
list.notifyAll();
return message;
}
}
//存入消息
public void put(Message message){
synchronized (list){
//檢查隊列是否已滿
while (list.size() == capcity){
try {
log.debug("隊列已滿,生產者執行緒等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//將消息加入隊列的尾部
list.addLast(message);
log.debug("已生產消息 {}",message);
list.notifyAll();
}
}
}
@Setter
@AllArgsConstructor
@ToString
@Slf4j(topic = "c.Message")
final class Message {
private int id;
private Object value;
}
輸出:
17:18:49 [生產者0] c.MessageQueue - 已生產消息 Message(id=0, value=值0)
17:18:49 [生產者2] c.MessageQueue - 已生產消息 Message(id=2, value=值2)
17:18:49 [生產者1] c.MessageQueue - 隊列已滿,生產者執行緒等待
17:18:50 [消費者] c.MessageQueue - 已消費消息 Message(id=0, value=值0)
17:18:50 [生產者1] c.MessageQueue - 已生產消息 Message(id=1, value=值1)
17:18:51 [消費者] c.MessageQueue - 已消費消息 Message(id=2, value=值2)
17:18:52 [消費者] c.MessageQueue - 已消費消息 Message(id=1, value=值1)
17:18:53 [消費者] c.MessageQueue - 隊列為空,消費者執行緒等待
park、unpark
基本使用
它們是 LockSupport 類中的方法
// 暫停當前執行緒
LockSupport.park();
// 恢復某個執行緒的運行
LockSupport.unpark(暫停執行緒對象)
先 park 再 unpark
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
Thread.sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
輸出:
18:42:52.585 c.TestParkUnpark [t1] - start...
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...
先 unpark 再 park
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
輸出:
18:43:50.765 c.TestParkUnpark [t1] - start...
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...
特點
與 Object 的 wait & notify 相比
- wait,notify 和 notifyAll 必須配合 Object Monitor 一起使用,而 park,unpark 不必
- park & unpark 是以執行緒為單位來【阻塞】和【喚醒】執行緒,而 notify 只能隨機喚醒一個等待執行緒,notifyAll 是喚醒所有等待執行緒,就不那麼【精確】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
原理
每個執行緒都有自己的一個 Parker 對象,由三部分組成 _counter , _cond 和 _mutex
- 當前執行緒調用 Unsafe.park() 方法
- 檢查 _counter ,本情況為 0,這時,獲得 _mutex 互斥鎖
- 執行緒進入 _cond 條件變數阻塞
- 設置 _counter = 0
- 調用 Unsafe.unpark(Thread_0) 方法,設置 _counter 為 1
- 喚醒 _cond 條件變數中的 Thread_0
- Thread_0 恢復運行
- 設置 _counter 為 0
- 調用 Unsafe.unpark(Thread_0) 方法,設置 _counter 為 1
- 當前執行緒調用 Unsafe.park() 方法
- 檢查 _counter ,本情況為 1,這時執行緒無需阻塞,繼續運行
- 設置 _counter 為 0
重新理解六種狀態
假設有執行緒 Thread t
情況一
NEW –> RUNNABLE
當調用 t.start() 方法時,由 NEW –> RUNNABLE
情況二
RUNNABLE <–> WAITING
t 執行緒用 synchronized(obj) 獲取了對象鎖後
- 調用 obj.wait() 方法時,t 執行緒從 RUNNABLE –> WAITING
- 調用 obj.notify() , obj.notifyAll() , t.interrupt() 時
- 競爭鎖成功,t 執行緒從 WAITING –> RUNNABLE
- 競爭鎖失敗,t 執行緒從 WAITING –> BLOCKED
情況三
RUNNABLE <–> WAITING
- 當前執行緒調用 t.join() 方法時,當前執行緒從 RUNNABLE –> WAITING
- 注意是當前執行緒在t 執行緒對象的監視器上等待
- t 執行緒運行結束,或調用了當前執行緒的 interrupt() 時,當前執行緒從 WAITING –> RUNNABLE
情況四
RUNNABLE <–> WAITING
- 當前執行緒調用 LockSupport.park() 方法會讓當前執行緒從 RUNNABLE –> WAITING
- 調用 LockSupport.unpark(目標執行緒) 或調用了執行緒 的 interrupt() ,會讓目標執行緒從 WAITING –> RUNNABLE
情況五
RUNNABLE <–> TIMED_WAITING
t 執行緒用 synchronized(obj) 獲取了對象鎖後
- 調用 obj.wait(long n) 方法時,t 執行緒從 RUNNABLE –> TIMED_WAITING
- t 執行緒等待時間超過了 n 毫秒,或調用 obj.notify() , obj.notifyAll() , t.interrupt() 時
- 競爭鎖成功,t 執行緒從 TIMED_WAITING –> RUNNABLE
- 競爭鎖失敗,t 執行緒從 TIMED_WAITING –> BLOCKED
情況六
RUNNABLE <–> TIMED_WAITING
- 當前執行緒調用 t.join(long n) 方法時,當前執行緒從 RUNNABLE –> TIMED_WAITING
- 注意是當前執行緒在t 執行緒對象的監視器上等待
- 當前執行緒等待時間超過了 n 毫秒,或t 執行緒運行結束,或調用了當前執行緒的 interrupt() 時,當前執行緒從TIMED_WAITING –> RUNNABLE
情況七
RUNNABLE <–> TIMED_WAITING
- 當前執行緒調用 Thread.sleep(long n) ,當前執行緒從 RUNNABLE –> TIMED_WAITING
- 當前執行緒等待時間超過了 n 毫秒,當前執行緒從 TIMED_WAITING –> RUNNABLE
情況八
RUNNABLE <–> TIMED_WAITING
- 當前執行緒調用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 時,當前執行緒從 RUNNABLE –> TIMED_WAITING
- 調用 LockSupport.unpark(目標執行緒) 或調用了執行緒 的 interrupt() ,或是等待超時,會讓目標執行緒從TIMED_WAITING–> RUNNABLE
情況九
RUNNABLE <–> BLOCKED
- t 執行緒用 synchronized(obj) 獲取了對象鎖時如果競爭失敗,從 RUNNABLE –> BLOCKED
- 持 obj 鎖執行緒的同步程式碼塊執行完畢,會喚醒該對象上所有 BLOCKED 的執行緒重新競爭,如果其中 t 執行緒競爭成功,從 BLOCKED –> RUNNABLE ,其它失敗的執行緒仍然 BLOCKED
情況十
RUNNABLE <–> TERMINATED
當前執行緒所有程式碼運行完畢,進入 TERMINATED
多把鎖
package WaNo;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo6")
public class demo6 {
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.study();
},"r1").start();
new Thread(() -> {
bigRoom.sleep();
},"r2").start();
}
}
@Slf4j(topic = "c.BigRoom")
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep(){
synchronized (bedRoom){
log.debug("sleep two hours");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void study(){
synchronized (studyRoom){
log.debug("study one hour");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
輸出:
20:01:42 [r2] c.BigRoom - sleep two hours
20:01:42 [r1] c.BigRoom - study one hour
將鎖的粒度細分
- 好處,是可以增強並發度
- 壞處,如果一個執行緒需要同時獲得多把鎖,就容易發生死鎖
活躍性
死鎖
有這樣的情況:一個執行緒需要同時獲取多把鎖,這時就容易發生死鎖
t1 執行緒 獲得 A對象 鎖,接下來想獲取 B對象 的鎖 t2 執行緒 獲得 B對象 鎖,接下來想獲取 A對象 的鎖
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
synchronized (A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();
輸出:
12:22:06.962 [t2] c.TestDeadLock - lock B
12:22:06.962 [t1] c.TestDeadLock - lock A
哲學家進餐問題
有五位哲學家,圍坐在圓桌旁。
- 他們只做兩件事,思考和吃飯,思考一會吃口飯,吃完飯後接著思考。
- 吃飯時要用兩根筷子吃,桌上共有 5 根筷子,每位哲學家左右手邊各有一根筷子。
- 如果筷子被身邊的人拿著,自己就得等待
這種執行緒沒有按預期結束,執行不下去的情況,歸類為【活躍性】問題,除了死鎖以外,還有活鎖和飢餓者兩種情況
活鎖
活鎖出現在兩個執行緒互相改變對方的結束條件,最後誰也無法結束
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望減到 0 退出循環
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超過 20 退出循環
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
飢餓
一個執行緒由於優先順序太低,始終得不到 CPU 調度執行,也不能夠結束
ReentrantLock
相對於 synchronized 它具備如下特點
- 可中斷
- 可以設置超時時間
- 可以設置為公平鎖
- 支援多個條件變數
與 synchronized 一樣,都支援可重入
基本語法
// 獲取鎖
reentrantLock.lock();
try {
// 臨界區
} finally {
// 釋放鎖
reentrantLock.unlock();
}
可重入
可重入是指同一個執行緒如果首次獲得了這把鎖,那麼因為它是這把鎖的擁有者,因此有權利再次獲取這把鎖如果是不可重入鎖,那麼第二次獲得鎖時,自己也會被鎖擋住
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo1")
public class demo1 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("enter main");
m1();
}finally {
lock.unlock();
}
}
public static void m1(){
lock.lock();
try {
log.debug("enter m1");
m2();
}finally {
lock.unlock();
}
}
public static void m2(){
lock.lock();
try {
log.debug("enter m2");
}finally {
lock.unlock();
}
}
}
輸出:
20:19:19 [main] c.demo1 - enter main
20:19:19 [main] c.demo1 - enter m1
20:19:19 [main] c.demo1 - enter m2
可打斷
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo2")
public class demo2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
//如果沒有競爭,此方法會獲取對象的鎖
//如果有競爭,就進入阻塞隊列,可以被其他執行緒用 interrupt 打斷
log.debug("嘗試獲得鎖");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("未獲得鎖,返回");
return;
}
try {
log.debug("獲取到鎖");
}finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
Thread.sleep(1000);
log.debug("打斷t1");
t1.interrupt();
}
}
輸出:
20:26:05 [t1] c.demo2 - 嘗試獲得鎖
20:26:06 [main] c.demo2 - 打斷t1
20:26:06 [t1] c.demo2 - 未獲得鎖,返回
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at ReentrantLockDemo.demo2.lambda$main$0(demo2.java:16)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 0
注意如果是不可中斷模式,那麼即使使用了 interrupt 也不會讓等待中斷
鎖超時
立刻失敗
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo3")
public class demo3 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("嘗試獲得鎖");
if(!lock.tryLock()){
log.debug("獲取不到鎖");
return;
}
try {
log.debug("獲得到鎖");
}finally {
lock.unlock();
}
},"t1");
lock.lock();
log.debug("獲得到鎖");
t1.start();
}
}
輸出:
20:31:15 [main] c.demo3 - 獲得到鎖
20:31:15 [t1] c.demo3 - 嘗試獲得鎖
20:31:15 [t1] c.demo3 - 獲取不到鎖
超時失敗
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import sun.reflect.generics.tree.Tree;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo3")
public class demo3 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("嘗試獲得鎖");
try {
if(!lock.tryLock(1, TimeUnit.SECONDS)){
log.debug("獲取不到鎖");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("獲取不到鎖");
return;
}
try {
log.debug("獲得到鎖");
}finally {
lock.unlock();
}
},"t1");
lock.lock();
log.debug("獲得到鎖");
Thread.sleep(1000);
lock.unlock();
t1.start();
}
}
輸出:
20:34:03 [main] c.demo3 - 獲得到鎖
20:34:04 [t1] c.demo3 - 嘗試獲得鎖
20:34:04 [t1] c.demo3 - 獲得到鎖
公平鎖
ReentrantLock 默認是不公平的
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo4")
public class demo4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock(false);
lock.lock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之後去爭搶鎖
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "強行插入").start();
lock.unlock();
}
}
強行插入,有機會在中間輸出
注意:該實驗不一定總能復現
t39 running...
t40 running...
t41 running...
t42 running...
t43 running...
強行插入 start...
強行插入 running...
t44 running...
t45 running...
t46 running...
t47 running...
t49 running...
改為公平鎖後
ReentrantLock lock = new ReentrantLock(true);
強行插入,總是在最後輸出
t465 running...
t464 running...
t477 running...
t442 running...
t468 running...
t493 running...
t482 running...
t485 running...
t481 running...
強行插入 running...
公平鎖一般沒有必要,會降低並發度
條件變數
ReentrantLock 的條件變數比 synchronized 強大之處在於,它是支援多個條件變數的,這就好比
- synchronized 是那些不滿足條件的執行緒都在一間休息室等消息
- 而 ReentrantLock 支援多間休息室,有專門等煙的休息室、專門等早餐的休息室、喚醒時也是按休息室來喚醒
使用要點:
- await 前需要獲得鎖
- await 執行後,會釋放鎖,進入 conditionObject 等待
- await 的執行緒被喚醒(或打斷、或超時)取重新競爭 lock 鎖
- 競爭 lock 鎖成功後,從 await 後繼續執行
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo4")
public class demo4 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
//創建一個新的條件變數(休息室)
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
lock.lock();
//進入休息室等待
condition1.await();
condition1.signal();
//condition1.signalAll();
}
}
同步模式之順序控制
固定運行順序
比如,必須先 2 後 1 列印
wait notify版
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.demo4")
public class demo4 {
static final Object lock = new Object();
//表示 t2 是否被運行過
static boolean t2runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock){
while (!t2runned){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
}
},"t1");
Thread t2 = new Thread(() -> {
synchronized (lock){
log.debug("2");
t2runned = true;
lock.notify();
}
},"t2");
t1.start();
t2.start();
}
}
輸出:
20:49:28 [t2] c.demo4 - 2
20:49:28 [t1] c.demo4 - 1
park unpark版
可以看到,實現上很麻煩:
- 首先,需要保證先 wait 再 notify,否則 wait 執行緒永遠得不到喚醒。因此使用了『運行標記』來判斷該不該wait
- 第二,如果有些干擾執行緒錯誤地 notify 了 wait 執行緒,條件不滿足時還要重新等待,使用了 while 循環來解決此問題
- 最後,喚醒對象上的 wait 執行緒需要使用 notifyAll,因為『同步對象』上的等待執行緒可能不止一個
可以使用 LockSupport 類的 park 和 unpark 來簡化上面的題目:
package ReentrantLockDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
@Slf4j(topic = "demo5")
public class demo5 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "t1");
t1.start();
new Thread(() -> {
log.debug("2");
LockSupport.unpark(t1);
},"t2").start();
}
}
交替輸出
執行緒 1 輸出 a 5 次,執行緒 2 輸出 b 5 次,執行緒 3 輸出 c 5 次。現在要求輸出 abcabcabcabcabc 怎麼實現
wait notify版
package ReentrantLockDemo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
@Slf4j(topic = "demo5")
public class demo5 {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1,5);
new Thread(() -> {
wn.print("a",1,2);
}).start();
new Thread(() -> {
wn.print("b",2,3);
}).start();
new Thread(() -> {
wn.print("c",3,1);
}).start();
}
}
/*
輸出內容 等待標記 下一個標記
a 1 2
b 2 3
c 3 1
*/
@AllArgsConstructor
class WaitNotify{
//等待標記
private int flag;
//循環次數
private int loopNumber;
//列印
public void print(String str,int waitFlag,int nextFlag){
for (int i = 0; i < loopNumber; i++) {
synchronized (this){
while (flag != waitFlag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
輸出:
abcabcabcabcabc
ReentrantLock版
package ReentrantLockDemo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.demo6")
public class demo6 {
public static void main(String[] args) throws InterruptedException {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}).start();
Thread.sleep(1000);
awaitSignal.lock();
try {
System.out.println("開始。。。");
a.signal();
}finally {
awaitSignal.unlock();
}
}
}
@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
private int loopNumber;
/**
* @param str 列印內容
* @param current 進入哪一間休息室
* @param next 下一間休息室
*/
public void print(String str,Condition current,Condition next){
for (int i = 0; i < loopNumber; i++) {
lock();
try {
try {
current.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
unlock();
}
}
}
}
輸出:
開始。。。
abcabcabcabcabc
park unpark版
package ReentrantLockDemo;
import lombok.AllArgsConstructor;
import java.util.concurrent.locks.LockSupport;
public class demo7 {
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> {
pu.print("a", t2);
},"t1");
t2 = new Thread(() -> {
pu.print("b", t3);
},"t2");
t3 = new Thread(() -> {
pu.print("c", t1);
},"t3");
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
@AllArgsConstructor
class ParkUnpark{
private int loopNumber;
public void print(String str,Thread next){
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(next);
}
}
}
輸出:
abcabcabcabcabc