分散式作業系統 Elastic-Job-Cloud 源碼分析 —— 本地運行模式
- 2019 年 10 月 29 日
- 筆記
摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/cloud-local-executor/
本文基於 Elastic-Job V2.1.5 版本分享
- 1. 概述
- 2. 配置
- 3. 運行
- 666. 彩蛋
1. 概述
本文主要分享 Elastic-Job-Cloud 本地運行模式,對應《官方文檔 —— 本地運行模式》。
有什麼用呢?引用官方解答:
在開發 Elastic-Job-Cloud 作業時,開發人員可以脫離 Mesos 環境,在本地運行和調試作業。可以利用本地運行模式充分的調試業務功能以及單元測試,完成之後再部署至 Mesos 集群。 本地運行作業無需安裝 Mesos 環境。
? 是不是很贊 + 1024?!
本文涉及到主體類的類圖如下( 打開大圖 ):

2. 配置
LocalCloudJobConfiguration,本地雲作業配置,在《Elastic-Job-Cloud 源碼分析 —— 作業配置》「3.2 本地雲作業配置」有詳細解析。
創建本地雲作業配置示例程式碼如下(來自官方):
LocalCloudJobConfiguration config = new LocalCloudJobConfiguration( new SimpleJobConfiguration( // 配置作業類型和作業基本資訊 JobCoreConfiguration.newBuilder("FooJob", "*/2 * * * * ?", 3) .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou") .jobParameter("dbName=dangdang").build(), "com.dangdang.foo.FooJob"), // 配置當前運行的作業是第幾個分片 1, // 配置Spring相關參數。如果不配置,代表不使用 Spring 配置。 "testSimpleJob" , "applicationContext.xml");
3. 運行
LocalTaskExecutor,本地作業執行器。
創建本地作業執行器示例程式碼如下(來自官方):
new LocalTaskExecutor(localJobConfig).execute();
可以看到,調用 LocalTaskExecutor#execute()
方法,執行作業邏輯,實現程式碼如下:
// LocalTaskExecutor.java public void execute() { AbstractElasticJobExecutor jobExecutor; CloudJobFacade jobFacade = new CloudJobFacade(getShardingContexts(), getJobConfigurationContext(), new JobEventBus()); // 創建執行器 switch (localCloudJobConfiguration.getTypeConfig().getJobType()) { case SIMPLE: jobExecutor = new SimpleJobExecutor(getJobInstance(SimpleJob.class), jobFacade); break; case DATAFLOW: jobExecutor = new DataflowJobExecutor(getJobInstance(DataflowJob.class), jobFacade); break; case SCRIPT: jobExecutor = new ScriptJobExecutor(jobFacade); break; default: throw new UnsupportedOperationException(localCloudJobConfiguration.getTypeConfig().getJobType().name()); } // 執行作業 jobExecutor.execute(); }
- 調用
#getShardingContexts()
方法,創建分片上下文集合( ShardingContexts ),實現程式碼如下:
private ShardingContexts getShardingContexts() { JobCoreConfiguration coreConfig = localCloudJobConfiguration.getTypeConfig().getCoreConfig(); Map<Integer, String> shardingItemMap = new HashMap<>(1, 1); shardingItemMap.put(localCloudJobConfiguration.getShardingItem(), new ShardingItemParameters(coreConfig.getShardingItemParameters()).getMap().get(localCloudJobConfiguration.getShardingItem())); return new ShardingContexts( // taskId ? Joiner.on("@-@").join(localCloudJobConfiguration.getJobName(), localCloudJobConfiguration.getShardingItem(), "READY", "foo_slave_id", "foo_uuid"), localCloudJobConfiguration.getJobName(), coreConfig.getShardingTotalCount(), coreConfig.getJobParameter(), shardingItemMap); }
- 調用
#getJobConfigurationContext()
方法,創建內部的作業配置上下文( JobConfigurationContext ),實現程式碼如下:
private <T extends ElasticJob> T getJobInstance(final Class<T> clazz) { Object result; if (Strings.isNullOrEmpty(localCloudJobConfiguration.getApplicationContext())) { // 直接創建 ElasticJob String jobClass = localCloudJobConfiguration.getTypeConfig().getJobClass(); try { result = Class.forName(jobClass).newInstance(); } catch (final ReflectiveOperationException ex) { throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage()); } } else { // Spring 環境獲得 ElasticJob result = new ClassPathXmlApplicationContext(localCloudJobConfiguration.getApplicationContext()).getBean(localCloudJobConfiguration.getBeanName()); } return clazz.cast(result); }
- 調用
#getJobInstance(...)
方法, 獲得分散式作業( ElasticJob )實現實例,實現程式碼如下:
private JobConfigurationContext getJobConfigurationContext() { Map<String, String> jobConfigurationMap = new HashMap<>(); jobConfigurationMap.put("jobClass", localCloudJobConfiguration.getTypeConfig().getJobClass()); jobConfigurationMap.put("jobType", localCloudJobConfiguration.getTypeConfig().getJobType().name()); jobConfigurationMap.put("jobName", localCloudJobConfiguration.getJobName()); jobConfigurationMap.put("beanName", localCloudJobConfiguration.getBeanName()); jobConfigurationMap.put("applicationContext", localCloudJobConfiguration.getApplicationContext()); if (JobType.DATAFLOW == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 數據流作業 jobConfigurationMap.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) localCloudJobConfiguration.getTypeConfig()).isStreamingProcess())); } else if (JobType.SCRIPT == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 腳本作業 jobConfigurationMap.put("scriptCommandLine", ((ScriptJobConfiguration) localCloudJobConfiguration.getTypeConfig()).getScriptCommandLine()); } return new JobConfigurationContext(jobConfigurationMap); }
- 調用
AbstractElasticJobExecutor#execute()
方法,執行作業邏輯。 Elastic-Job-Lite 和 Elastic-Job-Cloud 作業執行基本一致,在《Elastic-Job-Lite 源碼分析 —— 作業執行》有詳細解析。