源碼簡析XXL-JOB的註冊和執行過程

一,前言

XXL-JOB是一個優秀的國產開源分散式任務調度平台,他有著自己的一套調度註冊中心,提供了豐富的調度和阻塞策略等,這些都是可視化的操作,使用起來十分方便。

由於是國產的,所以上手還是比較快的,而且他的源碼也十分優秀,因為是調試平台所以執行緒這一塊的使用是很頻繁的,特別值得學習研究。

XXL-JOB一同分為兩個模組,調度中心模組和執行模組。具體解釋,我們copy下官網的介紹:

  • 調度模組(調度中心):
    負責管理調度資訊,按照調度配置發出調度請求,自身不承擔業務程式碼。調度系統與任務解耦,提高了系統可用性和穩定性,同時調度系統性能不再受限於任務模組;
    支援可視化、簡單且動態的管理調度資訊,包括任務新建,更新,刪除,GLUE開發和任務報警等,所有上述操作都會實時生效,同時支援監控調度結果以及執行日誌,支援執行器Failover。

  • 執行模組(執行器):
    負責接收調度請求並執行任務邏輯。任務模組專註於任務的執行等操作,開發和維護更加簡單和高效;
    接收「調度中心」的執行請求、終止請求和日誌請求等。

XXL-JOB中「調度模組」和「任務模組」完全解耦,調度模組進行任務調度時,將會解析不同的任務參數發起遠程調用,調用各自的遠程執行器服務。這種調用模型類似RPC調用,調度中心提供調用代理的功能,而執行器提供遠程服務的功能。

下面看下springboot環境下的使用方式,首先看下執行器的配置:

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        //調度中心地址
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        //執行器AppName
        xxlJobSpringExecutor.setAppname(appname);
        //執行器註冊地址,默認為空即可
        xxlJobSpringExecutor.setAddress(address);
        //執行器IP [選填]:默認為空表示自動獲取IP
        xxlJobSpringExecutor.setIp(ip);
        //執行器埠
        xxlJobSpringExecutor.setPort(port);
        //執行器通訊TOKEN
        xxlJobSpringExecutor.setAccessToken(accessToken);
        //執行器運行日誌文件存儲磁碟路徑
        xxlJobSpringExecutor.setLogPath(logPath);
        //執行器日誌文件保存天數
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

XXL-JOB提供了多種任務執行方式,我們今天看下最簡單的bean執行模式。如下:

    /**
     * 1、簡單任務示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        // default success
    }

現在在調度中心稍做配置,我們這段程式碼就可以按照一定的策略進行調度執行,是不是很神奇?我們先看下官網上的解釋:

原理:每個Bean模式任務都是一個Spring的Bean類實例,它被維護在「執行器」項目的Spring容器中。任務類需要加「@JobHandler(value=」名稱」)」註解,因為「執行器」會根據該註解識別Spring容器中的任務。任務類需要繼承統一介面「IJobHandler」,任務邏輯在execute方法中開發,因為「執行器」在接收到調度中心的調度請求時,將會調用「IJobHandler」的execute方法,執行任務邏輯。

紙上得來終覺淺,絕知此事要躬行,今天的任務就是跟著這段話,我們大體看一波源碼的實現方式。

二,XxlJobSpringExecutor

XxlJobSpringExecutor其實看名字,我們都能想到,這是XXL-JOB為了適應spring模式的應用而開發的模板類,先看下他的實現結構。

XxlJobSpringExecutor繼承自XxlJobExecutor,同時由於是用在spring環境,所以實現了多個spring內置的介面來配合實現整個執行器模組功能,每個介面的功能就不細說了,相信大家都可以百度查到。

我們看下初始化方法afterSingletonsInstantiated

    // start
    @Override
    public void afterSingletonsInstantiated() {

        //註冊每個任務
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

主流程看上去是比較簡單的,首先是註冊每一個JobHandler,然後進行初始化操作, GlueFactory.refreshInstance(1)是為了另一種調用模式時用到的,主要是用到了groovy,不在這次的分析中,我們就不看了。我們繼續看下如何註冊JobHandler的。

 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // 遍歷所有beans,取出所有包含有@XxlJob的方法
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);

            Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }
            //遍歷@XxlJob方法,取出executeMethod以及註解中對應的initMethod, destroyMethod進行註冊
            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // 註冊 jobhandler
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
            }
        }

    }

XxlJobSpringExecutor由於實現了ApplicationContextAware,所以通過applicationContext可以獲得所有容器中的bean實例,再通過MethodIntrospector來過濾出所有包含@XxlJob註解的方法,最後把對應的executeMethod以及註解中對應的initMethod, destroyMethod進行註冊到jobHandlerRepository中,jobHandlerRepository是一個執行緒安全ConcurrentMap,MethodJobHandler實現自IJobHandler介面的一個模板類,主要作用就是通過反射去執行對應的方法。看到這,之前那句話任務類需要加「@JobHandler(value=」名稱」)」註解,因為「執行器」會根據該註解識別Spring容器中的任務。我們就明白了。

public class MethodJobHandler extends IJobHandler {
    ....
    public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
        this.target = target;
        this.method = method;

        this.initMethod = initMethod;
        this.destroyMethod = destroyMethod;
    }

    @Override
    public void execute() throws Exception {
        Class<?>[] paramTypes = method.getParameterTypes();
        if (paramTypes.length > 0) {
            method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
        } else {
            method.invoke(target);
        }
    }

三,執行伺服器initEmbedServer

看完上面的JobHandler註冊,後面緊著就是執行器模組的啟動操作了,下面看下start方法:

    public void start() throws Exception {

        // 初始化日誌path
        XxlJobFileAppender.initLogPath(logPath);

        // 註冊adminBizList
        initAdminBizList(adminAddresses, accessToken);

        // 初始化日誌清除執行緒
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 初始化回調執行緒,用來把執行結果回調給調度中心
        TriggerCallbackThread.getInstance().start();

        // 執行伺服器啟動
        initEmbedServer(address, ip, port, appname, accessToken);
    }

前幾個操作,我們就不細看了,大家有興趣的可以自行查看,我們直接進入initEmbedServer方法查看內部伺服器如何啟動,以及向調試中心註冊的。

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
        ...
        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
    public void start(final String address, final int port, final String appname, final String accessToken) {
        ```
        // 啟動netty伺服器
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline()
                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                .addLast(new HttpServerCodec())
                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                    }
                })
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        // bind
        ChannelFuture future = bootstrap.bind(port).sync();

        logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

        // 執行向調度中心註冊
        startRegistry(appname, address);
        ```
    }

因為執行器模組本身需要有通訊交互的需求,不然調度中心是無法調用他的,所以內嵌了一個netty伺服器進行通訊。啟動成功後,正式向調試中心執行註冊請求。我們直接看註冊的程式碼:

    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            //執行註冊請求
            ReturnT<String> registryResult = adminBiz.registry(registryParam);
            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                registryResult = ReturnT.SUCCESS;
                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                break;
            } else {
                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
            }
        } catch (Exception e) {
            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
        }
    }
    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

XxlJobRemotingUtil.postBody就是個符合XXL-JOB規範的restful的http請求處理,裡面不止有註冊請求,還有下線請求,回調請求等,礙於篇幅,就不一一展示了,調度中心接到對應的請求,會有對應的DB處理:

        // services mapping
        if ("callback".equals(uri)) {
            List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }

跟到這裡,我們就已經大概了解了整個註冊的流程。同樣當調度中心向我們執行器發送請求,譬如說執行任務調度的請求時,也是同樣的http請求發送我們上面分析的執行器中內嵌netty服務進行操作,這邊只展示調用方法:

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

這樣,我們執行器模組收到請求後會執行我們上面註冊中的jobHandle進行對應的方法執行,執行器會將請求存入「非同步執行隊列」並且立即響應調度中心,非同步運行對應方法。這樣一套註冊和執行的流程就大致走下來了。

四,結尾

當然事實上XXL-JOB的程式碼還有許多豐富的特性,礙於本人實力不能一一道明,我這也是拋轉引玉,只是把最基礎的一些地方介紹給大家,有興趣的話,大家可以自行查閱相關程式碼,總的來說,畢竟是國產開源的優秀項目,還是值得讚賞的,也希望中國以後有越來越多優秀開源框架。

Tags: