Sqlalchemy非同步操作不完全指北
非同步SQLAlchemy
SQLAlchemy作為一款通用
的Python Orm工具,在最近的版本也支援了非同步操作。但網上很多資料都不是很齊全,API也不是很好查詢的情況下,我便有了整理一份基礎文檔的想法。文章主要會以CRUD為入口,解決大家最基本的需求。
engine的區別
在普通的SQLAlchemy中,建立engine對象,我們會採用下面的方式:
from sqlalchemy import create_engine
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_recycle=1500)
而非同步的方式如下:
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(ASYNC_SQLALCHEMY_URI, pool_recycle=1500)
session的區別
我們一般用sessionmaker來建立session,不過非同步的有點區別:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
# 同步session
Session = sessionmaker(engine)
# 非同步session 區別在於需要指定對應的class_
async_session = sessionmaker(async_engine, class_=AsyncSession)
建立會話
我們還是以程式碼的形式展示:
# 同步
with Session() as session:
# 裡面是具體的sql操作
pass
# 非同步
async with Session() as session:
# 裡面是非同步的操作,區別就是從with變成了async with 也就意味著方法必須是async修飾的
pass
以上是關於建立連接,處理會話的一些區別,接著我們講對應的CRUD操作。
查詢
這裡依舊會給出新老版本的對比:
# 注意Session為同步Session,為了區分,非同步session為async_session
# model則為具體的Model類
# 非同步查詢方式
from sqlalchemy import select
async def query():
async with async_session() as session:
sql = select(model).where(model.id == 1)
print(sql) # 這裡可以列印出sql
result = await session.execute(sql)
# 第一條數據
data = result.scalars().first()
# 所有數據
# data = result.scalars().all()
# 同步查詢方式一
def query():
with Session() as session:
# 查詢id=1的第一條數據 result對應的就是model的實例 如果沒有則是None
result = session.query(model).filter_by(id=1).first()
# 查詢所有數據 result對應的數據為List[model],即model數組
# result = session.query(model).filter_by(name="zhangsan").all()
# 同步查詢方式二
def query():
with Session() as session:
# 查詢id=1的第一條數據 result對應的就是model的實例 如果沒有則是None
result = session.query(model).filter(model.id == 1).first()
# 查詢所有數據 result對應的數據為List[model],即model數組
# result = session.query(model).filter(model.name == "zhangsan").all()
新增
這裡開始就只講非同步的操作了。
async def insert(data):
async with async_session() as session:
async with session.begin():
session.add(data)
# 刷新自帶的主鍵
await session.flush()
# 釋放這個data數據
session.expunge(data)
return data
先說一下session.begin,這個你可以理解為一個事務操作,當採用session的begin方法後,你可以發現我們不需要調用commit方法也能把修改存入資料庫。
expunge方法,是用例釋放這個實例,SQLAlchemy有個特點,當你的session會話結束以後,它會銷毀你插入的這種臨時數據,你再想訪問這個data就訪問不了了。所以我們可以釋放這個數據。(expunge的作用)
編輯
一般編輯有2種方式:
- 查詢出對應的數據,在數據上修改
- 根據key-value的形式,修改對應數據的欄位
from sqlalchemy import select, update
# 方式一
async def update_record(model):
async with async_session() as session:
async with session.begin():
result = await session.execute(select(model).where(id=1))
now = result.scalars().first()
if now is None:
raise Exception("記錄不存在")
now.name = "李四"
now.age = 23
# 這裡測試過,如果去掉flush會導致數據不更新
await session.flush()
session.expunge(now)
return now
# 方式二
async def update_by_map():
async with async_session() as session:
async with session.begin():
# 更新id為1的數據,並把name改為李四 age改為23
sql = update(model).where(model.id == 1).values(name="李四", age=23)
await session.execute(sql)
刪除
刪除的話,軟刪除大家都是update,所以不需要多說,物理刪除的話,也有兩種方式:
- 查到以後刪除之
- 直接根據條件刪除(這種我沒有仔細研究,我選的是第一種方式,容錯率高點)
async def delete_by_id():
async with async_session() as session:
async with session.begin():
result = await session.execute(select(model).where(model.id == 2))
original = result.scalars().first()
if original is None:
raise Exception("記錄不存在")
# 如果是多條
# session.delete(original)
# for item in result:
# session.delete(item)
今天的非同步內容就整理到這裡,我個人覺得還是很實用的,希望對大家有幫助~~~