如何在 Spring Boot 優雅關閉加入一些自定義機制

個人創作公約:本人聲明創作的所有文章皆為自己原創,如果有參考任何文章的地方,會標註出來,如果有疏漏,歡迎大家批判。如果大家發現網上有抄襲本文章的,歡迎舉報,並且積極向這個 github 倉庫 提交 issue,謝謝支援~

我們知道從 Spring Boot 2.3.x 這個版本開始,引入了優雅關閉的機制。我們也在線上部署了這個機制,來增加用戶體驗。雖然現在大家基本上都通過最終一致性,以及事務等機制,來保證了就算非優雅關閉,也可以保持業務正確。但是,這樣總會帶來短時間的數據不一致,影響用戶體驗。所以,引入優雅關閉,保證當前請求處理完,再開始 Destroy 所有 ApplicationContext 中的 Bean。

優雅關閉存在的問題

ApplicationContext 的關閉過程簡單來說分為以下幾個步驟(對應源碼 AbstractApplicationContext 的 doClose 方法):

  1. 取消當前 ApplicationContext 在 LivBeanView 的註冊(目前其實只包含從 JMX 上取消註冊)
  2. 發布 ContextClosedEvent 事件,同步處理所有這個事件的 Listener
  3. 處理所有實現 Lifecycle 介面的 Bean,解析他們的關閉順序,並調用他們的 stop 方法
  4. Destroy 所有 ApplicationContext 中的 Bean
  5. 關閉 BeanFactory

簡單理解優雅關閉,其實就是在上面的第三步中加入優雅關閉的邏輯實現的 Lifecycle,包括如下兩步:

  1. 切斷外部流量入口:具體點說就是讓 Spring Boot 的 Web 容器直接拒絕所有新收到的請求,不再處理新請求,例如直接返回 503.
  2. 等待承載的 Dispatcher 的執行緒池處理完所有請求:對於同步的 Servlet 進程其實就是處理 Servlet 請求的執行緒池,對於非同步響應式的 WebFlux 進程其實就是所有 Web 請求的 Reactor 執行緒池處理完當前所有 Publisher 發布的事件。

首先,切斷外部流量入口保證不再有新的請求到來,執行緒池處理完所有請求之後,正常的業務邏輯也是正常走完的,在這之後就可以開始關閉其他各種元素了。

但是,我們首先要保證,優雅關閉的邏輯,需要在所有的 Lifecycle 的第一個最保險。這樣保證一定所有請求處理完,才會開始 stop 其他的 Lifecycle。如果不這樣會有啥問題呢?舉個例子,例如某個 Lifecycle 是負載均衡器的,stop 方法會關閉負載均衡器,如果這個 Lifecycle 在優雅關閉的 Lifecycle 的 stop 之前進行 stop,那麼可能會造成某些在 負載均衡器 stop 後還沒處理完的請求,並且這些請求需要使用負載均衡器調用其他微服務,執行失敗。

優雅關閉還有另一個問題就是,默認的優雅關閉功能不是那麼全面,有時候我們需要在此基礎上,添加更多的關閉邏輯。例如,你的項目中不止 有 web 容器處理請求的執行緒池,你自己還使用了其他執行緒池,並且執行緒池可能還比較複雜,一個向另一個提交,互相提交,各種提交等等,我們需要在 web 容器處理請求的執行緒池處理完所有請求後,再等待這些執行緒池的執行完所有請求後再關閉。還有一個例子就是針對 MQ 消費者的,當優雅關閉時,其實應該停止消費新的消息,等待當前所有消息處理完。這些問題可以看下圖:
image

源碼分析接入點 – Spring Boot + Undertow & 同步 Servlet 環境

我們從源碼觸發,分析在 Spring Boot 中使用 Undertow 作為 Web 容器並且是同步 Servlet 環境下,如果接入自定義的機制。首先,在引入 spring boot 相關依賴並且配置好優雅關閉之後:

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <!--不使用默認的 tomcat 容器-->
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--使用 undertow 容器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-undertow</artifactId>
</dependency>

application.yml

server:
  # 設置關閉方式為優雅關閉
  shutdown: graceful
  
management:
  endpoint:
    health:
      show-details: always
    # actuator 暴露 /actuator/shutdown 介面用於關閉(由於這裡開啟了優雅關閉所以其實是優雅關閉)
    shutdown:
      enabled: true
  endpoints:
    jmx:
      exposure:
        exclude: '*'
    web:
      exposure:
        include: '*'

在設置關閉方式為優雅關閉之後,Spring Boot 啟動時,在創建基於 Undertow 實現的 WebServer 的時候,會添加優雅關閉的 Handler,參考源碼:

UndertowWebServerFactoryDelegate

static List<HttpHandlerFactory> createHttpHandlerFactories(Compression compression, boolean useForwardHeaders,
			String serverHeader, Shutdown shutdown, HttpHandlerFactory... initialHttpHandlerFactories) {
	List<HttpHandlerFactory> factories = new ArrayList<>(Arrays.asList(initialHttpHandlerFactories));
	if (compression != null && compression.getEnabled()) {
		factories.add(new CompressionHttpHandlerFactory(compression));
	}
	if (useForwardHeaders) {
		factories.add(Handlers::proxyPeerAddress);
	}
	if (StringUtils.hasText(serverHeader)) {
		factories.add((next) -> Handlers.header(next, "Server", serverHeader));
	}
	//如果指定了優雅關閉,則添加 gracefulShutdown
	if (shutdown == Shutdown.GRACEFUL) {
		factories.add(Handlers::gracefulShutdown);
	}
	return factories;
}

添加的這個 Handler 就是 Undertow 的 GracefulShutdownHandlerGracefulShutdownHandler 是一個 HttpHandler,這個介面很簡單:

public interface HttpHandler {
    void handleRequest(HttpServerExchange exchange) throws Exception;
}

其實就是對於收到的每個 HTTP 請求,都會經過每個 HttpHandler 的 handleRequest 方法。GracefulShutdownHandler 的實現思路也很簡單,既然每個請求都會經過這個類的 handleRequest 方法,那麼我就在收到請求的時候將一個原子計數器原子 + 1,請求處理完後(注意是返迴響應之後,不是方法返回,因為請求可能是非同步的,所以這個做成了回調),將原子計數器原子 – 1,如果這個計數器為零,就證明沒有任何正在處理的請求了。源碼是:

GracefulShutdownHandler:

@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
    //原子更新,請求計數器加一,返回的 snapshot 是包含是否關閉狀態位的數字
    long snapshot = stateUpdater.updateAndGet(this, incrementActive);
    //通過狀態位判斷是否正在關閉
    if (isShutdown(snapshot)) {
        //如果正在關閉,直接請求數原子減一
        decrementRequests();
        //設置響應碼為 503
        exchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
        //標記請求完成
        exchange.endExchange();
        //直接返回,不繼續走其他的 HttpHandler
        return;
    }
    //添加請求完成時候的 listener,這個在請求完成返迴響應時會被調用,將計數器原子減一
    exchange.addExchangeCompleteListener(listener);
    //繼續走下一個 HttpHandler
    next.handleRequest(exchange);
}

那麼,是什麼時候調用的這個關閉呢?前面我們說過 ApplicationContext 的關閉過程的第三步:處理所有實現 Lifecycle 介面的 Bean,解析他們的關閉順序,並調用他們的 stop 方法,其實優雅關閉就在這裡被調用。當 Spring Boot + Undertow & 同步 Servlet 環境啟動時,到了創建 WebServer 這一步,會創建一個優雅關閉的 Lifecycle,對應源碼:

ServletWebServerApplicationContext

private void createWebServer() {
	WebServer webServer = this.webServer;
	ServletContext servletContext = getServletContext();
	if (webServer == null && servletContext == null) {
		StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
		ServletWebServerFactory factory = getWebServerFactory();
		createWebServer.tag("factory", factory.getClass().toString());
		this.webServer = factory.getWebServer(getSelfInitializer());
		createWebServer.end();
		//就是這裡,創建一個 WebServerGracefulShutdownLifecycle 並註冊到當前 ApplicationContext 的 BeanFactory 中
		getBeanFactory().registerSingleton("webServerGracefulShutdown",
				new WebServerGracefulShutdownLifecycle(this.webServer));
		getBeanFactory().registerSingleton("webServerStartStop",
				new WebServerStartStopLifecycle(this, this.webServer));
	}
	else if (servletContext != null) {
		try {
			getSelfInitializer().onStartup(servletContext);
		}
		catch (ServletException ex) {
			throw new ApplicationContextException("Cannot initialize servlet context", ex);
		}
	}
	initPropertySources();
}

前面說到, ApplicationContext 的關閉過程的第三步調用所有 Lifecycle 的 stop 方法,這裡即 WebServerGracefulShutdownLifecycle 中的 stop 方法:

WebServerGracefulShutdownLifecycle

@Override
public void stop(Runnable callback) {
	this.running = false;
	this.webServer.shutDownGracefully((result) -> callback.run());
}

這裡的 webServer,由於我們使用的是 Undertow,對應實現就是 UndertowWebServer,看一下他的 shutDownGracefully 實現:

UndertowWebServer


//這裡的這個 GracefulShutdownHandler 就是前面說的在啟動時加的 GracefulShutdownHandler
private volatile GracefulShutdownHandler gracefulShutdown;

@Override
public void shutDownGracefully(GracefulShutdownCallback callback) {
    // 如果 GracefulShutdownHandler 不為 null,證明開啟了優雅關閉(server.shutdown=graceful)
	if (this.gracefulShutdown == null) {
	    //為 null,就證明沒開啟優雅關閉,什麼都不等
		callback.shutdownComplete(GracefulShutdownResult.IMMEDIATE);
		return;
	}
	//開啟優雅關閉,需要等待請求處理完
	logger.info("Commencing graceful shutdown. Waiting for active requests to complete");
	this.gracefulShutdownCallback.set(callback);
	//調用 GracefulShutdownHandler 的 shutdown 進行優雅關閉
	this.gracefulShutdown.shutdown();
	//調用 GracefulShutdownHandler 的 addShutdownListener 添加關閉後調用的操作,這裡是調用 notifyGracefulCallback
	//其實就是調用方法參數的 callback(就是外部的回調)
	this.gracefulShutdown.addShutdownListener((success) -> notifyGracefulCallback(success));
}

private void notifyGracefulCallback(boolean success) {
	GracefulShutdownCallback callback = this.gracefulShutdownCallback.getAndSet(null);
	if (callback != null) {
		if (success) {
			logger.info("Graceful shutdown complete");
			callback.shutdownComplete(GracefulShutdownResult.IDLE);
		}
		else {
			logger.info("Graceful shutdown aborted with one or more requests still active");
			callback.shutdownComplete(GracefulShutdownResult.REQUESTS_ACTIVE);
		}
	}
}

再看下 GracefulShutdownHandler 的 shutdown 方法以及 addShutdownListener 方法:

GracefulShutdownHandler:

public void shutdown() {
    //設置關閉狀態位,並原子 + 1
    stateUpdater.updateAndGet(this, incrementActiveAndShutdown);
    //直接請求數原子減一
    decrementRequests();
}

private void decrementRequests() {
    long snapshot = stateUpdater.updateAndGet(this, decrementActive);
    // Shutdown has completed when the activeCount portion is zero, and shutdown is set.
    //如果與 關閉狀態位 MASK 完全相等,證明其他位都是 0,證明剩餘處理中的請求數量為 0
    if (snapshot == SHUTDOWN_MASK) {
        //調用 shutdownComplete
        shutdownComplete();
    }
}

private void shutdownComplete() {
    synchronized (lock) {
        lock.notifyAll();
        //調用每個 ShutdownListener 的 shutdown 方法
        for (ShutdownListener listener : shutdownListeners) {
            listener.shutdown(true);
        }
        shutdownListeners.clear();
    }
}

/**
 * 這個方法並不只是字面意思,首先如果不是關閉中不能添加 ShutdownListener
 * 然後如果沒有請求了,就直接調用傳入的 shutdownListener 的 shutdown 方法
 * 如果還有請求,則添加入 shutdownListeners,等其他調用 shutdownComplete 的時候遍歷 shutdownListeners 調用 shutdown
 * lock 主要為了 addShutdownListener 與 shutdownComplete 對 shutdownListeners 的訪問安全
 * lock 的 wait notify 主要為了實現 awaitShutdown 機制,我們這裡沒有提
 */
public void addShutdownListener(final ShutdownListener shutdownListener) {
        synchronized (lock) {
            if (!isShutdown(stateUpdater.get(this))) {
                throw UndertowMessages.MESSAGES.handlerNotShutdown();
            }
            long count = activeCount(stateUpdater.get(this));
            if (count == 0) {
                shutdownListener.shutdown(true);
            } else {
                shutdownListeners.add(shutdownListener);
            }
        }
    }

這就是優雅關閉的底層原理,但是我們還沒有分析清楚 ApplicationContext 的關閉過程的第三步以及優雅關閉與其他 Lifecycle Bean 的 stop 先後順序,我們這裡來理清一下,首先我們看一下 Smart

開始關閉 Lifecycle Bean 的入口:

DefaultLifecycleProcessor

private void stopBeans() {
    //讀取所有的 Lifecycle bean,返回的是一個 LinkedHashMap,遍歷它的順序和放入的順序一樣
    //放入的順序就是從 BeanFactory 讀取所有 Lifecycle 的 Bean 的返回順序,這個和 Bean 載入順序有關,不太可控,可能這個版本載入順序升級一個版本就變了
	Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
	//按照每個 Lifecycle 的 Phase 值進行分組
	//如果實現了 Phased 介面就通過其 phase 方法返回得出 phase 值
	//如果沒有實現 Phased 介面則認為 Phase 是 0
	Map<Integer, LifecycleGroup> phases = new HashMap<>();
	lifecycleBeans.forEach((beanName, bean) -> {
		int shutdownPhase = getPhase(bean);
		LifecycleGroup group = phases.get(shutdownPhase);
		if (group == null) {
			group = new LifecycleGroup(shutdownPhase, this.timeoutPerShutdownPhase, lifecycleBeans, false);
			phases.put(shutdownPhase, group);
		}
		group.add(beanName, bean);
	});
	//如果不為空,證明有需要關閉的 Lifecycle,開始關閉
	if (!phases.isEmpty()) {
	    //按照 Phase 值倒序
		List<Integer> keys = new ArrayList<>(phases.keySet());
		keys.sort(Collections.reverseOrder());
		//挨個關閉
		for (Integer key : keys) {
			phases.get(key).stop();
		}
	}
}

總結起來,其實就是:

  1. 獲取當前 ApplicationContext 的 Beanfactory 中的所有實現了 Lifecycle 介面的 Bean。
  2. 讀取每個 Bean 的 Phase 值,如果這個 Bean 實現了 Phased 介面,就取介面方法返回的值,如果沒有實現就是 0.
  3. 按照 Phase 值將 Bean 分組
  4. 按照 Phase 值從大到小的順序,依次遍歷每組進行關閉
  5. 具體關閉每組的邏輯我們就不詳細看程式碼了,知道關閉的時候其實還看了當前這個 Lifecycle 的 Bean 是否還依賴了其他的 Lifecycle 的 Bean,如果依賴了,優先關掉被依賴的 Lifecycle Bean

我們來看下前面提到的優雅關閉相關的 WebServerGracefulShutdownLifecycle 的 Phase 是:

class WebServerGracefulShutdownLifecycle implements SmartLifecycle {
    ....
}

SmartLifecycle 包含了 Phased 介面以及默認實現:

public interface SmartLifecycle extends Lifecycle, Phased {
    int DEFAULT_PHASE = Integer.MAX_VALUE;
    @Override
	default int getPhase() {
		return DEFAULT_PHASE;
	}
}

可以看出,只要實現了 SmartLifecycle,Phase 默認就是最大值。所以優雅關閉的 Lifecycle: WebServerGracefulShutdownLifecycle 的 Phase 就是最大值,也就是屬於最先被關閉的那一組

總結接入點 – Spring Boot + Undertow & 同步 Servlet 環境

1. 接入點一 – 通過添加實現 SmartLifecycle 介面的 Bean,指定 Phase 比 WebServerGracefulShutdownLifecycle 的 Phase 小

前面的分析中,我們已經知道了:WebServerGracefulShutdownLifecycle 的 Phase 就是最大值,也就是屬於最先被關閉的那一組。我們想要實現的是在這之後加入一些優雅關閉的邏輯,同時在 Destroy Bean (前面提到的 ApplicationContext 關閉的第四步)之前(即 Bean 銷毀之前,某些 Bean 銷毀中就不能用了,比如微服務調用中的一些 Bean,這時候如果還有任務沒完成調用他們就會報異常)。那我們首先想到的就是加入一個 Phase 在這時候的 Lifecycle,在裡面實現我們的優雅關閉接入,例如:

@Log4j2
@Component
public class BizThreadPoolShutdownLifecycle implements SmartLifecycle {
    private volatile boolean running = false;
    
    @Override
    public int getPhase() {
        //在 WebServerGracefulShutdownLifecycle 那一組之後
        return SmartLifecycle.DEFAULT_PHASE - 1;
    }

    @Override
    public void start() {
        this.running = true;
    }

    @Override
    public void stop() {
        //在這裡編寫的優雅關閉邏輯
        this.running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

這樣實現兼容性比較好,並且升級底層框架依賴版本基本上不用修改。但是問題就是,可能會引入某個框架裡面帶 Lifecycle bean,雖然他的 Phase 是正確的,小於 WebServerGracefulShutdownLifecycle 的,但是 SmartLifecycle.DEFAULT_PHASE – 1 即等於我們自定義的 Lifecyce, 並且這個正好是需要等待我們的優雅關閉結束再關閉的,並且由於 Bean 載入順序問題導致框架的 Lifecycle 又跑到了我們自定義的 Lifecycle 前進行 stop。這樣就會有問題,但是問題出現的概率並不大。

2. 接入點二 – 通過反射向 Undertow 的 GracefulShutdownHandler 的 List<ShutdownListener> shutdownListeners 中添加 ShutdownListener 實現

這種實現方式,很明顯,限定了容器必須是 undertow,並且可能升級的兼容性不好。但是可以在 Http 執行緒池優雅關閉後立刻執行我們的優雅關閉邏輯,不用擔心引入某個依賴導致我們自定義的優雅關閉順序有問題。與第一種孰優孰劣,請大家自行判斷,簡單實現是:

@Log4j2
@Componenet
//僅在包含 Undertow 這個類的時候載入
@ConditionalOnClass(name = "io.undertow.Undertow")
public class ThreadPoolFactoryGracefulShutDownHandler implements ApplicationListener<ApplicationEvent> {
    
    //獲取操作 UndertowWebServer 的 gracefulShutdown 欄位的句柄
    private static VarHandle undertowGracefulShutdown;
    //獲取操作 GracefulShutdownHandler 的 shutdownListeners 欄位的句柄
    private static VarHandle undertowShutdownListeners;

    static {
        try {
            undertowGracefulShutdown = MethodHandles
                    .privateLookupIn(UndertowWebServer.class, MethodHandles.lookup())
                    .findVarHandle(UndertowWebServer.class, "gracefulShutdown",
                            GracefulShutdownHandler.class);
            undertowShutdownListeners = MethodHandles
                    .privateLookupIn(GracefulShutdownHandler.class, MethodHandles.lookup())
                    .findVarHandle(GracefulShutdownHandler.class, "shutdownListeners",
                            List.class);
        } catch (Exception e) {
            log.warn("ThreadPoolFactoryGracefulShutDownHandler undertow not found, ignore fetch var handles");
        }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        //僅處理 WebServerInitializedEvent 事件,這個是在 WebServer 創建並初始化完成後發出的事件
        if (event instanceof WebServerInitializedEvent) {
            WebServer webServer = ((WebServerInitializedEvent) event).getWebServer();
            //檢查當前的 web 容器是否是 UnderTow 的
            if (webServer instanceof UndertowWebServer) {
                GracefulShutdownHandler gracefulShutdownHandler = (GracefulShutdownHandler) undertowGracefulShutdown.getVolatile(webServer);
                //如果啟用了優雅關閉,則 gracefulShutdownHandler 不為 null
                if (gracefulShutdownHandler != null) {
                    var shutdownListeners = (List<GracefulShutdownHandler.ShutdownListener>) undertowShutdownListeners.getVolatile(gracefulShutdownHandler);
                    shutdownListeners.add(shutdownSuccessful -> {
                        if (shutdownSuccessful) {
                            //添加你的優雅關閉邏輯
                        } else {
                            log.info("ThreadPoolFactoryGracefulShutDownHandler-onApplicationEvent shutdown failed");
                        }
                    });
                }
            }
        }
    }
}

如何實現額外執行緒池的優雅關閉

現在我們知道如何接入了,那麼針對項目中的自定義執行緒池,如何把他們關閉呢?首先肯定是要先拿到所有要檢查的執行緒池,這個不同環境方式不同,實現也比較簡單,這裡不再贅述,我們假設拿到了所有執行緒池,並且執行緒池只有以下兩種實現(其實就是 JDK 中的兩種執行緒池,忽略定時任務執行緒池 ScheduledThreadPoolExecutor):

  • java.util.concurrent.ThreadPoolExecutor:最常用的執行緒池
  • java.util.concurrent.ForkJoinPool:ForkJoin 形式的執行緒池

針對這兩種執行緒池如何判斷他們是否已經沒有任務在執行了呢?參考程式碼:

public static boolean isCompleted(ExecutorService executorService) {
    if (executorService instanceof ThreadPoolExecutor) {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
        //對於 ThreadPoolExecutor,就是判斷沒有任何 active 的執行緒了
        return threadPoolExecutor.getActiveCount() == 0;
    } else if (executorService instanceof ForkJoinPool) {
        //對於 ForkJoinPool,複雜一些,就是判斷既沒有活躍執行緒,也沒有運行的執行緒,隊列裡面也沒有任何任務並且並沒有任何等待提交的任務
        ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;
        return forkJoinPool.getActiveThreadCount() == 0
                && forkJoinPool.getRunningThreadCount() == 0
                && forkJoinPool.getQueuedTaskCount() == 0
                && forkJoinPool.getQueuedSubmissionCount() == 0;
    }
    return true;
}

如何判斷所有執行緒池都沒有任務了呢?由於實際應用可能很放飛自我,比如執行緒池 A 可能提交任務到執行緒池 B,執行緒池 B 有可能提交任務到執行緒池 C,執行緒池 C 又有可能提交任務給 A 和 B,所以如果我們依次遍歷一輪所有執行緒池發現上面這個方法 isCompleted 都返回 true,也是不能保證所有執行緒池一定運行完了的(比如我依次檢查 A,B,C,檢查到 C 的時候,C 又提交任務到了 A 和 B 並結束,C 檢查發現任務都完成了,但是之前檢查過的 A,B 又有了任務未完成)。所以我的解決辦法是:打亂所有執行緒池,遍歷,檢查每個執行緒池是否完成,如果檢查發現都完成則計數器加 1,只要有未完成的就不加並清零計數器。不斷循環,每次循環 sleep 1 秒,直到計數器為 3(也就是連續三次按隨機順序檢查所有執行緒池都沒有任何任務):

List<ExecutorService> executorServices = 獲取所有執行緒池
for (int i = 0; i < 3; ) {
    //連續三次,以隨機亂序檢查所有的執行緒池都完成了,才認為是真正完成
    Collections.shuffle(executorServices);
    if (executorServices.stream().allMatch(ThreadPoolFactory::isCompleted)) {
        i++;
        log.info("all threads pools are completed, i: {}", i);
    } else {
        //連續三次
        i = 0;
        log.info("not all threads pools are completed, wait for 1s");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException ignored) {
        }
    }
}

RocketMQ-spring-starter 中是如何處理的

rocketmq 的官方 spring boot starter://github.com/apache/rocketmq-spring

其中是採用我們這裡說的第一種接入點方式,將消費者容器做成 SmartLifcycle(Phase 為最大值,屬於最優先的關閉組),在裡面加入關閉邏輯:

DefaultRocketMQListenerContainer

@Override
public int getPhase() {
    // Returning Integer.MAX_VALUE only suggests that
    // we will be the first bean to shutdown and last bean to start
    return Integer.MAX_VALUE;
}
@Override
public void stop(Runnable callback) {
    stop();
    callback.run();
}
@Override
public void stop() {
    if (this.isRunning()) {
        if (Objects.nonNull(consumer)) {
            //關閉消費者
            consumer.shutdown();
        }
        setRunning(false);
    }
}

微信搜索「我的編程喵」關注公眾號,加作者微信,每日一刷,輕鬆提升技術,斬獲各種offer
image
我會經常發一些很好的各種框架的官方社區的新聞影片資料並加上個人翻譯字幕到如下地址(也包括上面的公眾號),歡迎關註: