02-Java中的鎖詳解
I. 使用Lock接口
只要不涉及到複雜用法,一般採用的是Java的synchronized機制
不過,Lock可以提供一些synchronized不支持的機制
- 非阻塞的獲取鎖:嘗試獲取鎖,如果能獲取馬上獲取,不能獲取馬上返回,不會阻塞
- 中斷獲取鎖:當獲取鎖的線程被中斷時,拋出異常,鎖被釋放
- 超時獲取鎖:為嘗試獲取鎖設定超時時間
相應API:
- void lock():普通的獲取鎖
- void lockInterruptibly() throws InterruptedException:可中斷的獲取鎖,鎖的獲取中可以中斷線程
- boolean tryLock():非阻塞獲取鎖
- boolean tryLock(long time, TimeUnit unit):超時獲取鎖
- void unlock():釋放鎖
一般框架:
//不要將lock寫進try塊,防止無故釋放
Lock lock = new ReentrantLock();
lock.lock();
try{
...;
}finally{
lock.unlock();
}
II. 隊列同步器AQS
AbstractQueuedSynchronizer:隊列同步器,簡稱AQS,用來構建鎖或者其他同步組件的基礎框架
使用一個int的成員變量表示同步狀態,通過內置的FIFO隊列完成資源的排隊工作
AQS實現鎖可以看作:獲取同步狀態,成功則加鎖成功;失敗則加鎖失敗
調用AQS內部的獲取同步狀態的API,保證是線程安全的
- getState()
- setState(int newState)
- compareAndSetState(int expect, int update)
1. 自己實現一個Mutex互斥鎖
首先要繼承一個Lock接口,然後自己實現裏面的方法
public class Mutex implements Lock {...}
Lock裏面的方法是沒有默認實現的,因此都需要重寫
一般會實現一個繼承於AQS的內部類來執行獲取同步狀態的實現:加鎖相當於獲取同步狀態
public class Mutex implements Lock {
private static class Syn extends AbstractQueuedSynchronizer{...}
}
可以看到,AQS的方法和鎖需要實現的方法是對應的
先實現對應的AQS的幾個方法
private static class Syn extends AbstractQueuedSynchronizer{
//判斷同步器是否被線程佔用
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//獲取鎖
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread()); //設置佔用線程
return true;
}
return false;
}
//釋放鎖
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); //清空佔用線程
setState(0);
return true;
}
}
鎖的獲取和AQS獲取同步狀態其實是一個道理
通過代理模式可以像下面這樣實現
public class Mutex implements Lock {
private static class Syn extends AbstractQueuedSynchronizer{...}
Syn syn = new Syn();
@Override
public void lock() {
syn.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
syn.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return syn.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return syn.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
syn.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
2. AQS實現分析
鎖實現的本質:信號量機制,互斥鎖也就是0和1兩個信號量
AQS維護了一個FIFO的隊列,線程獲取同步狀態失敗則會加入這個隊列,然後阻塞,直到同步狀態釋放,隊列首節點的線程被喚醒
同步隊列中的節點保存的信息有:獲取同步狀態失敗的線程引用,等待狀態,前驅和後繼節點
同步器有一個頭節點和尾節點
加入新的阻塞線程:
構造節點,加入隊列的尾節點
使用compareAndSetTail()加到尾部,這是一個原子操作
2.1 獨佔式的獲取和釋放
獲取同步狀態:
acquire()方法會調用tryAcquire(),如果獲取失敗,則開始調用addWaiter()來給尾節點添加新節點,再調用acquireQueued()等待請求調度
addWaiter()的作用是給FIFO隊列添加尾節點,並返回這個節點的引用
因為可能會多個線程申請失敗,因此需要使用原子操作compareAndSetTail()
enq()的作用是快速添加失敗後的反覆嘗試,直到添加尾節點成功
acquireQueued()用來請求調度
可見等待調度期間是支持中斷的
這個請求調度有兩個條件:
- 該節點是首節點
- 申請互斥信號量成功
for循環的這個操作被稱為自旋
release()釋放互斥信號量,根據上文提到的獲取信號量,除了tryRelease(),還應該喚醒後繼節點
2.2 共享式狀態獲取和釋放
最典型的場景就是讀寫場景:一個資源允許多個線程進行讀取,此時寫線程阻塞;而寫線程執行時,所有讀線程阻塞
共享鎖鎖也就是資源信號量的應用,主要解決下面問題:只想要有限的線程執行
調用tryAcquireShared()來申請資源信號量
doAcquireShared()是申請失敗後,構造節點加入FIFO隊列然後自旋的操作
使用releaseShared()來釋放
注意:共享式的釋放可能有多個線程,需要用CAS操作來實現tryReleaseShared()
3. 自己實現一個TwinsLock共享鎖
需要自己實現的:
-
tryAcquiredShared()
-
tryReleaseShared():要保證釋放操作的原子性
State()的取值就是資源信號量的取值
public class TwinsLock {
private int count;
TwinsLock(int count){
this.count = count;
}
private final Sync sync = new Sync(count);
private static final class Sync extends AbstractQueuedSynchronizer{
Sync(int count){
if(count < 0) throw new IllegalArgumentException();
setState(count); //設置資源總數
}
@Override
protected int tryAcquireShared(int arg) {
for(;;){
int current = getState();
int newCount = current - arg;
if(newCount<0 || compareAndSetState(current, newCount)){
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for(;;){
int current = getState();
int newCount = current + arg;
if(compareAndSetState(current, newCount)){
return true;
}
}
}
}
public void lock(){
sync.acquireShared(1);
}
public void unlock(){
sync.releaseShared(1);
}
}
III. 可重入鎖
可重入鎖:支持一個線程對資源反覆加鎖
synchronized支持可重入
ReentrantLock
是可重入鎖的一種實現,支持反覆加鎖
鎖的公平性:
- 公平:先對鎖進行獲取的請求先被滿足
- 不公平:先對鎖進行獲取的請求不一定先被滿足
1. 實現可重入
只需要判斷當前線程是否是獲取了鎖的線程,如果是,則同步狀態加一
每次釋放同步狀態減一,減到0的時候設置獲取鎖的線程為null,此時允許其他線程獲取
接下來來看看ReetrantLock
的實現
2. 公平鎖與非公平鎖
繼續觀察nofairTryAcquire()方法,發現只要CAS成功,則線程直接獲取到鎖
而公平鎖需要確定隊列中沒有前驅節點,即自己就是首節點
公平鎖:確保線程的FIFO,先上下文切換開銷大
非公平鎖:可能造成線程飢餓,但線程切換少,吞吐量更大
IV. 讀寫鎖
讀寫鎖,是一種提供共享式和獨佔式兩種方式的鎖
- 支持公平鎖和非公平鎖
- 支持重進入
- 支持鎖降級
一個資源允許多個線程進行讀取,此時寫線程阻塞;而寫線程執行時,所有讀線程阻塞
1. 讀寫鎖的實現
讀寫鎖的同步狀態是按位切割使用的
維護了一個int型的同步狀態,32位
高16為讀狀態,低16位為寫狀態
1.1 寫鎖的獲取
w是c與0x0000FFFF做與運算後的值,w=0有兩種情況:
- 有讀鎖,低16位全0
- 無讀鎖也無寫鎖,需要後面的條件判斷是否為當前線程
1.2 讀鎖的獲取
和寫鎖的獲取類似,需要判斷先有沒有寫鎖
不過讀鎖是共享式的,可以允許多個線程獲取讀鎖
不過讀鎖也支持重進入,因此不光要維護獲取讀鎖的總狀態,還要維護每個線程獲取讀鎖的狀態
2. 鎖降級
鎖降級指:線程先獲取寫鎖,然後再獲取讀鎖,最後釋放寫鎖,實現從寫鎖降到讀鎖
目的:保證讀寫操作的連貫性
使用場景:寫操作執行完馬上需要讀一次,不加讀鎖的話可能會被其他寫線程修改,再讀數據可能就變了
V. LockSupport工具
用於阻塞和喚醒線程
VI. Condition接口
Condition接口依賴於Lock對象,用於實現等待-通知模式
核心API就是兩個,這兩個API的擴展可以增加超時時間,設置中斷不敏感等等:
- await()
- signal()
1. 使用Condition實現一個阻塞隊列
隊列滿的時候,填充操作阻塞;隊列空的時候,取出操作阻塞
public class BoundedQueue <T>{
private Object[] items;
private int addIndex, revIndex, count;
private ReentrantLock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private Condition full = lock.newCondition();
public BoundedQueue(int size){
items = new Object[size];
}
/**
* 添加元素
* @param t
*/
public void add(T t) throws InterruptedException {
lock.lock();
try{
while(count == items.length){
System.out.println("已滿,請等待消耗");
empty.await();
}
items[addIndex] = t;
if(++addIndex == items.length) addIndex = 0;
count++;
full.signal();
}finally {
lock.unlock();
}
}
/**
* 取出元素
* @return
*/
public T remove() throws InterruptedException {
lock.lock();
try{
while(count == 0){
System.out.println("已空,請等待生產");
full.await();
}
Object temp = items[revIndex];
if(++revIndex == items.length) revIndex = 0;
count--;
empty.signal();
return (T) temp;
}finally {
lock.unlock();
}
}
}
2. Condition的實現分析
每個Condition會維護一個等待隊列,一個鎖支持支持多個等待隊列
獲取到鎖的線程也就是同步隊列的首節點
此時再調用await,則首節點進入等待隊列,直到其他線程喚醒
相應的,調用signal則是將等待隊列的首節點拆下來放到同步隊列,喚醒線程開始自旋
當節點回到同步隊列,之前調用的await()中的isOnsyncQueue()會返回true,結束等待,在調用acquireQueued()加入競爭
通過isHeldExclusively判斷有沒有拿到鎖