­

深入理解Java中的鎖(三)

  • 2019 年 10 月 7 日
  • 筆記

ReadWriteLock接口

讀寫鎖維護一對關聯鎖,一個只用於讀操作,一個只用於寫操作。讀鎖可以由多個線程同時持有,又稱共享鎖。寫鎖同一時間只能由一個線程持有,又稱互斥鎖。同一時間,兩把鎖不能被不同線程持有。讀寫鎖適合讀取操作多於寫入操作的場景,改進互斥鎖的性能,比如集合的並發安全性改造,緩存組件等。

ReentrantReadWriteLock實現原理分析

  1. ReentrantReadWriteLock需要一個owner用來標記那個寫操作的線程獲取到了鎖,owner只會標記寫操作的線程引用,不會標記讀操作的線程,一個writeCount用來記錄寫操作加鎖的次數, 一個readCount用來記錄讀操作加鎖的次數,還有一個waiters等待隊列用來存放沒有搶到鎖的線程列表
  2. 當有寫操作線程進來時,會先判斷readCount的值,如果readCount為0說明讀鎖未被佔用
  3. 然後判斷writeCount的值,如果writeCount為0,說明寫鎖未被佔用
  4. 然後通過CAS操作進行搶鎖將writeCount值加1,如果搶到鎖則將owner設置為當前寫操作線程的引用
  5. 如果writeCount不為0同時owner指向當前寫線程的引用,則將writeCount的值加1
  6. 如果writeCount不為0同時owner指向的不是當前寫線程的引用,則將則將線程放入等待隊列
  7. 如果CAS搶鎖失敗,則將線程放入等待隊列
  8. 如果寫操作線程進來時,readCount不為0說明讀鎖已被佔用,則將線程放入等待隊列
  9. 當有讀操作線程進來時,會先判斷writeCount的值,如果writeCount為0說明寫鎖未被佔用
  10. 然後通過CAS將readCount的值加1
  11. 如果讀操作線程進來時,writeCount不為0說明寫鎖被佔用
  12. 如果寫鎖是被當前線程佔用則該線程可以繼續獲得讀鎖,即鎖降級
  13. 如果寫鎖不是被當前線程佔用,則將線程放入等待隊列
  14. 當有寫線程釋放鎖時,會將writeCount的值減1,如果writeCount的值為0,則將owner設為null同時喚醒等待隊列頭部的線程出隊列進行搶鎖操作
  15. 如果等待隊列的頭部線程是讀操作,則會進行CAS操作將readCount值加1同時喚醒下一個等待線程
  16. 如果下一個線程還是讀操作,則會進行CAS操作將readCount值加1並且繼續喚醒下一個等待線程
  17. 如果下一個線程是寫操作,則不會喚醒需要等到將讀鎖釋放完之後才會喚醒

手動實現ReentrantReadWriteLock示例:

public class MyReadWriteLock {  private AtomicInteger readCount = new AtomicInteger(0);  private AtomicInteger writeCount = new AtomicInteger(0);  // 獨佔鎖 擁有者  private AtomicReference<Thread> owner = new AtomicReference<>();  // 等待隊列  private volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();  class WaitNode {  int type = 0; // 0 為想獲取獨佔鎖的線程,  1為想獲取共享鎖的線程    Thread thread = null;  int arg = 0;  public WaitNode(Thread thread, int type, int arg) {  this.thread = thread;  this.type = type;  this.arg = arg;  }  }  // 獲取獨佔鎖  public void lockWrite() {  int arg = 1;  // 嘗試獲取獨佔鎖,若成功,退出方法,    若失敗...  if (!tryLockWrite(arg)) {  // 標記為獨佔鎖      WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);  waiters.offer(waitNode); // 進入等待隊列      // 循環嘗試拿鎖      for (; ; ) {  // 若隊列頭部是當前線程        WaitNode head = waiters.peek();  if (head != null && head.thread == Thread.currentThread()) {  if (!tryLockWrite(arg)) { // 再次嘗試獲取 獨佔鎖            LockSupport.park(); // 若失敗,掛起線程          } else { // 若成功獲取            waiters.poll(); //  將當前線程從隊列頭部移除            return; // 並退出方法          }  } else { // 若不是隊列頭部元素          LockSupport.park(); // 將當前線程掛起        }  }  }  }  // 釋放獨佔鎖  public boolean unlockWrite() {  int arg = 1;  // 嘗試釋放獨佔鎖 若失敗返回true,若失敗...  if (tryUnlockWrite(arg)) {  WaitNode next = waiters.peek(); // 取出隊列頭部的元素      if (next != null) {  Thread th = next.thread;  LockSupport.unpark(th); // 喚醒隊列頭部的線程      }  return true; // 返回true  }  return false;  }  // 嘗試獲取獨佔鎖  public boolean tryLockWrite(int acquires) {  // 如果read count !=0 返回false  if (readCount.get() != 0) return false;  int wct = writeCount.get(); // 拿到 獨佔鎖 當前狀態    if (wct == 0) {  if (writeCount.compareAndSet(wct, wct + acquires)) { // 通過修改state來搶鎖        owner.set(Thread.currentThread()); //  搶到鎖後,直接修改owner為當前線程        return true;  }  } else if (owner.get() == Thread.currentThread()) {  writeCount.set(wct + acquires); // 修改count值      return true;  }  return false;  }  // 嘗試釋放獨佔鎖  public boolean tryUnlockWrite(int releases) {  // 若當前線程沒有 持有獨佔鎖    if (owner.get() != Thread.currentThread()) {  throw new IllegalMonitorStateException(); // 拋IllegalMonitorStateException  }  int wc = writeCount.get();  int nextc = wc - releases; // 計算 獨佔鎖剩餘佔用    writeCount.set(nextc); // 不管是否完全釋放,都更新count值    if (nextc == 0) { // 是否完全釋放      owner.compareAndSet(Thread.currentThread(), null);  return true;  } else {  return false;  }  }  // 獲取共享鎖  public void lockRead() {  int arg = 1;  if (tryLockRead(arg) < 0) { // 如果tryAcquireShare失敗      // 將當前進程放入隊列      WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);  waiters.offer(node); // 加入隊列      for (; ; ) {  // 若隊列頭部的元素是當前線程        WaitNode head = waiters.peek();  if (head != null && head.thread == Thread.currentThread()) {  if (tryLockRead(arg) >= 0) { // 嘗試獲取共享鎖,  若成功            waiters.poll(); // 將當前線程從隊列中移除            WaitNode next = waiters.peek();  if (next != null && next.type == 1) { // 如果下一個線程也是等待共享鎖              LockSupport.unpark(next.thread); // 將其喚醒            }  return; // 退出方法          } else { // 若嘗試失敗            LockSupport.park(); // 掛起線程          }  } else { // 若不是頭部元素          LockSupport.park();  }  }  }  }  // 解鎖共享鎖  public boolean unLockRead() {  int arg = 1;  if (tryUnLockRead(arg)) { // 當read count變為0,才叫release share成功      WaitNode next = waiters.peek();  if (next != null) {  LockSupport.unpark(next.thread);  }  return true;  }  return false;  }  // 嘗試獲取共享鎖  public int tryLockRead(int acquires) {  for (; ; ) {  if (writeCount.get() != 0 && owner.get() != Thread.currentThread()) return -1;  int rct = readCount.get();  if (readCount.compareAndSet(rct, rct + acquires)) {  return 1;  }  }  }  // 嘗試解鎖共享鎖  public boolean tryUnLockRead(int releases) {  for (; ; ) {  int rc = readCount.get();  int nextc = rc - releases;  if (readCount.compareAndSet(rc, nextc)) {  return nextc == 0;  }  }  }  }

鎖降級

鎖降級指的是寫鎖降級為讀鎖,是指持有寫鎖的同時,再獲取讀鎖,隨後釋放寫鎖的過程。 寫鎖是線程獨佔,讀鎖是線程共享,所以寫鎖降級為讀鎖可行,而讀鎖升級為寫鎖不可行。

代碼示例:

class TeacherInfoCache {  static volatile boolean cacheValid;  static final ReadWriteLock rwl = new ReentrantReadWriteLock();  static Object get(String dataKey) {  Object data = null;  // 讀取數據,加讀鎖    rwl.readLock().lock();  try {  if (cacheValid) {  data = Redis.data.get(dataKey);  } else {  // 通過加鎖的方式去訪問DB,加寫鎖        rwl.readLock().unlock();  rwl.writeLock().lock();  try {  if (!cacheValid) {  data = DataBase.queryUserInfo();  Redis.data.put(dataKey, data);  cacheValid = true;  }  } finally {  // 鎖降級          rwl.readLock().lock();  rwl.writeLock().unlock();  }  }  return data;  } finally {  rwl.readLock().unlock();  }  }  }  class DataBase {  static String queryUserInfo() {  System.out.println("查詢數據庫。。。");  return "name:Kody,age:40,gender:true,";  }  }  class Redis {  static Map<String, Object> data = new HashMap<>();  }