SpringBoot2 整合ElasticJob框架,訂製化管理流程

  • 2020 年 3 月 12 日
  • 筆記

本文源碼:GitHub·點這裡 || GitEE·點這裡

一、ElasticJob簡介

1、定時任務

在前面的文章中,說過QuartJob這個定時任務,被廣泛應用的定時任務標準。但Quartz核心點在於執行定時任務並不是在於關注的業務模式和場景,缺少高度自定義的功能。Quartz能夠基於資料庫實現任務的高可用,但是不具備分散式並行調度的功能。

-> QuartJob定時任務

2、ElasticJob說明

  • 基礎簡介

Elastic-Job 是一個開源的分散式調度中間件,由兩個相互獨立的子項目 Elastic-Job-Lite 和 Elastic-Job-Cloud 組成。Elastic-Job-Lite 為輕量級無中心化解決方案,使用 jar 包提供分散式任務的調度和治理。 Elastic-Job-Cloud 是一個 Mesos Framework,依託於Mesos額外提供資源治理、應用分發以及進程隔離等服務。

  • 功能特點
分散式調度協調  彈性擴容縮容  失效轉移  錯過執行作業重觸發  作業分片一致性,保證同一分片在分散式環境中僅一個執行實例

補刀:人家官網這樣描述的,這裡贅述一下,充實一下文章。

  • 基礎框架結構

該圖片來自ElasticJob官網。

由圖可知如下內容:

需要Zookeeper組件支援,作為分散式的調度任務,有良好的監聽機制,和控制台,下面的案例也就沖這個圖解來。

3、分片管理

這個概念在ElasticJob中是最具有特點的,實用性極好。

  • 分片概念

任務的分散式執行,需要將一個任務拆分為多個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。

場景描述:假設有服務3台,分3片管理,要處理數據表100條,那就可以100%3,按照餘數0,1,2分散到三台服務上執行,看到這裡分庫分表的基本邏輯湧上心頭,這就是為何很多大牛講說,編程思維很重要。

  • 個性化參數

個性化參數即shardingItemParameter,可以和分片項匹配對應關係,用於將分片項的數字轉換為更加可讀的業務程式碼。

場景描述:這裡猛一讀好像很飄逸,其實就是這個意思,如果分3片,取名[0,1,2]不好看,或者不好標識,可以分別給個別名標識一下,[0=A,1=B,2=C]。

二、定時任務載入

1、核心依賴包

這裡使用2.0+的版本。

<dependency>      <groupId>com.dangdang</groupId>      <artifactId>elastic-job-lite-core</artifactId>      <version>2.1.5</version>  </dependency>  <dependency>      <groupId>com.dangdang</groupId>      <artifactId>elastic-job-lite-spring</artifactId>      <version>2.1.5</version>  </dependency>

2、核心配置文件

這裡主要配置一下Zookeeper中間件,分片和分片參數。

zookeeper:    server: 127.0.0.1:2181    namespace: es-job    job-config:    cron: 0/10 * * * * ?    shardCount: 1    shardItem: 0=A,1=B,2=C,3=D

3、自定義註解

看了官方的案例,沒看到好用的註解,這裡只能自己編寫一個,基於案例的載入過程和核心API作為參考。

核心配置類:

com.dangdang.ddframe.job.lite.config.LiteJobConfiguration

根據自己想如何使用註解的思路,比如我只想註解定時任務名稱和Cron表達式這兩個功能,其他參數直接統一配置(這裡可能是受QuartJob影響太深,可能根本就是想省事…)

@Inherited  @Target({ElementType.TYPE})  @Retention(RetentionPolicy.RUNTIME)  public @interface TaskJobSign {        @AliasFor("cron")      String value() default "";        @AliasFor("value")      String cron() default "";        String jobName() default "";    }

4、作業案例

這裡列印一些基本參數,對照配置和註解,一目了然。

@Component  @TaskJobSign(cron = "0/5 * * * * ?",jobName = "Hello-Job")  public class HelloJob implements SimpleJob {        private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ;        @Override      public void execute(ShardingContext shardingContext) {          LOG.info("當前執行緒: "+Thread.currentThread().getId());          LOG.info("任務分片:"+shardingContext.getShardingTotalCount());          LOG.info("當前分片:"+shardingContext.getShardingItem());          LOG.info("分片參數:"+shardingContext.getShardingParameter());          LOG.info("任務參數:"+shardingContext.getJobParameter());      }  }

5、載入定時任務

既然自定義註解,那載入過程自然也要自定義一下,讀取自定義的註解,配置化,加入容器,然後初始化,等著任務執行就好。

@Configuration  public class ElasticJobConfig {        @Resource      private ApplicationContext applicationContext ;      @Resource      private ZookeeperRegistryCenter zookeeperRegistryCenter;        @Value("${job-config.cron}") private String cron ;      @Value("${job-config.shardCount}") private int shardCount ;      @Value("${job-config.shardItem}") private String shardItem ;        /**       * 配置任務監聽器       */      @Bean      public ElasticJobListener elasticJobListener() {          return new TaskJobListener();      }      /**       * 初始化配置任務       */      @PostConstruct      public void initTaskJob() {          Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class);          Iterator iterator = jobMap.entrySet().iterator();          while (iterator.hasNext()) {              // 自定義註解管理              Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next();              SimpleJob simpleJob = entry.getValue();              TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class);              if (taskJobSign != null){                  String cron = taskJobSign.cron() ;                  String jobName = taskJobSign.jobName() ;                  // 生成配置                  SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(                                                  JobCoreConfiguration.newBuilder(jobName, cron, shardCount)                                                  .shardingItemParameters(shardItem).jobParameter(jobName).build(),                                                  simpleJob.getClass().getCanonicalName());                  LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(                                                  simpleJobConfiguration).overwrite(true).build();                  TaskJobListener taskJobListener = new TaskJobListener();                  // 初始化任務                  SpringJobScheduler jobScheduler = new SpringJobScheduler(                                                  simpleJob, zookeeperRegistryCenter,                                                  liteJobConfiguration, taskJobListener);                  jobScheduler.init();              }          }      }  }

絮叨一句:不要疑問這些API是怎麼知道,看下官方文檔的案例,他們怎麼使用這些核心API,這裡就是照著寫過來,就是多一步自定義註解類的載入過程。當然官方文檔大致讀一遍還是很有必要的。

補刀一句:如何快速學習一些組件的用法,首先找到官方文檔,或者開源庫Wiki,再不濟ReadMe文檔(如果都沒有,酌情放棄,另尋其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉API,最後就是研究自己需要的功能模組,個人經驗來看,該過程是彎路最少,坑最少的。

6、任務監聽

用法非常簡單,實現ElasticJobListener介面。

@Component  public class TaskJobListener implements ElasticJobListener {      private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class);        private long beginTime = 0;        @Override      public void beforeJobExecuted(ShardingContexts shardingContexts) {          beginTime = System.currentTimeMillis();          LOG.info(shardingContexts.getJobName()+"===>開始...");      }        @Override      public void afterJobExecuted(ShardingContexts shardingContexts) {          long endTime = System.currentTimeMillis();          LOG.info(shardingContexts.getJobName()+          "===>結束...[耗時:"+(endTime - beginTime)+"]");      }  }

絮叨一句:before和after執行前後,中間執行目標方法,標準的AOP切面思想,所以底層水平決定了對上層框架的理解速度,那本《Java編程思想》上的灰塵是不是該擦擦?

三、動態添加

1、作業任務

有部分場景需要動態添加和管理定時任務,基於上面的載入流程,在自定義一些步驟就可以。

@Component  public class GetTimeJob implements SimpleJob {        private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ;        private static final SimpleDateFormat format =              new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ;        @Override      public void execute(ShardingContext shardingContext) {          LOG.info("Job Name:"+shardingContext.getJobName());          LOG.info("Local Time:"+format.format(new Date()));      }  }

2、添加任務服務

這裡就動態添加上面的任務。

@Service  public class TaskJobService {        @Resource      private ZookeeperRegistryCenter zookeeperRegistryCenter;        public void addTaskJob(final String jobName,final SimpleJob simpleJob,                             final String cron,final int shardCount,final String shardItem) {          // 配置過程          JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(                                                      jobName, cron, shardCount)                                                      .shardingItemParameters(shardItem).build();          JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,                                                      simpleJob.getClass().getCanonicalName());          LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(                                                      jobTypeConfiguration).overwrite(true).build();          TaskJobListener taskJobListener = new TaskJobListener();          // 載入執行          SpringJobScheduler jobScheduler = new SpringJobScheduler(                  simpleJob, zookeeperRegistryCenter,                  liteJobConfiguration, taskJobListener);          jobScheduler.init();      }    }

補刀一句:這裡添加之後,任務就會定時執行,如何停止任務又是一個問題,可以在任務名上做一些配置,比如在資料庫生成一條記錄[1,job1,state],如果調度到state為停止狀態的任務,直接截胡即可。

3、測試介面

@RestController  public class TaskJobController {        @Resource      private TaskJobService taskJobService ;        @RequestMapping("/addJob")      public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName,                           @RequestParam("shardCount") Integer shardCount,                           @RequestParam("shardItem") String shardItem) {          taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem);          return "success";      }  }

四、源程式碼地址

GitHub·地址  https://github.com/cicadasmile/middle-ware-parent  GitEE·地址  https://gitee.com/cicadasmile/middle-ware-parent