通過源碼理解Spring中@Scheduled的實現原理並且實現調度任務動態裝載
- 2020 年 4 月 2 日
- 筆記
前提
最近的新項目和數據同步相關,有定時調度的需求。之前一直有使用過Quartz
、XXL-Job
、Easy Scheduler
等調度框架,後來越發覺得這些框架太重量級了,於是想到了Spring
內置的Scheduling
模組。而原生的Scheduling
模組只是記憶體態的調度模組,不支援任務的持久化或者配置(配置任務通過@Scheduled
註解進行硬編碼,不能抽離到類之外),因此考慮理解Scheduling
模組的底層原理,並且基於此造一個簡單的輪子,使之支援調度任務配置:通過配置文件或者JDBC
數據源。
Scheduling模組
Scheduling
模組是spring-context
依賴下的一個包org.springframework.scheduling
:
這個模組的類並不多,有四個子包:
- 頂層包的定義了一些通用介面和異常。
org.springframework.scheduling.annotation
:定義了調度、非同步任務相關的註解和解析類,常用的註解如@Async
、@EnableAsync
、@EnableScheduling
和@Scheduled
。org.springframework.scheduling.concurrent
:定義了調度任務執行器和相對應的FactoryBean
。org.springframework.scheduling.config
:定義了配置解析、任務具體實現類、調度任務XML
配置文件解析相關的解析類。org.springframework.scheduling.support
:定義了反射支援類、Cron
表達式解析器等工具類。
如果想單獨使用Scheduling
,只需要引入spring-context
這個依賴。但是現在流行使用SpringBoot
,引入spring-boot-starter-web
已經集成了spring-context
,可以直接使用Scheduling
模組,筆者編寫本文的時候(2020-03-14
)SpringBoot
的最新版本為2.2.5.RELEASE
,可以選用此版本進行源碼分析或者生產應用:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>2.2.5.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
開啟Scheduling
模組支援只需要在某一個配置類中添加@EnableScheduling
註解即可,一般為了明確模組的引入,建議在啟動類中使用此註解,如:
@EnableScheduling @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }
Scheduling模組的工作流程
這個圖描述了Scheduling
模組的工作流程,這裡分析一下非XML
配置下的流程(右邊的分支):
- 通過註解
@EnableScheduling
中的@Import
引入了SchedulingConfiguration
,而SchedulingConfiguration
中配置了一個類型為ScheduledAnnotationBeanPostProcessor
名稱為org.springframework.context.annotation.internalScheduledAnnotationProcessor
的Bean
,這裡有個常見的技巧,Spring
內部載入的Bean
一般會定義名稱為internalXXX
,Bean
的role
會定義為ROLE_INFRASTRUCTURE = 2
。 Bean
後置處理器ScheduledAnnotationBeanPostProcessor
會解析和處理每一個符合特定類型的Bean
中的@Scheduled
註解(注意@Scheduled
只能使用在方法或者註解上),並且把解析完成的方法封裝為不同類型的Task
實例,快取在ScheduledTaskRegistrar
中的。ScheduledAnnotationBeanPostProcessor
中的鉤子介面方法afterSingletonsInstantiated()
在所有單例初始化完成之後回調觸發,在此方法中設置了ScheduledTaskRegistrar
中的任務調度器(TaskScheduler
或者ScheduledExecutorService
類型)實例,並且調用ScheduledTaskRegistrar#afterPropertiesSet()
方法添加所有快取的Task
實例到任務調度器中執行。
任務調度器
Scheduling
模組支援TaskScheduler
或者ScheduledExecutorService
類型的任務調度器,而ScheduledExecutorService
其實是JDK
並發包java.util.concurrent
的介面,一般實現類就是調度執行緒池ScheduledThreadPoolExecutor
。實際上,ScheduledExecutorService
類型的實例最終會通過適配器模式轉變為ConcurrentTaskScheduler
,所以這裡只需要分析TaskScheduler
類型的執行器。
ThreadPoolTaskScheduler
:基於執行緒池實現的任務執行器,這個是最常用的實現,底層依賴於ScheduledThreadPoolExecutor
實現。ConcurrentTaskScheduler
:TaskScheduler
介面和ScheduledExecutorService
介面的適配器,如果自定義一個ScheduledThreadPoolExecutor
類型的Bean
,那麼任務執行器就會適配為ConcurrentTaskScheduler
。DefaultManagedTaskScheduler
:JDK7
引入的JSR-236
的支援,可以通過JNDI
配置此調度執行器,一般很少用到,底層也是依賴於ScheduledThreadPoolExecutor
實現。
也就是說,內置的三個調度器類型底層都依賴於JUC
調度執行緒池ScheduledThreadPoolExecutor
。這裡分析一下頂層介面org.springframework.scheduling.TaskScheduler
提供的功能(筆者已經把功能一致的default
方法暫時移除):
// 省略一些功能一致的default方法 public interface TaskScheduler { // 調度一個任務,通過觸發器實例指定觸發時間周期 ScheduledFuture<?> schedule(Runnable task, Trigger trigger); // 指定起始時間調度一個任務 - 單次執行 ScheduledFuture<?> schedule(Runnable task, Date startTime); // 指定固定頻率調度一個任務,period的單位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period); // 指定起始時間和固定頻率調度一個任務,period的單位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); // 指定固定延遲間隔調度一個任務,delay的單位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay); // 指定起始時間和固定延遲間隔調度一個任務,delay的單位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay); }
Task的分類
Scheduling
模組中支援不同類型的任務,主要包括下面的3種(解析的優先順序也是如下):
Cron
表達式任務,支援通過Cron
表達式配置執行的周期,對應的任務類型為org.springframework.scheduling.config.CronTask
。- 固定延遲間隔任務,也就是上一輪執行完畢後間隔固定周期再執行本輪,依次類推,對應的的任務類型為
org.springframework.scheduling.config.FixedDelayTask
。 - 固定頻率任務,基於固定的間隔時間執行,不會理會上一輪是否執行完畢本輪會照樣執行,對應的的任務類型為
org.springframework.scheduling.config.FixedRateTask
。
關於這幾類Task
,舉幾個簡單的例子。CronTask
是通過cron
表達式指定執行周期的,並且不支援延遲執行,可以使用特殊字元-
禁用任務執行:
// 註解聲明式使用 - 每五秒執行一次,不支援initialDelay @Scheduled(cron = "*/5 * * * * ?") public void processTask(){ } // 註解聲明式使用 - 禁止任務執行 @Scheduled(cron = "-") public void processTask(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); CronTask cronTask = new CronTask(() -> { System.out.println(String.format("[%s] - CronTask觸發...", F.format(LocalDateTime.now()))); }, "*/5 * * * * ?"); taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 01:07:00] - CronTask觸發... [2020-03-16 01:07:05] - CronTask觸發... ......
FixedDelayTask
需要配置延遲間隔值(fixedDelay
或者fixedDelayString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這裡注意一點是fixedDelayString
和initialDelayString
都支援從EmbeddedValueResolver
(簡單理解為配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支援格式的解析:
// 註解聲明式使用 - 延遲一秒開始執行,延遲間隔為5秒 @Scheduled(fixedDelay = 5000, initialDelay = 1000) public void process(){ } // 註解聲明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000 @Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> { System.out.println(String.format("[%s] - FixedDelayTask觸發...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay()); taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 01:06:12] - FixedDelayTask觸發... [2020-03-16 01:06:17] - FixedDelayTask觸發... ......
FixedRateTask
需要配置固定間隔值(fixedRate
或者fixedRateString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這裡注意一點是fixedRateString
和initialDelayString
都支援從EmbeddedValueResolver
(簡單理解為配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支援格式的解析:
// 註解聲明式使用 - 延遲一秒開始執行,每隔5秒執行一次 @Scheduled(fixedRate = 5000, initialDelay = 1000) public void processTask(){ } // 註解聲明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000 @Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedRateTask fixedRateTask = new FixedRateTask(() -> { System.out.println(String.format("[%s] - FixedRateTask觸發...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay()); taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 23:58:25] - FixedRateTask觸發... [2020-03-16 23:58:30] - FixedRateTask觸發... ......
簡單分析核心流程的源程式碼
在SpringBoot
註解體系下,Scheduling
模組的所有邏輯基本在ScheduledAnnotationBeanPostProcessor
和ScheduledTaskRegistrar
中。一般來說,一個類實現的介面代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor
實現的介面:
ScheduledTaskHolder
介面:返回Set<ScheduledTask>
,表示持有的所有任務實例。MergedBeanDefinitionPostProcessor
介面:Bean
定義合併時回調,預留空實現,暫時不做任何處理。BeanPostProcessor
介面:也就是MergedBeanDefinitionPostProcessor
的父介面,Bean
實例初始化前後分別回調,其中,後回調的postProcessAfterInitialization()
方法就是用於解析@Scheduled
和裝載ScheduledTask
,需要重點關注此方法的邏輯。DestructionAwareBeanPostProcessor
介面:具體的Bean
實例銷毀的時候回調,用於Bean
實例銷毀的時候移除和取消對應的任務實例。Ordered
介面:用於Bean
載入時候的排序,主要是改變ScheduledAnnotationBeanPostProcessor
在BeanPostProcessor
執行鏈中的順序。EmbeddedValueResolverAware
介面:回調StringValueResolver
實例,用於解析帶佔位符的環境變數屬性值。BeanNameAware
介面:回調BeanName
。BeanFactoryAware
介面:回調BeanFactory
實例,具體是DefaultListableBeanFactory
,也就是熟知的IOC
容器。ApplicationContextAware
介面:回調ApplicationContext
實例,也就是熟知的Spring
上下文,它是IOC
容器的門面,同時是事件廣播器、資源載入器的實現等等。SmartInitializingSingleton
介面:所有單例實例化完畢之後回調,作用是在持有的applicationContext
為NULL
的時候開始調度所有載入完成的任務,這個鉤子介面十分有用,筆者常用它做一些資源初始化工作。ApplicationListener
介面:監聽Spring
應用的事件,具體是ApplicationListener<ContextRefreshedEvent>
,監聽上下文刷新的事件,如果事件中攜帶的ApplicationContext
實例和ApplicationContextAware
回調的ApplicationContext
實例一致,那麼在此監聽回調方法中開始調度所有載入完成的任務,也就是在ScheduledAnnotationBeanPostProcessor
這個類中,SmartInitializingSingleton
介面的實現和ApplicationListener
介面的實現邏輯是互斥的。DisposableBean
介面:當前Bean
實例銷毀時候回調,也就是ScheduledAnnotationBeanPostProcessor
自身被銷毀的時候回調,用於取消和清理所有的ScheduledTask
。
上面分析的鉤子介面在SpringBoot體系中可以按需使用,了解回調不同鉤子介面的回調時機,可以在特定時機完成達到理想的效果。
@Scheduled
註解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization(Object bean, String beanName) { // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三種類型的Bean if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } // 獲取Bean的用戶態類型,例如Bean有可能被CGLIB增強,這個時候要取其父類 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); // nonAnnotatedClasses存放著不存在@Scheduled註解的類型,快取起來避免重複判斷它是否攜帶@Scheduled註解的方法 if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { // 因為JDK8之後支援重複註解,因此獲取具體類型中Method -> @Scheduled的集合,也就是有可能一個方法使用多個@Scheduled註解,最終會封裝為多個Task Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); // 解析到類型中不存在@Scheduled註解的方法添加到nonAnnotatedClasses快取 if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Method -> @Scheduled的集合遍歷processScheduled()方法進行登記 annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
processScheduled(Scheduled scheduled, Method method, Object bean)
就是具體的註解解析和Task
封裝的方法:
// Runnable適配器 - 用於反射調用具體的方法,觸發任務方法執行 public class ScheduledMethodRunnable implements Runnable { private final Object target; private final Method method; public ScheduledMethodRunnable(Object target, Method method) { this.target = target; this.method = method; } ....// 省略無關程式碼 // 這個就是最終的任務方法執行的核心方法,抑制修飾符,然後反射調用 @Override public void run() { try { ReflectionUtils.makeAccessible(this.method); this.method.invoke(this.target); } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } } } // 通過方法所在Bean實例和方法封裝Runnable適配器ScheduledMethodRunnable實例 protected Runnable createRunnable(Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } // 這個方法十分長,不過邏輯並不複雜,它只做了四件事 // 0. 解析@Scheduled中的initialDelay、initialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行 // 1. 優先解析@Scheduled中的cron屬性,封裝為CronTask,通過ScheduledTaskRegistrar進行快取 // 2. 解析@Scheduled中的fixedDelay、fixedDelayString屬性,封裝為FixedDelayTask,通過ScheduledTaskRegistrar進行快取 // 3. 解析@Scheduled中的fixedRate、fixedRateString屬性,封裝為FixedRateTask,通過ScheduledTaskRegistrar進行快取 protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 通過方法宿主Bean和目標方法封裝Runnable適配器ScheduledMethodRunnable實例 Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; // 快取已經裝載的任務 Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 解析初始化延遲執行時間,initialDelayString支援佔位符配置,如果initialDelayString配置了,會覆蓋initialDelay的值 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long"); } } } // Check cron expression // 解析時區zone的值,支援支援佔位符配置,判斷cron是否存在,存在則裝載為CronTask String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 此方法雖然表面上是調度CronTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中 // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore // 修正小於0的初始化延遲執行時間值為0 if (initialDelay < 0) { initialDelay = 0; } // 解析fixedDelay和fixedDelayString,如果同時配置,fixedDelayString最終解析出來的整數值會覆蓋fixedDelay,封裝為FixedDelayTask long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long"); } // 此方法雖然表面上是調度FixedDelayTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中 // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // 解析fixedRate和fixedRateString,如果同時配置,fixedRateString最終解析出來的整數值會覆蓋fixedRate,封裝為FixedRateTask long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long"); } // 此方法雖然表面上是調度FixedRateTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的快取中 // 返回的任務實例添加到宿主Bean的快取中,然後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { // 註冊所有任務實例,這個映射Key為宿主Bean實例,Value為List<ScheduledTask>,後面用於調度所有註冊完成的任務 Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
總的來說,這個方法做了四件事:
- 解析
@Scheduled
中的initialDelay
、initialDelayString
屬性,適用於FixedDelayTask
或者FixedRateTask
的延遲執行。 - 優先解析
@Scheduled
中的cron
屬性,封裝為CronTask
,通過ScheduledTaskRegistrar
進行快取。 - 解析
@Scheduled
中的fixedDelay
、fixedDelayString
屬性,封裝為FixedDelayTask
,通過ScheduledTaskRegistrar
進行快取。 - 解析
@Scheduled
中的fixedRate
、fixedRateString
屬性,封裝為FixedRateTask
,通過ScheduledTaskRegistrar
進行快取。
@Scheduled
修飾的某個方法如果同時配置了cron
、fixedDelay|fixedDelayString
和fixedRate|fixedRateString
屬性,意味著此方法同時封裝為三種任務CronTask
、FixedDelayTask
和FixedRateTask
。解析xxString
值的使用,用到了EmbeddedValueResolver
解析字元串的值,支援佔位符,這樣可以直接獲取環境配置中的佔位符屬性(基於SPEL
的特性,甚至可以支援嵌套佔位符)。解析成功的所有任務實例存放在ScheduledAnnotationBeanPostProcessor
的一個映射scheduledTasks
中:
// 宿主Bean實例 -> 解析完成的任務實例Set private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
解析和快取工作完成之後,接著分析最終激活所有調度任務的邏輯,見互斥方法afterSingletonsInstantiated()
和onApplicationEvent()
,兩者中一定只有一個方法能夠調用finishRegistration()
:
// 所有單例實例化完畢之後回調 public void afterSingletonsInstantiated() { // Remove resolved singleton classes from cache this.nonAnnotatedClasses.clear(); if (this.applicationContext == null) { // Not running in an ApplicationContext -> register tasks early... finishRegistration(); } } // 上下文刷新完成之後回調 @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late... // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). finishRegistration(); } } // private void finishRegistration() { // 如果持有的scheduler對象不為null則設置ScheduledTaskRegistrar中的任務調度器 if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } // 這個判斷一般會成立,得到的BeanFactory就是DefaultListableBeanFactory if (this.beanFactory instanceof ListableBeanFactory) { // 獲取所有的調度配置器SchedulingConfigurer實例,並且都回調configureTasks()方法,這個很重要,它是用戶動態裝載調取任務的擴展鉤子介面 Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); // SchedulingConfigurer實例列表排序 AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } // 下面這一大段邏輯都是為了從BeanFactory取出任務調度器實例,主要判斷TaskScheduler或者ScheduledExecutorService類型的Bean,包括嘗試通過類型或者名字獲取 // 獲取成功後設置到ScheduledTaskRegistrar中 if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { logger.trace("Could not find unique TaskScheduler bean", ex); try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.trace("Could not find default TaskScheduler bean", ex); // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { logger.trace("Could not find unique ScheduledExecutorService bean", ex2); try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { logger.trace("Could not find default ScheduledExecutorService bean", ex2); // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } // 調用ScheduledTaskRegistrar的afterPropertiesSet()方法,裝載所有的調度任務 this.registrar.afterPropertiesSet(); } public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean { // 省略其他程式碼......... @Override public void afterPropertiesSet() { scheduleTasks(); } // 裝載所有調度任務 @SuppressWarnings("deprecation") protected void scheduleTasks() { // 這裡注意一點,如果找不到任務調度器實例,那麼會用單個執行緒調度所有任務 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } // 調度所有裝載完畢的自定義觸發器的任務實例 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } // 調度所有裝載完畢的CronTask if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } // 調度所有裝載完畢的FixedRateTask if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } // 調度所有裝載完畢的FixedDelayTask if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } } // 省略其他程式碼......... }
注意兩個個問題:
- 如果沒有配置
TaskScheduler
或者ScheduledExecutorService
類型的Bean
,那麼調度模組只會創建一個執行緒去調度所有裝載完畢的任務,如果任務比較多,執行密度比較大,很有可能會造成大量任務飢餓,表現為存在部分任務不會觸發調度的場景(這個是調度模組生產中經常遇到的故障,需要重點排查是否沒有設置TaskScheduler
或者ScheduledExecutorService
)。 SchedulingConfigurer
是調度模組提供給使用的進行擴展的鉤子介面,用於在激活所有調度任務之前回調ScheduledTaskRegistrar
實例,只要拿到ScheduledTaskRegistrar
實例,我們就可以使用它註冊和裝載新的Task
。
調度任務動態裝載
Scheduling
模組本身已經支援基於NamespaceHandler
支援通過XML
文件配置調度任務,但是筆者一直認為XML
給人的感覺太"重",使用起來顯得太笨重,這裡打算擴展出JSON
文件配置和基於JDBC
數據源配置(也就是持久化任務,這裡選用MySQL
)。根據前文的源碼分析,需要用到SchedulingConfigurer
介面的實現,用於在所有調度任務觸發之前從外部添加自定義的調度任務。先定義調度任務的一些配置屬性類:
// 調度任務類型枚舉 @Getter @RequiredArgsConstructor public enum ScheduleTaskType { CRON("CRON"), FIXED_DELAY("FIXED_DELAY"), FIXED_RATE("FIXED_RATE"), ; private final String type; } // 調度任務配置,enable屬性為全局開關 @Data public class ScheduleTaskProperties { private Long version; private Boolean enable; private List<ScheduleTasks> tasks; } // 調度任務集合,筆者設計的時候採用一個宿主類中每個獨立方法都是一個任務實例的模式 @Data public class ScheduleTasks { // 這裡故意叫Klass代表Class,避免關鍵字衝突 private String taskHostKlass; private Boolean enable; private List<ScheduleTaskMethod> taskMethods; } // 調度任務方法 - enable為任務開關,沒有配置會被ScheduleTaskProperties或者ScheduleTasks中的enable覆蓋 @Data public class ScheduleTaskMethod { private Boolean enable; private String taskDescription; private String taskMethod; // 時區,cron的計算需要用到 private String timeZone; private String cronExpression; private String intervalMilliseconds; private String initialDelayMilliseconds; }
設計的時候,考慮到多個任務執行方法可以放在同一個宿主類,這樣可以方便同一種類的任務進行統一管理,如:
public class TaskHostClass { public void task1() { } public void task2() { } ...... public void taskN() { } }
細節方面,intervalMilliseconds
和initialDelayMilliseconds
的單位設計為毫秒,使用字元串形式,方便可以基於StringValueResolver
解析配置文件中的屬性配置。添加一個抽象的SchedulingConfigurer
:
@Slf4j public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware, EmbeddedValueResolverAware { @Getter private StringValueResolver embeddedValueResolver; private ConfigurableBeanFactory configurableBeanFactory; private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList(); private final Set<String> tasksLoaded = Sets.newHashSet(); @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; } @Override public void afterPropertiesSet() throws Exception { internalTasks.clear(); internalTasks.addAll(loadTaskProperties()); } @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { embeddedValueResolver = resolver; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { for (InternalTaskProperties task : internalTasks) { try { synchronized (tasksLoaded) { String key = task.taskHostKlass() + "#" + task.taskMethod(); // 避免重複載入 if (!tasksLoaded.contains(key)) { if (task instanceof CronTaskProperties) { loadCronTask((CronTaskProperties) task, taskRegistrar); } if (task instanceof FixedDelayTaskProperties) { loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar); } if (task instanceof FixedRateTaskProperties) { loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar); } tasksLoaded.add(key); } else { log.info("調度任務已經裝載,任務宿主類:{},任務執行方法:{}", task.taskHostKlass(), task.taskMethod()); } } } catch (Exception e) { throw new IllegalStateException(String.format("載入調度任務異常,任務宿主類:%s,任務執行方法:%s", task.taskHostKlass(), task.taskMethod()), e); } } } private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception { Class<?> klass = ClassUtils.forName(taskHostKlass, null); Object target = configurableBeanFactory.getBean(klass); Method method = ReflectionUtils.findMethod(klass, taskMethod); if (null == method) { throw new IllegalArgumentException(String.format("找不到目標方法,任務宿主類:%s,任務執行方法:%s", taskHostKlass, taskMethod)); } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression()); if (null != cronExpression) { String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone()); TimeZone timeZone; if (null != timeZoneString) { timeZone = TimeZone.getTimeZone(timeZoneString); } else { timeZone = TimeZone.getDefault(); } CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone)); taskRegistrar.addCronTask(cronTask); log.info("裝載CronTask[{}#{}()]成功,cron表達式:{},任務描述:{}", cronExpression, pops.taskMethod(), pops.cronExpression(), pops.taskDescription()); } } private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedDelayTask(fixedDelayTask); log.info("裝載FixedDelayTask[{}#{}()]成功,固定延遲間隔:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedRateTask(fixedRateTask); log.info("裝載FixedRateTask[{}#{}()]成功,固定執行頻率:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private long parseDelayAsLong(String value) { if (null == value) { return 0L; } if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private boolean isP(char ch) { return (ch == 'P' || ch == 'p'); } /** * 載入任務配置,預留給子類實現 */ protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception; interface InternalTaskProperties { String taskHostKlass(); String taskMethod(); String taskDescription(); } @Builder protected static class CronTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String cronExpression; private String taskDescription; private String timeZone; @Override public String taskDescription() { return taskDescription; } public String cronExpression() { return cronExpression; } public String timeZone() { return timeZone; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedDelayTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedRateTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } }
loadTaskProperties()
方法用於載入任務配置,留給子類實現。
JSON配置
JSON
配置文件的格式如下(類路徑下的scheduling/tasks.json
文件):
{ "version": 1, "tasks": [ { "taskKlass": "club.throwable.schedule.Tasks", "taskMethods": [ { "taskType": "FIXED_DELAY", "taskDescription": "processTask1任務", "taskMethod": "processTask1", "intervalMilliseconds": "5000" } ] } ] }
每個層級都有一個enable
屬性,默認為true
,只有強制指定為false
的時候才不會裝載對應的任務調度方法。這裡就是簡單繼承AbstractSchedulingConfigurer
,實現從類路徑載入配置的邏輯,定義JsonSchedulingConfigurer
:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer { // 這裡把默認的任務配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通過配置項scheduling.json.config.location進行覆蓋 @Value("${scheduling.json.config.location:scheduling/tasks.json}") private String location; @Autowired private ObjectMapper objectMapper; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { ClassPathResource resource = new ClassPathResource(location); String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class); if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) { return Lists.newArrayList(); } List<InternalTaskProperties> target = Lists.newArrayList(); for (ScheduleTasks tasks : properties.getTasks()) { if (null != tasks) { List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods(); if (null != taskMethods) { for (ScheduleTaskMethod taskMethod : taskMethods) { if (!Boolean.FALSE.equals(taskMethod.getEnable())) { if (ScheduleTaskType.CRON == taskMethod.getTaskType()) { target.add(CronTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .cronExpression(taskMethod.getCronExpression()) .timeZone(taskMethod.getTimeZone()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) { target.add(FixedDelayTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) { target.add(FixedRateTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } } } } } } return target; } }
添加一個配置類和任務類:
@Configuration public class SchedulingAutoConfiguration { @Bean public JsonSchedulingConfigurer jsonSchedulingConfigurer(){ return new JsonSchedulingConfigurer(); } } // club.throwable.schedule.Tasks @Slf4j @Component public class Tasks { public void processTask1() { log.info("processTask1觸發.........."); } }
啟動SpringBoot
應用,某次執行的部分日誌如下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 裝載FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延遲間隔:5000 ms,初始延遲執行時間:0 ms,任務描述:processTask1任務 2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... ......
這裡有些細節沒有完善,例如配置文件參數的一些非空判斷、配置值是否合法等等校驗邏輯沒有做,如果要設計成一個工業級的類庫,這些方面必須要考慮。
JDBC數據源配置
JDBC
數據源這裡用MySQL
舉例說明,先建一個調度任務配置表scheduling_task
:
CREATE TABLE `schedule_task` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵', edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間', editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者', creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '創建者', deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '軟刪除標識', task_host_class VARCHAR(256) NOT NULL COMMENT '任務宿主類全類名', task_method VARCHAR(128) NOT NULL COMMENT '任務執行方法名', task_type VARCHAR(16) NOT NULL COMMENT '任務類型', task_description VARCHAR(64) NOT NULL COMMENT '任務描述', cron_expression VARCHAR(128) COMMENT 'cron表達式', time_zone VARCHAR(32) COMMENT '時區', interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '執行間隔時間', initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延遲執行時間', UNIQUE uniq_class_method (task_host_class, task_method) ) COMMENT '調度任務配置表';
其實具體的做法和JSON
配置差不多,先引入spring-boot-starter-jdbc
,接著編寫MysqlSchedulingConfigurer
:
// DAO @RequiredArgsConstructor public class MysqlScheduleTaskDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> { List<ScheduleTask> tasks = Lists.newArrayList(); while (r.next()) { ScheduleTask task = new ScheduleTask(); tasks.add(task); task.setId(r.getLong("id")); task.setCronExpression(r.getString("cron_expression")); task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds")); task.setIntervalMilliseconds(r.getLong("interval_milliseconds")); task.setTimeZone(r.getString("time_zone")); task.setTaskDescription(r.getString("task_description")); task.setTaskHostClass(r.getString("task_host_class")); task.setTaskMethod(r.getString("task_method")); task.setTaskType(r.getString("task_type")); } return tasks; }; public List<ScheduleTask> selectAllTasks() { return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI); } } // MysqlSchedulingConfigurer @RequiredArgsConstructor public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer { private final MysqlScheduleTaskDao mysqlScheduleTaskDao; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { List<InternalTaskProperties> target = Lists.newArrayList(); List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks(); if (!tasks.isEmpty()) { for (ScheduleTask task : tasks) { ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType()); if (ScheduleTaskType.CRON == scheduleTaskType) { target.add(CronTaskProperties.builder() .taskMethod(task.getTaskMethod()) .cronExpression(task.getCronExpression()) .timeZone(task.getTimeZone()) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) { target.add(FixedDelayTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) { target.add(FixedRateTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } } } return target; } }
記得引入spring-boot-starter-jdbc
和mysql-connector-java
並且激活MysqlSchedulingConfigurer
配置。插入一條記錄:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '測試任務', NULL, NULL, 10000, 5000);
然後啟動服務,某次執行的輸出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... ....
混合配置
有些時候我們希望可以JSON
配置和JDBC
數據源配置進行混合配置,或者動態二選一以便靈活應對多環境的場景(例如要在開發環境使用JSON
配置而測試和生產環境使用JDBC
數據源配置,甚至可以將JDBC
數據源配置覆蓋JSON
配置,這樣能保證總是傾向於使用JDBC
數據源配置),這樣需要對前面兩小節的實現加多一層抽象。這裡的設計可以參考SpringMVC
中的控制器參數解析器的設計,具體是HandlerMethodArgumentResolverComposite
,其實道理是相同的。
其他注意事項
在生產實踐中,暫時不考慮生成任務執行日誌和細粒度的監控,著重做了兩件事:
- 並發控制,(多服務節點下)禁止任務並發執行。
- 跟蹤任務的日誌軌跡。
解決並發執行問題
一般情況下,我們需要禁止任務並發執行,考慮引入Redisson
提供的分散式鎖:
// 引入依賴 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>最新版本</version> </dependency> // 配置類 @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) public class RedissonAutoConfiguration { @Autowired private RedisProperties redisProperties; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort())); if (redisProperties.getDatabase() > 0) { singleServerConfig.setDatabase(redisProperties.getDatabase()); } if (null != redisProperties.getPassword()) { singleServerConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } } // 分散式鎖工廠 @Component public class DistributedLockFactory { private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:"; @Autowired private RedissonClient redissonClient; public DistributedLock provideDistributedLock(String lockKey) { String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey; return new RedissonDistributedLock(redissonClient, lockPath); } }
這裡考慮到項目依賴了spring-boot-starter-redis
,直接復用了它的配置屬性類(RedissonDistributedLock
是RLock
的輕量級封裝,見附錄)。使用方式如下:
@Autowired private DistributedLockFactory distributedLockFactory; public void task1() { DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey); // 等待時間為20秒,持有鎖的最大時間為60秒 boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS); if (tryLock) { try { // 業務邏輯 }finally { lock.unlock(); } } }
引入MDC跟蹤任務的Trace
MDC
其實是Mapped Diagnostic Context
的縮寫,也就是映射診斷上下文,一般用於日誌框架裡面同一個執行緒執行過程的跟蹤(例如一個執行緒跑過了多個方法,各個方法裡面都列印了日誌,那麼通過MDC
可以對整個調用鏈通過一個唯一標識關聯起來),例如這裡選用slf4j
提供的org.slf4j.MDC
:
@Component public class MappedDiagnosticContextAssistant { /** * 在MDC中執行 * * @param runnable runnable */ public void processInMappedDiagnosticContext(Runnable runnable) { String uuid = UUID.randomUUID().toString(); MDC.put("TRACE_ID", uuid); try { runnable.run(); } finally { MDC.remove("TRACE_ID"); } } }
任務執行的時候需要包裹成一個Runnale
實例:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("開始執行......"); // 業務邏輯 watch.stop(); log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis()); }); }
結合前面一節提到的並發控制,那麼最終執行的任務方法如下:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("開始執行......"); scheduleTaskAssistant.executeInDistributedLock("任務分散式鎖KEY", () -> { // 真實的業務邏輯 }); watch.stop(); log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis()); }); }
這裡的方法看起來比較彆扭,其實可以直接在任務裝載的時候基於分散式鎖和MDC
進行封裝,方式類似於ScheduledMethodRunnable
,這裡不做展開,因為要詳細展開篇幅可能比較大(ScheduleTaskAssistant
見附錄)。
小結
其實spring-context
整個調度模組完全依賴於TaskScheduler
實現,更底層的是JUC
調度執行緒池ScheduledThreadPoolExecutor
。如果想要從底層原理理解整個調度模組的運行原理,那麼就一定要分析ScheduledThreadPoolExecutor
的實現。整篇文章大致介紹了spring-context
調度模組的載入調度任務的流程,並且基於擴展介面SchedulingConfigurer
擴展出多種自定義配置調度任務的方式,但是考慮到需要在生產環境中運行,那麼免不了需要考慮監控、並發控制、日誌跟蹤等等的功能,但是這樣就會使得整個調度模組變重,慢慢地就會發現,這個輪子越造越大,越有主流調度框架Quartz
或者Easy Scheduler
的影子。筆者認為,軟體工程,有些時候要權衡取捨,該拋棄的就應該果斷拋棄,否則總是負重而行,還能走多遠?
參考資料:
SpringBoot
源碼
附錄
ScheduleTaskAssistant
:
@RequiredArgsConstructor @Component public class ScheduleTaskAssistant { /** * 5秒 */ public static final long DEFAULT_WAIT_TIME = 5L; /** * 30秒 */ public static final long DEFAULT_LEAVE_TIME = 30L; private final DistributedLockFactory distributedLockFactory; /** * 在分散式鎖中執行 * * @param waitTime 鎖等著時間 * @param leaveTime 鎖持有時間 * @param timeUnit 時間單位 * @param lockKey 鎖的key * @param task 任務對象 */ public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) { DistributedLock lock = distributedLockFactory.dl(lockKey); boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit); if (tryLock) { try { long waitTimeMillis = timeUnit.toMillis(waitTime); long start = System.currentTimeMillis(); task.run(); long end = System.currentTimeMillis(); long cost = end - start; // 預防鎖過早釋放 if (cost < waitTimeMillis) { Sleeper.X.sleep(waitTimeMillis - cost); } } finally { lock.unlock(); } } } /** * 在分散式鎖中執行 - 使用默認時間 * * @param lockKey 鎖的key * @param task 任務對象 */ public void executeInDistributedLock(String lockKey, Runnable task) { executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task); } }
RedissonDistributedLock
:
@Slf4j public class RedissonDistributedLock implements DistributedLock { private final RedissonClient redissonClient; private final String lockPath; private final RLock internalLock; RedissonDistributedLock(RedissonClient redissonClient, String lockPath) { this.redissonClient = redissonClient; this.lockPath = lockPath; this.internalLock = initInternalLock(); } private RLock initInternalLock() { return redissonClient.getLock(lockPath); } @Override public boolean isLock() { return internalLock.isLocked(); } @Override public boolean isHeldByCurrentThread() { return internalLock.isHeldByCurrentThread(); } @Override public void lock(long leaseTime, TimeUnit unit) { internalLock.lock(leaseTime, unit); } @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) { try { return internalLock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e); } } @Override public void unlock() { try { internalLock.unlock(); } catch (IllegalMonitorStateException ex) { log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex); } } }
(本文完 c-7-d e-a-20200324 真是有點滑稽,筆者發現任務持久化最好還是用現成的工業級調度器,於是基於Quartz做了輕量級封裝,寫了個後台管理介面,且聽下回分解)