Java多线程中的wait/notify通信模式

前言

  最近在看一些JUC下的源码,更加意识到想要学好Java多线程,基础是关键,比如想要学好ReentranLock源码,就得掌握好AQS源码,而AQS源码中又有很多Java多线程经典的一些应用;再比如看了线程池的核心源码实现,又学到了很多核心实现,其实这些都可以提出来慢慢消化并变成自己的知识点,今天这个Java等待/通知模式其实是Thread.join()实现的关键,还有线城市工作线程中线程跟线程之间的通信的核心所在,故在此为了加深理解,做此记录!

  参考资料《Java并发编程艺术》(电子PDF版),有需要的朋友的可以私信或者评论

 


 

一、什么是Java线程的等待/通知模式

1、等待/通知模式概述

  首先先介绍下官方的一个正式的介绍:

  等待/通知机制,是指一个线程A调用了对象object的wait()方法进入等待状态,而另一个线程B调用了对象object的notify或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而还行后续操作。

  而我的理解是(举例说明):

  假设工厂里有两条流水线,某个工作流程需要这两个流水线配合完成,这两个流水线分别是A和B,其中A负责准备各种配件,B负责租装配件之后产出输出到工作台。B的工作需要A的配件准备充分,否则就会一直等待A准备好配件,并且A准备好配件后会通过一个开头通知告诉B我已经准备好了,你那边不用一直等待了,可以继续执行任务了。流程A与流程B就是对应的线程A与线程B之间的通信,即可以理解为相互配合,具体也就是“”通知/等待“”机制!

2、需要注意的细节  

  那么,我们都知道超类Object有wait()方法与notify()/notifyAll()方法,在进行正式代码举例之前,应该先加深下对这三个方法的理解与一些细节(有一些细节确实容易被忽略)

  • 调用wait()方法,会释放锁(这一点我想大部分人都知道),线程状态由RUNNING->WAITNG,当前线程进入对象等待队列中;
  • 调用notify()/notifyAll()方法不会立马释放锁(这一点我大家人也应该知道,但是什么时候释放锁呢?——–请看下一条),notify()方法是将等待队列中的线程移到同步队列中,而notifyAll()则是全部移到同步队列中,被移出的线程状态WAITING–>BLOCKED;
  • 当前调用notify()/notifyAll()的线程释放锁了才算释放锁,才有机会唤醒wait线程返回(为什么有才有机会返回呢?——继续看下一条)
  • 从wait()返回的前提是必须获得调用对象锁,也就是说notify()与notifyAll()释放锁之后,wait()进入BLOCKED状态,如果其他线程有竞争当前锁的话,wait线程继续争取锁资格(不好理解的话,请看下面的代码举例)
  • 使用wait()、notify()、notifyAll()方法时需要先调对象加锁(这可能是最容易忽视的点了,至于为什么,请先看了代码之后,看本篇博文最后补充:wait()、notify()、notifyAll()加锁的原因—-防止线程即饥饿

二、代码举例

1、结合代码理解

 结合上述的“工厂流程装配配件并产出的例子”,我们有两个线程(流水线)WaitThread与NotifyThread、其中WaitThread是被通知的任务,完成主要的工作(组装配件完成产品),需要时刻判断标志位(开关);NotifyThread是需要通知的任务,需要对WaitThread进行“监督通知”,两个配合才能更好完成产品的组装并输出。

public class WaitNotify {

    static Object lock = new Object();
    static boolean flag = false;
    public static void main(String[] args) {
        new Thread(new WaitThread(), "WaitThread").start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new NotifyThread(), "NotifyThread").start();

    }

    /**
     * 流水线A,完成主要任务
     */
    static class WaitThread implements Runnable{
        @Override
        public void run() {
            // 获取object对象锁
            synchronized (lock){
                // 条件不满足时一直在等,等另外的线程改变该条件,并通知该wait线程
                while (!flag){
                    try {
                        System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);
                        // wait()方法调用就会释放锁,当前线程进入等待队列。
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // TODO 条件已经满足,不继续while,完成任务
                System.out.println(Thread.currentThread() + " is running, flag is "+flag);
            }
        }
    }
    /**
     * 流水线B,对开关进行控制,并通知流水线A
     */
    static class NotifyThread implements Runnable{
        @Override
        public void run() {
            // 获取等wait线程同一个object对象锁
            synchronized (lock){
                flag = true;
                // 通知wait线程,我已经改变了条件,你可以继续返回执行了(返回之后继续判断while)
                // 但是此时通知notify()操作并立即不会释放锁,而是要等当前线程释放锁
                // TODO 我准备好配件了,我需要通知全部的组装流水线A.....
                lock.notifyAll();
                System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);
            }
        }
    }
}

运行main函数,输出:

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true

车床流水工作开启,流水线的开关一开始是关闭的(flag=false),流水线B(NotifyThread)去开启后,开始自动唤醒流水线A(WaitThread),整个流水线开始工作了……

  • Thread[WaitThread,5,main] is waiting, flag is false: 一开始流水线A发现自己没有配件可租装,所以等流水线A准备好配件(这样是不是觉得特别傻,哈哈哈,真正的流水线不会浪费时间等的,而且会有很多条流水线B准备配件的,这里只是举例说明,望理解!);
  • Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true:流水线B准备好了配件,开启开关(flag=ture),并通知流水线A,让流水线A开始工作;
  • Thread[WaitThread,5,main] is running, flag is true,流水线B收到了通知,再次检查开关是否开启了,开启的话就开始返回继续完成工作了

其实结合上述我举的例子还是很好理解的,下面是大概的一个粗略时序图:

            

2、扩展理解—-wait()返回的前提是获得了锁

上述已经表达了这个注意的细节:从wait()返回的前提是必须获得调用对象锁我们再增加能竞争lock的同步代码块(红字部分)。

public class WaitNotify {

    static Object lock = new Object();
    static boolean flag = false;
    public static void main(String[] args) {
        new Thread(new WaitThread(), "WaitThread").start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new NotifyThread(), "NotifyThread").start();
    }

    /**
     * 流水线A,完成主要任务
     */
    static class WaitThread implements Runnable{
        @Override
        public void run() {
            // 获取object对象锁
            synchronized (lock){
                // 条件不满足时一直在等,等另外的线程改变该条件,并通知该wait线程
                while (!flag){
                    try {
                        System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);
                        // wait()方法调用就会释放锁,当前线程进入等待队列。
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // TODO 条件已经满足,不继续while,完成任务
                System.out.println(Thread.currentThread() + " is running, flag is "+flag);
            }
        }
    }
    /**
     * 流水线B,对开关进行控制,并通知流水线A
     */
    static class NotifyThread implements Runnable{
        @Override
        public void run() {
            // 获取等wait线程同一个object对象锁
            synchronized (lock){
                flag = true;
                // 通知wait线程,我已经改变了条件,你可以继续返回执行了(返回之后继续判断while)
                // 但是此时通知notify()操作并立即不会释放锁,而是要等当前线程释放锁
                // TODO 我准备好配件了,我需要通知全部的组装流水线A.....
                lock.notifyAll();
                System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);
            }
            // 模拟跟流水线B竞争
            synchronized (lock){
                System.out.println(Thread.currentThread() + " hold lock again");
            }
        }
    }
}

输出结果:

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[NotifyThread,5,main] hold lock again
Thread[WaitThread,5,main] is running, flag is true

 

其中第三条跟第四条顺序可能会反着来的,这就是因为lock锁可能被红字部分的synchronized代码块竞争获取(这样wait()方法可能获取不到lock锁,不会返回),也可能被waitThread获取从wait()方法返回

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true
Thread[NotifyThread,5,main] hold lock again

三、等待/通知模式的应用

1、Thread.join()中源码应用

Thread.join()作用:当线程A等待thread线程终止之后才从thread.join()返回, 每个线程终止的前提是前驱线程终止,每个线程等待前驱线程终止后,才从join方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)

Thread.join()源码中,使用while选好判断前驱线程是否活着,如果前驱线程还活着就一直wait等待,当然如果超时的话就直接返回。

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        // 这里的while(){wait(millis)} 就是利用等待/通知中的等待模式,只不过加上了超时设置
        if (millis == 0) {
            // while循环,当线程还活着的时候就一直循环等待,直到线程终止
            while (isAlive()) {
                // wait等待
                wait(0);
            }
            // 条件满足时返回
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

2、其它的应用

  线程池的本质是使用一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列中取出工作并执行。那么,在这里的等待/通知模式的应用就是:

  工作队列中线程job没有的话也就是工作队列为空的情况下,等待客户端放入工作队列线程任务,并通知工作线程继续从工作队列中获取线程执行。

  注:关于线程池的应用源码这里不做介绍,因为一时也讲不完(自己也还没有完全消化),先简单介绍下应用到的地方还有概念。

  补充:其实数据库的连接池也类似线程池这种工作流程,也会涉及等待/通知模式。

3、等待/通知范式

  介绍了那么多应用,这种模式应该有个统一的范式来套用。对的,必然是有的:

  对于等待者(也可以称之为消费者):

synchronized (对象lock) {
        while (条件不满足) {
            对象.wait();
        }
        // TODO 处理逻辑
    }

  对于通知者(也可以称之为生产者):

 synchronized (对象lock) {
        while (条件满足) {
            改变条件
            对象.notify();
        }
    }

  注意实际开发中最好采用的是超时等待/通知模式,在thread.join()源码方法中完美体现

四、wait()、notify()、notifyAll()使用前需要加锁的原因—-防止线程即饥饿

(1)其实根据wait()注意事项也能明白,wait()是释放锁的,那么不加锁哪来释放锁

(2)wait()与notify()或者notifyAll()必须是搭配一起使用的,否则线程调用object.wait()之后,没有超时机制,也没有调用notify()或者notifyAll()唤醒的话,就一直处于WAITING状态,造成调用wait()的线程一直都是饥饿状态。

(3)由于第2条的,我们已知:即便我们使用了notify()或者notifyAll()去唤醒线程,但是没有在适当的时机唤醒(比如调用wait()之前就唤醒了那么仍然调用wait()线程处于WAITING状态,所以我们必须保证wait()方法要么不执行,要么就执行完在被唤醒。也就是下列代码中1那里不能允许插入调用notify/notifyAll,自然而然就增加synchronized关键字,保证wait()操作整体执行不被破坏

 synchronized (对象lock) {
        while (条件不满足) {
            // 1 这里如果先执行了notify/notifyAll方法,那么2执行之后,该线程就一直WAITING
            对象.wait(); // 2
        }
        // TODO 处理逻辑
    }

用图片展示执行顺序就是:

(4)注意synchronized代码块中,代码错误或者其它原因线程终止的话,没有执行到wait()方法的话,是会自动释放锁的,不必担心会死锁