­

異步編排 Spring(線程池)

異步編排

源碼位置: GitHub

CompletableFuture 的詳解

🍫 它就是創建一個異步任務,然後在幹什麼,可以使用多任務組合

  • 創建任務的方法
static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 然後繼續上一段的任務(裏面包含了串行,AND,OR)

🚩 串行:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一個任務返回結果的類型
U:當前任務的返回值類型

參數解析:

thenApply 方法:當一個線程依賴另一個線程時,獲取上一個任務返回的結果,並返回當前任務的返回值。(接收上一階段任務結果,返回結果)

thenAccept方法:消費處理結果。接收任務的處理結果,並消費處理,無返回結果。  (接收上一階段任務結果,不返回結果)

thenRun方法:不接收上一階段任務結果,並且無返回值

帶有Async默認是異步執行的。這裡所謂的異步指的是不在當前線程內執行。

🚡 AND

public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
- 上一階段任務與other任務均執行結束,接收兩個任務的結果,並可獲取返回值

public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
         Executor executor);
- 使用上一階段任務的結果,返回一個新的CompletableFuture實例

更多的參數詳解: [博客鏈接](CompletableFuture 異步編排 – 掘金 (juejin.cn))

  • 多任務組合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

代碼測試

配置類的引入

yml

thread:
  pool:
    corePoolSize: 4
    maxPoolSize: 8
    workQueue: 25
    keepAliveTime: 30

config

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class AsyncConfig{
    //核心線程數量大小
    private   int corePoolSize = 4;
    //線程池最大容納線程數
    private   int maxPoolSize =8;
    //阻塞隊列
    private   int workQueue = 25;
    //線程空閑後的存活時長
    private   int keepAliveTime = 30;

    @Bean("asyncTaskExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //核心線程數
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        //最大線程數
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        //等待隊列
        threadPoolTaskExecutor.setQueueCapacity(workQueue);
        //線程前綴
        threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor-");
        //線程池維護線程所允許的空閑時間,單位為秒
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);
        // 線程池對拒絕任務(無線程可用)的處理策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

🍫 參數詳解: 建議看看下面的線程池對拒絕任務(無線程可用)的處理策略

    1、corePoolSize:核心線程數
        * 核心線程會一直存活,及時沒有任務需要執行
        * 當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
        * 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
 
    2、queueCapacity:任務隊列容量(阻塞隊列)
        * 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
 
    3、maxPoolSize:最大線程數
        * 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
        * 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
 
    4、 keepAliveTime:線程空閑時間
        * 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
        * 如果allowCoreThreadTimeout=true,則會直到線程數量=0
 
    5、allowCoreThreadTimeout:允許核心線程超時
    6、rejectedExecutionHandler:任務拒絕處理器
        * 兩種情況會拒絕處理任務:
            - 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
            - 當線程池被調用shutdown()後,會等待線程池裡的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
        * 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
        * ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
            - AbortPolicy 丟棄任務,拋運行時異常(默認)
            - CallerRunsPolicy 執行任務
            - DiscardPolicy 忽視,什麼都不會發生
            - DiscardOldestPolicy 從隊列中踢出最先進入隊列(最後一個執行)的任務
        * 實現RejectedExecutionHandler接口,可自定義處理器

Demo1

image-20221115170031342

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壺");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "水壺";
        }).thenApply(e->{
            System.out.println("燒水");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "熱水";
        });
        //洗水壺->洗水杯->拿茶葉
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗茶壺");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "茶壺";
        }).thenApply(e->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("洗水杯");
            return "水杯";
        }).thenApply(e->{
            System.out.println("拿茶葉");
            return "茶葉";
        });
        //泡茶
        CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
            System.out.println("泡茶");
            return "茶";
        });
        String tea = task3.join();
        System.out.println(tea);

🍫 參數解析:

更多的參數詳解: [博客鏈接](CompletableFuture 異步編排 – 掘金 (juejin.cn))

Demo2

  • 這個測試我就沒有寫了,自己可以看看

問題:當查詢接口較複雜時候,數據的獲取都需要遠程調用,必然需要花費更多的時間。

假如查詢文章詳情頁面,需要如下標註的時間才能完成:


// 1. 查詢文章詳情 0.5s

// 2. 查詢文章博主個人信息 0.5s

// 3. 查詢文章評論 1s

// 4. 查詢博主相關文章分類 1s

// 5. 相關推薦文章 1s

// ......
上面的描述只是舉個例子不要在意這裡的查詢描述,看實際情況使用,有些相關的查詢我們可以拆分接口實現,上面的描述只是為了舉例子。
@Service
public class ArticleService {
	@Autowired
    private ArticleClient articleClient;
    @Autowired
    private UserClient userClient;
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    
 	public ItemVo load(Long id) {
	// 1. 查詢文章詳情 0.5s
	// 下面的查詢需要用到文章對應的發佈用戶,所以這裡需要使用CompletableFuture.supplyAsync
	CompletableFuture<ArticleEntity> articleCompletableFuture = CompletableFuture.supplyAsync(() -> {
            ResponseVo<ArticleEntity> skuEntityResponseVo = this.articleClient.getArticleById(id);
            ArticleEntity articleEntity = skuEntityResponseVo.getData();
            if (articleEntity == null) {
                return null;
            }
            itemVo.setId(id);
            itemVo.setTitle(articleEntity.getTitle());
            itemVo.setDefaltImage(articleEntity.getDefaultImage());
            return articleEntity;
        }, threadPoolExecutor);
	// 2. 查詢文章博主個人信息 0.5s
	// 這裡查詢需要依賴文章關聯的用戶id,所以需要使用articleCompletableFuture.thenAcceptAsync()
    CompletableFuture<Void> userCompletableFuture = articleCompletableFuture.thenAcceptAsync(articleEntity -> {
        ResponseVo<UserEntity> categoryResponseVo = this.userClient.queryUserInfoById(articleEntity.getUserId());
        UserEntity userEntity = categoryResponseVo.getData();
        itemVo.setUserInfo(userEntity);
    }, threadPoolExecutor);    
	// 3. 查詢博主相關文章分類 1s
	// 這裡查詢需要依賴文章關聯的用戶id,所以需要使用articleCompletableFuture.thenAcceptAsync()
    CompletableFuture<Void> userOtherArticleCompletableFuture = articleCompletableFuture.thenAcceptAsync(articleEntity -> {
        ResponseVo<List<UserAuserOtherArticleEntity>> categoryResponseVo = this.articleClient.queryUserAuserOtherArticleById(articleEntity.getUserId());
        UserAuserOtherArticleEntity userAuserOtherArticleEntity = categoryResponseVo.getData();
        itemVo.setUserAuserOtherArticleList(userAuserOtherArticleEntity);
    }, threadPoolExecutor);
    // 4. 查詢文章評論 1s
    // 不需要依賴其他請求返回值,可以使用新的異步對象 CompletableFuture.runAsync()
    CompletableFuture<Void> commentsCompletableFuture =  CompletableFuture.runAsync(() -> {
        ResponseVo<List<UserArticleCategoryEntity>> userArticleCategoryVo = this.userClient.queryCommentsByArticleId(id);
        UserArticleCategoryEntity userArticleCategoryEntity = userArticleCategoryVo.getData();
        itemVo.setUserArticleCategoryList(userArticleCategoryEntity);
    }, threadPoolExecutor);
	// 5. 相關推薦文章 1s
	// 不需要依賴其他請求返回值,可以使用新的異步對象 CompletableFuture.runAsync()
	CompletableFuture<Void> relatedArticlesCompletableFuture =  CompletableFuture.runAsync(() -> {
        ResponseVo<List<RelatedArticlesEntity>> userArticleCategoryVo = this.articleClient.queryRelatedArticles(id);
        UserArticleCategoryEntity userArticleCategoryEntity = userArticleCategoryVo.getData();
        itemVo.setUserArticleCategoryList(userArticleCategoryEntity);
    }, threadPoolExecutor);
	}
	// 多任務執行組合 CompletableFuture.allOf()
	CompletableFuture.allOf(articleCompletableFuture, userCompletableFuture, userOtherArticleCompletableFuture,
                commentsCompletableFuture, relatedArticlesCompletableFuture).join();
     return itemVo;
}

CompletableFuture的async後綴函數與不帶async的函數的區別

參考鏈接: [博客鏈接]((106條消息) CompletableFuture的async後綴函數與不帶async的函數的區別_leon_wzm的博客-CSDN博客_thenacceptasync)

🍫 結論:

不帶async的函數的動作比較複雜

f的whenComplete的內容由哪個線程來執行,取決於哪個線程X執行了f.complete()。但是當X線程執行了f.complete()的時候,whenComplete還沒有被執行到的時候(就是事件還沒有註冊的時候),那麼X線程就不會去同步執行whenComplete的回調了。這個時候哪個線程執行到了whenComplete的事件註冊的時候,就由哪個線程自己來同步執行whenComplete的事件內容。

而whenCompleteAsync的場合,就簡單很多。一句話就是線程池裏面拿一個空的線程或者新啟一個線程來執行回調。和執行f.complete的線程以及執行whenCompleteAsync的線程無關。

ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的區別

參考鏈接: [博客鏈接]((106條消息) ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的區別_PonderYao的博客-CSDN博客_threadpooltaskexecutor和threadpoolexecutor)

🍫 結論:

其實最主要的原因很直觀:ThreadPoolExecutor是一個不受Spring管理生命周期、參數裝配的Java類,而有了ThreadPoolTaskExecutor的封裝,線程池才有Spring「內味」。

Spring 線程池的使用

業務使用多線程的原因

  • 目的是面對高並發的時候,提高運行速度

場景一:

一個業務邏輯有很多次的循環,每次循環之間沒有影響,比如驗證1萬條url路徑是否存在,正常情況要循環1萬次,逐個去驗證每一條URL,這樣效率會很低,假設驗證一條需要1分鐘,總共就需要1萬分鐘,有點恐怖。這時可以用多線程,將1萬條URL分成50等份,開50個線程,沒個線程只需驗證200條,這樣所有的線程執行完是遠小於1萬分鐘的。

場景二:

需要知道一個任務的執行進度,比如我們常看到的進度條,實現方式可以是在任務中加入一個整型屬性變量(這樣不同方法可以共享),任務執行一定程度就給變量值加1,另外開一個線程按時間間隔不斷去訪問這個變量,並反饋給用戶。總之使用多線程就是為了充分利用cpu的資源,提高程序執行效率,當你發現一個業務邏輯執行效率特別低,耗時特別長,就可以考慮使用多線程。

問題:不過CPU執行哪個線程的時間和順序是不確定的,即使設置了線程的優先級,因此使用多線程的風險也是比較大的,會出現很多預料不到的問題,一定要多熟悉概念,多構造不同的場景去測試才能夠掌握!

項目中可以通過:

@Order()
設置運行的優先級,數字越小,級別越高

FutureTask介紹

🍫 參考: [博客鏈接]((101條消息) FutureTask詳解_索碼理的博客-CSDN博客_futuretask)

線程池為什麼要使用阻塞隊列

阻塞隊列可以保證任務隊列中沒有任務時阻塞獲取任務的線程,使得線程進入wait 狀態,釋放 cpu 資源,當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執行。
使得在線程不至於一直佔用cpu資源。(線程執行完任務後通過循環再次從任務隊列中取出任務進行執行,代碼片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞隊列也是可以的,不過實現起來比較麻煩而已,有好用的為啥不用呢

Spring 常用的線程池的使用

序列

Spring 通過任務執行器(TaskExecutor)來實現多線程和並發編程,使用 ThreadPoolTaskExecutor 實現一個基於線程池的TaskExecutor,
還得需要使用 @EnableAsync 開啟異步,並通過在需要的異步方法那裡使用註解@Async聲明是一個異步任務
Spring 已經實現的異常線程池

- SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程。

- SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操作。只適用於不需要多線程的地方

- ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類

- SimpleThreadPoolTaskExecutor:是Quartz的 SimpleThreadPool 的類。線程池同時被quartz和非quartz使用,才需要使用此類

- ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對 java.util.concurrent.ThreadPoolExecutor 的包裝

🍫 擴展:相信大家在 Java 裏面也學過 JUC ,裏面有 Java 裏面的線程池,可以直接去看看 ThreadPoolExecutor

至於為什麼有線程池,Spring 為什麼還有在自己搞一個,可以自己去探索,Spring 的底層還是 ThreadPoolExecutor ,只是它的生命周期不受它控制。

常規使用

//    線程池(config裏面的Bean)
    @Autowired
    private Executor taskExecutor;
    
Callable<ScyTeacher> scy =()-> scyTeacherMapper.selectOne(new LambdaQueryWrapper<ScyTeacher>()
                                      .eq(ScyTeacher::getUsername,test));
FutureTask<ScyTeacher> commentCallable = new FutureTask<>(scy);
Future<Map> submit = executor.submit(commentCallable);
Map map = submit.get();

異步使用

🚩 記得開啟異步(配置類添加)

@EnableAsync //開啟異步執行
package com.melodyjerry.thread;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @classname AsyncTaskService
 * @description 異步任務的執行類
 */
@Service
public class AsyncTaskService {
    @Async //異步方法
    public void executeAsyncTask(Integer i) {
        System.out.javaprintln("執行異步任務: "+i);
    }

    @Async //異步方法
    public void executeAsyncTaskPlus(Integer i) {
        System.out.println("執行異步任務+1: " + (i+1));
    }
}