多线程之ReentrantLock篇(五)

昨天有说过后面讲ReentrantLock,今天我们这篇幅就全局的讲解下,我们在Lock出来前,解决并发问题没得选只能用Synchronized。

一.ReentrantLock PK synchronized

      (1)synchronized是独占锁,加锁和解锁的过程自动进行,易于操作,但不够灵活。ReentrantLock也是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。

      (2)synchronized可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁;ReentrantLock也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。

      (3)synchronized不可响应中断,一个线程获取不到锁就一直等着;ReentrantLock可以相应中断。

ReentrantLock好像比synchronized关键字没好太多,我们再去看看synchronized所没有的,一个最主要的就是ReentrantLock还可以实现公平锁机制。什么叫公平锁呢?也就是在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁。

 Lock接口的一些方法:

 

 

  1. lock():是最常用的获取锁的方法,若锁被其他线程获取,则等待(阻塞)。 
  2.  lockInterruptibly():获取锁,如果锁可用则线程继续执行;如果锁不可用则线程进入阻塞状态,此时可以在其它线程执行时调用这个线程的interrupt方法打断它的阻塞状态。
  3. tryLock():尝试非阻塞地获取锁,立即返回。获取成功返回true;获取失败返回false,但不会阻塞。 (这个方法比synchronized好)
  4. tryLock(long time, TimeUnit unit):阻塞尝试锁。参数代表时长,在指定时长内尝试锁。
  5. unlock():如果没有获取锁标记就放锁,会抛出异常。

  二.  Lock实现类介绍

        1.ReentrantLock(重入锁)

public class ReentrantLockDemo {

    private static int count=0;

    //重入锁(如何实现的?)
    static Lock lock=new ReentrantLock();

    public static void inc(){
        lock.lock(); //获得锁(互斥锁) ThreadA 获得了锁
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();//释放锁 ThreadA释放锁  state=1-1=0
        }
    }

    public static void main(String[] args) throws InterruptedException {


        for (int i = 0; i < 1000; i++) {

            new Thread(()-> ReentrantLockDemo.inc()).start();
        }
        Thread.sleep(4000);
        System.out.println("result:"+count);
    }
}

  2.ReentrantReadWriteLock(重入读写锁)

          读多写少的情况下,读和读不互斥,读和写互斥,写和写互斥

public class ReentrantReadWriteLockDemo {
    static Map<String,Object> cacheMap=new HashMap<>();
    static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
    static Lock read=rwl.readLock();
    static Lock write=rwl.writeLock();

    public static Object get(String key){
        read.lock(); //读锁 ThreadA 阻塞
        try{
            return cacheMap.get(key);
        }finally {
            read.unlock(); //释放读锁
        }
    }
    public static Object write(String key,Object value){
        write.lock(); //Other Thread 获得了写锁
        try{
            return cacheMap.put(key,value);
        }finally {
            write.unlock();
        }
    }
}

 三.思考锁的实现

      关于锁我们讲了很多,也写了很多案例,下面我们就底层是怎么实现锁的机制来进行一个猜想设计然后带着我们的猜想去看大佬们的源码是不是和我们的猜想一样:       

        1.首先锁的互斥的原理是多个线程访问同一个共享资源只有一个能进去访问,我们这里要分析锁的互斥特性是怎么实现的:要实现互斥性首先我们要有一个共享变量,然后在设计时用一个状态来标记共享资源的状态(例如0,1)

        2.没有抢占到锁的线程怎么玩,没有抢到锁的线程就要阻塞等待,想到等待就很容易想起前面篇幅讲的wait(等待、唤醒),但是这里不是用wait因为wait/notify不能唤醒指定的线程,所以我们想到了另一个方案,LockSupport.park()

        3.等待中的线程是怎么存储的,这里面想到的是双向链表

        4.公平和非公平(能否插队)

        5.锁的重入的特性(识别是否是同一个线程)重入次数可以用数字累加

下面我们就lock.lock(); 是怎么实现的进行深入分析下:l

   我们在多线程访问lock.lock()方法时如果获取lock权限的线程就可以向下执行,没有获取权限的线程就会阻塞,这个方向是大方向

 

 

 下面我们就lock.lock()方法里面做了什么事情,首先看到他调用了sync.lock();

 

 我们看下类的关系图,其中ReentantLock是Lock的一个实现我们从下面关系图片中可以看出ReentrantLockK中定义了一个sync

 

 我们可以看到Sync是一个静态的抽像内部类,他继承了AbstractQueuedSynchronizer

我 们回退到sync.lock();方法,他实现了两种锁,一种是共平锁一种是非公平锁,类关系图如下

 

 在sync.lock()中默认是非公平锁,那么我们在sync.lock()中进入NonfairSync方法中,首先他进来第一件事是抢占资源,在这里的判断compareAndSetState保证了多线程下的原子性,这里的compareAndSetState判断是采用了乐观锁机制来进行加锁,在很多源码中都有用到CAS操作,其中expect是预期值,update是更改值,这个操作是直接跟内存交互,这样做的好处是保证只有一个线程能进入,进入后操作setExclusiveOwnerThread(Thread.currentThread());保存当前线程

 

 我们进入他的判断方法共享资源compareAndSetState中看下他是怎么修改预期值的,stateOffset是当前state属性成员在内存中的偏移量,他会通过内存中的偏移量去拿到内存中的值 和我们的预期值对比,如果相等就修改,这里面设计的好处是直接跟内存交互,不让我们java代码操作,可以在java层面解决多线程问题

 

 

 

 上面图片是线程抢占成功的逻辑,其它线程抢占失败就走下面acquire(1)的逻辑了,这个acquire逻辑是由AQS来实现的;

  • ! tryAcquire(arg)
  • addWaiter 将未获得锁的线程加入到队列
  • acquireQueued(); 去抢占锁或者阻塞. 

 

我们先看下tryAcquire(arg)的实现,我们选择它NonfairSync实现

 

这下面的逻辑是继续去抢占锁的逻辑,

 final boolean nonfairTryAcquire(int acquires) {
//获取当前线程 final Thread current = Thread.currentThread();
//判断其状态 int c = getState();
//条件成立表示无锁, if (c == 0) {
//无锁的操作一定要变成CAS操作,因为修改本身存在原子性问题 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
// 这里面是判断重入的,判断当前线程和我们有锁的线程是否相等 else if (current == getExclusiveOwnerThread()) {
//如果相等就加一个次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded");
// 因为当前是有锁状态,所以不用再用CAS操作 setState(nextc); return true; } return false; }

  条件! tryAcquire(arg)不成立就会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))判断中来,将未获得锁的线程加入到队列;addWaiter是做一个链表然后加入acquireQueued中进行循环的判断;Node.EXCLUSIVE表示节点互斥的一个特性;我们进入addWaiter方法

private Node addWaiter(Node mode) {
//进来第一件事是先构造一个节点,这个节点会先把当前线程和mode(表示独占)传进来,如果有多个线程没有抢到锁那就有多个线程进入这个方法,也就代表了有多个Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure
//这里会拿到一个tail节点,tail表示尾部节点,一般链表都会有一个头节点Head和尾节点Tail,这一步的头尾节点还没有初始化,还是空指向 Node pred = tail;
if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//第一次进来尾节点一定是空的,所以第一次进来是走enq方法 enq(node); return node; }

  我们进入enq(node)方法

private Node enq(final Node node) {
//通过自旋的方式进行FOR循环 for (;;) {
//得到一个尾节点,此时尾节点还是空 Node t = tail; if (t == null) { // Must initialize
//初始化一个空的Node节点,这个compareAndSetHead只有在空的情况下才会替换,CAS保证只有一个线程能替换成功 if (compareAndSetHead(new Node()))
//将头和尾都指向这个刚刚初始化的空节点,到这一步的时序图如图一;这一步完成后初始化就完成了,然后进入下一次循环t就不为空了走else逻辑 tail = head; } else {
//node表示当前进来的线程,我们假设是B线程进来了,此时因为t不为空了,所以当前线程的prev指向空的Node节点 node.prev = t; if (compareAndSetTail(t, node)) {
//操作尾部节点t.next表示上一个节点的指向指向当前节点,这样一个双向链表就形成了,在多个for循环后的时序图就如图二 t.next = node; return t; } } } }

  

图一

 图二

 

addWaiter(Node.EXCLUSIVE), arg)代码执行完成后,他会把参数返回添加到acquireQueued里面去,我们进入acquireQueued,这里面一定会做的一件事就是阻塞列表中的线程

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
//又是自旋 for (;;) {
// 假设这里面的node是我们线程B的话,他的predecessor()方法可以点进去看下,会发现是当前线程的prev,由上面时序图会发现其实就是Head节点 final Node p = node.predecessor();
//如果头节点是head节点就会去抢占一次锁,成功就获得锁,失败走下面 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
//是否要挂起一个线程,我们进入shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) &&
//parkAndCheckInterrupt是挂起(阻塞) parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  阻塞状态是没有必要去抢占锁的,下面就是通过判断是不是偏锁状态来决定要不要去释放锁,如果是偏锁就释放锁

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//等待状态,如果线程出现异常会出来偏锁状态 int ws = pred.waitStatus;
//SIGNAL是唤醒状态成立就可以放心挂起(-1) if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true;
//偏锁状态ws会大于o if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
//将取消状态的移除节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
//替换节点状态改成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

  

 

 

 前面的挂起完成后代表lock.lock()方法执行完成了,接下来我们就讲下lock.unlock()释放锁的过程,这时候释放锁是线程A来释放锁,我们来看lock.unlock()的ReentrantLock实现

 

public final boolean release(int arg) {
//进入tryRelease方法 if (tryRelease(arg)) {
Node h = head; if (h != null && h.waitStatus != 0)
//重置信息完成后会通过下面方法进行唤醒阻塞线程 unparkSuccessor(h); return true; } return false; }

  

protected final boolean tryRelease(int releases) {
//将state恢复原有值 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true;
//如果刚好c==0就释放线程并把线程清空,如图三 setExclusiveOwnerThread(null); } setState(c); return free; }

  

 

 图三

我们进入unparkSuccessor方法中

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
//如果成立 if (ws < 0)
//先恢复成初始状态 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//获取下一个节点 Node s = node.next;
//如果下个节点为空,则除去无效节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
if (s != null)
// 唤醒下一个节点,唤醒后的线程又要抢占锁又会进入前面的acquireQueued方法进行自旋,抢占失败的线程又要挂起
//唤醒完成后唤醒的线会去执行代码程序
LockSupport.unpark(s.thread);
}

  

 

 

Tags: