源碼簡析XXL-JOB的註冊和執行過程
一,前言
XXL-JOB是一個優秀的國產開源分散式任務調度平台,他有著自己的一套調度註冊中心,提供了豐富的調度和阻塞策略等,這些都是可視化的操作,使用起來十分方便。
由於是國產的,所以上手還是比較快的,而且他的源碼也十分優秀,因為是調試平台所以執行緒這一塊的使用是很頻繁的,特別值得學習研究。
XXL-JOB一同分為兩個模組,調度中心模組和執行模組。具體解釋,我們copy下官網的介紹:
-
調度模組(調度中心):
負責管理調度資訊,按照調度配置發出調度請求,自身不承擔業務程式碼。調度系統與任務解耦,提高了系統可用性和穩定性,同時調度系統性能不再受限於任務模組;
支援可視化、簡單且動態的管理調度資訊,包括任務新建,更新,刪除,GLUE開發和任務報警等,所有上述操作都會實時生效,同時支援監控調度結果以及執行日誌,支援執行器Failover。 -
執行模組(執行器):
負責接收調度請求並執行任務邏輯。任務模組專註於任務的執行等操作,開發和維護更加簡單和高效;
接收「調度中心」的執行請求、終止請求和日誌請求等。
XXL-JOB中「調度模組」和「任務模組」完全解耦,調度模組進行任務調度時,將會解析不同的任務參數發起遠程調用,調用各自的遠程執行器服務。這種調用模型類似RPC調用,調度中心提供調用代理的功能,而執行器提供遠程服務的功能。
下面看下springboot環境下的使用方式,首先看下執行器的配置:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//調度中心地址
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
//執行器AppName
xxlJobSpringExecutor.setAppname(appname);
//執行器註冊地址,默認為空即可
xxlJobSpringExecutor.setAddress(address);
//執行器IP [選填]:默認為空表示自動獲取IP
xxlJobSpringExecutor.setIp(ip);
//執行器埠
xxlJobSpringExecutor.setPort(port);
//執行器通訊TOKEN
xxlJobSpringExecutor.setAccessToken(accessToken);
//執行器運行日誌文件存儲磁碟路徑
xxlJobSpringExecutor.setLogPath(logPath);
//執行器日誌文件保存天數
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XXL-JOB提供了多種任務執行方式,我們今天看下最簡單的bean執行模式。如下:
/**
* 1、簡單任務示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
現在在調度中心稍做配置,我們這段程式碼就可以按照一定的策略進行調度執行,是不是很神奇?我們先看下官網上的解釋:
原理:每個Bean模式任務都是一個Spring的Bean類實例,它被維護在「執行器」項目的Spring容器中。任務類需要加「@JobHandler(value=」名稱」)」註解,因為「執行器」會根據該註解識別Spring容器中的任務。任務類需要繼承統一介面「IJobHandler」,任務邏輯在execute方法中開發,因為「執行器」在接收到調度中心的調度請求時,將會調用「IJobHandler」的execute方法,執行任務邏輯。
紙上得來終覺淺,絕知此事要躬行,今天的任務就是跟著這段話,我們大體看一波源碼的實現方式。
二,XxlJobSpringExecutor
XxlJobSpringExecutor其實看名字,我們都能想到,這是XXL-JOB為了適應spring模式的應用而開發的模板類,先看下他的實現結構。
XxlJobSpringExecutor繼承自XxlJobExecutor
,同時由於是用在spring環境,所以實現了多個spring內置的介面來配合實現整個執行器模組功能,每個介面的功能就不細說了,相信大家都可以百度查到。
我們看下初始化方法afterSingletonsInstantiated
:
// start
@Override
public void afterSingletonsInstantiated() {
//註冊每個任務
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
主流程看上去是比較簡單的,首先是註冊每一個JobHandler,然後進行初始化操作, GlueFactory.refreshInstance(1)
是為了另一種調用模式時用到的,主要是用到了groovy
,不在這次的分析中,我們就不看了。我們繼續看下如何註冊JobHandler的。
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 遍歷所有beans,取出所有包含有@XxlJob的方法
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//遍歷@XxlJob方法,取出executeMethod以及註解中對應的initMethod, destroyMethod進行註冊
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
executeMethod.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
// 註冊 jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
}
}
XxlJobSpringExecutor由於實現了ApplicationContextAware
,所以通過applicationContext可以獲得所有容器中的bean實例,再通過MethodIntrospector來過濾出所有包含@XxlJob註解的方法,最後把對應的executeMethod以及註解中對應的initMethod, destroyMethod進行註冊到jobHandlerRepository
中,jobHandlerRepository
是一個執行緒安全ConcurrentMap,MethodJobHandler實現自IJobHandler
介面的一個模板類,主要作用就是通過反射去執行對應的方法。看到這,之前那句話任務類需要加「@JobHandler(value=」名稱」)」註解,因為「執行器」會根據該註解識別Spring容器中的任務。我們就明白了。
public class MethodJobHandler extends IJobHandler {
....
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod = initMethod;
this.destroyMethod = destroyMethod;
}
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
三,執行伺服器initEmbedServer
看完上面的JobHandler註冊,後面緊著就是執行器模組的啟動操作了,下面看下start方法:
public void start() throws Exception {
// 初始化日誌path
XxlJobFileAppender.initLogPath(logPath);
// 註冊adminBizList
initAdminBizList(adminAddresses, accessToken);
// 初始化日誌清除執行緒
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化回調執行緒,用來把執行結果回調給調度中心
TriggerCallbackThread.getInstance().start();
// 執行伺服器啟動
initEmbedServer(address, ip, port, appname, accessToken);
}
前幾個操作,我們就不細看了,大家有興趣的可以自行查看,我們直接進入initEmbedServer方法查看內部伺服器如何啟動,以及向調試中心註冊的。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
...
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
public void start(final String address, final int port, final String appname, final String accessToken) {
```
// 啟動netty伺服器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 執行向調度中心註冊
startRegistry(appname, address);
```
}
因為執行器模組本身需要有通訊交互的需求,不然調度中心是無法調用他的,所以內嵌了一個netty伺服器進行通訊。啟動成功後,正式向調試中心執行註冊請求。我們直接看註冊的程式碼:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
//執行註冊請求
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
XxlJobRemotingUtil.postBody
就是個符合XXL-JOB規範的restful的http請求處理,裡面不止有註冊請求,還有下線請求,回調請求等,礙於篇幅,就不一一展示了,調度中心接到對應的請求,會有對應的DB處理:
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
跟到這裡,我們就已經大概了解了整個註冊的流程。同樣當調度中心向我們執行器發送請求,譬如說執行任務調度的請求時,也是同樣的http請求發送我們上面分析的執行器中內嵌netty服務進行操作,這邊只展示調用方法:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
這樣,我們執行器模組收到請求後會執行我們上面註冊中的jobHandle進行對應的方法執行,執行器會將請求存入「非同步執行隊列」並且立即響應調度中心,非同步運行對應方法。這樣一套註冊和執行的流程就大致走下來了。
四,結尾
當然事實上XXL-JOB的程式碼還有許多豐富的特性,礙於本人實力不能一一道明,我這也是拋轉引玉,只是把最基礎的一些地方介紹給大家,有興趣的話,大家可以自行查閱相關程式碼,總的來說,畢竟是國產開源的優秀項目,還是值得讚賞的,也希望中國以後有越來越多優秀開源框架。