Java中的管程

  • 2019 年 11 月 3 日
  • 筆記

Java是利用 管程解決並發編程問題的,那麼究竟什麼是 管程?而它又是如何解決並發問題的呢?

什麼是管程

管程,英文名是 Monitor ,因此有的時候會被翻譯為 監視器。其實你也許很早就接觸到這個概念了,比如 synchronized 關鍵字,很多文章就介紹過其原理是使用了 監視器,只是你那個時候還並不知道 監視器管程,其實是一回事。

我們來看看維基百科上的概念:

管程 (英語:Monitors,也稱為監視器) 是一種程式結構,結構內的多個子程式(對象或模組)形成的多個工作執行緒互斥訪問共享資源。

感覺這句話聽得有點迷糊,但下面這句話應該就很好理解了:

管程提供了一種機制,執行緒可以臨時放棄互斥訪問,等待某些條件得到滿足後,重新獲得執行權恢復它的互斥訪問。

我的理解是:我們通過管程管理 Java 中的類,使得類是執行緒安全的。

這應該是 管程最終要達到的效果,那麼,它是怎麼做到的呢?

管程模型

管程這個概念最早來源於作業系統,作業系統發展了那麼多年,管程的實現也有多種方式,主流的有三種:Hasen模型Hoare模型MESA模型, Java 中借鑒的是 MESA模型,讓我們來重點看一下。

談到 MESA模型,就不得不提到並發主要解決2個核心問題:一個是 互斥,即同一時刻只允許一個執行緒訪問共享資源;另一個是 同步,即多個執行緒之間如何通訊、協作。

如何解決 互斥呢?我們可以在操作共享變數之前,增加一個等待隊列,每一個執行緒想要操作共享變數的話,都需要在等待隊列中等待,直到管程選出一個執行緒操作共享變數。

那又是如何解決 同步的呢?執行緒在操作共享變數時候,它不一定是直接執行,可能有一些自己的執行條件限制(比如取錢操作要求賬戶里一定要有錢,出隊操作要求隊列一定不能是空的),我們將這些限制稱之為 條件變數,每一個 條件變數也有自己對應的 等待隊列,當執行緒發現自己的 條件變數不滿足時,就進入相應的 等待隊列中排隊,直至 條件變數滿足,那麼其 等待隊列中的執行緒也不會是立馬執行,而是到最開始 共享變數對應的 等待隊列中再次排隊,重複之前的過程。

可以參考下面這幅圖:

理論說了那麼多,還是來看看用程式碼是如何實現的吧

實現

首先可以自定一個支援並發的隊列

    public class MyQueen {          // 共享變數(任何操作之前,都需要獲得該鎖才可以執行)          private final Lock lock = new ReentrantLock();          // 條件變數:隊列不滿          private final Condition notFull = lock.newCondition();          // 條件變數:隊列不空          private final Condition notEmpty = lock.newCondition();          /**           * 存儲隊列的容器           */          private final LinkedList<Integer> list = new LinkedList<>();          /**           * 最大容量           */          private int capacity;          /**           * 當前容器中存儲的數量           */          private int size;          public MyQueen(int capacity) {              this.capacity = capacity;              this.size = 0;          }          /**           * 入隊           */          public void enter(int value) {              lock.lock();              try {                  // 如果隊列已滿,則需要等到隊列不滿                  while (size >= capacity) {                      notFull.await(1, TimeUnit.MILLISECONDS);                  }                  // 入隊                  list.add(value);                  size++;                  System.out.println(value + " has bean entered");                  // 通知可以出隊                  notEmpty.signal();              } catch (InterruptedException e) {              } finally {                  lock.unlock();              }          }          /**           * 出隊           */          public int dequeue() {              Integer result = null;              lock.lock();              try {                  // 如果隊列已空,則需要等到隊列不空                  while (size <= 0) {                      notEmpty.await(1, TimeUnit.MILLISECONDS);                  }                  // 出隊                  result = list.removeFirst();                  size--;                  System.out.println(result + " has bean dequeued");                  // 通知可以入隊                  notFull.signal();                  return result;              } catch (InterruptedException e) {              } finally {                  lock.unlock();              }              return result;          }          public static void main(String[] args) {              MyQueen myQueen = new MyQueen(3);              new Thread(new Pruducer("producer1", myQueen, 0, 2)).start();              new Thread(new Pruducer("producer2", myQueen, 2, 5)).start();              new Thread(new Consumer("consumer2", myQueen, 5)).start();              new Thread(new Consumer("consumer1", myQueen, 3)).start();          }      }

定義生產者和消費者:

    class Pruducer implements Runnable {          private final MyQueen queen;          /**           * 該執行緒的名字           */          private final String name;          /**           * 開始的大小           */          private final int start;          /**           * 需要生產的資料個數           */          private final int size;          public Pruducer(String name, MyQueen queen, int start, int size) {              this.name = name;              this.queen = queen;              this.start = start;              this.size = size;          }          @Override          public void run() {              for (int i = 1; i <= size; i++) {                  int now = start + i;      //            System.out.println(name + " produce : " + now + " start");                  queen.enter(now);      //            System.out.println(name + " produce : " + now + " end");              }          }      }      class Consumer implements Runnable {          private final MyQueen queen;          /**           * 該執行緒的名字           */          private final String name;          /**           * 需要消費的資料個數           */          private final int size;          public Consumer(String name, MyQueen queen, int size) {              this.name = name;              this.queen = queen;              this.size = size;          }          @Override          public void run() {              for (int i = 1; i <= size; i++) {      //            System.out.println(name + " consume start");                  int result = queen.dequeue();      //            System.out.println(name + " consume : " + result + " end");              }          }      }

做一個測試的main方法:

        public static void main(String[] args) {              MyQueen myQueen = new MyQueen(3);              new Thread(new Pruducer("producer1", myQueen, 0, 2)).start();              new Thread(new Pruducer("producer2", myQueen, 2, 5)).start();              new Thread(new Consumer("consumer1", myQueen, 3)).start();              new Thread(new Consumer("consumer2", myQueen, 5)).start();          }

結果為:

    1 has bean entered      2 has bean entered      3 has bean entered      1 has bean dequeued      2 has bean dequeued      3 has bean dequeued      4 has bean entered      5 has bean entered      6 has bean entered      4 has bean dequeued      5 has bean dequeued      6 has bean dequeued      7 has bean entered      8 has bean entered      9 has bean entered      7 has bean dequeued      8 has bean dequeued      9 has bean dequeued

雖然滿足我想要的結果,但顯示的內容有些奇怪,總是容器先被填滿之後,然後容器被清空,感覺原因應該和 可重入鎖有關。