Flask解析(二):Flask-Sqlalchemy與多執行緒、多進程

  • 2019 年 11 月 3 日
  • 筆記

原創作者:flowell,轉載請標明出處:https://www.cnblogs.com/flowell/p/multiprocessing_flask_sqlalchemy.html


 

 

Sqlalchemy

  flask-sqlalchemy的session是執行緒安全的,但在多進程環境下,要確保派生子進程時,父進程不存在任何的資料庫連接,可以通過調用db.get_engine(app=app).dispose()來手動銷毀已經創建的engine,然後再派生子進程。


 

  最近線上的項目總是會報出資料庫連接相關的錯誤,比如“Command out of Sync”,“Mysql server has gone away”,“Lost databse connection”,“Package sequence out of order”等等,最終解決下來,發現以上錯誤可以分為兩種,一種是和連接丟失有關的,一種是和連接被多個執行緒(進程)同時使用了有關。

 

  我們項目基於flask,有多執行緒的場景,也有多進程的場景。orm用的是flask的拓展flask-sqlalchemy。flask-sqlalchemy的使用必須基於flask的app實例,也就是說要在app上下文中才能使用flask-sqlalchemy,所以在某些離線(非web)場景下,我們也用到了原生的Sqlalchemy。

 

  原生的Sqlalchemy的使用方式是

engine = create_engine(db_url)  Session = sessionmaker(bind=engine)  session = Session()
session.query(xxx)

  首先要創建一個engine,engine顧名思義就是和資料庫連接的引擎。在實際發起查詢前,是不會創建任何connection的。創建engine時可以通過指定poolclass參數來指定engine使用的連接池。默認是QueuePool,也可以設置為NullPool(不使用連接池)。為了方便理解,可以把engine視為管理連接池的對象。

 

  sqlalchemy中session和我們平時資料庫里說的session是兩個不同的概念,在平時資料庫中,session的生命周期從連接上資料庫開始,到斷開和資料庫的連接位置。但是sqlalchemy中的session更多的是一種管理連接的對象,它從連接池取出一個連接,使用連接,然後釋放連接,而自身也跟隨著銷毀。sqlalchemy中的Connection對象是管理真正資料庫連接的對象,真正的資料庫連接在sqlalchemy中是DBAPI。

 

  默認地,如果不傳入poolclass,則使用QueuePool(具有一定數量的連接池),如果不指定pool_recycle參數,則默認資料庫連接不會刷新。也就是說連接如果不適用,則一直不去刷新它。但是問題來了,在Mysql中,輸入“show variables like “%timeout%”; ” ,可以看到有一個waittimeout,還有interacttimeout,默認值為28800(8小時),這兩個值代表著,如果8個小時內某個資料庫連接都不和mysql聯繫,那麼就會斷掉這個連接。所以,8個小時過去了,Mysql把連接斷掉了,但是sqlalchemy客戶端這邊卻還保持著這個連接。當某個時候該連接從連接池被取出使用時,就會拋出“Mysql server has gone away”等連接丟失的資訊。

 

  解決這個問題的辦法很簡單,只要傳入pool_recycle參數即可。特別地,在flask-sqlalchemy中不會出現這種問題,因為falsk-sqlalchemy拓展自動地幫我們注入了pool_recycle參數,默認為7200秒。

 

def apply_driver_hacks(self, app, sa_url, options):          """This method is called before engine creation and used to inject          driver specific hacks into the options.  The `options` parameter is          a dictionary of keyword arguments that will then be used to call          the :func:`sqlalchemy.create_engine` function.          The default implementation provides some saner defaults for things          like pool sizes for MySQL and sqlite.  Also it injects the setting of          `SQLALCHEMY_NATIVE_UNICODE`.          """          if sa_url.drivername.startswith('mysql'):              sa_url.query.setdefault('charset', 'utf8')              if sa_url.drivername != 'mysql+gaerdbms':                  options.setdefault('pool_size', 10)                  options.setdefault('pool_recycle', 7200)  # 默認7200秒刷新連接          elif sa_url.drivername == 'sqlite':              pool_size = options.get('pool_size')              detected_in_memory = False              if sa_url.database in (None, '', ':memory:'):                  detected_in_memory = True                  from sqlalchemy.pool import StaticPool                  options['poolclass'] = StaticPool                  if 'connect_args' not in options:                      options['connect_args'] = {}                  options['connect_args']['check_same_thread'] = False                    # we go to memory and the pool size was explicitly set                  # to 0 which is fail.  Let the user know that                  if pool_size == 0:                      raise RuntimeError('SQLite in memory database with an '                                         'empty queue not possible due to data '                                         'loss.')              # if pool size is None or explicitly set to 0 we assume the              # user did not want a queue for this sqlite connection and              # hook in the null pool.              elif not pool_size:                  from sqlalchemy.pool import NullPool                  options['poolclass'] = NullPool                # if it's not an in memory database we make the path absolute.              if not detected_in_memory:                  sa_url.database = os.path.join(app.root_path, sa_url.database)            unu = app.config['SQLALCHEMY_NATIVE_UNICODE']          if unu is None:              unu = self.use_native_unicode          if not unu:              options['use_native_unicode'] = False            if app.config['SQLALCHEMY_NATIVE_UNICODE'] is not None:              warnings.warn(                  "The 'SQLALCHEMY_NATIVE_UNICODE' config option is deprecated and will be removed in"                  " v3.0.  Use 'SQLALCHEMY_ENGINE_OPTIONS' instead.",                  DeprecationWarning              )          if not self.use_native_unicode:              warnings.warn(                  "'use_native_unicode' is deprecated and will be removed in v3.0."                  "  Use the 'engine_options' parameter instead.",                  DeprecationWarning              )  

  

  sessionmaker是Session訂製方法,我們把engine傳入sessionmaker中,就可以得到一個session工廠,通過工廠來生產真正的session對象。但是這種生產出來的session是執行緒不安全的,sqlalchemy提供了scoped_session來幫助我們生產執行緒安全的session,原理類似於Local,就是代理session,通過執行緒的id來找到真正屬於本執行緒的session。

 

  flask-sqlalchemy就是使用了scoped_session來保證執行緒安全,具體的程式碼可以在Sqlalchemy中看到,構造session時,使用了scoped_session。

 

def create_scoped_session(self, options=None):          """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`          on the factory from :meth:`create_session`.          An extra key ``'scopefunc'`` can be set on the ``options`` dict to          specify a custom scope function.  If it's not provided, Flask's app          context stack identity is used. This will ensure that sessions are          created and removed with the request/response cycle, and should be fine          in most cases.          :param options: dict of keyword arguments passed to session class  in              ``create_session``          """            if options is None:              options = {}            scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)          options.setdefault('query_cls', self.Query)          return orm.scoped_session(              self.create_session(options), scopefunc=scopefunc          )        def create_session(self, options):          """Create the session factory used by :meth:`create_scoped_session`.          The factory **must** return an object that SQLAlchemy recognizes as a session,          or registering session events may raise an exception.          Valid factories include a :class:`~sqlalchemy.orm.session.Session`          class or a :class:`~sqlalchemy.orm.session.sessionmaker`.          The default implementation creates a ``sessionmaker`` for :class:`SignallingSession`.          :param options: dict of keyword arguments passed to session class          """            return orm.sessionmaker(class_=SignallingSession, db=self, **options)  

  

多進程和資料庫連接

  多進程環境下,要注意和資料庫連接相關的操作。


 

  說到多進程,python里最常用的就是multiprocessing。multiprocessing在windows下和linux的表現有所區別,在此只討論linux下的表現。linux下多進程通過fork()來派生,要理解我下面說的必須先弄懂fork()是什麼東西。粗略地說,每個進程都有自己的一個空間,稱為進程空間,每個進程的進程空間都是獨立的,進程與進程之間互不干擾。fork()的作用,就是將一個進程的進程空間,完完全全地copy一份,copy出來的就是子進程了,所以我們說子進程和父進程有著一模一樣的地址空間。地址空間就是進程運行的空間,這空間里會有進程已經打開的文件描述符,文件描述符會間接地指向進程已經打開的文件。也就是說,fork()之後,父進程,子進程會有相同的文件描述符,指向相同的一個文件。為什麼?因為文件是存在硬碟里的,fork()時copy的記憶體中的進程空間,並沒有把文件也copy一份。這就導致了,父進程,子進程,同時指向同一個文件,他們任意一個都可以對這個文件進行操作。這和本文說的資料庫有啥關係?順著這個思路想,資料庫連接是不是一個TCP連接?TCP連接是不是一個socket?socket在linux下是什麼,就是一個文件。所以說,如果父進程在fork()之前打開了資料庫連接,那麼子進程也會擁有這個打開的連接。

 

  兩個進程同時寫一個連接會導致數據混亂,所以會出現“Command out of sync”的錯誤,兩個進程同時讀一個連接,會導致一個進程讀到了,另一個沒讀到,就是“No result”。一個進程關閉了連接,另一個進程並不知道,它試圖去操作連接時,就會出現“Lost database connection”的錯誤。

 

  在此討論的場景是,父進程在派生子進程之前,父進程擁有已打開的資料庫連接。派生出子進程之後,子進程也就擁有了相應的連接。如果在fork()之前父進程沒有打開資料庫連接,那麼也不用擔心這個問題。比如Celery使用的prefork池,雖然是多進程模型,但是celery在派子進程前時不會打開資料庫連接的,所以不用擔心在celery任務中會出現資料庫連接混亂的問題。

 

   我做的項目里的多進程的場景之一就是使用tornado來跑web應用,在派生多個web應用實例時,確保此前創建的資料庫連接被銷毀。

 

app = Flask()  db = Sqlalchemy()  db.init_app(app)  ...  ...  db.get_engine(app=app).dispose()  # 先銷毀已有的engine,確保父進程沒有資料庫連接  ...  ...  fork()    # 派生子進程

# 例如
tornado.start()  # 啟動多個web實例進程