案例復現,帶你分析Priority Blocking Queue比較器異常導致的NPE問題

摘要:本文通過完整的案例復現來演示在什麼情況會觸發該問題,同時給出了處理建議。希望讀者在編程時加以借鑒,避免再次遇到此類問題。

本文分享自華為雲社區《Priority Blocking Queue比較器異常導致的NPE問題分析》,作者:謝照昆、王嘉偉。

編者按:筆者在使用PriorityBlockingQueue實現按照優先級處理任務時遇到一類NPE問題,經過分析發現根本原因是在任務出隊列時調用比較器異常,進而導致後續任務出隊列拋出NullPointerException。本文通過完整的案例復現來演示在什麼情況會觸發該問題,同時給出了處理建議。希望讀者在編程時加以借鑒,避免再次遇到此類問題。

背景知識

PriorityBlockingQueue是一個無界的基於數組的優先級阻塞隊列,使用一個全局ReentrantLock來控制某一時刻只有一個線程可以進行元素出隊和入隊操作,並且每次出隊都返回優先級別最高的或者最低的元素。PriorityBlockingQueue通過以下兩種方式實現元素優先級排序:

  1. 入隊元素實現Comparable接口來比較元素優先級;
  2. PriorityBlockingQueue構造函數指定Comparator來比較元素優先級;

關於PriorityBlockingQueue中隊列操作的部分,基本和PriorityQueue邏輯一致,只不過在操作時加鎖了。在本文中我們主要關注PriorityBlockingQueue出隊的take方法,該方法通過調用dequeue方法將元素出隊列。當沒有元素可以出隊的時候,線程就會阻塞等待。

public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
 lock.lockInterruptibly();
 E result;
 try {
 // 嘗試獲取最小元素,即小頂堆第一個元素,然後重新排序,如果不存在表示隊列暫無元素,進行阻塞等待。
 while ( (result = dequeue()) == null)
 notEmpty.await();
 } finally {
 lock.unlock();
 }
 return result;
}

現象

在某個業務服務中使用PriorityBlockingQueue實現按照優先級處理任務,某一天環境中的服務突然間不處理任務了,查看後台日誌,發現一直拋出NullPointerException。將進程堆dump出來,使用MAT發現某個PriorityBlockingQueue中的size值比實際元素個數多1個(入隊時已經對任務進行非空校驗)。

異常堆棧如下:

java.lang.NullPointerException
at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
 ...

MAT結果:

原因分析

在此我們分析下PriorityBlockingQueue是如何出隊列的,PriorityBlockingQueue最終通過調用dequeue方法出隊列,dequeue方法處理邏輯如下:

  1. 將根節點(array[0])賦值給result;
  2. array[n] 賦值給 arrary[0];
  3. 將 array[n] 設置為 null;
  4. 調用siftDownComparable或siftDownUsingComparator對隊列元素重新排序;
  5. size大小減1;
  6. 返回result;

如果在第4步中出現異常,就會出現隊列中的元素個數比實際的元素個數多1個的現象。此時size未發生改變,arry[n]已經被置為null,再進行siftDown操作時就會拋出NullPointerException。繼續分析第4步中在什麼情況下會出現異常,通過代碼走讀我們可以發現只有在調用Comparable#compareTo或者Comparator#compare方法進行元素比較的時候才可能出現異常。這塊代碼的處理邏輯和業務相關,如果業務代碼處理不當拋出異常,就會導致上述現象。

 /**
     * Mechanics for poll().  Call only while holding lock.
     */
 private E dequeue() {
 int n = size - 1;
 if (n < 0)
 return null;
 else {
 Object[] array = queue;
 E result = (E) array[0]; //step1
 E x = (E) array[n]; //step2
            array[n] = null; //step3
 Comparator<? super E> cmp = comparator;
 if (cmp == null) //step4 如果指定了comparator,就按照指定的comparator來比較。否則就按照默認的
 siftDownComparable(0, x, array, n);
 else
 siftDownUsingComparator(0, x, array, n, cmp);
            size = n; //step5
 return result; //step6
 }
 }
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
 if (n > 0) {
 Comparable<? super T> key = (Comparable<? super T>)x;
 int half = n >>> 1;
 while (k < half) {
 int child = (k << 1) + 1; 
 Object c = array[child];
 int right = child + 1;
 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 
                c = array[child = right];
 if (key.compareTo((T) c) <= 0) 
 break;
            array[k] = c;
            k = child;
 }
        array[k] = key;
 }
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n,
 Comparator<? super T> cmp) {
 if (n > 0) {
 int half = n >>> 1;
 while (k < half) {
 int child = (k << 1) + 1;
 Object c = array[child];
 int right = child + 1;
 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
 if (cmp.compare(x, (T) c) <= 0)
 break;
            array[k] = c;
            k = child;
 }
        array[k] = x;
 }
}

復現代碼

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
 static class Entity implements Comparable<Entity> {
 private int id;
 private String name;
 private boolean flag;
 public void setFlag(boolean flag) {
 this.flag = flag;
 }
 public Entity(int id, String name) {
 this.id = id;
 this.name = name;
 }
 @Override
 public int compareTo(Entity entity) {
 if(flag) {
 throw new RuntimeException("Test Exception");
 }
 if (entity == null || this.id > entity.id) {
 return 1;
 }
 return this.id == entity.id ? 0 : -1;
 }
 }
 public static void main(String[] args) {
 int num = 5;
 PriorityBlockingQueue<Entity> priorityBlockingQueue = new PriorityBlockingQueue<>();
 List<Entity> entities = new ArrayList<>();
 for (int i = 0; i < num; i++) {
 Entity entity = new Entity(i, "entity" + i);
 entities.add(entity);
 priorityBlockingQueue.offer(entity);
 }
 entities.get(num - 1).setFlag(true);
 int size = entities.size();
 for (int i = 0; i < size; i++) {
 try {
 priorityBlockingQueue.take();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }

執行結果如下:

java.lang.RuntimeException: Test Exception
at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31)
at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8)
at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
java.lang.NullPointerException
at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)

規避方案

可以通過以下兩種方法規避:

  • 在take方法出現NPE時,清除隊列元素,將未處理的元素重新進入隊列;
  • 在 Comparable#compareTo 或 Comparator#compare 方法中做好異常處理,對異常情況進行默認操作;

建議使用後者。

案例引申

使用PriorityBlockingQueue作為緩存隊列來創建線程池時,使用submit提交任務會出現 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 異常,而使用execute沒有問題。

觀察submit源碼可以發現在submit內部代碼會將Runable封裝成RunnableFuture對象,然後調用execute提交任務。

public Future<?> submit(Runnable task) {
 if (task == null) throw new NullPointerException();
 RunnableFuture<Void> ftask = newTaskFor(task, null);
 execute(ftask);
 return ftask;
}

以Comparable為例,任務入隊列時,最終會調用siftUpComparable方法。該方法第一步將RunnableFuture強轉為Comparable類型,而RunnableFuture類未實現Comparable接口,進而拋出ClassCastException異常。

public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}

這也是常見的比較器調用異常案例,本文不再贅述,可自行參考其他文章。

總結

在使用PriorityBlockingQueue時,注意在比較器中做好異常處理,避免出現類似問題。

後記

如果遇到相關技術問題(包括不限於畢昇 JDK),可以進入畢昇 JDK 社區查找相關資源(點擊閱讀原文進入官網),包括二進制下載、代碼倉庫、使用教學、安裝、學習資料等。

 

點擊關注,第一時間了解華為雲新鮮技術~