分享一個JDK批量非同步任務工具CompletionService,超好用

摘要:當需要批量提交非同步任務,推薦CompletionService。CompletionService將執行緒池Executor和阻塞隊列融合,讓批量非同步任務管理更簡單。

本文分享自華為雲社區《JDK批量非同步任務最強工具CompletionService》,作者: JavaEdge。

如何優化一個查詢各個價格介面的程式碼?若使用「ThreadPoolExecutor+Future」,可能優化如下:

三個執行緒非同步執行查詢價格,通過三次調用Future的get()方法獲取結果,之後將查詢結果保存在MySQL。

若獲取price1耗時很長,那麼即便獲取price2耗時短,也無法讓保存price2的操作先執行,因為主執行緒都阻塞在 f1.get()。這種問題如何解決呢?

加個阻塞隊列! 獲取到price1、2、3都進入阻塞隊列,然後在主執行緒消費阻塞隊列,就能保證先獲取到的價格先保存:

CompletionService實現查詢價格

實際開發推薦CompletionService,不但能幫你解決先獲取到的價格先保存,還能精簡程式碼。

CompletionService內部維護了一個阻塞隊列,當任務執行結束就把任務的執行結果入隊,但CompletionService是把任務執行結果的Future對象入隊,而上面demo是把任務最終執行結果入隊。

創建CompletionService

CompletionService介面的實現類是ExecutorCompletionService,這個實現類的構造方法有兩個,分別是:

  • ExecutorCompletionService(Executor executor);

  • ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

這倆構造器都需要傳入一個執行緒池,若不指定completionQueue,默認使用無界LinkedBlockingQueue。任務執行結果的Future對象就是加入到completionQueue中。

學長~你直接給我寫段程式碼解釋清楚點唄 ~

讓我們試著利用CompletionService實現高性能的查詢房價系統。 之後通過CompletionService#submit()提交三個詢價操作,這三個詢價操作將會被CompletionService非同步執行。

最後CompletionService#take()獲取一個Future對象(加入到阻塞隊列的是任務執行結果的Future對象),調用Future#get()就能返回執行結果。

CompletionService介面

CompletionService介面提供的方法

submit()相關的方法有兩個:

  • 一個方法參數是Callable<V> task
  • 一個方法有兩個參數,分別是Runnable task和V result,該方法類似於ThreadPoolExecutor的 <T> Future<T> submit(Runnable task, T result) ,

CompletionService實現Dubbo#Forking Cluster

Dubbo中有一種叫做Forking的集群模式,這種集群模式下,支援並行調用多個查詢服務,只要有一個成功返回結果,整個服務即可返回。例如你需要提供一個地址轉坐標的服務,為了保證該服務的高可用和性能,可並行調用3個地圖服務商的API,然後只要有1個正確返回了結果r,那麼地址轉坐標這個服務就可以直接返回r了。這種集群模式可以容忍2個地圖服務商服務異常,但缺點是消耗的資源偏多。

geocoder(addr) {
   // 並行執行以下3個查詢服務, 
   r1=geocoderByS1(addr);
   r2=geocoderByS2(addr);
   r3=geocoderByS3(addr);
   // 只要r1,r2,r3有一個返回
   // 則返回
   return r1|r2|r3;
 }

利用CompletionService可快速實現 Forking 這種集群模式,比如下面示例程式碼。 首先創建一個執行緒池executor 、一個CompletionService對象cs和一個Future<Integer>類型的列表 futures,每次通過調用CompletionService的submit()方法提交一個非同步任務,會返回一個Future對象,把這些Future對象保存在列表futures中。通過調用 cs.take().get(),我們能夠拿到最快返回的任務執行結果,只要我們拿到一個正確返回的結果,就可以取消所有任務並且返回最終結果了。

 // 創建執行緒池
 ExecutorService executor =
   Executors.newFixedThreadPool(3);
 // 創建CompletionService
 CompletionService<Integer> cs =
   new ExecutorCompletionService<>(executor);
 // 用於保存Future對象
 List<Future<Integer>> futures =
   new ArrayList<>(3);
 // 提交非同步任務,並保存future到futures 
 futures.add(
   cs.submit(()->geocoderByS1()));
 futures.add(
   cs.submit(()->geocoderByS2()));
 futures.add(
   cs.submit(()->geocoderByS3()));
 // 獲取最快返回的任務執行結果
 Integer r = 0;
 try {
   // 只要有一個成功返回,則break
   for (int i = 0; i < 3; ++i) {
     r = cs.take().get();
     // 簡單地通過判空來檢查是否成功返回
     if (r != null) {
       break;
     }
   }
 } finally {
   // 取消所有任務
   for(Future<Integer> f : futures)
     f.cancel(true);
 }
 // 返回結果
 return r;

總結

當需要批量提交非同步任務,推薦CompletionService。CompletionService將執行緒池Executor和阻塞隊列融合,讓批量非同步任務管理更簡單。

CompletionService能讓非同步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用該特性,可以輕鬆實現後續處理的有序性,避免無謂等待,同時還可以快速實現諸如Forking Cluster這樣的需求。

CompletionService的實現類ExecutorCompletionService,需要你自己創建執行緒池,雖看上去有些啰嗦,但好處是你可以讓多個ExecutorCompletionService的執行緒池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。

 

點擊關注,第一時間了解華為雲新鮮技術~