深入理解Java中的AQS
- 2019 年 10 月 3 日
- 笔记
AQS??
? AbstractQueuedSynchronizer?????????AQS??????????????juc??Lock????????????????AQS???????????AQS??????????????????AQS?????????AQS????
? (1)AQS????????FIFO??????????????(??????head?tail??????????????????Node??????????Node?????)?
/*?????????(??????????????????????????????enq??????? ???Head??)???????setHead??????????waitStatus???CANCELLED*/ private transient volatile Node head; /**?????????????????enq????????????????????*/ private transient volatile Node tail;
? (2)??Node??thread??????AQS?????????Node?????SHARED????????????????????????????Node??EXCLUSIVE?????????????????????????waitStatus????????????
? ?CANCELLED=1?????????????????????????????
? ?SIGNAL=-1?????thread1????????head(??????????????????)?????node1????????????????thread1?????CANCEL??????????node1???????
? ?CONDITION=-2???????????(??????????lock?condition????Condition????????)???????????Condition?signal()??????????condition?????????lock??????????lock?(????????????????AQS???FIFO???????????condition?????)
? ?PROPAGTE=-3????????????????????????????????
(3)AQS?????????volatile???????state(AQS??Unsafe?????????????????????state)?AQS???getState()?setState()?compareAndSetState()?????(???????unsafe?compareAndSwapInt??)????AQS????????????state???
//??????????head????????????????????????????????head????????????waitStatus???CANCELLED private transient volatile Node head; //????????????????????????enq????????????wait node private transient volatile Node tail; //AQS??????? private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
? (4)AQS????????????????????????????????????????????????????????????????????AQS?????????????????????????????????????????????
//???????????????????????????????????????????CAS?????? protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();} //????????????????????????????????? protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();} //?????????? protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();} //???????????????????? ???????????????? protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //???????????????????????????????????? protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
(5)AQS????ConditionObject?????????????ConditionObject??????AQS???(state?queue)?ConditionObject?????? ???ConditionObject??????????????condition?????await???????????
AQS??????
? ???????????AQS??????????ReentrantLock????????????AQS????????????????
?????????
? ?????AQS?????????????CLH?????????????lock.unlock()??????????????????????????????????????????????(park())???????
? (1)??????????????????????????state???????????thread1???lock?????????????CAS????state???1?????thread1???????????????????thread1?
final void lock() { if (compareAndSetState(0, 1)) //setExclusiveOwnerThread?AbstractOwnableSynchronizer????AQS???AbstractOwnableSynchronizer setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
? (2)??????????thread2????????????lock???????CAS????state???1???????????????state???thread2???CAS??????thread1????state????????????acquire(1)???????AQS???????????????tryAcquire?????????????AQS?????acquire(1)????NofairSync?tryAcquire????tryAcquire??????Sync?nonfairTryAcquire?????????nonfairTryAcquire????
//NofairSync protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { //?1??????? final Thread current = Thread.currentThread(); //?2?????????state int c = getState(); //?3???state==0????????? if (c == 0) { //?3-1???????CAS?????state?? if (compareAndSetState(0, acquires)) { //?3-2??????????????????????????????? setExclusiveOwnerThread(current); //?3-3??????????true return true; } } //?4?????????? else if (current == getExclusiveOwnerThread()) { //?4-1???????state????????????state?????????????state int nextc = c + acquires; //?4-2???????? if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //?4-3???setState??????state?????????????????state???? //???????????????CAS??? setState(nextc); return true; } //?5?????state??????????false return false; }
???????
1??????????????thread2?
2?????AQS?state???????state???0????????CAS??????????AQS???????thread2?????????????????state???1??0??????thread1?????????CAS??????3???????????
3??????????????????AQS?exclusiveOwnerThread????????state???1????????????
4???thread2????????false?
? (3)???thread2???????false????????????AQS??????thread2?????Node????????????NofairSync?tryAcquire????AQS?????acquire()????????????????????????
//(1)tryAcquire???thread2?????false???????addWaiter??????????????????? public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
? ????????addWaiter????????
private Node addWaiter(Node mode) { //(1)???????????(???SHARED????state????EXCLUSIVE????)???Node?? Node node = new Node(Thread.currentThread(), mode); //(2)????????????????? Node pred = tail; if (pred != null) { //(2-1)?????node??????????tail node.prev = pred; //(2-2)?CAS????????node???tail?? if (compareAndSetTail(pred, node)) { //(2-3)CAS??????????tail?next????????node?????????? //??????? pred.next = node; return node; } } //(3)???????????????null?????????????????????tail??? //???????????2-2??CAS?????????????enq?? enq(node); return node; }
? ??????add Waiter??
? 1????????????????thread2??????????node???
? 2??????????????node??????tail??(??????????tail????node????????tail)????CAS????node???tail??(CAS?????tail?next???node???????????)?
? 3???2????????enq???
? ????thread1?thread2??????????????????(??thread1?????thread2??????????????????????)??????????????tail????null?????????enq?????????enq?????
private Node enq(final Node node) { for (;;) { //(4)??????????tail?? Node t = tail; //(5)??tail?null??????????null??????????????head?tail?? //???????? if (t == null) { //?5-1????????????????????????????????????CAS? //?????? if (compareAndSetHead(new Node())) tail = head; } //(6)tail??null else { //(6-1)???????????????tail?? node.prev = t; //(6-2)???????????????CAS?????????tail?????????? //???????????? if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
? enq??????????????????????????
? 1???????4???t???tail?????t==null???(1)???
? 2?????????????????????????(???5-1???)
? 3???????(2)???????????????
? ???????????????
? 1????????tail?????t??tail???????(3)
? 2?????????t!=null???enq????????(6).
? 3??(6-1)?????node?????????????tail???????(4)???
? 4??????????????(6-2)??CAS???????node?????tail??,????????????(5)??????tail????????????????????????????t?next????node??????(6)????????
? ???????????????enq??????????????????????????????enq??CAS?????????????????????????????(??????????????????????Node??????)
? (4)???AQS???????acquire()??????acquireQueued??????????????????????????????????????????????????????????????waitStatus?-1?????LockSupport???park???????????????
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //??????????tryAcquire???? for (;;) { //?????? final Node p = node.predecessor(); //(1)?????????????????????????tryAcquire???????? //?NofairSync?tryAcquire?????????? if (p == head && tryAcquire(arg)) { //????????????tryAcquire??true????????????node setHead(node); p.next = null; //????????next???null???GC???? failed = false; return interrupted; } //(2)???????,???????????????????????????node?? //?waitStatus???-1(SIGNAL),??park???????????????????? //????? if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
? ??????????????????????????????????thread1?thread2????????enq??????????????????????
? ???node????????head??????tryAcquire()??????????????state?thread1?????tryAcquire?????????acquireQueued??????(2)?????(2)??????shouldParkAfterFailedAcquire?????????????node????????waitStatus?CANCELLED????????????????????????????????waitStatus???-1(SIGNAL)?????????????????
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //?1????????waitStatus int ws = pred.waitStatus; //?2????????waitStatus?SINGNAL??????true if (ws == Node.SIGNAL) //????????SIGNAL??????????????park???????? return true; if (ws > 0) { //?3????????????????CANCELLED???? do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //CAS????????????SIGHNAL? compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
? ??shouldParkAfterFailedAcquire??????????????????????????????waitStatus???-1?
? ??????????acquireQueued???????acquireQueued????????????????????state?????????shouldParkAfterFailedAcquire??????????node?????head?waitStatus???-1(SIGNAL)??????true???acquireQueued?????????parkAndCheckInterrupt???park?????
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
? (5)????????????????????????thread3??????state???????????????????
? ????????ReentrantLock.lock()???????;
? ?????????????????NoFairSync.lock()??;
? ??NoFairSync.lock()???????????state???????????????????????????AQS?????AQS.acquire(1)?
? ??AQS?????acquire(1)??????????????tryAcquire()??????????????Sync.nofairTryAcquire()???
? ???tryAcquire()???false???acquire()????????AQS.addWaiter()??????????????Node??,?????waitStatus?0?
? ??addWaiter???????????????node???CAS???(??????????????tail)???tail????????????????????????????????????enq()?????????????????????
? ?????????????????????????????????????????????????acquireQueued()???
? ????thread3???????head??????????shouldParkAfterFailedAcquire()?????????????thread2??????waitStatue?????-1(????????????waitStatus???????????????????waitStatus?)?
? ?thread2?????waitStatus????shouldParkAfterFailedAcquire?????false????????acquireQueued??????????????shouldParkAfterFailedAcquire???????true?????parkAndCheckInterrupt()??????
? ???????????????????????????????????thread3???????????????????????????????????.
? ???????????????????
?????????
? ????ReentrantLock????????????????thread1?????????????????????????finally???ReentrantLock.unlock()??????????????????
? (1)????unlock????????????????AQS?release()???????????1????????unlock????????????state????????????unlock???????lock?unlock?????
public void unlock() { sync.release(1); //??ReentrantLock?unlock?????AQS?release?? } public final boolean release(int arg) { //????????tryRelease????ReentrantLock????Sync?tryRelease?? if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
? (2)????release???????ReentrantLock????Sync?tryRelease??????????????????tryRelease???????
? ?????AQS?state????1?
? ???????????AQS?exclusiveOwnerThread????????IllegalMonitorStateException???????????????????????
? ???(state-1)?????0?????????????unlock????lock?unlock??????
? ???(state-1)??0?????AQS?ExclusiveOwnerThread???null?
? ??????????????tryRelase?????true???false??????unlock?
protected final boolean tryRelease(int releases) { //?1??????state????1???????state int c = getState() - releases; //?2??????????????????????????IllegalMonitorStateException if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //?3???????state???0 if (c == 0) { free = true; //?3-1?????????null setExclusiveOwnerThread(null); } //?4?????state=c=getState()-releases setState(c); //?5???state==0?????true return free; }
? (3)???tryRelease??true???????release???if????????????????
if (tryRelease(arg)) { //?1???????????head Node h = head; //?2????????null???????waitStatus??0(CACCELLED) if (h != null && h.waitStatus != 0) //?3-1??????????????head??????????? unparkSuccessor(h); return true; }
? (4)????????????????????????????????head!=null??head?waitStatus=-1??????unparkSuccessor???????????head?????h.????????unparkSuccessor???????????
private void unparkSuccessor(Node node) { //?1???node?waitStatus int ws = node.waitStatus; //?2???waitStatus????0 if (ws < 0) //?2-1???waitStatus??0?????CAS??????0 compareAndSetWaitStatus(node, ws, 0); //?2???s?????????head????? Node s = node.next; //?3??????????????????waitStatus==CANCELLED if (s == null || s.waitStatus > 0) { //?3-1???s?=null????waitStatus=CANCELLED???????null s = null; //?3-2???????????????head?????null??node.waitStatus??? for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //?4?node.next!=null????????head???????null if (s != null) //?4-1??????????? LockSupport.unpark(s.thread); }
? ??????????????unparkSuccessor????????:
? ???head???waitStatus?????0????CAS???head???waitStatus???0
? ???head????????????????waitStatus??0?????????????????????waitStatus<=0????????
? (5)?????????????state????????????????????????????????????????????????
? ?thread1(???????)??unlock??????????unparkSuccessor?????thread2?????thread2?unpark?
? ?????????thread2????acquireQueued?????parkAndCheckInterrupt???park????????thread2?????????acquireQueued????for???????????????acquireQueued????for??????????
? ?for?????????????????????????????????????????????
? ??????head???????tryAcquire??????state???thread1?????state??state=0???thread2??tryAcquire??????CAS?????state?0???1???????????thread2??????
? ?thread2??state??????acquireQueued????????????acquireQueued????false????AQS??????acquire?????if??????????????????????