spring-boot @Async註解 解決異步多線程入庫的問題

 

前言
在開發過程中,我們會遇到很多使用線程池的業務場景,例如定時任務使用的就是ScheduledThreadPoolExecutor。而有些時候使用線程池的場景就是會將一些可以進行異步操作的業務放在線程池中去完成,例如在生成訂單的時候給用戶發送短訊,生成訂單的結果不應該被發送短訊的成功與否所左右,也就是說生成訂單這個主操作是不依賴於發送短訊這個操作,所以我們就可以把發送短訊這個操作置為異步操作。而要想完成異步操作,一般使用的一個是消息服務器MQ,一個就是線程池。今天我們就來看看在Java中常用的Spring框架中如何去使用線程池來完成異步操作,以及分析背後的原理。

 

在Spring4中,Spring中引入了一個新的註解@Async,這個註解讓我們在使用Spring完成異步操作變得非常方便。

 

在SpringBoot環境中,要使用@Async註解,我們需要先在啟動類上加上@EnableAsync註解。這個與在SpringBoot中使用@Scheduled註解需要在啟動類中加上@EnableScheduling是一樣的道理(當然你使用古老的XML配置也是可以的,但是在SpringBoot環境中,建議的是全註解開發),具體原理下面會分析。加上@EnableAsync註解後,如果我們想在調用一個方法的時候開啟一個新的線程開始異步操作,我們只需要在這個方法上加上@Async註解,當然前提是,這個方法所在的類必須在Spring環境中。

 

項目實況介紹

項目中,我需要將700w條數據,定時任務加入到mysql表中,去掉日誌打印和一些其他因素的影響,入庫時間還是需要8個小時以上,嚴重影響後續的一系列操作,所以我才用@Async註解,來實現異步入庫,開了7個線程,入庫時間縮短為1.5個小時,大大提高效率,以下是詳細介紹,一級一些需要注意的坑.

需要寫個配置文件兩種方式

第一種方式

 

@Configuration
@EnableAsync //啟用異步任務
public class ThreadConfig {
    @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          //配置核心線程數
        executor.setCorePoolSize(15);
          //配置最大線程數
        executor.setMaxPoolSize(30);
          //配置隊列大小
        executor.setQueueCapacity(1000);
          //線程的名稱前綴
        executor.setThreadNamePrefix("Executor-");
          //線程活躍時間(秒)
        //executor.setKeepAliveSeconds(60);
          //等待所有任務結束後再關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
          //設置拒絕策略
        //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //執行初始化
        executor.initialize();
        return executor;
    }
}

 

第二種方式

 

@Configuration
@EnableAsync
public class ExecutorConfig {

   @Value("${thread.maxPoolSize}")
   private Integer maxPoolSize;
   @Value("${thread.corePoolSize}")
   private Integer corePoolSize;
   @Value("${thread.keepAliveSeconds}")
   private Integer keepAliveSeconds;
   @Value("${thread.queueCapacity}")
   private Integer queueCapacity;
   @Bean
   public ThreadPoolTaskExecutor asyncExecutor(){
      ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
      taskExecutor.setCorePoolSize(corePoolSize);//核心數量
      taskExecutor.setMaxPoolSize(maxPoolSize);//最大數量
      taskExecutor.setQueueCapacity(queueCapacity);//隊列
      taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活時間
      taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//設置等待任務完成後線程池再關閉
      taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//設置拒絕策略
      taskExecutor.initialize();//初始化
      return taskExecutor;
   }
}

 

配置文件

#線程池
thread:
  corePoolSize: 5
  maxPoolSize: 10
  queueCapacity: 100
  keepAliveSeconds: 3000

 

 

springboot默認是不開啟異步註解功能的,所以,要讓springboot中識別@Async,則必須在入口文件中,開啟異步註解功能

 

package com.demo;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
 
//開啟異步註解功能
@EnableAsync
@SpringBootApplication
public class SpringbootTaskApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(SpringbootTaskApplication.class, args);
    }
 
}

 

 

 

這裡有個坑!

如果遇到報錯:需要加上    proxyTargetClass = true

The bean 'xxxService' could not be injected as a'com.xxxx.xxx.xxxService' because it is a JDK dynamic proxy that implements:
xxxxxx
Action:
Consider injecting the bean as one of its interfaces orforcing the use of CGLib-based proxiesby setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
package com.demo;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
 
//開啟異步註解功能
@EnableAsync(proxyTargetClass = true)
@SpringBootApplication
public class SpringbootTaskApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(SpringbootTaskApplication.class, args);
    }
 
}

 

當我service層處理完邏輯,吧list分成7個小list然後調用異步方法(異步方法的參數不用管,沒影響,只截取核心代碼)

List<List<DistributedPredictDTO>> partition = Lists.partition(userList, userList.size() / 7);
        for (List<DistributedPredictDTO> distributedPredictDTOS : partition) {
       //調用異步方法 threadService.getI(beginDate, endDate, tableName, distributedPredictDTOS, hMap, i); }

 

@Slf4j
@Service
public class ThreadServiceImpl {
    @Resource
    ResourcePoolUrlProperties properties;
    @Resource
    private MonitorDao monitorDao;
    @Async
    Integer getI(String beginDate, String endDate, String tableName, List<DistributedPredictDTO> userList, Map<String, String> hMap, int i) {
        log.info("我開始執行");
        for (DistributedPredictDTO e : userList) {
            String responseStr;
            HashMap<String, String> pMap = Maps.newHashMap();
            pMap.put("scheduleId", e.getScheduleId());
            pMap.put("scheduleName", e.getScheduleName());
            pMap.put("distribsunStationId", e.getLabel());
            pMap.put("distribsunStationName", e.getValue());
            pMap.put("beginTime", beginDate);
            pMap.put("endTime", endDate);
            try {
                if ("180".equals(properties.getNewPowerSys().getDistributedPredictUrl().substring(17, 20))) {
                    pMap = null;
                }
                responseStr = HttpClientUtil.doPost(properties.getNewPowerSys().getDistributedPredictUrl(), hMap, pMap);
            } catch (Exception exception) {
                throw new RuntimeException(e.getValue() + "的功率預測接口異常" + hMap + pMap);
            }
            if (org.springframework.util.StringUtils.isEmpty(responseStr)) {
                log.info(e + "數據為空");
                continue;
            }
            JSONObject resJson = JSONObject.parseObject(responseStr);
            JSONObject obj = (JSONObject) resJson.get("obj");
            JSONArray tableData = (JSONArray) obj.get("tabledata");

            final List<DistributedUserPower> userPowers = Lists.newArrayList();
            for (Object o : tableData) {
                final DistributedUserPower distributedUserPower = new DistributedUserPower();
                distributedUserPower.setData(((JSONObject) o).get("data").toString());
                distributedUserPower.setData2(((JSONObject) o).get("data2").toString());
                distributedUserPower.setDataTime(((JSONObject) o).get("time").toString());
                distributedUserPower.setUserId(e.getLabel());
                distributedUserPower.setUserName(e.getValue());
                distributedUserPower.setAreaName(e.getScheduleName());
                distributedUserPower.setCreateTime(DateUtils.getDate());
                userPowers.add(distributedUserPower);
            }
            monitorDao.saveBatch(userPowers, tableName);
            i++;
        }
        return i;
    }

 

 

 

這裡有兩個坑!

第一個坑:

  我調用的異步方法在當前類中,則直接導致

@Async註解失效
正確操作,異步方法不要和同步調用方法寫在同一個類中,應該重新調用其他類

第二個坑:

如果出現這個報錯:

Null return value from advice does not mat

問題分析

代碼中採用異步調用,AOP 做來一層切面處理,底層是通過 JDK 動態代理實現

不管採用 JDK 還是 CGLIB 代理,返回值必須是包裝類型,所以才會導致上訴的報錯信息

處理方案
將異步方法的返回值修改為基本類型的對應包裝類型即可,如 int -> Integer

 

5分鐘測試效果圖:

最後一張是7線程: