Sqlalchemy非同步操作不完全指北

非同步SQLAlchemy

SQLAlchemy作為一款通用的Python Orm工具,在最近的版本也支援了非同步操作。但網上很多資料都不是很齊全,API也不是很好查詢的情況下,我便有了整理一份基礎文檔的想法。文章主要會以CRUD為入口,解決大家最基本的需求。

engine的區別

在普通的SQLAlchemy中,建立engine對象,我們會採用下面的方式:

from sqlalchemy import create_engine
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_recycle=1500)

而非同步的方式如下:

from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(ASYNC_SQLALCHEMY_URI, pool_recycle=1500)

session的區別

我們一般用sessionmaker來建立session,不過非同步的有點區別:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

# 同步session
Session = sessionmaker(engine)

# 非同步session 區別在於需要指定對應的class_
async_session = sessionmaker(async_engine, class_=AsyncSession)

建立會話

我們還是以程式碼的形式展示:

# 同步
with Session() as session:
  # 裡面是具體的sql操作
  pass
  
# 非同步
async with Session() as session:
    # 裡面是非同步的操作,區別就是從with變成了async with 也就意味著方法必須是async修飾的
    pass


以上是關於建立連接,處理會話的一些區別,接著我們講對應的CRUD操作。

查詢

這裡依舊會給出新老版本的對比:

# 注意Session為同步Session,為了區分,非同步session為async_session
# model則為具體的Model類

# 非同步查詢方式
from sqlalchemy import select


async def query():
    async with async_session() as session:
        sql = select(model).where(model.id == 1)
        print(sql) # 這裡可以列印出sql
        result = await session.execute(sql)
        # 第一條數據
        data = result.scalars().first()
        # 所有數據
        # data = result.scalars().all()


# 同步查詢方式一
def query():
    with Session() as session:
        # 查詢id=1的第一條數據 result對應的就是model的實例 如果沒有則是None
        result = session.query(model).filter_by(id=1).first()
        # 查詢所有數據 result對應的數據為List[model],即model數組
        # result = session.query(model).filter_by(name="zhangsan").all()

# 同步查詢方式二
def query():
    with Session() as session:
        # 查詢id=1的第一條數據 result對應的就是model的實例 如果沒有則是None
        result = session.query(model).filter(model.id == 1).first()
        # 查詢所有數據 result對應的數據為List[model],即model數組
        # result = session.query(model).filter(model.name == "zhangsan").all()

新增

這裡開始就只講非同步的操作了。

async def insert(data):
    async with async_session() as session:
        async with session.begin():
            session.add(data)
            # 刷新自帶的主鍵
            await session.flush()
            # 釋放這個data數據
            session.expunge(data)
            return data

先說一下session.begin,這個你可以理解為一個事務操作,當採用session的begin方法後,你可以發現我們不需要調用commit方法也能把修改存入資料庫。

expunge方法,是用例釋放這個實例,SQLAlchemy有個特點,當你的session會話結束以後,它會銷毀你插入的這種臨時數據,你再想訪問這個data就訪問不了了。所以我們可以釋放這個數據。(expunge的作用)

編輯

一般編輯有2種方式:

  • 查詢出對應的數據,在數據上修改
  • 根據key-value的形式,修改對應數據的欄位
from sqlalchemy import select, update

# 方式一
async def update_record(model):
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(select(model).where(id=1))
            now = result.scalars().first()
            if now is None:
                raise Exception("記錄不存在")
            now.name = "李四"
            now.age = 23
            # 這裡測試過,如果去掉flush會導致數據不更新
            await session.flush()
            session.expunge(now)
            return now

# 方式二
async def update_by_map():
    async with async_session() as session:
        async with session.begin():
            # 更新id為1的數據,並把name改為李四 age改為23
            sql = update(model).where(model.id == 1).values(name="李四", age=23)
            await session.execute(sql)

刪除

刪除的話,軟刪除大家都是update,所以不需要多說,物理刪除的話,也有兩種方式:

  • 查到以後刪除之
  • 直接根據條件刪除(這種我沒有仔細研究,我選的是第一種方式,容錯率高點)
async def delete_by_id():
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(select(model).where(model.id == 2))
            original = result.scalars().first()
            if original is None:
                raise Exception("記錄不存在")
            # 如果是多條
            # session.delete(original)
            # for item in result:
            #     session.delete(item)

今天的非同步內容就整理到這裡,我個人覺得還是很實用的,希望對大家有幫助~~~