SpringBoot-ElasticJob封裝快速上手使用(分散式定時器)

  • 2019 年 10 月 12 日
  • 筆記

elastic-job-spring-boot

qq交流群:812321371

1 簡介

Elastic-Job是一個分散式調度解決方案,由兩個相互獨立的子項目Elastic-Job-LiteElastic-Job-Cloud組成。Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。
基於quartz定時任務框架為基礎的,因此具備quartz的大部分功能
使用zookeeper做協調,調度中心,更加輕量級
支援任務的分片
支援彈性擴容,可以水平擴展, 當任務再次運行時,會檢查當前的伺服器數量,重新分片,分片結束之後才會繼續執行任務
失效轉移,容錯處理,當一台調度伺服器宕機或者跟zookeeper斷開連接之後,會立即停止作業,然後再去尋找其他空閑的調度伺服器,來運行剩餘的任務
提供運維介面,可以管理作業和註冊中心。

1.1 使用場景

由於項目為微服務,單模組可能在兩個實例以上的數量,定時器就會出現多實例同時執行的情況。
一般定時器缺少管理介面,無法監控定時器是否執行成功。
市面上常見的解決方案為定時器加鎖的操作,或者採用第3方分散式定時器。
分散式定時器有多種方案,比如阿里內部的ScheduledX,噹噹網的Elastic job,個人開源的xxl-job等。

1.2 功能列表

  • 分散式調度協調
  • 彈性擴容縮容
  • 失效轉移
  • 錯過執行作業重觸發
  • 作業分片一致性,保證同一分片在分散式環境中僅一個執行實例
  • 自診斷並修復分散式不穩定造成的問題
  • 支援並行調度
  • 支援作業生命周期操作
  • 豐富的作業類型
  • Spring整合以及命名空間提供
  • 運維平台

1.3 概念

分片:任務的分散式執行,需要將一個任務拆分為多個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。
例如:有一個遍歷資料庫某張表的作業,現有2台伺服器。為了快速的執行作業,那麼每台伺服器應執行作業的50%。 為滿足此需求,可將作業分成2片,每台伺服器執行1片。作業遍曆數據的邏輯應為:伺服器A遍歷ID以奇數結尾的數據;伺服器B遍歷ID以偶數結尾的數據。 如果分成10片,則作業遍曆數據的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID0-4結尾的數據;伺服器B遍歷ID5-9結尾的數據。

歷史軌跡:Elastic-Job提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。

1.4 封裝elasticjob

由於噹噹網Elastic job處於1年間未更新階段,相關jar處於可以使用階段功能不全。考慮到使用場景為多項目使用,將elastic-job-lite-spring簡單封裝便於使用。

2.使用說明:

2.1 添加依賴

ps:實際version版本請使用最新版

<dependency>    <groupId>com.purgeteam</groupId>    <artifactId>elasticjob-spring-boot-starter</artifactId>    <version>0.1.1.RELEASE</version>  </dependency>

2.2 配置

ps: 需要mysql,zookeeper支援,請提前搭建好。

配置bootstrap.yml或者application.yml

加入以下配置:

spring:    elasticjob:      datasource: # job需要的記錄數據源        url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false        driver-class-name: com.mysql.cj.jdbc.Driver        username: root        password: Rtqw123OpnmER      regCenter: # 註冊中心        serverList: 127.0.0.1:2181        namespace: elasticJobDemo

2.3 定時器實現方法編寫

創建定時器類(唯一不同的地方在於將@Scheduled改為實現SimpleJob介面即可)
定時器實現方法編寫在execute方法里。

@Slf4j  @Component  public class MySimpleJob implements SimpleJob {        //  @Scheduled(cron = "0 0/1 * * * ?")      @Override      public void execute(ShardingContext shardingContext) {          log.info(String.format("Thread ID: %s, 作業分片總數: %s, " +                          "當前分片項: %s.當前參數: %s," +                          "作業名稱: %s.作業自定義參數: %s",                  Thread.currentThread().getId(),                  shardingContext.getShardingTotalCount(),                  shardingContext.getShardingItem(),                  shardingContext.getShardingParameter(),                  shardingContext.getJobName(),                  shardingContext.getJobParameter()          ));          // 分片大致如下:根據配置的分片參數執行相應的邏輯          switch (context.getShardingItem()) {              case 0:                  // do something by sharding item 0                  break;              case 1:                  // do something by sharding item 1                  break;              case 2:                  // do something by sharding item 2                  break;              // case n: ...          }      }  }
log:Thread ID: 66, 作業分片總數: 1, 當前分片項: 0.當前參數: Beijing,作業名稱: PropertiesSimpleJob.作業自定義參數: test

2.4 配置定時器

2.4.1 創建Configuration類

ZookeeperRegistryCenterJobEventConfiguration注入。
創建JobScheduler @Bean(initMethod = "init")
mySimpleJobScheduler方法里先通過ElasticJobUtils#getLiteJobConfiguration獲取LiteJobConfiguration對象。
創建SpringJobScheduler對象返回即可。

@Configuration  public class MyJobConfig {        // job 名稱      private static final String JOB_NAME = "MySimpleJob";        // 定時器cron參數      private static final String CRON = "0 0/1 * * * ?";        // 定時器分片      private static final int SHARDING_TOTAL_COUNT = 1;        // 分片參數      private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou";        // 自定義參數      private static final String JOB_PARAMETERS = "parameter";        @Resource      private ZookeeperRegistryCenter regCenter;        @Resource      private JobEventConfiguration jobEventConfiguration;          @Bean(initMethod = "init")      public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {            LiteJobConfiguration liteJobConfiguration = ElasticJobUtils                  .getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,                          SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);          // 參數:1.定時器實例,2.註冊中心類,3.LiteJobConfiguration,          //     3.歷史軌跡(不需要可以省略)          return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);      }    }

ElasticJobUtils#getLiteJobConfiguration參數簡介:

/**       * 獲取 {@link LiteJobConfiguration} 對象       *       * @param jobClass               定時器實現類       * @param jobName                定時器名稱       * @param cron                   定時參數       * @param shardingTotalCount     作業分片總數       * @param shardingItemParameters 當前參數 可以為null       * @param jobParameters          作業自定義參數 可以為null       * @return {@link LiteJobConfiguration}       */    public static LiteJobConfiguration getLiteJobConfiguration(        final Class<? extends SimpleJob> jobClass,        final String jobName,        final String cron,        final int shardingTotalCount,        final String shardingItemParameters,        final String jobParameters) {    ...      return ...;    }

2.4.2 簡化Configuration類

當然也可以用下面的@Configuration實現簡化,配置bootstrap.yml或者application.yml

spring:    elasticjob:      scheduled:        jobConfigMap: // 為map集合          PropertiesSimpleJob: // 定時器key名稱            jobName: PropertiesSimpleJob // job名稱            cron: 0 0/1 * * * ? // cron表達式            shardingTotalCount: 2 // 分片數量            shardingItemParameters: 0=123,1=332 // 分片參數            jobParameters: test // 自定義參數

注入SpringJobSchedulerFactory,在propertiesSimpleJobScheduler方法里調用gerSpringJobScheduler方法即可。

@Configuration  public class PropertiesSimpleJobConfig {        @Resource      private SpringJobSchedulerFactory springJobSchedulerFactory;        @Bean(initMethod = "init")      public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) {          // 參數:1.定時器實例,2.配置名稱,3.是否開啟歷史軌跡          return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true);      }    }

2.4.3 註解方式配置(推薦方式)

ps:這個註解包含了上述方式,簡化定時器注入。

繼承SimpleJob實現方法execute

AnnotationSimpleJob類上加入註解@ElasticJobScheduler即可。
下面為完整註解。

@Slf4j  @ElasticJobScheduler(          name = "AnnotationSimpleJob", // 定時器名稱          cron = "0/8 * * * * ?", // 定時器表達式          shardingTotalCount = 1, // 作業分片總數 默認為1          shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou",  // 分片序列號和參數用等號分隔 不需要參數可以不加          jobParameters = "123", // 作業自定義參數 不需要參數可以不加          isEvent = true // 是否開啟數據記錄 默認為true  )  public class AnnotationSimpleJob implements SimpleJob {        @Override      public void execute(ShardingContext shardingContext) {          log.info(String.format("Thread ID: %s, 作業分片總數: %s, " +                          "當前分片項: %s.當前參數: %s," +                          "作業名稱: %s.作業自定義參數: %s",                  Thread.currentThread().getId(),                  shardingContext.getShardingTotalCount(),                  shardingContext.getShardingItem(),                  shardingContext.getShardingParameter(),                  shardingContext.getJobName(),                  shardingContext.getJobParameter()          ));      }  }

總結

分散式job可以解決多個項目同一個定時器都執行的問題,配合elastic-job控制台可以直觀監控定時器執行情況等。

img

示例程式碼地址:elastic-job-spring-boot

作者GitHub:
Purgeyao 歡迎關注
本文由部落格一文多發平台 OpenWrite 發布!