Springboot源碼分析之事務問題

  • 2019 年 10 月 3 日
  • 筆記

摘要:

事務在後端開發中無處不在,是數據一致性的最基本保證。要明白進事務的本質就是進到事務切面的代理方法中,最常見的是同一個類的非事務方法調用一個加了事務註解的方法沒進入事務。我們以cglib代理為例,由於Spring的對於cglib AOP代理的實現,進入被代理方法的時候實際上已經離開了「代理這一層殼子」,可以認為程式碼走到的是一個樸素的bean,調用同一個bean中方法自然與代理沒有半毛錢關係了。
一般對於聲明式事務都是以調用另一個類的加了@Transactional註解的public方法作為入口的。

spring事務關鍵處理流程

  • EnableTransactionManagement註解導入TransactionManagementConfigurationSelector
  • TransactionManagementConfigurationSelector載入InfrastructureAdvisorAutoProxyCreator(但不一定是它,一般都是AnnotationAwareAspectJAutoProxyCreator),BeanFactoryTransactionAttributeSourceAdvisorTransactionInterceptor
    AnnotationAwareAspectJAutoProxyCreatorioc流程一個關鍵步驟是查找Advisor,有兩個方面,第一是實現了Advisor介面的類,第二是基於註解Aspectj。關鍵是BeanFactoryTransactionAttributeSourceAdvisor被載入進了代理快取
  • 代理調用方法的時候會執行DefaultAdvisorChainFactory#getInterceptorsAndDynamicInterceptionAdvice,這個時候就會將我們的

BeanFactoryTransactionAttributeSourceAdvisor派上用處,最主要的還是它裡面的TransactionAttributeSourcePointcut進行匹配,執行TransactionInterceptor的方法

TransactionInterceptor

    @Override      @Nullable      public Object invoke(MethodInvocation invocation) throws Throwable {         // Work out the target class: may be {@code null}.         // The TransactionAttributeSource should be passed the target class         // as well as the method, which may be from an interface.         Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);           // Adapt to TransactionAspectSupport's invokeWithinTransaction...         return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);      }

TransactionAspectSupport

    @Nullable      protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,            final InvocationCallback invocation) throws Throwable {           // If the transaction attribute is null, the method is non-transactional.         TransactionAttributeSource tas = getTransactionAttributeSource();         final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);         final PlatformTransactionManager tm = determineTransactionManager(txAttr);         final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);           if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {            // Standard transaction demarcation with getTransaction and commit/rollback calls.            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);              Object retVal;            try {               // This is an around advice: Invoke the next interceptor in the chain.               // This will normally result in a target object being invoked.               retVal = invocation.proceedWithInvocation();            }            catch (Throwable ex) {               // target invocation exception               completeTransactionAfterThrowing(txInfo, ex);               throw ex;            }            finally {               cleanupTransactionInfo(txInfo);            }            commitTransactionAfterReturning(txInfo);            return retVal;         }           else {            final ThrowableHolder throwableHolder = new ThrowableHolder();              // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.            try {               Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {                  TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);                  try {                     return invocation.proceedWithInvocation();                  }                  catch (Throwable ex) {                     if (txAttr.rollbackOn(ex)) {                        // A RuntimeException: will lead to a rollback.                        if (ex instanceof RuntimeException) {                           throw (RuntimeException) ex;                        }                        else {                           throw new ThrowableHolderException(ex);                        }                     }                     else {                        // A normal return value: will lead to a commit.                        throwableHolder.throwable = ex;                        return null;                     }                  }                  finally {                     cleanupTransactionInfo(txInfo);                  }               });                 // Check result state: It might indicate a Throwable to rethrow.               if (throwableHolder.throwable != null) {                  throw throwableHolder.throwable;               }               return result;            }            catch (ThrowableHolderException ex) {               throw ex.getCause();            }            catch (TransactionSystemException ex2) {               if (throwableHolder.throwable != null) {                  logger.error("Application exception overridden by commit exception", throwableHolder.throwable);                  ex2.initApplicationException(throwableHolder.throwable);               }               throw ex2;            }            catch (Throwable ex2) {               if (throwableHolder.throwable != null) {                  logger.error("Application exception overridden by commit exception", throwableHolder.throwable);               }               throw ex2;            }         }      }

這次在分析這個方法,但是從事務的異常,不生效等角度來分析問題。註解事務和編程式都一樣的核心思想,下面我們來分析註解事務邏輯

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {                  // Standard transaction demarcation with getTransaction and commit/rollback calls.                  TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);                    Object retVal;                  try {                      // This is an around advice: Invoke the next interceptor in the chain.                      // This will normally result in a target object being invoked.                      retVal = invocation.proceedWithInvocation();                  }                  catch (Throwable ex) {                      // target invocation exception                      completeTransactionAfterThrowing(txInfo, ex);                      throw ex;                  }                  finally {              // 把上一層事務的TxInfo重新綁到ThreadLocal中                      cleanupTransactionInfo(txInfo);                  }                  commitTransactionAfterReturning(txInfo);                  return retVal;              }

請記住這幾個核心的方法邏輯順序和異常捕獲哦!

    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {              if (txInfo != null && txInfo.getTransactionStatus() != null) {                  if (logger.isTraceEnabled()) {                      logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +                              "] after exception: " + ex);                  }            //事務回滾的異常支援                  if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {                      try {                          txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());                      }                      catch (TransactionSystemException ex2) {                          logger.error("Application exception overridden by rollback exception", ex);                          ex2.initApplicationException(ex);                          throw ex2;                      }                      catch (RuntimeException | Error ex2) {                          logger.error("Application exception overridden by rollback exception", ex);                          throw ex2;                      }                  }                  else {                      // We don't roll back on this exception.                      // Will still roll back if TransactionStatus.isRollbackOnly() is true.                      try {                          txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());                      }                      catch (TransactionSystemException ex2) {                          logger.error("Application exception overridden by commit exception", ex);                          ex2.initApplicationException(ex);                          throw ex2;                      }                      catch (RuntimeException | Error ex2) {                          logger.error("Application exception overridden by commit exception", ex);                          throw ex2;                      }                  }              }          }

事務回滾的異常支援

    @Override      public boolean rollbackOn(Throwable ex) {         return (ex instanceof RuntimeException || ex instanceof Error);      }

注意點來了,僅支援運行時異常和錯誤機制,否則不予回滾。並進行直接條件。

AbstractPlatformTransactionManager

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {         try {           //默認false            boolean unexpectedRollback = unexpected;            try {              //回調TransactionSynchronization對象的beforeCompletion方法。               triggerBeforeCompletion(status);               if (status.hasSavepoint()) {                  if (status.isDebug()) {                     logger.debug("Rolling back transaction to savepoint");                  }                  status.rollbackToHeldSavepoint();               }              // 在最外層事務邊界進行回滾               else if (status.isNewTransaction()) {                  if (status.isDebug()) {                     logger.debug("Initiating transaction rollback");                  }                 // 由具體TxMgr子類實現回滾。                  doRollback(status);               }               else {                  // Participating in larger transaction                  if (status.hasTransaction()) {                    /*                       * 內層事務被標記為rollBackOnly或者globalRollbackOnParticipationFailure開關開啟時,給當前事務標記需要回滾。                       *                       * 如果內層事務顯式打上了rollBackOnly的標記,最終全事務一定是回滾掉的。                       *                       * 但如果沒有被打上rollBackOnly標記,則globalRollbackOnParticipationFailure開關就很重要了。                       * globalRollbackOnParticipationFailure開關默認是開啟的,也就是說內層事務掛了,最終的結果只能是全事務回滾。                       * 但如果globalRollbackOnParticipationFailure開關被關閉的話,內層事務掛了,外層事務業務方法中可以根據情況控制是否回滾。                       */                       if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {                        if (status.isDebug()) {                           logger.debug("Participating transaction failed - marking existing transaction as rollback-only");                        }                       // 由具體TxMgr子類實現回滾。                        doSetRollbackOnly(status);                     }                     else {                        if (status.isDebug()) {                           logger.debug("Participating transaction failed - letting transaction originator decide on rollback");                        }                     }                  }                  else {                     logger.debug("Should roll back transaction but cannot - no transaction available");                  }                  // Unexpected rollback only matters here if we're asked to fail early                  if (!isFailEarlyOnGlobalRollbackOnly()) {                     unexpectedRollback = false;                  }               }            }            catch (RuntimeException | Error ex) {               triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);               throw ex;            }      // 回調TransactionSynchronization對象的afterCompletion方法。            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);              // Raise UnexpectedRollbackException if we had a global rollback-only marker            if (unexpectedRollback) {               throw new UnexpectedRollbackException(                     "Transaction rolled back because it has been marked as rollback-only");            }         }         finally {            cleanupAfterCompletion(status);         }      }

案例分析

file

file

有經驗的同學肯定知道整個事務最終被回滾掉了, TransactionB#test並沒有執行System.out.println("TransactionB#test after");
其實對於Spring事務來說,這樣的結果是正確的,但對於開發者來說,這個結果確實看似有些「不能理解」。

我們不妨來分析一下原因:

首先TransactionB#test本身是直接拋出RuntimeException的,那麼退棧到事務切面後,事務切面會發現需要回滾但因為TransactionB#test還不是事務的最外層邊界,所以在AbstractPlatformTransactionManager#processRollback方法僅僅會調用doSetRollbackOnly(status);,子類DataSourceTransactionManager會拿出DefaultTransactionStatus中的transaction對象打上回滾標記,具體來說就是transaction對象(對於DataSourceTransactionManager來說類型是DataSourceTransactionObject)會取出ConnectionHolder,調用setRollbackOnly。我們知道這樣就相當於標記是一個全局的標記了,因為只要是隸屬於同一個物理事務的Spring事務都能夠讀到同一個ConnectionHolder

    protected void doSetRollbackOnly(DefaultTransactionStatus status) {              DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();              if (status.isDebug()) {                  this.logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only");              }        //關鍵點              txObject.setRollbackOnly();          }

回到上層事務切面,在AbstractPlatformTransactionManager#commit方法讀到if(!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly())條件成立,接下來調用processRollback,由於在事務最外層邊界會物理回滾掉,並且也正是到了事務最外層邊界,Spring拋出UnexpectedRollbackException

如何解決?

那麼問題怎麼解決呢,這個問題有好幾種解決辦法,但是得根據具體情況決定。

  • 根據實際程式碼與業務情況處理,如果內嵌事務註解取消,Spring也不會拋出UnexpectedRollbackException。但是方法實際上並沒有完整執行,所以這樣的解決思路很容易導致出現不完整的臟數據。

  • 手動控制是否回滾。如果不能接受內嵌事務掛掉的話,可以在catch塊里加上TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();用於顯式控制回滾。這樣Spring就明白你自己要求回滾事務,而不是unexpected了。Spring也不會拋出UnexpectedRollbackException了。那麼如果在上層事務中捕獲到異常,真的就是不想回滾,即便上層事務發生了異常,也想要最終提交整個事務呢?如果有這樣的需求的話,可以給事務管理器配置一個參數setGlobalRollbackOnParticipationFailure(false);

  • 如果isGlobalRollbackOnParticipationFailure為false,則會讓主事務決定回滾,如果當遇到exception加入事務失敗時,調用者能繼續在事務內決定是回滾還是繼續。然而,要注意是那樣做僅僅適用於在數據訪問失敗的情況下且只要所有操作事務能提交,這個方法也能解決,但顯然影響到全局的事務屬性,所以極力不推薦使用。

    public final void commit(TransactionStatus status) throws TransactionException {         if (status.isCompleted()) {            throw new IllegalTransactionStateException(                  "Transaction is already completed - do not call commit or rollback more than once per transaction");         }           DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;         if (defStatus.isLocalRollbackOnly()) {            if (defStatus.isDebug()) {               logger.debug("Transactional code has requested rollback");            }            processRollback(defStatus, false);            return;         }           if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {            if (defStatus.isDebug()) {               logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");            }            processRollback(defStatus, true);            return;         }           processCommit(defStatus);      }