分享一個JDK批量非同步任務工具CompletionService,超好用
摘要:當需要批量提交非同步任務,推薦CompletionService。CompletionService將執行緒池Executor和阻塞隊列融合,讓批量非同步任務管理更簡單。
本文分享自華為雲社區《JDK批量非同步任務最強工具CompletionService》,作者: JavaEdge。
如何優化一個查詢各個價格介面的程式碼?若使用「ThreadPoolExecutor+Future」,可能優化如下:
三個執行緒非同步執行查詢價格,通過三次調用Future的get()方法獲取結果,之後將查詢結果保存在MySQL。
若獲取price1耗時很長,那麼即便獲取price2耗時短,也無法讓保存price2的操作先執行,因為主執行緒都阻塞在 f1.get()。這種問題如何解決呢?
加個阻塞隊列! 獲取到price1、2、3都進入阻塞隊列,然後在主執行緒消費阻塞隊列,就能保證先獲取到的價格先保存:
CompletionService實現查詢價格
實際開發推薦CompletionService,不但能幫你解決先獲取到的價格先保存,還能精簡程式碼。
CompletionService內部維護了一個阻塞隊列,當任務執行結束就把任務的執行結果入隊,但CompletionService是把任務執行結果的Future對象入隊,而上面demo是把任務最終執行結果入隊。
創建CompletionService
CompletionService介面的實現類是ExecutorCompletionService,這個實現類的構造方法有兩個,分別是:
- ExecutorCompletionService(Executor executor);
- ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
這倆構造器都需要傳入一個執行緒池,若不指定completionQueue,默認使用無界LinkedBlockingQueue。任務執行結果的Future對象就是加入到completionQueue中。
學長~你直接給我寫段程式碼解釋清楚點唄 ~
讓我們試著利用CompletionService實現高性能的查詢房價系統。 之後通過CompletionService#submit()提交三個詢價操作,這三個詢價操作將會被CompletionService非同步執行。
最後CompletionService#take()獲取一個Future對象(加入到阻塞隊列的是任務執行結果的Future對象),調用Future#get()就能返回執行結果。
CompletionService介面
CompletionService介面提供的方法
submit()相關的方法有兩個:
- 一個方法參數是Callable<V> task
- 一個方法有兩個參數,分別是Runnable task和V result,該方法類似於ThreadPoolExecutor的 <T> Future<T> submit(Runnable task, T result) ,
CompletionService實現Dubbo#Forking Cluster
Dubbo中有一種叫做Forking的集群模式,這種集群模式下,支援並行調用多個查詢服務,只要有一個成功返回結果,整個服務即可返回。例如你需要提供一個地址轉坐標的服務,為了保證該服務的高可用和性能,可並行調用3個地圖服務商的API,然後只要有1個正確返回了結果r,那麼地址轉坐標這個服務就可以直接返回r了。這種集群模式可以容忍2個地圖服務商服務異常,但缺點是消耗的資源偏多。
geocoder(addr) { // 並行執行以下3個查詢服務, r1=geocoderByS1(addr); r2=geocoderByS2(addr); r3=geocoderByS3(addr); // 只要r1,r2,r3有一個返回 // 則返回 return r1|r2|r3; }
利用CompletionService可快速實現 Forking 這種集群模式,比如下面示例程式碼。 首先創建一個執行緒池executor 、一個CompletionService對象cs和一個Future<Integer>類型的列表 futures,每次通過調用CompletionService的submit()方法提交一個非同步任務,會返回一個Future對象,把這些Future對象保存在列表futures中。通過調用 cs.take().get(),我們能夠拿到最快返回的任務執行結果,只要我們拿到一個正確返回的結果,就可以取消所有任務並且返回最終結果了。
// 創建執行緒池 ExecutorService executor = Executors.newFixedThreadPool(3); // 創建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 用於保存Future對象 List<Future<Integer>> futures = new ArrayList<>(3); // 提交非同步任務,並保存future到futures futures.add( cs.submit(()->geocoderByS1())); futures.add( cs.submit(()->geocoderByS2())); futures.add( cs.submit(()->geocoderByS3())); // 獲取最快返回的任務執行結果 Integer r = 0; try { // 只要有一個成功返回,則break for (int i = 0; i < 3; ++i) { r = cs.take().get(); // 簡單地通過判空來檢查是否成功返回 if (r != null) { break; } } } finally { // 取消所有任務 for(Future<Integer> f : futures) f.cancel(true); } // 返回結果 return r;
總結
當需要批量提交非同步任務,推薦CompletionService。CompletionService將執行緒池Executor和阻塞隊列融合,讓批量非同步任務管理更簡單。
CompletionService能讓非同步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用該特性,可以輕鬆實現後續處理的有序性,避免無謂等待,同時還可以快速實現諸如Forking Cluster這樣的需求。
CompletionService的實現類ExecutorCompletionService,需要你自己創建執行緒池,雖看上去有些啰嗦,但好處是你可以讓多個ExecutorCompletionService的執行緒池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。