從零開始自己動手寫阻塞隊列

從零開始自己動手寫阻塞隊列

前言

在我們平時編程的時候一個很重要的工具就是容器,在本篇文章當中主要給大家介紹阻塞隊列的原理,並且在了解原理之後自己動手實現一個低配版的阻塞隊列。

需求分析

在前面的兩篇文章ArrayDeque(JDK雙端隊列)源碼深度剖析深入剖析(JDK)ArrayQueue源碼當中我們仔細介紹了隊列的原理,如果大家感興趣可以查看一下!

而在本篇文章所談到的阻塞隊列當中,是在並發的情況下使用的,上面所談到的是隊列是並發不安全的,但是阻塞隊列在並發下情況是安全的。阻塞隊列的主要的需求如下:

  • 隊列基礎的功能需要有,往隊列當中放數據,從隊列當中取數據。
  • 所有的隊列操作都要是並發安全的。
  • 當隊列滿了之後再往隊列當中放數據的時候,執行緒需要被掛起,當隊列當中的數據被取出,讓隊列當中有空間的時候執行緒需要被喚醒。
  • 當隊列空了之後再往隊列當中取數據的時候,執行緒需要被掛起,當有執行緒往隊列當中加入數據的時候被掛起的執行緒需要被喚醒。
  • 在我們實現的隊列當中我們使用數組去存儲數據,因此在構造函數當中需要提供數組的初始大小,設置用多大的數組。

阻塞隊列實現原理

執行緒阻塞和喚醒

在上面我們已經談到了阻塞隊列是並發安全的,而且我們還有將執行緒喚醒和阻塞的需求,因此我們可以選擇可重入鎖ReentrantLock保證並發安全,但是我們還需要將執行緒喚醒和阻塞,因此我們可以選擇條件變數Condition進行執行緒的喚醒和阻塞操作,在Condition當中我們將會使用到的,主要有以下兩個函數:

  • signal用於喚醒執行緒,當一個執行緒調用Conditionsignal函數的時候就可以喚醒一個被await函數阻塞的執行緒。
  • await用於阻塞執行緒,當一個執行緒調用Conditionawait函數的時候這個執行緒就會阻塞。

數組循環使用

因為隊列是一端進一端出,因此隊列肯定有頭有尾。

當我們往隊列當中加入一些數據之後,隊列的情況可能如下:

在上圖的基礎之上我們在進行四次出隊操作,結果如下:

在上面的狀態下,我們繼續加入8個數據,那麼布局情況如下:

我們知道上圖在加入數據的時候不僅將數組後半部分的空間使用完了,而且可以繼續使用前半部分沒有使用過的空間,也就是說在隊列內部實現了一個循環使用的過程。

為了保證數組的循環使用,我們需要用一個變數記錄隊列頭在數組當中的位置,用一個變數記錄隊列尾部在數組當中的位置,還需要有一個變數記錄隊列當中有多少個數據。

程式碼實現

成員變數定義

根據上面的分析我們可以知道,在我們自己實現的類當中我們需要有如下的類成員變數:

// 用於保護臨界區的鎖
private final ReentrantLock lock;
// 用於喚醒取數據的時候被阻塞的執行緒
private final Condition notEmpty;
// 用於喚醒放數據的時候被阻塞的執行緒
private final Condition notFull;
// 用於記錄從數組當中取數據的位置 也就是隊列頭部的位置
private int takeIndex;
// 用於記錄從數組當中放數據的位置 也就是隊列尾部的位置
private int putIndex;
// 記錄隊列當中有多少個數據
private int count;
// 用於存放具體數據的數組
private Object[] items;

構造函數

我們的構造函數也很簡單,最核心的就是傳入一個數組大小的參數,並且給上面的變數進行初始化賦值。

@SuppressWarnings("unchecked")
public MyArrayBlockingQueue(int size) {
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.notFull = lock.newCondition();
  // 其實可以不用初始化 類會有默認初始化 默認初始化為0
  takeIndex = 0;
  putIndex = 0;
  count = 0;
  // 數組的長度肯定不能夠小於0
  if (size <= 0)
    throw new RuntimeException("size can not be less than 1");
  items = (E[])new Object[size];
}

put函數

這是一個比較重要的函數了,在這個函數當中如果隊列沒有滿,則直接將數據放入到數組當中即可,如果數組滿了,則需要將執行緒掛起。

public void put(E x){
  // put 函數可能多個執行緒調用 但是我們需要保證在給變數賦值的時候只能夠有一個執行緒
  // 因為如果多個執行緒同時進行賦值的話 那麼可能後一個執行緒的賦值操作覆蓋了前一個執行緒的賦值操作
  // 因此這裡需要上鎖
  lock.lock();

  try {
    // 如果隊列當中的數據個數等於數組的長度的話 說明數組已經滿了
    // 這個時候需要將執行緒掛起
    while (count == items.length)
      notFull.await(); // 將調用 await的執行緒掛起
    // 當數組沒有滿 或者在掛起之後再次喚醒的話說明數組當中有空間了
    // 這個時候需要將數組入隊 
    // 調用入隊函數將數據入隊
    enqueue(x);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    // 解鎖
    lock.unlock();
  }
}

// 將數據入隊
private void enqueue(E x) {
  this.items[putIndex] = x;
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal(); // 喚醒一個被 take 函數阻塞的執行緒喚醒
}

offer函數

offer函數和put函數一樣,但是與put函數不同的是,當數組當中數據填滿之後offer函數返回false,而不是被阻塞。

public boolean offer(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 如果數組滿了 則直接返回false 而不是被阻塞
    if (count == items.length)
      return false;
    else {
      // 如果數組沒有滿則直接入隊 並且返回 true
      enqueue(e);
      return true;
    }
  } finally {
    lock.unlock();
  }
}

add函數

這個函數和上面兩個函數作用一樣,也是往隊列當中加入數據,但當單隊列滿了之後這個函數會拋出異常。

public boolean add(E e) {
  if (offer(e))
    return true;
  else
    throw new RuntimeException("Queue full");
}

take函數

這個函數主要是從隊列當中取出一個數據,但是當隊列為空的時候,這個函數會阻塞調用該函數的執行緒:

public E take() throws InterruptedException {
  // 這個函數也是不能夠並發的 否則可能不同的執行緒取出的是同一個位置的數據
  // 進行加鎖操作
  lock.lock();
  try {
    // 當 count 等於0 說明隊列為空
    // 需要將執行緒掛起等待
    while (count == 0)
      notEmpty.await();
    // 當被喚醒之後進行出隊操作
    return dequeue();
  }finally {
    lock.unlock();
  }
}

private E  dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null; // 將對應的位置設置為 null GC就可以回收了
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--; // 隊列當中數據少一個了
  // 因為出隊了一個數據 可以喚醒一個被 put 函數阻塞的執行緒 如果這個時候沒有被阻塞的執行緒
  // 這個函數就不會起作用 也就說在這個函數調用之後被 put 函數掛起的執行緒也不會被喚醒
  notFull.signal(); // 喚醒一個被 put 函數阻塞的執行緒
  return x;
}

重寫toString函數

因為我們在後面的測試函數當中會列印我們這個類,而列印這個類的時候會調用對象的toString方法得到一個字元串,最後列印這個字元串。

@Override
public String toString() {
  StringBuilder stringBuilder = new StringBuilder();
  stringBuilder.append("[");
  // 這裡需要上鎖 因為我們在列印的時候需要列印所有的數據
  // 列印所有的數據就需要對數組進行遍歷操作 而在進行遍歷
  // 操作的時候是不能進行插入和刪除操作的 因為列印的是某
  // 個時刻的數據
  lock.lock();
  try {
    if (count == 0)
      stringBuilder.append("]");
    else {
      int cur = 0;
      // 對數據進行遍歷 一共遍歷 count 次 因為數組當中一共有 count
      // 個數據
      while (cur != count) {
        // 從 takeIndex 位置開始進行遍歷 因為數據是從這個位置開始的
        stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
        cur += 1;
      }
      // 刪除掉最後一次沒用的 ", "
      stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
      stringBuilder.append(']');
    }
  }finally {
    lock.unlock();
  }
  return stringBuilder.toString();
}

完整程式碼

整個我們自己完成的阻塞隊列的程式碼如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyArrayBlockingQueue<E> {

  // 用於保護臨界區的鎖
  private final ReentrantLock lock;
  // 用於喚醒取數據的時候被阻塞的執行緒
  private final Condition notEmpty;
  // 用於喚醒放數據的時候被阻塞的執行緒
  private final Condition notFull;
  // 用於記錄從數組當中取數據的位置 也就是隊列頭部的位置
  private int takeIndex;
  // 用於記錄從數組當中放數據的位置 也就是隊列尾部的位置
  private int putIndex;
  // 記錄隊列當中有多少個數據
  private int count;
  // 用於存放具體數據的數組
  private Object[] items;


  @SuppressWarnings("unchecked")
  public MyArrayBlockingQueue(int size) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.notFull = lock.newCondition();
    // 其實可以不用初始化 類會有默認初始化 默認初始化為0
    takeIndex = 0;
    putIndex = 0;
    count = 0;
    if (size <= 0)
      throw new RuntimeException("size can not be less than 1");
    items = (E[])new Object[size];
  }

  public void put(E x){
    lock.lock();

    try {
      while (count == items.length)
        notFull.await();
      enqueue(x);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
  }

  private void enqueue(E x) {
    this.items[putIndex] = x;
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    notEmpty.signal();
  }

  private E  dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    notFull.signal();
    return x;
  }

  public boolean add(E e) {
    if (offer(e))
      return true;
    else
      throw new RuntimeException("Queue full");
  }

  public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      if (count == items.length)
        return false;
      else {
        enqueue(e);
        return true;
      }
    } finally {
      lock.unlock();
    }
  }

  public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return (count == 0) ? null : dequeue();
    } finally {
      lock.unlock();
    }
  }

  public E take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
        notEmpty.await();
      return dequeue();
    }finally {
      lock.unlock();
    }
  }

  @Override
  public String toString() {
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append("[");
    lock.lock();
    try {
      if (count == 0)
        stringBuilder.append("]");
      else {
        int cur = 0;
        while (cur != count) {
          stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
          cur += 1;
        }
        stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
        stringBuilder.append(']');
      }
    }finally {
      lock.unlock();
    }
    return stringBuilder.toString();
  }

}

現在對上面的程式碼進行測試:

我們現在使用阻塞隊列模擬一個生產者消費者模型,設置阻塞隊列的大小為5,生產者執行緒會往隊列當中加入數據,數據為0-9的10個數字,消費者執行緒一共會消費10次。

import java.util.concurrent.TimeUnit;

public class Test {

  public static void main(String[] args) throws InterruptedException {
    MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
    Thread thread = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        System.out.println(Thread.currentThread().getName() + " 往隊列當中加入數據:" + i);
        queue.put(i);
      }
    }, "生產者");


    Thread thread1 = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          System.out.println(Thread.currentThread().getName() + " 從隊列當中取出數據:" + queue.take());
          System.out.println(Thread.currentThread().getName() + " 當前隊列當中的數據:" + queue);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }, "消費者");
    thread.start();
    TimeUnit.SECONDS.sleep(3);
    thread1.start();

  }
}

上面程式碼的輸出如下所示:

生產者 往隊列當中加入數據:0
生產者 往隊列當中加入數據:1
生產者 往隊列當中加入數據:2
生產者 往隊列當中加入數據:3
生產者 往隊列當中加入數據:4
生產者 往隊列當中加入數據:5
消費者 從隊列當中取出數據:0
生產者 往隊列當中加入數據:6
消費者 當前隊列當中的數據:[1, 2, 3, 4, 5]
消費者 從隊列當中取出數據:1
消費者 當前隊列當中的數據:[2, 3, 4, 5]
消費者 從隊列當中取出數據:2
消費者 當前隊列當中的數據:[3, 4, 5, 6]
生產者 往隊列當中加入數據:7
消費者 從隊列當中取出數據:3
消費者 當前隊列當中的數據:[4, 5, 6, 7]
消費者 從隊列當中取出數據:4
消費者 當前隊列當中的數據:[5, 6, 7]
消費者 從隊列當中取出數據:5
消費者 當前隊列當中的數據:[6, 7]
生產者 往隊列當中加入數據:8
消費者 從隊列當中取出數據:6
消費者 當前隊列當中的數據:[7, 8]
消費者 從隊列當中取出數據:7
消費者 當前隊列當中的數據:[8]
消費者 從隊列當中取出數據:8
消費者 當前隊列當中的數據:[]
生產者 往隊列當中加入數據:9
消費者 從隊列當中取出數據:9
消費者 當前隊列當中的數據:[]

從上面的輸出結果我們知道,生產者執行緒列印5之後被掛起了,因為如果沒有被掛起,生產者執行緒肯定可以一次性輸出完成,因為消費者執行緒阻塞了3秒。但是他沒有輸出完成說明在列印5之後,因為阻塞隊列滿了,因而生產者執行緒被掛起了。然後消費者開始消費,這樣阻塞隊列當中就有空間了,生產者執行緒就可以繼續生產了。

總結

在本篇文章當中,主要向大家介紹了阻塞隊列的原理並且實現了一個低配版的數組阻塞隊列,其實如果你了解數組隊列和鎖的話,這個程式碼實現起來還是相對比較簡單的,我們只需要使用鎖去保證我們的程式並發安全即可。

  • 我們在實現put函數的時候,如果當前隊列已經滿了,則當前執行緒需要調用await函數進行阻塞,當執行緒被喚醒或者隊列沒有滿可以繼續執行的時候,我們在往隊列當中加入數據之後需要調用一次signal函數,因為這樣可以喚醒在調用take函數的時候因為隊列空而阻塞的執行緒。
  • 我們實現take函數的時候,如果當前隊列已經空了,則當前執行緒也需要調用await函數進行阻塞,當執行緒被喚醒或者隊列不為空執行緒可以繼續執行,在出隊之後需要調用一次signal函數,因為這樣可以喚醒在調用put函數的時候因為隊列滿而阻塞的執行緒。

以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,了解更多電腦(Java、Python、電腦系統基礎、演算法與數據結構)知識。