案例復現,帶你分析Priority Blocking Queue比較器異常導致的NPE問題
摘要:本文通過完整的案例復現來演示在什麼情況會觸發該問題,同時給出了處理建議。希望讀者在編程時加以借鑒,避免再次遇到此類問題。
本文分享自華為雲社區《Priority Blocking Queue比較器異常導致的NPE問題分析》,作者:謝照昆、王嘉偉。
編者按:筆者在使用PriorityBlockingQueue實現按照優先級處理任務時遇到一類NPE問題,經過分析發現根本原因是在任務出隊列時調用比較器異常,進而導致後續任務出隊列拋出NullPointerException。本文通過完整的案例復現來演示在什麼情況會觸發該問題,同時給出了處理建議。希望讀者在編程時加以借鑒,避免再次遇到此類問題。
背景知識
PriorityBlockingQueue是一個無界的基於數組的優先級阻塞隊列,使用一個全局ReentrantLock來控制某一時刻只有一個線程可以進行元素出隊和入隊操作,並且每次出隊都返回優先級別最高的或者最低的元素。PriorityBlockingQueue通過以下兩種方式實現元素優先級排序:
- 入隊元素實現Comparable接口來比較元素優先級;
- PriorityBlockingQueue構造函數指定Comparator來比較元素優先級;
關於PriorityBlockingQueue中隊列操作的部分,基本和PriorityQueue邏輯一致,只不過在操作時加鎖了。在本文中我們主要關注PriorityBlockingQueue出隊的take方法,該方法通過調用dequeue方法將元素出隊列。當沒有元素可以出隊的時候,線程就會阻塞等待。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 嘗試獲取最小元素,即小頂堆第一個元素,然後重新排序,如果不存在表示隊列暫無元素,進行阻塞等待。 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
現象
在某個業務服務中使用PriorityBlockingQueue實現按照優先級處理任務,某一天環境中的服務突然間不處理任務了,查看後台日誌,發現一直拋出NullPointerException。將進程堆dump出來,使用MAT發現某個PriorityBlockingQueue中的size值比實際元素個數多1個(入隊時已經對任務進行非空校驗)。
異常堆棧如下:
java.lang.NullPointerException at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404) at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333) at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548) ...
MAT結果:
原因分析
在此我們分析下PriorityBlockingQueue是如何出隊列的,PriorityBlockingQueue最終通過調用dequeue方法出隊列,dequeue方法處理邏輯如下:
- 將根節點(array[0])賦值給result;
- array[n] 賦值給 arrary[0];
- 將 array[n] 設置為 null;
- 調用siftDownComparable或siftDownUsingComparator對隊列元素重新排序;
- size大小減1;
- 返回result;
如果在第4步中出現異常,就會出現隊列中的元素個數比實際的元素個數多1個的現象。此時size未發生改變,arry[n]已經被置為null,再進行siftDown操作時就會拋出NullPointerException。繼續分析第4步中在什麼情況下會出現異常,通過代碼走讀我們可以發現只有在調用Comparable#compareTo或者Comparator#compare方法進行元素比較的時候才可能出現異常。這塊代碼的處理邏輯和業務相關,如果業務代碼處理不當拋出異常,就會導致上述現象。
/** * Mechanics for poll(). Call only while holding lock. */ private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; //step1 E x = (E) array[n]; //step2 array[n] = null; //step3 Comparator<? super E> cmp = comparator; if (cmp == null) //step4 如果指定了comparator,就按照指定的comparator來比較。否則就按照默認的 siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; //step5 return result; //step6 } } private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
復現代碼
import java.util.ArrayList; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; public class PriorityBlockingQueueTest { static class Entity implements Comparable<Entity> { private int id; private String name; private boolean flag; public void setFlag(boolean flag) { this.flag = flag; } public Entity(int id, String name) { this.id = id; this.name = name; } @Override public int compareTo(Entity entity) { if(flag) { throw new RuntimeException("Test Exception"); } if (entity == null || this.id > entity.id) { return 1; } return this.id == entity.id ? 0 : -1; } } public static void main(String[] args) { int num = 5; PriorityBlockingQueue<Entity> priorityBlockingQueue = new PriorityBlockingQueue<>(); List<Entity> entities = new ArrayList<>(); for (int i = 0; i < num; i++) { Entity entity = new Entity(i, "entity" + i); entities.add(entity); priorityBlockingQueue.offer(entity); } entities.get(num - 1).setFlag(true); int size = entities.size(); for (int i = 0; i < size; i++) { try { priorityBlockingQueue.take(); } catch (Exception e) { e.printStackTrace(); } } }
執行結果如下:
java.lang.RuntimeException: Test Exception at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31) at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8) at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404) at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333) at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548) at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71) java.lang.NullPointerException at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404) at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333) at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548) at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
規避方案
可以通過以下兩種方法規避:
- 在take方法出現NPE時,清除隊列元素,將未處理的元素重新進入隊列;
- 在 Comparable#compareTo 或 Comparator#compare 方法中做好異常處理,對異常情況進行默認操作;
建議使用後者。
案例引申
使用PriorityBlockingQueue作為緩存隊列來創建線程池時,使用submit提交任務會出現 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 異常,而使用execute沒有問題。
觀察submit源碼可以發現在submit內部代碼會將Runable封裝成RunnableFuture對象,然後調用execute提交任務。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
以Comparable為例,任務入隊列時,最終會調用siftUpComparable方法。該方法第一步將RunnableFuture強轉為Comparable類型,而RunnableFuture類未實現Comparable接口,進而拋出ClassCastException異常。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
這也是常見的比較器調用異常案例,本文不再贅述,可自行參考其他文章。
總結
在使用PriorityBlockingQueue時,注意在比較器中做好異常處理,避免出現類似問題。
後記
如果遇到相關技術問題(包括不限於畢昇 JDK),可以進入畢昇 JDK 社區查找相關資源(點擊閱讀原文進入官網),包括二進制下載、代碼倉庫、使用教學、安裝、學習資料等。