深入理解Java中的AQS

  • 2019 年 10 月 3 日
  • 笔记

AQS??

? AbstractQueuedSynchronizer?????????AQS??????????????juc??Lock????????????????AQS???????????AQS??????????????????AQS?????????AQS????

? (1)AQS????????FIFO??????????????(??????head?tail??????????????????Node??????????Node?????)?

/*?????????(??????????????????????????????enq???????  ???Head??)???????setHead??????????waitStatus???CANCELLED*/  private transient volatile Node head;  /**?????????????????enq????????????????????*/  private transient volatile Node tail;

? (2)??Node??thread??????AQS?????????Node?????SHARED????????????????????????????Node??EXCLUSIVE?????????????????????????waitStatus????????????

? ?CANCELLED=1?????????????????????????????

? ?SIGNAL=-1?????thread1????????head(??????????????????)?????node1????????????????thread1?????CANCEL??????????node1???????

? ?CONDITION=-2???????????(??????????lock?condition????Condition????????)???????????Condition?signal()??????????condition?????????lock??????????lock?(????????????????AQS???FIFO???????????condition?????)

? ?PROPAGTE=-3????????????????????????????????

(3)AQS?????????volatile???????state(AQS??Unsafe?????????????????????state)?AQS???getState()?setState()?compareAndSetState()?????(???????unsafe?compareAndSwapInt??)????AQS????????????state???

//??????????head????????????????????????????????head????????????waitStatus???CANCELLED  private transient volatile Node head;  //????????????????????????enq????????????wait node  private transient volatile Node tail;  //AQS???????  private volatile int state;  protected final int getState() {      return state;  }  protected final void setState(int newState) {      state = newState;  }  protected final boolean compareAndSetState(int expect, int update) {      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  }

? (4)AQS????????????????????????????????????????????????????????????????????AQS?????????????????????????????????????????????

//???????????????????????????????????????????CAS??????  protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}  //?????????????????????????????????  protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}  //??????????  protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}  //???????????????????? ????????????????  protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }  //????????????????????????????????????  protected int isHeldExclusively(int arg) {  throw new UnsupportedOperationException();}

(5)AQS????ConditionObject?????????????ConditionObject??????AQS???(state?queue)?ConditionObject?????? ???ConditionObject??????????????condition?????await???????????

AQS??????

? ???????????AQS??????????ReentrantLock????????????AQS????????????????

?????????

? ?????AQS?????????????CLH?????????????lock.unlock()??????????????????????????????????????????????(park())???????

? (1)??????????????????????????state???????????thread1???lock?????????????CAS????state???1?????thread1???????????????????thread1?

final void lock() {      if (compareAndSetState(0, 1))          //setExclusiveOwnerThread?AbstractOwnableSynchronizer????AQS???AbstractOwnableSynchronizer          setExclusiveOwnerThread(Thread.currentThread());      else          acquire(1);  }

? (2)??????????thread2????????????lock???????CAS????state???1???????????????state???thread2???CAS??????thread1????state????????????acquire(1)???????AQS???????????????tryAcquire?????????????AQS?????acquire(1)????NofairSync?tryAcquire????tryAcquire??????Sync?nonfairTryAcquire?????????nonfairTryAcquire????

//NofairSync  protected final boolean tryAcquire(int acquires) {      return nonfairTryAcquire(acquires);  }  final boolean nonfairTryAcquire(int acquires) {      //?1???????      final Thread current = Thread.currentThread();      //?2?????????state      int c = getState();      //?3???state==0?????????      if (c == 0) {          //?3-1???????CAS?????state??          if (compareAndSetState(0, acquires)) {              //?3-2???????????????????????????????              setExclusiveOwnerThread(current);              //?3-3??????????true              return true;          }      }      //?4??????????      else if (current == getExclusiveOwnerThread()) {          //?4-1???????state????????????state?????????????state          int nextc = c + acquires;          //?4-2????????          if (nextc < 0) // overflow              throw new Error("Maximum lock count exceeded");          //?4-3???setState??????state?????????????????state????          //???????????????CAS???          setState(nextc);          return true;      }      //?5?????state??????????false      return false;  }

???????

1??????????????thread2?

2?????AQS?state???????state???0????????CAS??????????AQS???????thread2?????????????????state???1??0??????thread1?????????CAS??????3???????????

3??????????????????AQS?exclusiveOwnerThread????????state???1????????????

4???thread2????????false?

? (3)???thread2???????false????????????AQS??????thread2?????Node????????????NofairSync?tryAcquire????AQS?????acquire()????????????????????????

//(1)tryAcquire???thread2?????false???????addWaiter???????????????????  public final void acquire(int arg) {      if (!tryAcquire(arg) &&          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))          selfInterrupt();  }

? ????????addWaiter????????

private Node addWaiter(Node mode) {      //(1)???????????(???SHARED????state????EXCLUSIVE????)???Node??      Node node = new Node(Thread.currentThread(), mode);      //(2)?????????????????      Node pred = tail;      if (pred != null) {          //(2-1)?????node??????????tail          node.prev = pred;          //(2-2)?CAS????????node???tail??          if (compareAndSetTail(pred, node)) {              //(2-3)CAS??????????tail?next????????node??????????              //???????              pred.next = node;              return node;          }      }      //(3)???????????????null?????????????????????tail???      //???????????2-2??CAS?????????????enq??      enq(node);      return node;  }

? ??????add Waiter??

? 1????????????????thread2??????????node???

? 2??????????????node??????tail??(??????????tail????node????????tail)????CAS????node???tail??(CAS?????tail?next???node???????????)?

? 3???2????????enq???

? ????thread1?thread2??????????????????(??thread1?????thread2??????????????????????)??????????????tail????null?????????enq?????????enq?????

private Node enq(final Node node) {      for (;;) {          //(4)??????????tail??          Node t = tail;          //(5)??tail?null??????????null??????????????head?tail??          //????????          if (t == null) {              //?5-1????????????????????????????????????CAS?              //??????              if (compareAndSetHead(new Node()))                  tail = head;          }          //(6)tail??null          else {              //(6-1)???????????????tail??              node.prev = t;              //(6-2)???????????????CAS?????????tail??????????              //????????????              if (compareAndSetTail(t, node)) {                  t.next = node;                  return t;              }          }      }  }

? enq??????????????????????????

? 1???????4???t???tail?????t==null???(1)???

? 2?????????????????????????(???5-1???)

? 3???????(2)???????????????

? ???????????????

? 1????????tail?????t??tail???????(3)

? 2?????????t!=null???enq????????(6).

? 3??(6-1)?????node?????????????tail???????(4)???

? 4??????????????(6-2)??CAS???????node?????tail??,????????????(5)??????tail????????????????????????????t?next????node??????(6)????????

? ???????????????enq??????????????????????????????enq??CAS?????????????????????????????(??????????????????????Node??????)

? (4)???AQS???????acquire()??????acquireQueued??????????????????????????????????????????????????????????????waitStatus?-1?????LockSupport???park???????????????

final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {          boolean interrupted = false;          //??????????tryAcquire????          for (;;) {              //??????              final Node p = node.predecessor();              //(1)?????????????????????????tryAcquire????????              //?NofairSync?tryAcquire??????????              if (p == head && tryAcquire(arg)) {                  //????????????tryAcquire??true????????????node                  setHead(node);                  p.next = null; //????????next???null???GC????                  failed = false;                  return interrupted;              }              //(2)???????,???????????????????????????node??              //?waitStatus???-1(SIGNAL),??park????????????????????              //?????              if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;          }      } finally {          if (failed)              cancelAcquire(node);      }  }

? ??????????????????????????????????thread1?thread2????????enq??????????????????????

? ???node????????head??????tryAcquire()??????????????state?thread1?????tryAcquire?????????acquireQueued??????(2)?????(2)??????shouldParkAfterFailedAcquire?????????????node????????waitStatus?CANCELLED????????????????????????????????waitStatus???-1(SIGNAL)?????????????????

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {      //?1????????waitStatus      int ws = pred.waitStatus;      //?2????????waitStatus?SINGNAL??????true      if (ws == Node.SIGNAL)          //????????SIGNAL??????????????park????????          return true;      if (ws > 0) {          //?3????????????????CANCELLED????          do {              node.prev = pred = pred.prev;          } while (pred.waitStatus > 0);          pred.next = node;      } else {          //CAS????????????SIGHNAL?          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);      }      return false;  }

? ??shouldParkAfterFailedAcquire??????????????????????????????waitStatus???-1?

? ??????????acquireQueued???????acquireQueued????????????????????state?????????shouldParkAfterFailedAcquire??????????node?????head?waitStatus???-1(SIGNAL)??????true???acquireQueued?????????parkAndCheckInterrupt???park?????

private final boolean parkAndCheckInterrupt() {      LockSupport.park(this);      return Thread.interrupted();  }

? (5)????????????????????????thread3??????state???????????????????

? ????????ReentrantLock.lock()???????;

? ?????????????????NoFairSync.lock()??;

? ??NoFairSync.lock()???????????state???????????????????????????AQS?????AQS.acquire(1)?

? ??AQS?????acquire(1)??????????????tryAcquire()??????????????Sync.nofairTryAcquire()???

? ???tryAcquire()???false???acquire()????????AQS.addWaiter()??????????????Node??,?????waitStatus?0?

? ??addWaiter???????????????node???CAS???(??????????????tail)???tail????????????????????????????????????enq()?????????????????????

? ?????????????????????????????????????????????????acquireQueued()???

? ????thread3???????head??????????shouldParkAfterFailedAcquire()?????????????thread2??????waitStatue?????-1(????????????waitStatus???????????????????waitStatus?)?

? ?thread2?????waitStatus????shouldParkAfterFailedAcquire?????false????????acquireQueued??????????????shouldParkAfterFailedAcquire???????true?????parkAndCheckInterrupt()??????

? ???????????????????????????????????thread3???????????????????????????????????.

? ???????????????????

?????????

? ????ReentrantLock????????????????thread1?????????????????????????finally???ReentrantLock.unlock()??????????????????

? (1)????unlock????????????????AQS?release()???????????1????????unlock????????????state????????????unlock???????lock?unlock?????

public void unlock() {      sync.release(1); //??ReentrantLock?unlock?????AQS?release??  }  public final boolean release(int arg) {      //????????tryRelease????ReentrantLock????Sync?tryRelease??      if (tryRelease(arg)) {          Node h = head;          if (h != null && h.waitStatus != 0)              unparkSuccessor(h);          return true;      }      return false;  }

? (2)????release???????ReentrantLock????Sync?tryRelease??????????????????tryRelease???????

? ?????AQS?state????1?

? ???????????AQS?exclusiveOwnerThread????????IllegalMonitorStateException???????????????????????

? ???(state-1)?????0?????????????unlock????lock?unlock??????

? ???(state-1)??0?????AQS?ExclusiveOwnerThread???null?

? ??????????????tryRelase?????true???false??????unlock?

protected final boolean tryRelease(int releases) {      //?1??????state????1???????state      int c = getState() - releases;      //?2??????????????????????????IllegalMonitorStateException      if (Thread.currentThread() != getExclusiveOwnerThread())          throw new IllegalMonitorStateException();      boolean free = false;      //?3???????state???0      if (c == 0) {          free = true;          //?3-1?????????null          setExclusiveOwnerThread(null);      }      //?4?????state=c=getState()-releases      setState(c);      //?5???state==0?????true      return free;  }

? (3)???tryRelease??true???????release???if????????????????

if (tryRelease(arg)) {      //?1???????????head      Node h = head;      //?2????????null???????waitStatus??0(CACCELLED)      if (h != null && h.waitStatus != 0)          //?3-1??????????????head???????????          unparkSuccessor(h);      return true;  }

? (4)????????????????????????????????head!=null??head?waitStatus=-1??????unparkSuccessor???????????head?????h.????????unparkSuccessor???????????

private void unparkSuccessor(Node node) {      //?1???node?waitStatus      int ws = node.waitStatus;      //?2???waitStatus????0      if (ws < 0)          //?2-1???waitStatus??0?????CAS??????0          compareAndSetWaitStatus(node, ws, 0);        //?2???s?????????head?????      Node s = node.next;      //?3??????????????????waitStatus==CANCELLED      if (s == null || s.waitStatus > 0) {          //?3-1???s?=null????waitStatus=CANCELLED???????null          s = null;          //?3-2???????????????head?????null??node.waitStatus???          for (Node t = tail; t != null && t != node; t = t.prev)              if (t.waitStatus <= 0)                  s = t;      }      //?4?node.next!=null????????head???????null      if (s != null)          //?4-1???????????          LockSupport.unpark(s.thread);  }

? ??????????????unparkSuccessor????????:

? ???head???waitStatus?????0????CAS???head???waitStatus???0

? ???head????????????????waitStatus??0?????????????????????waitStatus<=0????????

? (5)?????????????state????????????????????????????????????????????????

? ?thread1(???????)??unlock??????????unparkSuccessor?????thread2?????thread2?unpark?

? ?????????thread2????acquireQueued?????parkAndCheckInterrupt???park????????thread2?????????acquireQueued????for???????????????acquireQueued????for??????????

? ?for?????????????????????????????????????????????

? ??????head???????tryAcquire??????state???thread1?????state??state=0???thread2??tryAcquire??????CAS?????state?0???1???????????thread2??????

? ?thread2??state??????acquireQueued????????????acquireQueued????false????AQS??????acquire?????if??????????????????????