通過源碼理解Spring中@Scheduled的實現原理並且實現調度任務動態裝載

前提

最近的新項目和數據同步相關,有定時調度的需求。之前一直有使用過QuartzXXL-JobEasy Scheduler等調度框架,後來越發覺得這些框架太重量級了,於是想到了Spring內置的Scheduling模組。而原生的Scheduling模組只是記憶體態的調度模組,不支援任務的持久化或者配置(配置任務通過@Scheduled註解進行硬編碼,不能抽離到類之外),因此考慮理解Scheduling模組的底層原理,並且基於此造一個簡單的輪子,使之支援調度任務配置:通過配置文件或者JDBC數據源。

Scheduling模組

Scheduling模組是spring-context依賴下的一個包org.springframework.scheduling

這個模組的類並不多,有四個子包:

  • 頂層包的定義了一些通用介面和異常。
  • org.springframework.scheduling.annotation:定義了調度、非同步任務相關的註解和解析類,常用的註解如@Async@EnableAsync@EnableScheduling@Scheduled
  • org.springframework.scheduling.concurrent:定義了調度任務執行器和相對應的FactoryBean
  • org.springframework.scheduling.config:定義了配置解析、任務具體實現類、調度任務XML配置文件解析相關的解析類。
  • org.springframework.scheduling.support:定義了反射支援類、Cron表達式解析器等工具類。

如果想單獨使用Scheduling,只需要引入spring-context這個依賴。但是現在流行使用SpringBoot,引入spring-boot-starter-web已經集成了spring-context,可以直接使用Scheduling模組,筆者編寫本文的時候(2020-03-14SpringBoot的最新版本為2.2.5.RELEASE,可以選用此版本進行源碼分析或者生產應用:

<properties>      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>      <maven.compiler.source>1.8</maven.compiler.source>      <maven.compiler.target>1.8</maven.compiler.target>      <spring.boot.version>2.2.5.RELEASE</spring.boot.version>  </properties>  <dependencyManagement>      <dependencies>          <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-dependencies</artifactId>              <version>${spring.boot.version}</version>              <type>pom</type>              <scope>import</scope>          </dependency>      </dependencies>  </dependencyManagement>  <dependencies>      <dependency>          <groupId>org.springframework.boot</groupId>          <artifactId>spring-boot-starter-web</artifactId>      </dependency>  </dependencies>  

開啟Scheduling模組支援只需要在某一個配置類中添加@EnableScheduling註解即可,一般為了明確模組的引入,建議在啟動類中使用此註解,如:

@EnableScheduling  @SpringBootApplication  public class App {        public static void main(String[] args) {          SpringApplication.run(App.class, args);      }  }  

Scheduling模組的工作流程

這個圖描述了Scheduling模組的工作流程,這裡分析一下非XML配置下的流程(右邊的分支):

  • 通過註解@EnableScheduling中的@Import引入了SchedulingConfiguration,而SchedulingConfiguration中配置了一個類型為ScheduledAnnotationBeanPostProcessor名稱為org.springframework.context.annotation.internalScheduledAnnotationProcessorBean,這裡有個常見的技巧,Spring內部載入的Bean一般會定義名稱為internalXXXBeanrole會定義為ROLE_INFRASTRUCTURE = 2
  • Bean後置處理器ScheduledAnnotationBeanPostProcessor會解析和處理每一個符合特定類型的Bean中的@Scheduled註解(注意@Scheduled只能使用在方法或者註解上),並且把解析完成的方法封裝為不同類型的Task實例,快取在ScheduledTaskRegistrar中的。
  • ScheduledAnnotationBeanPostProcessor中的鉤子介面方法afterSingletonsInstantiated()在所有單例初始化完成之後回調觸發,在此方法中設置了ScheduledTaskRegistrar中的任務調度器(TaskScheduler或者ScheduledExecutorService類型)實例,並且調用ScheduledTaskRegistrar#afterPropertiesSet()方法添加所有快取的Task實例到任務調度器中執行。

任務調度器

Scheduling模組支援TaskScheduler或者ScheduledExecutorService類型的任務調度器,而ScheduledExecutorService其實是JDK並發包java.util.concurrent的介面,一般實現類就是調度執行緒池ScheduledThreadPoolExecutor。實際上,ScheduledExecutorService類型的實例最終會通過適配器模式轉變為ConcurrentTaskScheduler,所以這裡只需要分析TaskScheduler類型的執行器。

  • ThreadPoolTaskScheduler:基於執行緒池實現的任務執行器,這個是最常用的實現,底層依賴於ScheduledThreadPoolExecutor實現。
  • ConcurrentTaskSchedulerTaskScheduler介面和ScheduledExecutorService介面的適配器,如果自定義一個ScheduledThreadPoolExecutor類型的Bean,那麼任務執行器就會適配為ConcurrentTaskScheduler
  • DefaultManagedTaskSchedulerJDK7引入的JSR-236的支援,可以通過JNDI配置此調度執行器,一般很少用到,底層也是依賴於ScheduledThreadPoolExecutor實現。

也就是說,內置的三個調度器類型底層都依賴於JUC調度執行緒池ScheduledThreadPoolExecutor。這裡分析一下頂層介面org.springframework.scheduling.TaskScheduler提供的功能(筆者已經把功能一致的default方法暫時移除):

// 省略一些功能一致的default方法  public interface TaskScheduler {         // 調度一個任務,通過觸發器實例指定觸發時間周期       ScheduledFuture<?> schedule(Runnable task, Trigger trigger);         // 指定起始時間調度一個任務 - 單次執行       ScheduledFuture<?> schedule(Runnable task, Date startTime);         // 指定固定頻率調度一個任務,period的單位是毫秒       ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);         // 指定起始時間和固定頻率調度一個任務,period的單位是毫秒       ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);         // 指定固定延遲間隔調度一個任務,delay的單位是毫秒       ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);        // 指定起始時間和固定延遲間隔調度一個任務,delay的單位是毫秒       ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);  }  

Task的分類

Scheduling模組中支援不同類型的任務,主要包括下面的3種(解析的優先順序也是如下):

  1. Cron表達式任務,支援通過Cron表達式配置執行的周期,對應的任務類型為org.springframework.scheduling.config.CronTask
  2. 固定延遲間隔任務,也就是上一輪執行完畢後間隔固定周期再執行本輪,依次類推,對應的的任務類型為org.springframework.scheduling.config.FixedDelayTask
  3. 固定頻率任務,基於固定的間隔時間執行,不會理會上一輪是否執行完畢本輪會照樣執行,對應的的任務類型為org.springframework.scheduling.config.FixedRateTask

關於這幾類Task,舉幾個簡單的例子。CronTask是通過cron表達式指定執行周期的,並且不支援延遲執行,可以使用特殊字元-禁用任務執行:

// 註解聲明式使用 - 每五秒執行一次,不支援initialDelay  @Scheduled(cron = "*/5 * * * * ?")  public void processTask(){    }    // 註解聲明式使用 - 禁止任務執行  @Scheduled(cron = "-")  public void processTask(){    }    // 編程式使用  public class Tasks {        static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");        public static void main(String[] args) throws Exception {          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();          taskScheduler.setPoolSize(10);          taskScheduler.initialize();          CronTask cronTask = new CronTask(() -> {              System.out.println(String.format("[%s] - CronTask觸發...", F.format(LocalDateTime.now())));          }, "*/5 * * * * ?");          taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());          Thread.sleep(Integer.MAX_VALUE);      }  }  // 某次執行輸出結果  [2020-03-16 01:07:00] - CronTask觸發...  [2020-03-16 01:07:05] - CronTask觸發...  ......  

FixedDelayTask需要配置延遲間隔值(fixedDelay或者fixedDelayString)和可選的起始延遲執行時間(initialDelay或者initialDelayString),這裡注意一點是fixedDelayStringinitialDelayString都支援從EmbeddedValueResolver(簡單理解為配置文件的屬性處理器)讀取和Duration(例如P2D就是parses as 2 days,表示86400秒)支援格式的解析:

// 註解聲明式使用 - 延遲一秒開始執行,延遲間隔為5秒  @Scheduled(fixedDelay = 5000, initialDelay = 1000)  public void process(){    }    // 註解聲明式使用 - spring-boot配置文件中process.task.fixedDelay=5000  process.task.initialDelay=1000  @Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}")  public void process(){    }    // 編程式使用  public class Tasks {        static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");        public static void main(String[] args) throws Exception {          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();          taskScheduler.setPoolSize(10);          taskScheduler.initialize();          FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {              System.out.println(String.format("[%s] - FixedDelayTask觸發...", F.format(LocalDateTime.now())));          }, 5000, 1000);          Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());          taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval());          Thread.sleep(Integer.MAX_VALUE);      }  }  // 某次執行輸出結果  [2020-03-16 01:06:12] - FixedDelayTask觸發...  [2020-03-16 01:06:17] - FixedDelayTask觸發...  ......  

FixedRateTask需要配置固定間隔值(fixedRate或者fixedRateString)和可選的起始延遲執行時間(initialDelay或者initialDelayString),這裡注意一點是fixedRateStringinitialDelayString都支援從EmbeddedValueResolver(簡單理解為配置文件的屬性處理器)讀取和Duration(例如P2D就是parses as 2 days,表示86400秒)支援格式的解析:

// 註解聲明式使用 - 延遲一秒開始執行,每隔5秒執行一次  @Scheduled(fixedRate = 5000, initialDelay = 1000)  public void processTask(){    }    // 註解聲明式使用 - spring-boot配置文件中process.task.fixedRate=5000  process.task.initialDelay=1000  @Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}")  public void process(){    }    // 編程式使用  public class Tasks {        static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");        public static void main(String[] args) throws Exception {          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();          taskScheduler.setPoolSize(10);          taskScheduler.initialize();          FixedRateTask fixedRateTask = new FixedRateTask(() -> {              System.out.println(String.format("[%s] - FixedRateTask觸發...", F.format(LocalDateTime.now())));          }, 5000, 1000);          Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());          taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval());          Thread.sleep(Integer.MAX_VALUE);      }  }  // 某次執行輸出結果  [2020-03-16 23:58:25] - FixedRateTask觸發...  [2020-03-16 23:58:30] - FixedRateTask觸發...  ......  

簡單分析核心流程的源程式碼

SpringBoot註解體系下,Scheduling模組的所有邏輯基本在ScheduledAnnotationBeanPostProcessorScheduledTaskRegistrar中。一般來說,一個類實現的介面代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor實現的介面:

  • ScheduledTaskHolder介面:返回Set<ScheduledTask>,表示持有的所有任務實例。
  • MergedBeanDefinitionPostProcessor介面:Bean定義合併時回調,預留空實現,暫時不做任何處理。
  • BeanPostProcessor介面:也就是MergedBeanDefinitionPostProcessor的父介面,Bean實例初始化前後分別回調,其中,後回調的postProcessAfterInitialization()方法就是用於解析@Scheduled和裝載ScheduledTask,需要重點關注此方法的邏輯。
  • DestructionAwareBeanPostProcessor介面:具體的Bean實例銷毀的時候回調,用於Bean實例銷毀的時候移除和取消對應的任務實例。
  • Ordered介面:用於Bean載入時候的排序,主要是改變ScheduledAnnotationBeanPostProcessorBeanPostProcessor執行鏈中的順序。
  • EmbeddedValueResolverAware介面:回調StringValueResolver實例,用於解析帶佔位符的環境變數屬性值。
  • BeanNameAware介面:回調BeanName
  • BeanFactoryAware介面:回調BeanFactory實例,具體是DefaultListableBeanFactory,也就是熟知的IOC容器。
  • ApplicationContextAware介面:回調ApplicationContext實例,也就是熟知的Spring上下文,它是IOC容器的門面,同時是事件廣播器、資源載入器的實現等等。
  • SmartInitializingSingleton介面:所有單例實例化完畢之後回調,作用是在持有的applicationContextNULL的時候開始調度所有載入完成的任務,這個鉤子介面十分有用,筆者常用它做一些資源初始化工作。
  • ApplicationListener介面:監聽Spring應用的事件,具體是ApplicationListener<ContextRefreshedEvent>,監聽上下文刷新的事件,如果事件中攜帶的ApplicationContext實例和ApplicationContextAware回調的ApplicationContext實例一致,那麼在此監聽回調方法中開始調度所有載入完成的任務,也就是在ScheduledAnnotationBeanPostProcessor這個類中,SmartInitializingSingleton介面的實現和ApplicationListener介面的實現邏輯是互斥的。
  • DisposableBean介面:當前Bean實例銷毀時候回調,也就是ScheduledAnnotationBeanPostProcessor自身被銷毀的時候回調,用於取消和清理所有的ScheduledTask

上面分析的鉤子介面在SpringBoot體系中可以按需使用,了解回調不同鉤子介面的回調時機,可以在特定時機完成達到理想的效果。

@Scheduled註解的解析集中在postProcessAfterInitialization()方法:

public Object postProcessAfterInitialization(Object bean, String beanName) {      // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三種類型的Bean      if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||              bean instanceof ScheduledExecutorService) {          // Ignore AOP infrastructure such as scoped proxies.          return bean;      }      // 獲取Bean的用戶態類型,例如Bean有可能被CGLIB增強,這個時候要取其父類      Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);      // nonAnnotatedClasses存放著不存在@Scheduled註解的類型,快取起來避免重複判斷它是否攜帶@Scheduled註解的方法      if (!this.nonAnnotatedClasses.contains(targetClass) &&              AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {          // 因為JDK8之後支援重複註解,因此獲取具體類型中Method -> @Scheduled的集合,也就是有可能一個方法使用多個@Scheduled註解,最終會封裝為多個Task          Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,                  (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {                      Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(                              method, Scheduled.class, Schedules.class);                      return (!scheduledMethods.isEmpty() ? scheduledMethods : null);                  });          // 解析到類型中不存在@Scheduled註解的方法添加到nonAnnotatedClasses快取          if (annotatedMethods.isEmpty()) {              this.nonAnnotatedClasses.add(targetClass);              if (logger.isTraceEnabled()) {                  logger.trace("No @Scheduled annotations found on bean class: " + targetClass);              }          }          else {              // Method -> @Scheduled的集合遍歷processScheduled()方法進行登記              annotatedMethods.forEach((method, scheduledMethods) ->                      scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));              if (logger.isTraceEnabled()) {                  logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +                          "': " + annotatedMethods);              }          }      }      return bean;  }  

processScheduled(Scheduled scheduled, Method method, Object bean)就是具體的註解解析和Task封裝的方法:

// Runnable適配器 - 用於反射調用具體的方法,觸發任務方法執行  public class ScheduledMethodRunnable implements Runnable {    	private final Object target;    	private final Method method;    	public ScheduledMethodRunnable(Object target, Method method) {  		this.target = target;  		this.method = method;  	}          ....// 省略無關程式碼            // 這個就是最終的任務方法執行的核心方法,抑制修飾符,然後反射調用  	@Override  	public void run() {  		try {  			ReflectionUtils.makeAccessible(this.method);  			this.method.invoke(this.target);  		}  		catch (InvocationTargetException ex) {  			ReflectionUtils.rethrowRuntimeException(ex.getTargetException());  		}  		catch (IllegalAccessException ex) {  			throw new UndeclaredThrowableException(ex);  		}  	}  }    // 通過方法所在Bean實例和方法封裝Runnable適配器ScheduledMethodRunnable實例  protected Runnable createRunnable(Object target, Method method) {  	Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");  	Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());  	return new ScheduledMethodRunnable(target, invocableMethod);  }      // 這個方法十分長,不過邏輯並不複雜,它只做了四件事  // 0. 解析@Scheduled中的initialDelay、initialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行  // 1. 優先解析@Scheduled中的cron屬性,封裝為CronTask,通過ScheduledTaskRegistrar進行快取  // 2. 解析@Scheduled中的fixedDelay、fixedDelayString屬性,封裝為FixedDelayTask,通過ScheduledTaskRegistrar進行快取  // 3. 解析@Scheduled中的fixedRate、fixedRateString屬性,封裝為FixedRateTask,通過ScheduledTaskRegistrar進行快取  protected void processScheduled(Scheduled scheduled, Method method, Object bean) {      try {          // 通過方法宿主Bean和目標方法封裝Runnable適配器ScheduledMethodRunnable實例          Runnable runnable = createRunnable(bean, method);          boolean processedSchedule = false;          String errorMessage =                  "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";            // 快取已經裝載的任務          Set<ScheduledTask> tasks = new LinkedHashSet<>(4);            // Determine initial delay          // 解析初始化延遲執行時間,initialDelayString支援佔位符配置,如果initialDelayString配置了,會覆蓋initialDelay的值          long initialDelay = scheduled.initialDelay();          String initialDelayString = scheduled.initialDelayString();          if (StringUtils.hasText(initialDelayString)) {              Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");              if (this.embeddedValueResolver != null) {                  initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);              }              if (StringUtils.hasLength(initialDelayString)) {                  try {                      initialDelay = parseDelayAsLong(initialDelayString);                  }                  catch (RuntimeException ex) {                      throw new IllegalArgumentException(                              "Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long");                  }              }          }            // Check cron expression          // 解析時區zone的值,支援支援佔位符配置,判斷cron是否存在,存在則裝載為CronTask          String cron = scheduled.cron();          if (StringUtils.hasText(cron)) {              String zone = scheduled.zone();              if (this.embeddedValueResolver != null) {                  cron = this.embeddedValueResolver.resolveStringValue(cron);                  zone = this.embeddedValueResolver.resolveStringValue(zone);              }              if (StringUtils.hasLength(cron)) {                  Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");                  processedSchedule = true;                  if (!Scheduled.CRON_DISABLED.equals(cron)) {                      TimeZone timeZone;                      if (StringUtils.hasText(zone)) {                          timeZone = StringUtils.parseTimeZoneString(zone);                      }                      else {                          timeZone = TimeZone.getDefault();                      }                      // 此方法雖然表面上是調度CronTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中                      // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中                      tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));                  }              }          }            // At this point we don't need to differentiate between initial delay set or not anymore          // 修正小於0的初始化延遲執行時間值為0          if (initialDelay < 0) {              initialDelay = 0;          }            // 解析fixedDelay和fixedDelayString,如果同時配置,fixedDelayString最終解析出來的整數值會覆蓋fixedDelay,封裝為FixedDelayTask          long fixedDelay = scheduled.fixedDelay();          if (fixedDelay >= 0) {              Assert.isTrue(!processedSchedule, errorMessage);              processedSchedule = true;              tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));          }          String fixedDelayString = scheduled.fixedDelayString();          if (StringUtils.hasText(fixedDelayString)) {              if (this.embeddedValueResolver != null) {                  fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);              }              if (StringUtils.hasLength(fixedDelayString)) {                  Assert.isTrue(!processedSchedule, errorMessage);                  processedSchedule = true;                  try {                      fixedDelay = parseDelayAsLong(fixedDelayString);                  }                  catch (RuntimeException ex) {                      throw new IllegalArgumentException(                              "Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long");                  }                  // 此方法雖然表面上是調度FixedDelayTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中                  // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中                  tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));              }          }            // 解析fixedRate和fixedRateString,如果同時配置,fixedRateString最終解析出來的整數值會覆蓋fixedRate,封裝為FixedRateTask          long fixedRate = scheduled.fixedRate();          if (fixedRate >= 0) {              Assert.isTrue(!processedSchedule, errorMessage);              processedSchedule = true;              tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));          }          String fixedRateString = scheduled.fixedRateString();          if (StringUtils.hasText(fixedRateString)) {              if (this.embeddedValueResolver != null) {                  fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);              }              if (StringUtils.hasLength(fixedRateString)) {                  Assert.isTrue(!processedSchedule, errorMessage);                  processedSchedule = true;                  try {                      fixedRate = parseDelayAsLong(fixedRateString);                  }                  catch (RuntimeException ex) {                      throw new IllegalArgumentException(                              "Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long");                  }                   // 此方法雖然表面上是調度FixedRateTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中                  // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中                  tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));              }          }            // Check whether we had any attribute set          Assert.isTrue(processedSchedule, errorMessage);            // Finally register the scheduled tasks          synchronized (this.scheduledTasks) {              // 註冊所有任務實例,這個映射Key為宿主Bean實例,Value為List<ScheduledTask>,後面用於調度所有註冊完成的任務              Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));              regTasks.addAll(tasks);          }      }      catch (IllegalArgumentException ex) {          throw new IllegalStateException(                  "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());      }  }  

總的來說,這個方法做了四件事:

  • 解析@Scheduled中的initialDelayinitialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行。
  • 優先解析@Scheduled中的cron屬性,封裝為CronTask,通過ScheduledTaskRegistrar進行快取。
  • 解析@Scheduled中的fixedDelayfixedDelayString屬性,封裝為FixedDelayTask,通過ScheduledTaskRegistrar進行快取。
  • 解析@Scheduled中的fixedRatefixedRateString屬性,封裝為FixedRateTask,通過ScheduledTaskRegistrar進行快取。

@Scheduled修飾的某個方法如果同時配置了cronfixedDelay|fixedDelayStringfixedRate|fixedRateString屬性,意味著此方法同時封裝為三種任務CronTaskFixedDelayTaskFixedRateTask。解析xxString值的使用,用到了EmbeddedValueResolver解析字元串的值,支援佔位符,這樣可以直接獲取環境配置中的佔位符屬性(基於SPEL的特性,甚至可以支援嵌套佔位符)。解析成功的所有任務實例存放在ScheduledAnnotationBeanPostProcessor的一個映射scheduledTasks中:

// 宿主Bean實例 -> 解析完成的任務實例Set  private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);  

解析和快取工作完成之後,接著分析最終激活所有調度任務的邏輯,見互斥方法afterSingletonsInstantiated()onApplicationEvent(),兩者中一定只有一個方法能夠調用finishRegistration()

// 所有單例實例化完畢之後回調  public void afterSingletonsInstantiated() {      // Remove resolved singleton classes from cache      this.nonAnnotatedClasses.clear();        if (this.applicationContext == null) {          // Not running in an ApplicationContext -> register tasks early...          finishRegistration();      }  }    // 上下文刷新完成之後回調  @Override  public void onApplicationEvent(ContextRefreshedEvent event) {      if (event.getApplicationContext() == this.applicationContext) {          // Running in an ApplicationContext -> register tasks this late...          // giving other ContextRefreshedEvent listeners a chance to perform          // their work at the same time (e.g. Spring Batch's job registration).          finishRegistration();      }  }    //  private void finishRegistration() {      // 如果持有的scheduler對象不為null則設置ScheduledTaskRegistrar中的任務調度器      if (this.scheduler != null) {          this.registrar.setScheduler(this.scheduler);      }      // 這個判斷一般會成立,得到的BeanFactory就是DefaultListableBeanFactory      if (this.beanFactory instanceof ListableBeanFactory) {          // 獲取所有的調度配置器SchedulingConfigurer實例,並且都回調configureTasks()方法,這個很重要,它是用戶動態裝載調取任務的擴展鉤子介面          Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);          List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());          // SchedulingConfigurer實例列表排序          AnnotationAwareOrderComparator.sort(configurers);          for (SchedulingConfigurer configurer : configurers) {              configurer.configureTasks(this.registrar);          }      }      // 下面這一大段邏輯都是為了從BeanFactory取出任務調度器實例,主要判斷TaskScheduler或者ScheduledExecutorService類型的Bean,包括嘗試通過類型或者名字獲取      // 獲取成功後設置到ScheduledTaskRegistrar中      if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {          Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");          try {              // Search for TaskScheduler bean...              this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));          }          catch (NoUniqueBeanDefinitionException ex) {              logger.trace("Could not find unique TaskScheduler bean", ex);              try {                  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));              }              catch (NoSuchBeanDefinitionException ex2) {                  if (logger.isInfoEnabled()) {                      logger.info("More than one TaskScheduler bean exists within the context, and " +                              "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +                              "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +                              "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +                              ex.getBeanNamesFound());                  }              }          }          catch (NoSuchBeanDefinitionException ex) {              logger.trace("Could not find default TaskScheduler bean", ex);              // Search for ScheduledExecutorService bean next...              try {                  this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));              }              catch (NoUniqueBeanDefinitionException ex2) {                  logger.trace("Could not find unique ScheduledExecutorService bean", ex2);                  try {                      this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));                  }                  catch (NoSuchBeanDefinitionException ex3) {                      if (logger.isInfoEnabled()) {                          logger.info("More than one ScheduledExecutorService bean exists within the context, and " +                                  "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +                                  "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +                                  "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +                                  ex2.getBeanNamesFound());                      }                  }              }              catch (NoSuchBeanDefinitionException ex2) {                  logger.trace("Could not find default ScheduledExecutorService bean", ex2);                  // Giving up -> falling back to default scheduler within the registrar...                  logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");              }          }      }      // 調用ScheduledTaskRegistrar的afterPropertiesSet()方法,裝載所有的調度任務      this.registrar.afterPropertiesSet();  }    public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {        // 省略其他程式碼.........        @Override      public void afterPropertiesSet() {          scheduleTasks();      }        // 裝載所有調度任務      @SuppressWarnings("deprecation")      protected void scheduleTasks() {          // 這裡注意一點,如果找不到任務調度器實例,那麼會用單個執行緒調度所有任務          if (this.taskScheduler == null) {              this.localExecutor = Executors.newSingleThreadScheduledExecutor();              this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);          }          // 調度所有裝載完畢的自定義觸發器的任務實例          if (this.triggerTasks != null) {              for (TriggerTask task : this.triggerTasks) {                  addScheduledTask(scheduleTriggerTask(task));              }          }          // 調度所有裝載完畢的CronTask          if (this.cronTasks != null) {              for (CronTask task : this.cronTasks) {                  addScheduledTask(scheduleCronTask(task));              }          }          // 調度所有裝載完畢的FixedRateTask          if (this.fixedRateTasks != null) {              for (IntervalTask task : this.fixedRateTasks) {                  addScheduledTask(scheduleFixedRateTask(task));              }          }          // 調度所有裝載完畢的FixedDelayTask          if (this.fixedDelayTasks != null) {              for (IntervalTask task : this.fixedDelayTasks) {                  addScheduledTask(scheduleFixedDelayTask(task));              }          }      }        // 省略其他程式碼.........  }  

注意兩個個問題:

  • 如果沒有配置TaskScheduler或者ScheduledExecutorService類型的Bean,那麼調度模組只會創建一個執行緒去調度所有裝載完畢的任務,如果任務比較多,執行密度比較大,很有可能會造成大量任務飢餓,表現為存在部分任務不會觸發調度的場景(這個是調度模組生產中經常遇到的故障,需要重點排查是否沒有設置TaskScheduler或者ScheduledExecutorService)。
  • SchedulingConfigurer是調度模組提供給使用的進行擴展的鉤子介面,用於在激活所有調度任務之前回調ScheduledTaskRegistrar實例,只要拿到ScheduledTaskRegistrar實例,我們就可以使用它註冊和裝載新的Task

調度任務動態裝載

Scheduling模組本身已經支援基於NamespaceHandler支援通過XML文件配置調度任務,但是筆者一直認為XML給人的感覺太"重",使用起來顯得太笨重,這裡打算擴展出JSON文件配置和基於JDBC數據源配置(也就是持久化任務,這裡選用MySQL)。根據前文的源碼分析,需要用到SchedulingConfigurer介面的實現,用於在所有調度任務觸發之前從外部添加自定義的調度任務。先定義調度任務的一些配置屬性類:

// 調度任務類型枚舉  @Getter  @RequiredArgsConstructor  public enum  ScheduleTaskType {        CRON("CRON"),        FIXED_DELAY("FIXED_DELAY"),        FIXED_RATE("FIXED_RATE"),        ;        private final String type;  }    // 調度任務配置,enable屬性為全局開關  @Data  public class ScheduleTaskProperties {        private Long version;      private Boolean enable;      private List<ScheduleTasks> tasks;  }    // 調度任務集合,筆者設計的時候採用一個宿主類中每個獨立方法都是一個任務實例的模式  @Data  public class ScheduleTasks {        // 這裡故意叫Klass代表Class,避免關鍵字衝突      private String taskHostKlass;      private Boolean enable;      private List<ScheduleTaskMethod> taskMethods;  }    // 調度任務方法 - enable為任務開關,沒有配置會被ScheduleTaskProperties或者ScheduleTasks中的enable覆蓋  @Data  public class ScheduleTaskMethod {        private Boolean enable;      private String taskDescription;      private String taskMethod;      // 時區,cron的計算需要用到      private String timeZone;      private String cronExpression;      private String intervalMilliseconds;      private String initialDelayMilliseconds;  }  

設計的時候,考慮到多個任務執行方法可以放在同一個宿主類,這樣可以方便同一種類的任務進行統一管理,如:

public class TaskHostClass {        public void task1() {        }        public void task2() {        }        ......        public void taskN() {        }  }  

細節方面,intervalMillisecondsinitialDelayMilliseconds的單位設計為毫秒,使用字元串形式,方便可以基於StringValueResolver解析配置文件中的屬性配置。添加一個抽象的SchedulingConfigurer

@Slf4j  public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware,          EmbeddedValueResolverAware {        @Getter      private StringValueResolver embeddedValueResolver;        private ConfigurableBeanFactory configurableBeanFactory;        private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList();        private final Set<String> tasksLoaded = Sets.newHashSet();        @Override      public void setBeanFactory(BeanFactory beanFactory) throws BeansException {          configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;      }        @Override      public void afterPropertiesSet() throws Exception {          internalTasks.clear();          internalTasks.addAll(loadTaskProperties());      }        @Override      public void setEmbeddedValueResolver(StringValueResolver resolver) {          embeddedValueResolver = resolver;      }        @Override      public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {          for (InternalTaskProperties task : internalTasks) {              try {                  synchronized (tasksLoaded) {                      String key = task.taskHostKlass() + "#" + task.taskMethod();                      // 避免重複載入                      if (!tasksLoaded.contains(key)) {                          if (task instanceof CronTaskProperties) {                              loadCronTask((CronTaskProperties) task, taskRegistrar);                          }                          if (task instanceof FixedDelayTaskProperties) {                              loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar);                          }                          if (task instanceof FixedRateTaskProperties) {                              loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar);                          }                          tasksLoaded.add(key);                      } else {                          log.info("調度任務已經裝載,任務宿主類:{},任務執行方法:{}", task.taskHostKlass(), task.taskMethod());                      }                  }              } catch (Exception e) {                  throw new IllegalStateException(String.format("載入調度任務異常,任務宿主類:%s,任務執行方法:%s",                          task.taskHostKlass(), task.taskMethod()), e);              }          }      }        private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception {          Class<?> klass = ClassUtils.forName(taskHostKlass, null);          Object target = configurableBeanFactory.getBean(klass);          Method method = ReflectionUtils.findMethod(klass, taskMethod);          if (null == method) {              throw new IllegalArgumentException(String.format("找不到目標方法,任務宿主類:%s,任務執行方法:%s", taskHostKlass, taskMethod));          }          Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());          return new ScheduledMethodRunnable(target, invocableMethod);      }        private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {          ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());          String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression());          if (null != cronExpression) {              String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone());              TimeZone timeZone;              if (null != timeZoneString) {                  timeZone = TimeZone.getTimeZone(timeZoneString);              } else {                  timeZone = TimeZone.getDefault();              }              CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone));              taskRegistrar.addCronTask(cronTask);              log.info("裝載CronTask[{}#{}()]成功,cron表達式:{},任務描述:{}", cronExpression, pops.taskMethod(),                      pops.cronExpression(), pops.taskDescription());          }      }        private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {          ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());          long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));          long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));          FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds);          taskRegistrar.addFixedDelayTask(fixedDelayTask);          log.info("裝載FixedDelayTask[{}#{}()]成功,固定延遲間隔:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),                  pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription());      }        private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {          ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());          long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));          long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));          FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds);          taskRegistrar.addFixedRateTask(fixedRateTask);          log.info("裝載FixedRateTask[{}#{}()]成功,固定執行頻率:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),                  pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription());      }        private long parseDelayAsLong(String value) {          if (null == value) {              return 0L;          }          if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {              return Duration.parse(value).toMillis();          }          return Long.parseLong(value);      }        private boolean isP(char ch) {          return (ch == 'P' || ch == 'p');      }        /**       * 載入任務配置,預留給子類實現       */      protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception;        interface InternalTaskProperties {            String taskHostKlass();            String taskMethod();            String taskDescription();      }        @Builder      protected static class CronTaskProperties implements InternalTaskProperties {            private String taskHostKlass;          private String taskMethod;          private String cronExpression;          private String taskDescription;          private String timeZone;            @Override          public String taskDescription() {              return taskDescription;          }            public String cronExpression() {              return cronExpression;          }            public String timeZone() {              return timeZone;          }            @Override          public String taskHostKlass() {              return taskHostKlass;          }            @Override          public String taskMethod() {              return taskMethod;          }      }        @Builder      protected static class FixedDelayTaskProperties implements InternalTaskProperties {            private String taskHostKlass;          private String taskMethod;          private String intervalMilliseconds;          private String initialDelayMilliseconds;          private String taskDescription;            @Override          public String taskDescription() {              return taskDescription;          }            public String initialDelayMilliseconds() {              return initialDelayMilliseconds;          }            public String intervalMilliseconds() {              return intervalMilliseconds;          }            @Override          public String taskHostKlass() {              return taskHostKlass;          }            @Override          public String taskMethod() {              return taskMethod;          }      }        @Builder      protected static class FixedRateTaskProperties implements InternalTaskProperties {            private String taskHostKlass;          private String taskMethod;          private String intervalMilliseconds;          private String initialDelayMilliseconds;          private String taskDescription;            @Override          public String taskDescription() {              return taskDescription;          }            public String initialDelayMilliseconds() {              return initialDelayMilliseconds;          }            public String intervalMilliseconds() {              return intervalMilliseconds;          }            @Override          public String taskHostKlass() {              return taskHostKlass;          }            @Override          public String taskMethod() {              return taskMethod;          }      }  }  

loadTaskProperties()方法用於載入任務配置,留給子類實現。

JSON配置

JSON配置文件的格式如下(類路徑下的scheduling/tasks.json文件):

{    "version": 1,    "tasks": [      {        "taskKlass": "club.throwable.schedule.Tasks",        "taskMethods": [          {            "taskType": "FIXED_DELAY",            "taskDescription": "processTask1任務",            "taskMethod": "processTask1",            "intervalMilliseconds": "5000"          }        ]      }    ]  }  

每個層級都有一個enable屬性,默認為true,只有強制指定為false的時候才不會裝載對應的任務調度方法。這裡就是簡單繼承AbstractSchedulingConfigurer,實現從類路徑載入配置的邏輯,定義JsonSchedulingConfigurer

public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {        // 這裡把默認的任務配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通過配置項scheduling.json.config.location進行覆蓋      @Value("${scheduling.json.config.location:scheduling/tasks.json}")      private String location;        @Autowired      private ObjectMapper objectMapper;        @Override      protected List<InternalTaskProperties> loadTaskProperties() throws Exception {          ClassPathResource resource = new ClassPathResource(location);          String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);          ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class);          if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) {              return Lists.newArrayList();          }          List<InternalTaskProperties> target = Lists.newArrayList();          for (ScheduleTasks tasks : properties.getTasks()) {              if (null != tasks) {                  List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods();                  if (null != taskMethods) {                      for (ScheduleTaskMethod taskMethod : taskMethods) {                          if (!Boolean.FALSE.equals(taskMethod.getEnable())) {                              if (ScheduleTaskType.CRON == taskMethod.getTaskType()) {                                  target.add(CronTaskProperties.builder()                                          .taskMethod(taskMethod.getTaskMethod())                                          .cronExpression(taskMethod.getCronExpression())                                          .timeZone(taskMethod.getTimeZone())                                          .taskDescription(taskMethod.getTaskDescription())                                          .taskHostKlass(tasks.getTaskKlass())                                          .build());                              }                              if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) {                                  target.add(FixedDelayTaskProperties.builder()                                          .taskMethod(taskMethod.getTaskMethod())                                          .intervalMilliseconds(taskMethod.getIntervalMilliseconds())                                          .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())                                          .taskDescription(taskMethod.getTaskDescription())                                          .taskHostKlass(tasks.getTaskKlass())                                          .build());                              }                              if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) {                                  target.add(FixedRateTaskProperties.builder()                                          .taskMethod(taskMethod.getTaskMethod())                                          .intervalMilliseconds(taskMethod.getIntervalMilliseconds())                                          .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())                                          .taskDescription(taskMethod.getTaskDescription())                                          .taskHostKlass(tasks.getTaskKlass())                                          .build());                              }                          }                      }                  }              }          }          return target;      }  }  

添加一個配置類和任務類:

@Configuration  public class SchedulingAutoConfiguration {        @Bean      public JsonSchedulingConfigurer jsonSchedulingConfigurer(){          return new JsonSchedulingConfigurer();      }  }    // club.throwable.schedule.Tasks  @Slf4j  @Component  public class Tasks {        public void processTask1() {          log.info("processTask1觸發..........");      }  }  

啟動SpringBoot應用,某次執行的部分日誌如下:

2020-03-22 16:24:17.248  INFO 22836 --- [           main] c.t.s.AbstractSchedulingConfigurer       : 裝載FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延遲間隔:5000 ms,初始延遲執行時間:0 ms,任務描述:processTask1任務  2020-03-22 16:24:22.275  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........  2020-03-22 16:24:27.277  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........  2020-03-22 16:24:32.279  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........  ......  

這裡有些細節沒有完善,例如配置文件參數的一些非空判斷、配置值是否合法等等校驗邏輯沒有做,如果要設計成一個工業級的類庫,這些方面必須要考慮。

JDBC數據源配置

JDBC數據源這裡用MySQL舉例說明,先建一個調度任務配置表scheduling_task

CREATE TABLE `schedule_task`  (      id                         BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',      edit_time                  DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',      create_time                DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',      editor                     VARCHAR(32)     NOT NULL DEFAULT 'admin' COMMENT '修改者',      creator                    VARCHAR(32)     NOT NULL DEFAULT 'admin' COMMENT '創建者',      deleted                    BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '軟刪除標識',      task_host_class            VARCHAR(256)    NOT NULL COMMENT '任務宿主類全類名',      task_method                VARCHAR(128)    NOT NULL COMMENT '任務執行方法名',      task_type                  VARCHAR(16)     NOT NULL COMMENT '任務類型',      task_description           VARCHAR(64)     NOT NULL COMMENT '任務描述',      cron_expression            VARCHAR(128) COMMENT 'cron表達式',      time_zone                  VARCHAR(32) COMMENT '時區',      interval_milliseconds      BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '執行間隔時間',      initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延遲執行時間',      UNIQUE uniq_class_method (task_host_class, task_method)  ) COMMENT '調度任務配置表';  

其實具體的做法和JSON配置差不多,先引入spring-boot-starter-jdbc,接著編寫MysqlSchedulingConfigurer

// DAO  @RequiredArgsConstructor  public class MysqlScheduleTaskDao {        private final JdbcTemplate jdbcTemplate;        private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> {          List<ScheduleTask> tasks = Lists.newArrayList();          while (r.next()) {              ScheduleTask task = new ScheduleTask();              tasks.add(task);              task.setId(r.getLong("id"));              task.setCronExpression(r.getString("cron_expression"));              task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds"));              task.setIntervalMilliseconds(r.getLong("interval_milliseconds"));              task.setTimeZone(r.getString("time_zone"));              task.setTaskDescription(r.getString("task_description"));              task.setTaskHostClass(r.getString("task_host_class"));              task.setTaskMethod(r.getString("task_method"));              task.setTaskType(r.getString("task_type"));          }          return tasks;      };        public List<ScheduleTask> selectAllTasks() {          return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI);      }  }    // MysqlSchedulingConfigurer  @RequiredArgsConstructor  public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer {        private final MysqlScheduleTaskDao mysqlScheduleTaskDao;        @Override      protected List<InternalTaskProperties> loadTaskProperties() throws Exception {          List<InternalTaskProperties> target = Lists.newArrayList();          List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks();          if (!tasks.isEmpty()) {              for (ScheduleTask task : tasks) {                  ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType());                  if (ScheduleTaskType.CRON == scheduleTaskType) {                      target.add(CronTaskProperties.builder()                              .taskMethod(task.getTaskMethod())                              .cronExpression(task.getCronExpression())                              .timeZone(task.getTimeZone())                              .taskDescription(task.getTaskDescription())                              .taskHostKlass(task.getTaskHostClass())                              .build());                  }                  if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) {                      target.add(FixedDelayTaskProperties.builder()                              .taskMethod(task.getTaskMethod())                              .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))                              .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))                              .taskDescription(task.getTaskDescription())                              .taskHostKlass(task.getTaskHostClass())                              .build());                  }                  if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) {                      target.add(FixedRateTaskProperties.builder()                              .taskMethod(task.getTaskMethod())                              .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))                              .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))                              .taskDescription(task.getTaskDescription())                              .taskHostKlass(task.getTaskHostClass())                              .build());                  }              }          }          return target;      }  }  

記得引入spring-boot-starter-jdbcmysql-connector-java並且激活MysqlSchedulingConfigurer配置。插入一條記錄:

INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '測試任務', NULL, NULL, 10000, 5000);  

然後啟動服務,某次執行的輸出:

2020-03-30 23:47:27.376  INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........  2020-03-30 23:47:37.378  INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........  ....  

混合配置

有些時候我們希望可以JSON配置和JDBC數據源配置進行混合配置,或者動態二選一以便靈活應對多環境的場景(例如要在開發環境使用JSON配置而測試和生產環境使用JDBC數據源配置,甚至可以將JDBC數據源配置覆蓋JSON配置,這樣能保證總是傾向於使用JDBC數據源配置),這樣需要對前面兩小節的實現加多一層抽象。這裡的設計可以參考SpringMVC中的控制器參數解析器的設計,具體是HandlerMethodArgumentResolverComposite,其實道理是相同的。

其他注意事項

在生產實踐中,暫時不考慮生成任務執行日誌和細粒度的監控,著重做了兩件事:

  • 並發控制,(多服務節點下)禁止任務並發執行。
  • 跟蹤任務的日誌軌跡。

解決並發執行問題

一般情況下,我們需要禁止任務並發執行,考慮引入Redisson提供的分散式鎖:

// 引入依賴  <dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson</artifactId>      <version>最新版本</version>  </dependency>    // 配置類  @Configuration  @AutoConfigureAfter(RedisAutoConfiguration.class)  public class RedissonAutoConfiguration {        @Autowired      private RedisProperties redisProperties;        @Bean(destroyMethod = "shutdown")      public RedissonClient redissonClient() {          Config config = new Config();          SingleServerConfig singleServerConfig = config.useSingleServer();          singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort()));          if (redisProperties.getDatabase() > 0) {              singleServerConfig.setDatabase(redisProperties.getDatabase());          }          if (null != redisProperties.getPassword()) {              singleServerConfig.setPassword(redisProperties.getPassword());          }          return Redisson.create(config);      }  }    // 分散式鎖工廠  @Component  public class DistributedLockFactory {        private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:";        @Autowired      private RedissonClient redissonClient;        public DistributedLock provideDistributedLock(String lockKey) {          String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey;          return new RedissonDistributedLock(redissonClient, lockPath);      }  }  

這裡考慮到項目依賴了spring-boot-starter-redis,直接復用了它的配置屬性類(RedissonDistributedLockRLock的輕量級封裝,見附錄)。使用方式如下:

@Autowired  private DistributedLockFactory distributedLockFactory;    public void task1() {      DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey);      // 等待時間為20秒,持有鎖的最大時間為60秒      boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS);      if (tryLock) {           try {              // 業務邏輯           }finally {              lock.unlock();          }      }  }  

引入MDC跟蹤任務的Trace

MDC其實是Mapped Diagnostic Context的縮寫,也就是映射診斷上下文,一般用於日誌框架裡面同一個執行緒執行過程的跟蹤(例如一個執行緒跑過了多個方法,各個方法裡面都列印了日誌,那麼通過MDC可以對整個調用鏈通過一個唯一標識關聯起來),例如這裡選用slf4j提供的org.slf4j.MDC

@Component  public class MappedDiagnosticContextAssistant {        /**       * 在MDC中執行       *       * @param runnable runnable       */      public void processInMappedDiagnosticContext(Runnable runnable) {          String uuid = UUID.randomUUID().toString();          MDC.put("TRACE_ID", uuid);          try {              runnable.run();          } finally {              MDC.remove("TRACE_ID");          }      }  }  

任務執行的時候需要包裹成一個Runnale實例:

public void task1() {      mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {          StopWatch watch = new StopWatch();          watch.start();          log.info("開始執行......");          // 業務邏輯          watch.stop();          log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());      });  }  

結合前面一節提到的並發控制,那麼最終執行的任務方法如下:

public void task1() {      mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {          StopWatch watch = new StopWatch();          watch.start();          log.info("開始執行......");          scheduleTaskAssistant.executeInDistributedLock("任務分散式鎖KEY", () -> {              // 真實的業務邏輯          });          watch.stop();          log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());      });  }  

這裡的方法看起來比較彆扭,其實可以直接在任務裝載的時候基於分散式鎖和MDC進行封裝,方式類似於ScheduledMethodRunnable,這裡不做展開,因為要詳細展開篇幅可能比較大(ScheduleTaskAssistant見附錄)。

小結

其實spring-context整個調度模組完全依賴於TaskScheduler實現,更底層的是JUC調度執行緒池ScheduledThreadPoolExecutor。如果想要從底層原理理解整個調度模組的運行原理,那麼就一定要分析ScheduledThreadPoolExecutor的實現。整篇文章大致介紹了spring-context調度模組的載入調度任務的流程,並且基於擴展介面SchedulingConfigurer擴展出多種自定義配置調度任務的方式,但是考慮到需要在生產環境中運行,那麼免不了需要考慮監控、並發控制、日誌跟蹤等等的功能,但是這樣就會使得整個調度模組變重,慢慢地就會發現,這個輪子越造越大,越有主流調度框架Quartz或者Easy Scheduler的影子。筆者認為,軟體工程,有些時候要權衡取捨,該拋棄的就應該果斷拋棄,否則總是負重而行,還能走多遠?

參考資料:

  • SpringBoot源碼

附錄

ScheduleTaskAssistant

@RequiredArgsConstructor  @Component  public class ScheduleTaskAssistant {        /**       * 5秒       */      public static final long DEFAULT_WAIT_TIME = 5L;        /**       * 30秒       */      public static final long DEFAULT_LEAVE_TIME = 30L;        private final DistributedLockFactory distributedLockFactory;        /**       * 在分散式鎖中執行       *       * @param waitTime  鎖等著時間       * @param leaveTime 鎖持有時間       * @param timeUnit  時間單位       * @param lockKey   鎖的key       * @param task      任務對象       */      public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) {          DistributedLock lock = distributedLockFactory.dl(lockKey);          boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit);          if (tryLock) {              try {                  long waitTimeMillis = timeUnit.toMillis(waitTime);                  long start = System.currentTimeMillis();                  task.run();                  long end = System.currentTimeMillis();                  long cost = end - start;                  // 預防鎖過早釋放                  if (cost < waitTimeMillis) {                      Sleeper.X.sleep(waitTimeMillis - cost);                  }              } finally {                  lock.unlock();              }          }      }        /**       * 在分散式鎖中執行 - 使用默認時間       *       * @param lockKey 鎖的key       * @param task    任務對象       */      public void executeInDistributedLock(String lockKey, Runnable task) {          executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task);      }  }  

RedissonDistributedLock

@Slf4j  public class RedissonDistributedLock implements DistributedLock {        private final RedissonClient redissonClient;      private final String lockPath;      private final RLock internalLock;        RedissonDistributedLock(RedissonClient redissonClient, String lockPath) {          this.redissonClient = redissonClient;          this.lockPath = lockPath;          this.internalLock = initInternalLock();      }        private RLock initInternalLock() {          return redissonClient.getLock(lockPath);      }        @Override      public boolean isLock() {          return internalLock.isLocked();      }        @Override      public boolean isHeldByCurrentThread() {          return internalLock.isHeldByCurrentThread();      }        @Override      public void lock(long leaseTime, TimeUnit unit) {          internalLock.lock(leaseTime, unit);      }        @Override      public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {          try {              return internalLock.tryLock(waitTime, leaseTime, unit);          } catch (InterruptedException e) {              Thread.currentThread().interrupt();              throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e);          }      }        @Override      public void unlock() {          try {              internalLock.unlock();          } catch (IllegalMonitorStateException ex) {              log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex);          }      }  }  

(本文完 c-7-d e-a-20200324 真是有點滑稽,筆者發現任務持久化最好還是用現成的工業級調度器,於是基於Quartz做了輕量級封裝,寫了個後台管理介面,且聽下回分解)