SQLAlchemy完全入門

最近想要學習SQLAlchemy, 發現網上的中文文檔大多是機翻的, 讀起來特別變扭, 因此對照著最新的英文文檔梳理了一遍, 寫下來記錄一下
目前SQLAlchemy的版本為1.4.x, 風格處於1.x過渡到2.0的時代. 為了盡量讓這篇文章的兼容之後的版本, 本文將講述1.x和2.0兩種風格的介面(主要是查詢的介面)

其實在2.0風格中, 主要受到影響的是ORM的查詢方式, 詳情見文檔: 2.0 Migration – ORM Usage

安裝

pip install sqlalchemy

檢測sqlalchemy版本:

>>>import sqlalchemy
>>>sqlalchemy.__version__
'1.4.27'

使用步驟

一般來說SQLAlchemy的使用方式有兩種: CoreORM
兩種有什麼不同呢?

  1. ORM是構建在Core之上的
  2. Core更加底層, 可以執行直接執行SQL語句
  3. ORM類似於Django的ORM, 由於sqlalchemy提供了一套介面, 所以不需要我們直接寫SQL語句 (1.x版本)
  4. 至於要用哪個, 等到你用到時, 你會知道的

組件依賴關係圖:
組件依賴關係

Core

一般來說, 使用步驟如下:

  1. 配置資料庫連接
  2. 建立連接
  3. 創建表
  4. 執行SQL語句, 按需開啟事件是否自動提交
  5. 拿到返回數據, 執行其他程式碼

資料庫的連接的格式

我們在創建引擎(連接)時, 需要指定資料庫的URL, URL格式, 見: Engine Configuration
, 總的來說, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]

  • dialect 資料庫名稱(方言): 如mysql
  • driver 連接資料庫的庫: 如: pymysql
  • user 用戶名
  • password 密碼
  • host 地址
  • dbname 資料庫名稱
  • key=value 指的是給資料庫的參數

如下面的URL:

mysql+pymysql://root:[email protected]:3306/test_db?charset=utf8

建立連接

調用sqlalchemy.create_engine方法, 為了兼容2.0風格的介面, 可以加上future參數. 至於什麼是2.0風格的介面, 可以看看官方文檔: 2.0 style
create_engine有幾個參數需要我們注意:

  • url 即資料庫url, 其格式見上文: 資料庫的連接的格式
  • echo參數為True時, 將會將engine的SQL記錄到日誌中 ( 默認輸出到標準輸出)
  • echo_poolTrue時,會將連接池的記錄資訊輸出
  • future 使用2.0樣式EngineConnection API

更多參數見官方文檔: sqlalchemy.create_engine

例子

from sqlalchemy import create_engine

# 兼容2.0的寫法
# 返回對象不一樣
engine1 = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
print(type(engine1))
# <class 'sqlalchemy.future.engine.Engine'>

engine2 = create_engine("sqlite+pysqlite:///:memory:", echo=True)
print(type(engine2))
# <class 'sqlalchemy.engine.base.Engine'>

注意, 由於sqlalchemy使用lazy initialization的策略連接資料庫, 故此時還未真正地連接上資料庫

創建表

我們想要讓資料庫創建一個表, 需要利用MetaData對象, 關於一些常用的MetaData方法, 見: MetaData
除了要MetaData對象外, 我們還需要Table對象, 用於定義一個表的結構
Table的一般使用

mytable = Table("mytable", metadata,
    Column('mytable_id', Integer, primary_key=True),
    Column('value', String(50))
    )

Table的參數:

  • name 表名稱
  • metadata 該表所屬的MetaData對象
  • 其他參數: 通過Column指定一列數據, 格式見: Column定義

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不實例化

# 第二個表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    # 定義外鍵
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)
# 相當於執行 CREATE TABLE 語句
metadata_obj.create_all(engine)

"""
-- 相當於:
CREATE TABLE user_account (
    id INTEGER NOT NULL AUTO_INCREMENT, 
    username VARCHAR(30), 
    PRIMARY KEY (id)
);
CREATE TABLE address (
    id INTEGER NOT NULL AUTO_INCREMENT, 
    uid INTEGER NOT NULL, 
    email_address VARCHAR(32) NOT NULL, 
    PRIMARY KEY (id), 
    FOREIGN KEY(uid) REFERENCES user_account (id)
)
"""

create_all方法, 默認會在創建表之間檢測一下表是否存在, 不存在時才創建.

Table的一些屬性

# ---------- 訪問所有列
# .c  => Column
print(user_table.c.keys())
# ['id', 'username']

# ---------- 訪問某一列
print(repr(user_table.c.username))
# Column('username', String(length=30), table=<user>)

# ---------- 返回主鍵
print(user_table.primary_key)
# 隱式生成
# PrimaryKeyConstraint(Column('id', Integer(), table=<user>, primary_key=True, nullable=False))

在事務中執行SQL

通常, 我們通過調用engine.connectengine.begin方法開始一個事件
sqlalchemy使用事務有兩種風格commit as you goBegin once, 前者需要我們手動提交, 後者會自動提交

手動提交

engine.connect方法符合python的上下文管理協議, 會返回一個Connection對象, 該方法會在不手動提交的情況下回滾.舉個例子:

from sqlalchemy import create_engine
from sqlalchemy import text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 執行
    result = conn.execute(text("select 'hello world'")) # text 可以使用SQL語句
    print(result.all())
    # conn.commit()
    # [('hello world',)]
    
    # 最後會ROLLBACK

上面的程式碼中, 相當於開啟了事務, 由於最後沒有調用commit方法, 所以會回滾.

自動提交

engine.begin方法也符合python的上下文管理協議, 只要執行時不報錯就會自動提交, 報錯時會回滾.

from sqlalchemy import create_engine
from sqlalchemy import text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select 'hello world'"))
    print(result.all())
    # [('hello world',)]

    # COMMIT

綁定參數

上面在事務中執行SQL語句時, 我們用到了sqlalchemy.text, 可以直接定義文本SQL字元串
為了避免被SQL注入, 故在需要傳入參數的場景中需要根據sqlalchemy的方式傳入, 而不是直接拼接成字元串.
使用:y的格式定義參數, 且將值以字典的形式傳給execute

from sqlalchemy import create_engine
from sqlalchemy import text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"})
    print(result.all())
    # [('lczmx',)]

    # COMMIT

多個參數時, 可以這樣

with engine.connect() as conn:
    conn.execute(
        text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"),
        [{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}])
    conn.commit()

這種方式也可以

stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

with engine.connect() as conn:
    conn.execute(stmt)
    conn.commit()

增刪改查

處理使用text直接執行SQL外, 你還可以使用其他語法增刪改查數據
假如表結構如下:

$show create table address;
+---------+-----------------------------------------+
| Table   | Create Table                            |
+---------+-----------------------------------------+
| address | CREATE TABLE `address` (
  `id` int NOT NULL AUTO_INCREMENT,
  `uid` int NOT NULL,
  `email_address` varchar(32) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `uid` (`uid`),
  CONSTRAINT `address_ibfk_1` FOREIGN KEY (`uid`) REFERENCES `user_account` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=gbk |
+---------+------------------------------------------+


$show create table user_account;
+--------------+------------------------------------+
| Table        | Create Table                       |
+--------------+------------------------------------+
| user_account | CREATE TABLE `user_account` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=gbk |
+--------------+-------------------------------------+
1 row in set (0.00 sec)


插入數據

使用insert(...).values(...)形式為資料庫插入數據

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, insert

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不實例化

# 第二個表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 插入一條普通數據
    conn.execute(insert(user_table).values(id=1, username="lczmx"))
    # 插入外鍵等數據
    conn.execute(insert(address_table).values(uid=1, email_address="[email protected]"))

    # 自動生成value, 不需要我們手動指定

    conn.execute(insert(user_table),
                 [{"username": "張三"},
                  {"username": "李四"},
                  {"username": "王五"},
                  {"username": "趙六"},
                  ])

    conn.commit()

SQLAlchemy還提供了更複雜的用法, 見: Inserting Rows with Core

注意: 插入數據沒有返回值

刪除數據

使用delete(...).where(...)的形式刪除數據

目前的表數據:

select u.id as uid, u.username, a.id as aid, a.email_address as email_address 
from user_account as u 
left join  address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 張三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 趙六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, delete

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不實例化

# 第二個表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般刪除
    # user_table.c 獲取的是 列數據
    result1 = conn.execute(delete(user_table).where(user_table.c.id == 3))
    print(f"受影響行數: {result1.rowcount}")  # 受影響行數: 1

    # and 刪除
    result2 = conn.execute(delete(user_table).where(user_table.c.username == "張三", user_table.c.id == 2))
    print(f"受影響行數: {result2.rowcount}")  # 受影響行數: 1

    conn.commit()

.rowcount屬性獲取受影響的行數

更多見: The delete() SQL Expression Construct

更新數據

使用update(...).where(...).values(...)的形式更新數據

select u.id as uid, u.username, a.id as aid, a.email_address as email_address 
from user_account as u 
left join  address as a on u.id=a.uid;

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 張三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 趙六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, update, bindparam, select

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不實例化

# 第二個表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般更新
    result1 = conn.execute(update(user_table).where(
        user_table.c.username == "王五").values(username="王老五"))
    print(f"受影響行數: {result1.rowcount}")  # 受影響行數: 1

    # 更新數據 加上 原來的數據
    result2 = conn.execute(
        update(user_table).where(user_table.c.username == "趙六").values(
            username=user_table.c.username + "一號"))
    print(f"受影響行數: {result2.rowcount}")  # 受影響行數: 1

    # 以字典的形式, 替換更新多個值
    result3 = conn.execute(
        update(user_table).where(user_table.c.username == bindparam('old_name')).values(
            username=bindparam('new_name')),
        [
            {"old_name": "張三", "new_name": "新張三"},
            {"old_name": "李四", "new_name": "新李四"},
        ]
    )

    print(f"受影響行數: {result3.rowcount}")  # 受影響行數: 2

    # 以 子查詢 的方式 更新數據
    scalar_subq = (
        select(address_table.c.email_address).
            where(address_table.c.uid == user_table.c.id).
            order_by(address_table.c.id).
            limit(1).
            scalar_subquery()
    )
    # 將email_address的值 賦給 username
    update(user_table).values(username=scalar_subq)

    """
    -- 以上查詢, 相當於:
    UPDATE user_account SET username=(SELECT address.email_address
    FROM address
    WHERE address.uid = user_account.id ORDER BY address.id
    LIMIT :param_1)
    """
    conn.commit()

修改後的結果:

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 新張三   | NULL | NULL              |
|   3 | 新李四   | NULL | NULL              |
|   4 | 王老五   | NULL | NULL              |
|   5 | 趙六一號 | NULL | NULL              |
+-----+----------+------+-------------------+

更多見: Updating and Deleting Rows with Core

查詢數據

由於2.0的查詢方式, Core和ORM都可以使用, 所以放在一起, 見下文: 查詢數據詳解

處理查詢返回的數據

我們執行conn.execute方法的結果為: CursorResult對象
其本質上是繼承與Result對象, 其使用方式見: Result

例子:
假如查詢的表:

mysql> select * from user_account;
+----+----------+
| id | username |
+----+----------+
|  9 | lczmx    |
| 10 | jack     |
| 11 | tom      |
| 12 | mike     |
+----+----------+
4 rows in set (0.00 sec)

mysql>

利用SQLAlchemy獲取數據:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 執行
    result = conn.execute(text("select * from user_account;"))

    for row in result.all():
        # 使用f-strings 格式化字元串
        print(f"id: {row.id:3}, username: {row.username:20}")
    # 列印的結果:
    """
    id:   9, username: lczmx               
    id:  10, username: jack                
    id:  11, username: tom                 
    id:  12, username: mike  
    """
    conn.commit()

ORM

和Core一樣, ORM也有一定的使用步驟:

  1. 配置資料庫連接, 見上文: 資料庫的連接的格式
  2. 創建會話
  3. 創建表
  4. 使用介面, 增刪改查數據
  5. 拿到返回數據, 執行其他程式碼

在學習SQLAlcehmy的ORM之前, 建議先了解一些概念, 以免後面會混淆

  1. 會話 Session
    會話是SQLAlchemy ORM與資料庫的交互對象
    它可以管理建立連接engine, 並為通過會話載入或與會話關聯的對象提供標識映射 (identity map)
    在使用時與Connection非常相似, 你可以對比著使用

  2. Base
    通過sqlalchemy.orm.declarative_base創建
    作為定義表的基類, 內部有包含MetaData對象
    可以類似於Django一樣定義表

SQLAlchemy中, session是一個連接池, 的由其管理, 因此, 假如我們需要操作資料庫的話, 需要在session中拿到Connection(連接)

創建會話

SQLAlchemy提供了兩種創建會話的方法:

  1. sqlalchemy.orm.Session
    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session
    
    # 創建引擎
    engine = create_engine('postgresql://scott:tiger@localhost/')
    
    # 創建會話
    # 以下with可以簡寫成 with Session(engine) as session, session.begin():
    with Session(engine) as session:
        # 開啟自動提交
        with session.begin():
            # add方法 會將some_object 保存到資料庫
            # session.add(some_object)
            # session.add(some_other_object)
            pass
    
    
  2. sqlalchemy.orm.sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    # 創建引擎
    engine = create_engine('postgresql://scott:tiger@localhost/')
    
    # 創建session
    Session = sessionmaker(engine)
    
    # 一般使用
    with Session() as session:
        # session.add(some_object)
        # session.add(some_other_object)
        # 提交
        session.commit()
    
    # 自動提交
    with Session.begin() as session:
        # session.add(some_object)
        # session.add(some_other_object)
        pass
    
    

雖然有兩種方法創建會話, 但我們一般使用sessionmaker創建會話

另外補充一下session的其它使用方式:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://scott:tiger@localhost/')

Session = sessionmaker(engine)

# 從連接指定到session
with engine.connect() as connection:
    with Session(bind=connection) as session:
        # 一些操作
        pass

下面列出session的一些常用方法, 增刪改查數據時要用到

方法 參數 描述
add instance 下次刷新操作時, 將 instance 保留到資料庫中
delete instance 下次刷新操作時, 將instance從資料庫中刪除
begin subtransactions nested _subtrans 開始事務
rollback 回滾當前事務
commit 提交當前事務
close 關閉此Session
execute statement params execution_option bind_arguments 執行SQL表達式構造
query *entities **kwargs 返回Query對象, 可用於查詢數據
refresh instance attribute_names with_for_update instance執行刷新操作

例子:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1)
with Session(engine) as session:
    result = session.execute(stmt)
    print(result.all())
    # [(2, 'name2'), (3, 'name2')]
    
    # ROLLBACK
    

在ORM中創建表

使用ORM時, 我們也需要MetaData, 不同的是, 我們是通過sqlalchemy.orm.registry構造的. 而且, 我們不需要像Core那樣直接聲明Table, 而是繼承某個公共基類 (Base), 添加屬性即可. 有兩種方式定義基類.
方式一:

from sqlalchemy.orm import registry
mapper_registry = registry()
print(mapper_registry.metadata)  # MetaData對象
# 公共基類
Base = mapper_registry.generate_base()

方法二:

from sqlalchemy.orm import declarative_base

# 內部 return registry(...).generate_base(...)
Base = declarative_base()

現在你可以像在Django ORM中一樣, 定義表並在資料庫中創建表, 每一個Column表示一列數據, 關於Column的寫法, 見: Column定義

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年齡")
    gender = Column("gender", Boolean, nullable=False, comment="性別, True: 男, False: 女")


class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="課程教師")


class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教師名")


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

"""
-- 對於sql
CREATE TABLE student (
    sid INTEGER NOT NULL AUTO_INCREMENT, 
    name VARCHAR(32) NOT NULL COMMENT '姓名', 
    age SMALLINT NOT NULL COMMENT '年齡', 
    gender BOOL NOT NULL COMMENT '性別, True: 男, False: 女', 
    PRIMARY KEY (sid)
)
CREATE TABLE teacher (
    tid INTEGER NOT NULL AUTO_INCREMENT, 
    name VARCHAR(10) NOT NULL COMMENT '教師名', 
    PRIMARY KEY (tid)
)
CREATE TABLE course (
    cid INTEGER NOT NULL AUTO_INCREMENT, 
    name VARCHAR(10) NOT NULL COMMENT '科目名', 
    tid INTEGER COMMENT '課程教師', 
    PRIMARY KEY (cid), 
    FOREIGN KEY(tid) REFERENCES teacher (tid)
)
CREATE TABLE score (
    sid INTEGER NOT NULL AUTO_INCREMENT, 
    score SMALLINT NOT NULL COMMENT '成績', 
    student_id INTEGER COMMENT '成績所屬學生', 
    course_id INTEGER COMMENT '成績所屬科目', 
    PRIMARY KEY (sid), 
    FOREIGN KEY(student_id) REFERENCES student (sid), 
    FOREIGN KEY(course_id) REFERENCES course (cid)
)

"""

Base.metadataMetaData對象, 常用的MetaData方法見: MetaData

注: 你通過Student.__table__屬性可以查看Table, 也可以通過Student.name訪問某一列
你也可以通過__init__顯示定義某些列

增刪改查數據

插入數據

接上文 “在ORM中創建表” 中的表

1.x的介面與2.0的介面一樣, 都是調用session.add(instance)方法添加到資料庫 (add方法下次刷新操作時, 將instance保存到資料庫)
注意: 自動生成的數據, 在未插入到資料庫之前, 都為None, 如: 自動生成的主鍵

你也可以調用add_all(instance1, instance2, ...)方法, 區別只是插入一條和多條數據而已

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.orm import Session
from typing import Any

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年齡")
    gender = Column("gender", Boolean, nullable=False, comment="性別, True: 男, False: 女")


class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="課程教師")


class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教師名")


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)


# 一般將 添加到資料庫 封裝成一個函數
def create_data(db: Session, target_cls: Any, **kwargs):
    try:
        cls_obj = target_cls(**kwargs)
        # 添加一個
        db.add(cls_obj)
        # 添加多個:
        # db.add_all([obj1, obj2, ...])

        db.commit()
        # 手動將 數據 刷新到資料庫
        db.refresh(cls_obj)
        return cls_obj
    except Exception as e:
        # 別忘記發生錯誤時回滾
        db.rollback()
        raise e


session = SessionLocal()

# -------------- 創建學生數據
student = create_data(session, Student, sid=1, name="張三", age=22, gender=True)

# -------------- 創建教師數據
teacher = create_data(session, Teacher, tid=1, name="語文老師")

# -------------- 創建課程數據
course = create_data(session, Course, cid=1, name="語文", tid=teacher.tid)

# -------------- 創建成績數據
score = create_data(session, Score, sid=1, score=89, student_id=student.sid, course_id=course.cid)

注意: 自動生成主鍵時, 只有在刷新到資料庫中後, 才能獲取主鍵

總的來說, 插入數據程式碼一般為:

# 1. 實例化一個表類
db_city = CityTable(....)

# 2. 調用session的add方法
session.add(db_city)

# 3. 調用session的commit方法 提交事務
session.commit()

# 4. 手動調用session的refresh方法 將數據刷新到資料庫
session.refresh(db_city)

刪除數據

1.x的方法

主要步驟是先查詢再刪除, 一般形式為: session.query(...).filter(...).delete()

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 方法一: 調用 session.delete 方法
    s = session.query(Score).filter(Score.score == 59).first()
    session.delete(s)

    # 方法二: 查詢後直接刪除
    session.query(Score).filter(Score.score == 59).delete()
    session.commit()

2.0的方法

像Core一樣刪除數據, 即delte(...).where(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import select, delete

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    session.execute(
        delete(Score).where(Score.sid == 1)
    )
    session.commit()

修改數據

1.x的方法

主要步驟是先查詢再更新, 即: session.query(...).filter(...).update(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    row = session.query(Score).filter(Score.score == 59).update({"score": 60})

    print(f"修改的行數: {row}")
    session.commit()

2.0的方法

同樣和Core一樣, 使用update(...).where(...).values(...)的形式更新數據

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import update, bindparam

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成績")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成績所屬學生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成績所屬科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 一般更新
    result1 = session.execute(update(Score).where(Score.score == 59).values(score=60))
    print(f"受影響行數: {result1.rowcount}")

    # # 更新數據 加上 原來的數據
    result2 = session.execute(
        update(Score).where(Score.score == 59).values(score=Score.score + 1))
    print(f"受影響行數: {result2.rowcount}")  # 受影響行數: 1

    # 以字典的形式, 替換更新多個值
    result3 = session.execute(
        update(Score).where(Score.score == bindparam('old_score')).values(score=bindparam('new_score')),
        [
            {"old_score": 59, "new_score": 60},
        ]
    )

    print(f"受影響行數: {result3.rowcount}")

    session.commit()

同樣.rowcount屬性獲取受影響行數

查詢數據

1.x的方法

在SQLAlchemy1.x查詢方式中, 使用Query對象進行查詢, 類似於Django ORM的管理器, 可以較為簡單地查詢數據
假如要查詢的表如下:

class User(Base):
    __tablename__ = "User"  # 設置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '<User %r>' % self.username

表的數據:

mysql> select * from User;
+-----+----------+-------------------+------+
| uid | username | email             | tags |
+-----+----------+-------------------+------+
|   1 | 張三     | [email protected]  | 熱情 |
|   2 | 李四     | [email protected]       | 熱情 |
|   3 | 王五     | [email protected]     | 開朗 |
|   4 | lczmx    | [email protected] | 熱情 |
+-----+----------+-------------------+------+
4 rows in set (0.10 sec)

mysql> 

使用例子:

from sqlalchemy import Column, Integer, create_engine, String
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import not_, or_, desc

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class User(Base):
    __tablename__ = "User"  # 設置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '<User %r>' % self.username


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # ------------------  查詢所有User數據
    session.query(User).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`, 
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    """

    # ------------------  查詢有多少條數據
    session.query(User).count()
    """
    對應SQL
    SELECT count(*) AS count_1 FROM (SELECT `User`.uid AS `User_uid`,
    `User`.username AS `User_username`, `User`.email AS `User_email`,
    `User`.tags AS `User_tags` FROM `User`) AS anon_1
    """

    # ------------------  查詢第1條數據
    session.query(User).first()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 1
    """

    # ------------------  根據主鍵查詢
    session.query(User).get(1)
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1
    """

    # ------------------  簡單查詢, 使用 關鍵字實參 的形式來設置欄位名
    session.query(User).filter_by(uid=1).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`, 
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User` 
    WHERE `User`.uid = 1
    """
    
    # ------------------  複雜查詢, 可以多個表一起,使用 恆等式'==' 等形式 來設置條件
    session.query(User).filter(User.uid == 1).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1
    """
    
    # ------------------  filter 查詢開頭
    session.query(User).filter(User.username.startswith("l")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat('l', '%%'))
    """
    
    # ------------------  filter 查詢結尾
    session.query(User).filter(User.username.endswith("x")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat('%%', 'x'))
    """
    
    # ------------------  filter 查詢是否包含
    session.query(User).filter(User.username.contains("lcz")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat(concat('%%', "lcz", '%%')))
    """
    
    # ------------------  filter 模糊查詢
    session.query(User).filter(User.username.like("%cz%")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username LIKE "%cz%"
    """
    
    # ------------------  filter 條件取反 (not)
    session.query(User).filter(not_(User.username == "lczmx")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username != "lczmx"
    """
    
    # ------------------  filter條件 或 (or), 默認為and
    session.query(User).filter(
        or_(User.uid == 1, User.uid == 3), ).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1 OR `User`.uid = 3
    """
    
    # ------------------  filter條件 and or not 一起使用
    session.query(User).filter(or_(User.uid == 1, User.uid == 4), User.username == "lczmx",
                               not_(User.email == "[email protected]")).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.uid = 1 OR `User`.uid = 4)
    AND `User`.username = "lczmx" AND `User`.email = "[email protected]"
    """
    
    # ------------------   filter 取反查詢
    session.query(User).filter(User.username != "lczmx").all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username != "lczmx";
    """

    # ------------------  查詢uid為[1, 3, 5, 7, 9]的用戶
    session.query(User).filter(User.uid.in_([1, 3, 5, 7, 9])).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid IN (1, 3, 5, 7, 9)
    """
    
    # ------------------  分組查詢
    # !! 注意不是query(User), 因為Query(User)對應的SQL為:
    # SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    # `User`.email AS `User_email`, `User`.tags AS `User_tags`
    session.query(User.tags).group_by(User.tags).all()
    """
    對應SQL
    SELECT `User`.tags AS `User_tags` FROM `User` GROUP BY `User`.tags
    """
    
    # ------------------  排序 順序
    session.query(User).order_by(User.uid).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags`
    FROM `User` ORDER BY `User`.uid
    """
    
    # ------------------  排序 倒序
    session.query(User).order_by(desc(User.uid)).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    ORDER BY `User`.uid DESC
    """
    
    # ------------------ 去重
    session.query(User.tags).distinct().all()
    """
    對應SQL:
    SELECT DISTINCT `User`.tags AS `User_tags` FROM `User`;
    """

    # ------------------ 取幾條數據
    session.query(User).limit(2).all()
    """
    對應SQL:
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 2;
    """

    # ------------------ 跳過幾條個數據
    session.query(User).offset(1).limit(2).all()
    """
    對應SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 1, 2;
    """

關於query返回的對象
query的返回對象為sqlalchemy.orm.query.Query對象, 你可以與Result對象進行對比, 主要有以下的方法:

  • all()
    返回由表對象組成的列表
  • first()
    返回第一個結果 (表對象), 內部執行limit SQL
  • one()
    只返回一行數據或引發異常 (無數據時拋出: sqlalchemy.exc.NoResultFound, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)
  • one_or_none()
    最多返回一行數據或引發異常 (無數據時返回None, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)
  • scalar()
    獲取第一行的第一列數據. 如果沒有要獲取的行, 則返回None, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound

以上查詢不包含一些連表操作, 見: relationship連表操作

2.0的方法

2.0的返回結果也是Result對象, 關於Result對象, 見: Result

注意: 由於2.0的查詢方式, Core和ORM都可以使用, 所以放在一起, 見下文: 查詢數據詳解

relationship連表操作

我們自定義外鍵時, 一般的步驟是:

  1. 子表使用欄位名= Column(Integer, ForeignKey('主表名.主鍵'))的格式定義
  2. 除此外, 還需要在主表中定義relationship用於子表與主表之間的跨表查詢

完整例子:

from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)
    # Article是類名 user是反向訪問的屬性名稱
    article = relationship("Article", backref="user")
    """
    article = relationship("Article", backref="user")
    相當於:
    class User(Base):
        # Article是類名 user是反向訪問的屬性名稱
        article = relationship("Article", back_populates="user")
        
    class Article(Base):
        # User是類名 addresses是反向訪問的屬性名稱
        user = relationship("User", back_populates="addresses")

    """

    def __repr__(self):
        return "<User %s>" % self.username


class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "<User %s>" % self.title


Base.metadata.create_all(bind=engine)

關於relationship
relationship在定義外鍵時, 有非常重要的作用, 如: 1. 跨表操作 2. 設置刪除主表數據時子表的值
relationship的參數很多, 這裡只列出常用的幾個, 全部參數見文檔: relationship

  • back_populates
    指定反向訪問的屬性名稱

  • backref
    快捷設置兩個relationship (設置back_populates的話, 要設置兩個表)
    關係

  • cascade
    用於控制修改數據時的選項

    說明
    save-update 默認選項, 在添加一條數據的時候,會把其他和它相關聯的數據都添加到資料庫中。這種行為就是save-update屬性影響的
    delete 表示當刪除某一個模型中的數據的時候,是否也刪除掉使用relationship和它關聯的數據
    delete-orphan 表示當對一個ORM對象解除了父表中的關聯對象的時候,自己便會被刪除掉。當然如果表中的數據被刪除,自己也會被刪除。這個選項只能用在一對多上,不能用在多對多以及多對一上。並且還需要在子表中的relationship中,增加一個single_parent=True的參數
    merge 默認選項, 當在使用session.merge,合併一個對象的時候,會將使用了relationship相關聯的對象也進行merge操作
    expunge 移除操作的時候,會將相關聯的對象也進行移除。這個操作只是從session中移除,並不會真正的從資料庫中刪除
    all 是對save-updatemergerefresh-expireexpungedelete幾種的填寫

    比如:

    articles = relationship("Article",cascade="save-update,delete")
    
  • order_by
    子表列表的排序方式

    # 倒序
    article = relationship("Article", backref="user", order_by="Article.aid.desc()")
    # 正序
    article = relationship("Article", backref="user", order_by="Article.aid")
    

本部分包括Core與ORM的跨表增刪改查操作

select `user`.uid as uid, `user`.username, `user`.password,
 (select GROUP_CONCAT(`article`.title) from article 
   where `article`.author_id = `user`.uid) as article
 from user;

+-----+----------+----------+---------------+
| uid | username | password | article       |
+-----+----------+----------+---------------+
|   1 | 張三     | 12345    | C++入門,C入門 |
|   2 | 李四     | 12346    | python入門    |
+-----+----------+----------+---------------+

GROUP_CONCAT可以讓多行數據拼接成一行數據, 以,鏈接

通過relationship雙向訪問

即直接通過relationship訪問主表或子表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表訪問主表, 返回主表對象
    """
    實質上內部執行的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE user.uid = %(pk_1)s
    """
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 張三>
    # ----------- 2.0 方法
    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 張三>

    # +++++++++++++++++++++++++ 主表訪問子表, 返回子表列表
    # 實質是 sqlalchemy.orm.collections.InstrumentedList 對象
    # 是List的子類
    """
    實質上內部執行的SQL
    SELECT article.aid AS article_aid, article.title AS article_title,
    article.content AS article_content, article.author_id AS article_author_id
    FROM article
    WHERE %(param_1)s = article.author_id ORDER BY article.aid
    """
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)
    # [<User C++入門>, <User C入門>]
    # ----------- 2.0方法
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)
    # [<User C++入門>, <User C入門>]

通過relationship修改關聯關係

上面例子中說過, 主表.relationship欄位InstrumentedList對象 (類似於List), 我們可以修改它, 然後調用commit方法即可. 子表.relationship欄位是對應的主表, 可以修改為自己想要的, 同樣調用commit方法即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表修改屬於的主表
    """
    對於SQL
    UPDATE article 
    SET author_id=%(author_id)s 
    WHERE article.aid = %(article_aid)s
    """
    # ----------- 1.x方法
    lisi = session.query(User).get(2)

    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 張三>
    article_1x.user = lisi  # 改為 <User 李四>
    session.commit()  # 記得提交
    # ----------- 2.0 方法
    zhangsan = session.execute(select(User).where(User.uid == 1)).first()

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 李四>
    article_20.Article.user = zhangsan.User  # 改為 <User 張三>
    session.commit()  # 記得提交

    # +++++++++++++++++++++++++ 主表修改子表列表

    # ----------- 1.x方法 增加
    user_1x = session.query(User).first()
    # 添加一個新的子表數據
    # 會在Article中插入一條新的數據
    user_1x.article.append(Article(title="javascript 入門", content="console.log(hello world)"))
    session.commit()  # 記得提交

    # ----------- 2.0 方法 移除
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)  # [<User C++入門>, <User C入門>, <User javascript 入門>]

    article_js = session.execute(select(Article).where(Article.aid == 4)).scalar()
    # 從主表中移除與子表的關係
    user_20.User.article.remove(article_js)
    session.commit()  # 記得提交

通過relationship修改子/主表數據

同樣非常簡單, 找到對應的類, 然後修改數據並commit即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 通過子表修改對應主表的數據
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 張三>

    article_1x.user.username = "張三二號"
    session.commit()  # 記得提交
    # ----------- 2.0 方法

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 張三二號>

    article_20.Article.user.username = "張三"
    session.commit()  # 記得提交

    # +++++++++++++++++++++++++ 通過主表修改對應子表的數據
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)  # [<User C++入門>, <User C入門>, <User javascript 入門>]

    user_1x.article[-1].title = "js入門"
    session.commit()  # 記得提交
    # ----------- 2.0 方法
    user_20 = session.execute(select(User)).scalar()

    print(user_20.article)  # [<User C++入門>, <User C入門>, <User js入門>]
    
    user_20.article[-1].title = "javascript 入門"
    session.commit()  # 記得提交

通過relationship查詢數據

正向查詢, 子表利用主表的條件查詢, 使用has, 條件和普通查詢的條件一樣
反向查詢, 主表利用子表的條件查詢, 使用any, 條件和普通查詢的條件一樣

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 反向查詢
    """
    對應的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE EXISTS (SELECT 1
    FROM article
    WHERE user.uid = article.author_id AND article.title LIKE 'python%')
    """
    # ********* 查詢有以python開頭的Article的User
    # ----------- 1.x方法
    user_1x = session.query(User).filter(User.article.any(Article.title.like("python%")))
    print(user_1x.all())  # [<User 李四>]

    # ----------- 2.0方法
    user_20 = session.execute(
        select(User).where(User.article.any(Article.title.like("python%"))))
    print(user_20.all())  # [(<User 李四>,)]
    
    # +++++++++++++++++++++++++ 正向查詢
    """
    對應的SQL
    SELECT article.aid AS article_aid, article.title AS article_title, 
     article.content AS article_content, article.author_id AS article_author_id 
    FROM article 
    WHERE EXISTS (SELECT 1 
    FROM user 
    WHERE user.uid = article.author_id AND (user.username LIKE concat(concat('%%', '四', '%%')))
    """
    # ********* 查詢User表中username有 四 的Article
    # ----------- 1.x方法
    article_1x = session.query(Article).filter(Article.user.has(User.username.contains("四")))
    print(article_1x.all())  # [<User python入門>]

    # ----------- 2.0方法
    article_20 = session.execute(
        select(Article).where(Article.user.has(User.username.contains("四"))))
    print(article_20.all())  # [(<User python入門>,)]

建立多對多關係

可以通過relationship便捷使用多對多關係

from sqlalchemy import Table, Text, Column, ForeignKey

# 第三張表
post_keywords = Table("post_keywords", Base.metadata,
                      Column('post_id', ForeignKey('posts.id'), primary_key=True),
                      Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
                      )


class BlogPost(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    headline = Column(String(255), nullable=False)
    body = Column(Text)

    # 多對多關係 BlogPost<->Keyword
    keywords = relationship('Keyword',
                            secondary=post_keywords,
                            back_populates='posts')

    def __init__(self, headline, body):
        self.headline = headline
        self.body = body

    def __repr__(self):
        return "BlogPost(%r, %r)" % (self.headline, self.body)


class Keyword(Base):
    __tablename__ = 'keywords'
    id = Column(Integer, primary_key=True)
    keyword = Column(String(50), nullable=False, unique=True)
    # 多對多關係 Keyword<->BlogPost
    posts = relationship('BlogPost',
                         secondary=post_keywords,
                         back_populates='keywords')

    def __init__(self, keyword):
        self.keyword = keyword


Base.metadata.create_all(bind=engine)

添加操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 比如添加一篇文章, 為文章添加多個keyword
    blog = BlogPost(headline="起飛!", body="我是文章的內容")
    # 獲取子表列表 並 append
    blog.keywords.append(Keyword("新聞"))
    blog.keywords.append(Keyword("熱門"))
    session.add(blog)
    session.commit()
    # ------- 添加第二篇文章
    keyword1 = session.execute(
        select(Keyword).filter_by(keyword="熱門")
    ).scalar()
    new_blog = BlogPost(headline="震驚!", body="我是第二篇文章的內容")
    new_blog.keywords.append(keyword1)
    session.add(new_blog)
    session.commit()

查詢操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 查詢所有 "熱門" 文章
    blog = session.execute(
        select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "熱門"))
    ).all()

    print(blog)
    # [(BlogPost('起飛!', '我是文章的內容'),), 
    # (BlogPost('震驚!', '我是第二篇文章的內容'),)]

其他操作也像一般的relationship一樣操作

項目示範

一般來說, 我們使用SQLAlchemy的步驟大多相同, 下面
文件結構:

+--- database.py  # 用於 初始化session 和 公共基類
+--- models.py   # 定義表
+--- crud.py     # 封裝增刪改查的方法
+--- schemas.py  # 定義pydantic模型, 用于格式化已經取得的數據 [可選]
+--- main.py    # 執行主邏輯

database.py 初始化session和 公共基類:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

# 資料庫的URL
# 替換成自己的
SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'

# 創建引擎
engine = create_engine(
    # echo=True表示引擎將用repr()函數記錄所有語句及其參數列表到日誌
    SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True
)

# SessionLocal用於對數據的增刪改查
# flush()是指發送資料庫語句到資料庫,但資料庫不一定執行寫入磁碟;commit()是指提交事務,將變更保存到資料庫文件
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)

# 創建基本映射類, 用於創建表
Base = declarative_base(bind=engine, name='Base')

models.py 定義表結構

from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
# 導入公共基類
from .database import Base


class City(Base):
    __tablename__ = 'city'  

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    province = Column(String(100), unique=True, nullable=False, comment='省/直轄市')
    country = Column(String(100), nullable=False, comment='國家')
    country_code = Column(String(100), nullable=False, comment='國家程式碼')
    country_population = Column(BigInteger, nullable=False, comment='國家人口')
    data = relationship('Data', back_populates='city')  # 'Data'是關聯的類名;back_populates來指定反向訪問的屬性名稱

    created_at = Column(DateTime, server_default=func.now(), comment='創建時間')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新時間')

    __mapper_args__ = {"order_by": country_code}  # 默認是正序,倒序加上.desc()方法

    def __repr__(self):
        return f'{self.country}_{self.province}'


class Data(Base):
    __tablename__ = 'data'

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    city_id = Column(Integer, ForeignKey('city.id'), comment='所屬省/直轄市')  # ForeignKey里的字元串格式不是類名.屬性名,而是表名.欄位名
    date = Column(Date, nullable=False, comment='數據日期')
    confirmed = Column(BigInteger, default=0, nullable=False, comment='確診數量')
    deaths = Column(BigInteger, default=0, nullable=False, comment='死亡數量')
    recovered = Column(BigInteger, default=0, nullable=False, comment='痊癒數量')
    city = relationship('City', back_populates='data')  # 'City'是關聯的類名;back_populates來指定反向訪問的屬性名稱

    created_at = Column(DateTime, server_default=func.now(), comment='創建時間')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新時間')

    __mapper_args__ = {"order_by": date.desc()}  # 按日期降序排列

    def __repr__(self):
        return f'{repr(self.date)}:確診{self.confirmed}例'

crud.py 封裝增刪改查的方法:

from sqlalchemy.orm import Session

import models
import schemas


def get_city(db: Session, city_id: int):
    return db.query(models.City).filter(models.City.id == city_id).first()


def get_city_by_name(db: Session, name: str):
    return db.query(models.City).filter(models.City.province == name).first()


def get_cities(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.City).offset(skip).limit(limit).all()


def create_city(db: Session, city: schemas.CreateCity):
    """
    創建City數據
    """
    db_city = models.City(**city.dict())
    db.add(db_city)
    db.commit()
    db.refresh(db_city)
    return db_city


def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
    if city:
        # 外鍵關聯查詢,這裡不是像Django ORM那樣Data.city.province
        return db.query(models.Data).filter(models.Data.city.has(province=city))
    return db.query(models.Data).offset(skip).limit(limit).all()


def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
    """
    創建Data數據
    """
    db_data = models.Data(**data.dict(), city_id=city_id)
    db.add(db_data)
    db.commit()
    db.refresh(db_data)
    return db_data

schemas.py 定義pydantic模型, 用于格式化已經取得的數據:

from datetime import date as date_
from datetime import datetime

from pydantic import BaseModel


class CreateData(BaseModel):
    date: date_
    confirmed: int = 0
    deaths: int = 0
    recovered: int = 0


class CreateCity(BaseModel):
    province: str
    country: str
    country_code: str
    country_population: int


class ReadData(CreateData):
    id: int
    city_id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True


class ReadCity(CreateCity):
    id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True

main.py 執行主邏輯:

from sqlalchemy.orm import Session

import crud
import schemas

from database import engine, Base, SessionLocal
from models import City, Data

# 創建表, 已經存在的將被忽略
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# 調用 crud
db_city = crud.get_city_by_name(db, name="廣東省")
if db_city:
    raise Exception("City already registered")

# 創建數據
city = City(...)
crud.create_city(db=db, city=city)

查詢數據詳解

只有涉及SQL 的查詢數據, 必然繞不開FROM WHERE SELECT GROUP BY HAVING ORDER BY LIMIT INNER JOIN ... ON LEFT JOIN ... ON UNION 這些SQL查詢語法, 那麼它們在SQLAlchemy中是如何表示的呢? 實際上和SQL語句一樣, SQLAlchemy的語法也有select where join order_by group_by having 等 …

注意: 這些方法在Core與ORM中都適用

剛接觸可能會覺得比較複雜, 但是假如有SQL基礎的話, 用起來比較簡單.
要查詢的表結構為:

# +++++++++++++++++++ 使用Core定義 +++++++++++++++++++
from sqlalchemy import MetaData, create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text

#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user",
    metadata_obj,
    Column("uid", Integer, primary_key=True, autoincrement=True),
    Column("username", String(16), nullable=False),
    Column("password", String(32), nullable=False),
)

article_table = Table(
    'article',
    metadata_obj,
    Column("aid", Integer, primary_key=True, autoincrement=True),
    Column("title", String(36), nullable=False),
    Column("content", Text, nullable=False),
    Column("author_id", Integer, ForeignKey("user.uid"))
)

metadata_obj.create_all(bind=engine)


# +++++++++++++++++++ 使用ORM定義  +++++++++++++++++++
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".
                       format(**DATABASE_CONFIG), echo=True, future=True)


class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)

    # Article是類名 user是反向訪問的屬性名稱
    article = relationship("Article", backref="user")

    def __repr__(self):
        return "<User %s>" % self.username


class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "<User %s>" % self.title


Base.metadata.create_all(bind=engine)

表數據為:

mysql> select  * from article, user where user.uid=article.author_id;
+-----+------------+--------------------+-----------+-----+----------+----------+
| aid | title      | content            | author_id | uid | username | password |
+-----+------------+--------------------+-----------+-----+----------+----------+
|   1 | C++入門    | c++ hello world    |         1 |   1 | 張三     | 12345    |
|   2 | C入門      | c hello world      |         1 |   1 | 張三     | 12345    |
|   3 | python入門 | print(hello world) |         2 |   2 | 李四     | 12346    |
+-----+------------+--------------------+-----------+-----+----------+----------+
3 rows in set (0.12 sec)

select

該函數可以指定要查詢的表、列及添加別名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 一般查詢 ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password FROM user
WHERE user.uid = 2
"""
# -------------- Core
result_core_1 = conn.execute(select(user_table).where(user_table.c.uid == 2))
print(result_core_1.all())  # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid == 2))
print(result_orm_1.all())  # [(<User 李四>,)]

# 2 ++++++++++++++++++ 查詢全部列 ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user
"""
# -------------- Core
result_core_2 = conn.execute(select(user_table))
print(result_core_2.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_2 = session.execute(select(User))
print(result_orm_2.all())  # [(<User 張三>,), (<User 李四>,)]

# 3 ++++++++++++++++++ 查詢指定的列 ++++++++++++++++++
"""
對應SQL
SELECT user.username 
FROM user
"""
# -------------- Core
result_core_3 = conn.execute(select(user_table.c.username))
print(result_core_3.all())  # [('張三',), ('李四',)]
# -------------- ORM
result_orm_3 = session.execute(select(User.username))
print(result_orm_3.all())  # [('張三',), ('李四',)]

# 4 ++++++++++++++++++ 兩個表 聯合查詢 ++++++++++++++++++
"""
對應SQL
SELECT user.username, article.title 
FROM user, article 
WHERE user.uid = article.author_id
"""
# -------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).where(
        user_table.c.uid == article_table.c.author_id))
# [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
print(result_core_4.all())
# -------------- ORM
result_orm_4 = session.execute(select(User.username, Article.title).where(
    User.uid == Article.author_id))
# [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
print(result_orm_4.all())

# 5 ++++++++++++++++++ 為列添加別名 ++++++++++++++++++
"""
對應SQL
SELECT user.username AS name 
FROM user 
WHERE user.uid = 1
"""
# -------------- Core
result_core_5 = conn.execute(
    select((user_table.c.username).label("name")).where(
        user_table.c.uid == 1))
# print(result_core_5.all())  # [('張三',)]
# -------------- ORM
result_orm_5 = session.execute(
    select((User.username).label("name")).where(
        User.uid == 1))
# print(result_orm_5.all())  # [('張三',)]

# 利用別名 取值
print(result_core_5.first().name)  # 張三
print(result_orm_5.first().name)  # 張三

# 6 ++++++++++++++++++ 為表添加別名 ++++++++++++++++++
"""
對應SQL
SELECT user_1.username 
FROM user AS user_1 
WHERE user_1.uid = 1
"""
# -------------- Core
user_table_core_alias = user_table.alias()
result_core_6 = conn.execute(
    select(user_table_core_alias.c.username).where(
        user_table_core_alias.c.uid == 1))
print(result_core_6.all())  # [('張三',)]
# -------------- ORM
from sqlalchemy.orm import aliased

user_table_orm_alias = aliased(User)
result_orm_6 = session.execute(
    select(user_table_orm_alias.username).where(
        user_table_orm_alias.uid == 1))
print(result_orm_6.all())  # [('張三',)]

# 7 ++++++++++++++++++ 與text 結合, 添加額外列 ++++++++++++++++++
"""
對應SQL
SELECT now() AS now, user.username, '自定義字元' 
FROM user
"""

from sqlalchemy import literal_column, text

# literal_column表示一列數據
# text可以轉化成SQL

# -------------- Core
result_core_7 = conn.execute(
    select(literal_column("now()").label("now"), user_table.c.username, text("'自定義字元'")))
print(result_core_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 54, 44), '張三', '自定義字元'), 
(datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定義字元')]
"""
# -------------- ORM
result_orm_7 = session.execute(
    select(literal_column("now()").label("now"), User.username, text("'自定義字元'")))
print(result_orm_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 59, 42), '張三', '自定義字元'), 
(datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定義字元')]
"""

where

過濾數據, Coretable.c.xxx獲取行, 假如是ORM的話為:類名.屬性名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 條件默認為and ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid < 3 AND user.uid > 1
"""
# -------------- Core
result_core_1 = conn.execute(
    select(user_table).where(user_table.c.uid < 3, user_table.c.uid > 1))
print(result_core_1.all())  # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1))
print(result_orm_1.all())  # [(<User 李四>,)]

# 2 ++++++++++++++++++ 修改條件為not or ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四"
"""
from sqlalchemy import or_, not_

# -------------- Core
result_core_2 = conn.execute(
    select(user_table).where(
        or_(user_table.c.uid == 1, user_table.c.uid == 2),
        not_(user_table.c.username == "李四"),
    ))
print(result_core_2.all())  # [(1, '張三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(select(User).where(
    or_(User.uid == 1, User.uid == 2),
    not_(User.username == "李四")))
print(result_orm_2.all())  # [(<User 張三>,)]

# 3 ++++++++++++++++++ startswith ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE (user.username LIKE concat('張', '%%'))
"""

# -------------- Core
result_core_3 = conn.execute(
    select(user_table).where(
        user_table.c.username.startswith("張")))
print(result_core_3.all())  # [(1, '張三', '12345')]
# -------------- ORM
result_orm_3 = session.execute(select(User).where(
    User.username.startswith("張")))
print(result_orm_3.all())  # [(<User 張三>,)]

# 4 ++++++++++++++++++ endswith ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE (user.username LIKE concat('%%', '三'))
"""

# -------------- Core
result_core_4 = conn.execute(
    select(user_table).where(
        user_table.c.username.endswith("三")))
print(result_core_4.all())  # [(1, '張三', '12345')]
# -------------- ORM
result_orm_4 = session.execute(select(User).where(
    User.username.endswith("三")))
print(result_orm_4.all())  # [(<User 張三>,)]

# 5 ++++++++++++++++++ endswith ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE (user.username LIKE concat(concat('%%', '三', '%%'))
"""

# -------------- Core
result_core_5 = conn.execute(
    select(user_table).where(
        user_table.c.username.contains("三")))
print(result_core_5.all())  # [(1, '張三', '12345')]
# -------------- ORM
result_orm_5 = session.execute(select(User).where(
    User.username.contains("三")))
print(result_orm_5.all())  # [(<User 張三>,)]

# 6 ++++++++++++++++++ like ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.username LIKE '%三'
"""

# -------------- Core
result_core_6 = conn.execute(
    select(user_table).where(
        user_table.c.username.like("%三")))
print(result_core_6.all())  # [(1, '張三', '12345')]
# -------------- ORM
result_orm_6 = session.execute(select(User).where(
    User.username.like("%三")))
print(result_orm_6.all())  # [(<User 張三>,)]

# 7 ++++++++++++++++++ in ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid IN (1, 2)
"""

# -------------- Core
result_core_7 = conn.execute(
    select(user_table).where(
        user_table.c.uid.in_((1, 2))
    ))
print(result_core_7.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_7 = session.execute(
    select(User).where(
        User.uid.in_((1, 2))
    ))
print(result_orm_7.all())  # [(<User 張三>,), (<User 李四>,)]

# 8 ++++++++++++++++++ between ... and ... ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid BETWEEN 1 AND 2
"""

# -------------- Core
result_core_8 = conn.execute(
    select(user_table).where(
        user_table.c.uid.between(1, 2)
    ))
print(result_core_8.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_8 = session.execute(
    select(User).where(
        User.uid.between(1, 2)
    ))
print(result_orm_8.all())  # [(<User 張三>,), (<User 李四>,)]

# 9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid IS NOT NULL
"""

from sqlalchemy import not_

# -------------- Core
result_core_9 = conn.execute(
    select(user_table).where(
        # user_table.c.uid.is_(None),  # IS NULL
        not_(user_table.c.uid.is_(None)),  # IS NOT NULL
    ))
print(result_core_9.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_9 = session.execute(
    select(User).where(
        # User.uid.is_(None),  # IS NULL
        not_(User.uid.is_(None)),  # IS NOT NULL
    ))
print(result_orm_9.all())  # [(<User 張三>,), (<User 李四>,)]

除此之外, 你還可以使用一些其他運算符, 詳情見: Operator Reference

order_by

order_by用於排序

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 根據某一列排序 (默認升序) ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user ORDER BY user.uid
"""

# -------------- Core
result_core_1 = conn.execute(
    select(user_table).order_by(user_table.c.uid))
print(result_core_1.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(
    select(User).order_by(User.uid))
print(result_orm_1.all())  # [(<User 張三>,), (<User 李四>,)]

# 2 ++++++++++++++++++ 手動指定升序/降序 ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user ORDER BY user.uid ASC/DESC
"""

# -------------- Core
result_core_2 = conn.execute(
    # select(user_table).order_by(user_table.c.uid.asc())  # 升序
    select(user_table).order_by(user_table.c.uid.desc())  # 降序
)
print(result_core_2.all())
# [(1, '張三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '張三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(
    # select(User).order_by(User.uid.asc())  # 升序
    select(User).order_by(User.uid.desc())  # 降序
)
print(result_orm_2.all())
# [(<User 張三>,), (<User 李四>,)] / [(<User 李四>,), (<User 張三>,)]

# 3 ++++++++++++++++++ 根據別名排序 ++++++++++++++++++
"""
對應SQL
SELECT user.username AS name 
FROM user ORDER BY name DESC
"""
from sqlalchemy import desc, asc

# -------------- Core
result_core_3 = conn.execute(
    select((user_table.c.username).label("name")).order_by(desc("name"))
)
print(result_core_3.all())  # [('張三',), ('李四',)]

# -------------- ORM
result_orm_3 = session.execute(
    select((User.username).label("name")).order_by(desc("name"))
)
print(result_orm_3.all())  # [('張三',), ('李四',)]

group_by和having

group_by用於分組, having類似於where, 但可以對已分組數據使用聚合函數

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ group_by一般使用 ++++++++++++++++++
"""
對應SQL
SELECT count(article.author_id) AS count 
FROM article GROUP BY article.author_id
"""

# func 內有很多內置函數

# -------------- Core
result_core_1 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(article_table.c.author_id)
)
print(result_core_1.all())  # [(2,), (1,)]

# -------------- ORM
result_orm_1 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(Article.author_id)
)
print(result_orm_1.all())  # [(2,), (1,)]

# 2 ++++++++++++++++++ group by + having ++++++++++++++++++
"""
對應SQL
SELECT count(article.author_id) AS count 
FROM article GROUP BY article.author_id 
HAVING count(article.author_id) > 1
"""

# func 內有很多內置函數

# -------------- Core
result_core_2 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(
        article_table.c.author_id).having(func.count(article_table.c.author_id) > 1)
)
print(result_core_2.all())  # [(2,)]

# -------------- ORM
result_orm_2 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(
        Article.author_id).having(func.count(Article.author_id) > 1)
)
print(result_orm_2.all())  # [(2,)]

除了count外, 還有其他的方法, 詳情見: 內置函數

limit和offset

limit: 表示取幾條數據, offset: 表示要跳過多少條數據

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 僅使用LIMIT ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
LIMIT 1
"""

# -------------- Core
result_core_1 = conn.execute(select(user_table).limit(1))
print(result_core_1.all())  # [(1, '張三', '12345')]

# -------------- ORM
result_orm_1 = session.execute(select(User).limit(1))
print(result_orm_1.all())  # [(<User 張三>,)]

# 2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
LIMIT 1, 1
"""

# -------------- Core
result_core_2 = conn.execute(select(user_table).limit(1).offset(1))
print(result_core_2.all())  # [(2, '李四', '12346')]

# -------------- ORM
result_orm_2 = session.execute(select(User).limit(1).offset(1))
print(result_orm_2.all())  # [(<User 李四>,)]

去重

你可以在查詢的時候使用SQL進行去重, 亦可以在取到數據後進行去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++
"""
對應SQL
SELECT DISTINCT article.author_id 
FROM article
"""

# -------------- Core
result_core_1 = conn.execute(
    select(article_table.c.author_id).distinct()
)
print(result_core_1.all())  # [(1,), (2,)]
# -------------- ORM
result_orm_1 = session.execute(
    select(Article.author_id).distinct())
print(result_orm_1.all())  # [(1,), (2,)]

# 2 ++++++++++++++++++ 在結果中去重 ++++++++++++++++++
"""
對應SQL
SELECT article.author_id 
FROM article
"""

# -------------- Core
result_core_2 = conn.execute(select(article_table.c.author_id))
print(result_core_2.unique().all())  # [(1,), (2,)]

# -------------- ORM
result_orm_2 = session.execute(select(Article.author_id))
print(result_orm_2.unique().all())  # [(1,), (2,)]

連接查詢

內連接查詢 外連接查詢 完全連接查詢三種連接查詢方式, 在SQLAlchemy中分別對應著join(...) join(..., isouter=True) join(..., full=True)

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 內連接查詢 (自動推斷join的表) 方式一 ++++++++++++++++++
"""
對應SQL
SELECT user.username, article.title 
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_1 = conn.execute(
    select(user_table.c.username, article_table.c.title).join_from(user_table, article_table))
print(result_core_1.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
# -------------- ORM
result_orm_1 = session.execute(
    select(User.username, Article.title).join_from(User, Article))
print(result_orm_1.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]

# 2 ++++++++++++++++++ 內連接查詢 (自動推斷join的表) 方式二 ++++++++++++++++++
"""
對應SQL
SELECT user.username, article.title 
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_2 = conn.execute(
    select(user_table.c.username, article_table.c.title).join(article_table))
print(result_core_2.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
# -------------- ORM
result_orm_2 = session.execute(select(User.username, Article.title).join(Article))
print(result_orm_2.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]

# 3 ++++++++++++++++++ 內連接查詢 (手動指定join的表) ++++++++++++++++++
"""
對應SQL
SELECT user.username, article.title 
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_3 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(user_table).join(article_table))
print(result_core_3.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
# -------------- ORM
result_orm_3 = session.execute(
    select(User.username, Article.title).select_from(User).join(Article))
print(result_orm_3.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]

# 4 ++++++++++++++++++ 內連接查詢 (手動指定on的條件) ++++++++++++++++++
"""
對應SQL
SELECT user.username, article.title 
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(
        user_table).join(article_table, user_table.c.uid == article_table.c.author_id))
print(result_core_4.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]
# -------------- ORM
result_orm_4 = session.execute(
    select(User.username, Article.title).select_from(
        User).join(Article, User.uid == Article.author_id))
print(result_orm_4.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]

# 5 ++++++++++++++++++ 外連接查詢 ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password, article.title 
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_5 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, isouter=True))
print(result_core_5.all())
# [(1, '張三', '12345', 'C入門'), (1, '張三', '12345', 'C++入門'), (2, '李四', '12346', 'python入門')]
# -------------- ORM
result_orm_5 = session.execute(
    select(User, Article.title).join(Article, isouter=True))
print(result_orm_5.all())  # [(<User 張三>, 'C入門'), (<User 張三>, 'C++入門'), (<User 李四>, 'python入門')]

# 6 ++++++++++++++++++ 完全連接查詢 ++++++++++++++++++
# # !!! 注意: MYSQL 中沒有  FULL OUTER JOIN, 執行時會報錯
"""
對應SQL
SELECT user.uid, user.username, user.password, article.title 
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_6 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, full=True))
print(result_core_6.all())

# -------------- ORM
result_orm_6 = session.execute(
    select(User, Article.title).join(Article, full=True))
print(result_orm_6.all())

注意: 使用ORM的方式進行連表操作時, 可以通過relationship, 即不需要直接指定第二張表, 指定relationship

比如:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

"""
對應SQL
SELECT user.username, article.title 
FROM user INNER JOIN article ON user.uid = article.author_id
"""

result_orm = session.execute(
    select(User.username, Article.title).select_from(User).join(User.article))
print(result_orm.all())  # [('張三', 'C++入門'), ('張三', 'C入門'), ('李四', 'python入門')]

UNION和UNION ALL

UNION ALL: 合併兩個或多個SELECT語句的結果, 結果不會去重
UNION: 合併兩個或多個SELECT語句的結果, 結果會去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, union_all, union


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ UNION ALL ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid = 1 
UNION ALL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid = 2
"""

# -------------- Core
result_core_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_core_1_u = union_all(result_core_1_stmt1, result_core_1_stmt2)

result_core_1 = conn.execute(result_core_1_u)
print(result_core_1.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]

# -------------- ORM
result_orm_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_orm_1_u = union_all(result_orm_1_stmt1, result_orm_1_stmt2)

result_orm_1 = session.execute(result_orm_1_u)
print(result_orm_1.all())  # [(1, '張三', '12345'), (2, '李四', '12346')]

# 2 ++++++++++++++++++ UNION ++++++++++++++++++
"""
對應SQL
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid = 1 
UNION
SELECT user.uid, user.username, user.password 
FROM user 
WHERE user.uid = 1
"""

# -------------- Core
result_core_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_core_2_u = union(result_core_2_stmt1, result_core_2_stmt2)

result_core_2 = conn.execute(result_core_2_u)
print(result_core_2.all())  # [(1, '張三', '12345')]

# -------------- ORM
result_orm_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_u = union(result_orm_2_stmt1, result_orm_2_stmt2)

result_orm_2 = session.execute(result_orm_2_u)
print(result_orm_2.all())  # [(1, '張三', '12345')]

子查詢

即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
SELECT * FROM data WHERE EXISTS (SELECT name FROM user);的查詢
或使用WITH temp AS (...) 作為臨時表
注意: 在使用子查詢後,SQL語句的查詢性能變得非常糟糕, 至於如何取捨看個人權衡了

關於子查詢的優化, 見: 深入理解MySql子查詢IN的執行和優化

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func


conn = engine.connect()  # engine 在上面定義

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ EXISTS 子查詢 ++++++++++++++++++
"""
對應SQL
SELECT user.username 
FROM user 
WHERE EXISTS (SELECT count(article.author_id) AS count_1 
FROM article GROUP BY article.author_id 
HAVING count(article.author_id) >= 2)
"""

# -------------- Core
subquery_core_1 = (select(func.count(article_table.c.author_id)).group_by(
    article_table.c.author_id).having(
    func.count(article_table.c.author_id) >= 2)).exists()

result_core_1 = conn.execute(select(user_table.c.username).where(subquery_core_1))
print(result_core_1.all())  # [('張三',), ('李四',)]

# -------------- ORM
subquery_orm_1 = (select(func.count(Article.author_id)).group_by(
    Article.author_id).having(
    func.count(Article.author_id) >= 2)).exists()

result_orm_1 = session.execute(select(User.username).where(subquery_orm_1))
print(result_orm_1.all())  # [('張三',), ('李四',)]

# 2 ++++++++++++++++++ 其它 子查詢 ++++++++++++++++++
"""
對應SQL
SELECT article.title, anon_1.username 
FROM article, (SELECT user.uid AS uid, user.username AS username 
FROM user 
WHERE user.uid = 1) AS anon_1 
WHERE article.author_id = anon_1.uid
"""

# -------------- Core
subquery_core_2 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).subquery()

result_core_2 = conn.execute(
    select(article_table.c.title, subquery_core_2.c.username).where(
        article_table.c.author_id == subquery_core_2.c.uid))
print(result_core_2.all())  # [('C++入門', '張三'), ('C入門', '張三')]

# # -------------- ORM
subquery_orm_2 = select(User.uid, User.username).where(User.uid == 1).subquery()

result_orm_2 = session.execute(
    select(Article.title, subquery_orm_2.c.username).where(
        Article.author_id == subquery_orm_2.c.uid))
print(result_orm_2.all())  # [('C++入門', '張三'), ('C入門', '張三')]

# 3 ++++++++++++++++++ with 添加臨時表 ++++++++++++++++++
"""
對應SQL
WITH anon_1 AS 
(SELECT user.uid AS uid, user.username AS username 
FROM user 
WHERE user.uid = 1)
 SELECT article.title, anon_1.username 
FROM article, anon_1 
WHERE article.author_id = anon_1.uid
"""

# -------------- Core
subquery_core_3 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).cte()

result_core_3 = conn.execute(
    select(article_table.c.title, subquery_core_3.c.username).where(
        article_table.c.author_id == subquery_core_3.c.uid))
print(result_core_3.all())  # [('C++入門', '張三'), ('C入門', '張三')]

# -------------- ORM
subquery_orm_3 = select(User.uid, User.username).where(User.uid == 1).cte()

result_orm_3 = session.execute(
    select(Article.title, subquery_orm_3.c.username).where(
        Article.author_id == subquery_orm_3.c.uid))
print(result_orm_3.all())  # [('C++入門', '張三'), ('C入門', '張三')]

從1.x遷移到2.0的介面

一些Query的介面用起來還是特別好用的, 因此在2.0風格的介面中, 一些介面仍然可以使用

get根據主鍵查詢

注意: 這是session的方法

# ---------------- 1.x
session.query(User).get(42)
# ---------------- 2.0
session.get(User, 42)

filter_by簡單查詢

注意: 這是select的方法

result = session.execute(
    select(User).filter_by(username="張三")
)
print(result.all())  # [(<User 張三>,)]

filter複雜查詢

注意: 這是select的方法

result = session.execute(
    select(User).filter(User.username == "張三")
)
print(result.all())  # [(<User 張三>,)]

1.x與2.0的ORM介面對比

以下表格來自於: 2.0 Migration – ORM Usage
圖一
圖二

一些類的介紹

Result

表示從資料庫中返回的結果, 一行數據使用Row對象表示, 關於Row, 見: Row

從SQLAlchemy1.4開始, Core和ORM的結果(Result), 使用的介面相同

from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

# 導入公共基類
Base = declarative_base()
# 資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    # <class 'sqlalchemy.engine.cursor.CursorResult'>
    print(type(session_res))

with engine.connect() as conn:
    conn_res = conn.execute(
        text("select tid, name from teacher;")
    )
    # <class 'sqlalchemy.engine.cursor.CursorResult'>
    print(type(conn_res))

注意: 例子中的teacher表的數據為:

+-----+----------+
| tid | name     |
+-----+----------+
|   1 | 語文老師 |
|   2 | 英語老師 |
|   3 | 數學老師 |
+-----+----------+

Result的全部方法

  • unique(strategy=None)
    去重, 但需要注意何時調用, 應該在調用如.all()這種生成Row的方法之前調用, 否則返回的對象都沒有unique這個方法

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
    
        session_res = session_res1.merge(session_res2)
    
        # [1, 2, 3, 1, 2, 3] ==> [1, 2, 3]
        print(session_res.scalars().unique().all())
    
    
  • all
    返回所有Row數據的列表, 之後的調用將返回一個空列表

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
       "username": "root",
       "password": "123456",
       "host": "localhost",
       "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                          echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
       session_res = session.execute(
           text("select tid, name from teacher;")
       )
    
       # ---------------------------------- 第一次調用all
       first_all = session_res.all()
       print(type(session_res.all()))  # <class 'list'>
       for row in first_all:
           print(type(row))  # <class 'sqlalchemy.engine.row.Row'>
           print(f"{row.tid}-{row.name}")
    
       # ---------------------------------- 第二次調用all
       print(session_res.all())  # []
    
    
  • fetchall()
    all方法一樣

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        first_fetchall = session_res.fetchall()
        second_fetchall = session_res.fetchall()
        # [(1, '語文老師'), (2, '英語老師'), (3, '數學老師')]
        print(first_fetchall)
        # []
        print(second_fetchall)
    
    
  • fetchmany(size=None)
    取多行數據, size表示取多少行數據 , 當所有行都用完時, 返回一個空列表

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        # 一共3行數據
    
        # 取兩行數據: [(1, '語文老師'), (2, '英語老師')]
        print(session_res.fetchmany(2))
    
        # 取一行數據: [(3, '數學老師')]
        print(session_res.fetchmany(1))
    
        # 沒有數據了, 返回空列表: []
        print(session_res.fetchmany(1))
    
  • fetchone()
    取一行數據, 當所有行都用完時,返回None

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
       "username": "root",
       "password": "123456",
       "host": "localhost",
       "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                          echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
       session_res = session.execute(
           text("select tid, name from teacher;")
       )
       while True:
           row = session_res.fetchone()
           if not row:
               break
           print(row)
           """
           (1, '語文老師')
           (2, '英語老師')
           (3, '數學老師')
           """
    
    
  • first()
    獲取第一行數據,關閉Result並丟棄其餘行, 如果沒有行,則不獲取 (即返回值為None)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        # (1, '語文老師')
        print(session_res.first())
    
        # !!! 由於Result已經關閉, 繼續操作會報錯:
        # sqlalchemy.exc.ResourceClosedError: This result object is closed.
        print(session_res.first())
    
    
  • one()
    只返回一行數據或引發異常, 並關閉Result (無數據時拋出: sqlalchemy.exc.NoResultFound, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        # (1, '語文老師')
        print(session_res.one())
    
    
  • one_or_none()
    最多返回一行數據或引發異常, 並關閉Result (無數據時返回None, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid < 1;")
        )
        # None
        print(session_res1.one_or_none())
    
        session_res2 = session.execute(
            text("select tid, name from teacher where tid = 1;")
        )
        # (1, '語文老師')
        print(session_res2.one_or_none())
    
    
  • columns(*col_expressions)
    限制返回列, 也可以對列進行重新排序
    即假如結果的列為(a, b, c, d), 但我只需要a和b, 那麼只需要result.columns("a", "b"), 你還可以調整它們的順序, 以方便解包
    注意: 這會修改Result的列, 且該方法的返回值就是修改後的Result對象

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
    
        for row in session_res1.columns("tid"):
            # 返回值時修改後的Result
    
            # 獲取欄位名元組
            print(row._fields)  # ('tid',)
    
        # 會修改原Result
        session_res2.columns("name")
        for row in session_res2:
            # 獲取欄位名元組
            print(row._fields)  # ('name',)
            
    
  • scalar()
    獲取第一行的第一列數據, 並關閉Result. 如果沒有要獲取的行, 則返回None
    如: [(1, "lczmx"), (2, "jack")], 返回 1

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        print(session_res.scalar())  # 1
    
  • scalar_one()
    只返回一行數據的第一列或引發異常, 並關閉Result (無數據時拋出: sqlalchemy.exc.NoResultFound, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)
    Result.one() + Result.scalar()

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        print(session_res.scalar_one())  # 1
        
    
  • scalar_one_or_none()
    最多返回一行數據的第一列或引發異常, 並關閉Result (無數據時返回None, 多行數據時拋出: sqlalchemy.exc.MultipleResultsFound)
    Result.one_or_none() + Result.scalar()

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid<1;")
        )
    
        print(session_res1.scalar_one_or_none())  # None
    
        session_res2 = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        print(session_res2.scalar_one_or_none())  # 1
    
    
  • scalars(index=0)
    返回一個ScalarResult對象, 該對象以每行數據的 第index列 元素作為數據 (而不是ResultRow)
    該對象的方法有: unique partitions fetchall fetchmany all first one_or_none one
    具體使用與Result類似

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        # [1, 2, 3]
        print(session_res.scalars().all())
    
    
  • mappings()
    返回一個MappingResult對象, MappingResult對象與Result對象類似, 但是一行數據使用RowMapping對象表示, RowMapping對象類似於字典對象, 簡而言之: 調用該方法, 你可以將一行數據由類元組變為類字典

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        for d in session_res.mappings():
            # 像操作字典一樣操作 d 即可
            print(d.get("tid"), d.get("name"))
    
    
  • keys()
    從SQLAlchemy1.4起 (之前的版本返回一個列表), 該方法將返回一個RMKeyView對象, 該對象可迭代, 其_keys屬性存放列的名稱, 由於實現的__contains__方法, 因此也可以使用in運算符作判斷.

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    from collections import Iterable
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        keys = session_res.keys()
        print(keys)  # RMKeyView(['tid', 'name'])
    
        # <class 'sqlalchemy.engine.result.RMKeyView'>
        print(type(keys))
    
        # 可迭代的
        print(isinstance(keys, Iterable))  # True
    
        if "name" in keys:
            print("name in keys")
    
    
  • freeze()
    可以對Result進行快取, 見官方文檔: Re-Executing Statements

  • merge(*others)
    該方法合併其他Result, 返回一個MergedResult對象, 你可以像一般的Result一樣操作它, 但是取值的時候注意游標的位置(MergedResult關閉, Result也關閉; MergedResult取完了值, Result的值也被取完)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher where tid=2;")
        )
    
        session_res = session_res2.merge(session_res1)
    
        # 注意 先session_res2再session_res1
    
        # (2, '英語老師')
        print(session_res.fetchone())
    
        # [(1, '語文老師')]
        print(session_res1.all())
    
        # session_res已經取過一次
        # 所以返回: []
        print(session_res2.all())
    
    
  • partitions(size=None)
    迭代生成size大小的行的子列表, sizeNone時調用Result.yield_per(), 否則調用Result.fetchmany(size)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        # 每次迭代 取 1行 數據
        for i in session_res1.partitions():
            print(i)
            """
            [(1, '語文老師')]
            [(2, '英語老師')]
            [(3, '數學老師')]
            """
    
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
        # 每次迭代 取 2行 數據
        for i in session_res2.partitions(2):
            print(i)
            """
            [(1, '語文老師'), (2, '英語老師')]
            [(3, '數學老師')]
            """
        # 已經迭代完了, 就沒有值了
        print(list(session_res2.partitions()))  # []
    
    
  • yield_per(num)
    迭代num行數據, 返回的是Result對象

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 導入公共基類
    Base = declarative_base()
    #  資料庫配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 連接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        # [(1, '語文老師'), (2, '英語老師'), (3, '數學老師')]
        print(session_res.yield_per(1).all())
    
    
  • close
    關閉此Result, 再操作的話會拋出異常: sqlalchemy.exc.ResourceClosedError: This result object is closed.

Row

一般來說, 一行數據是一個Row對象
常用的屬性或方法:

屬性/方法 屬性或方法 描述
_asdict 方法 返回欄位名與值的字典數據 並添加到 _mapping屬性
_fields 屬性 返回字元名的元組
_mapping 屬性 返回欄位名與值的字典數據

一般使用:
注意: 我們每一次迭代Result對象, 得到的是Row對象

通過屬性取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    y = row.y

    # illustrate use with Python f-strings
    print(f"Row: {row.x} {row.y}")
    

在ORM中, select(Article)的屬性是Article, select(Article.title)的屬性是title

通過元組解包

result = conn.execute(text("select x, y from some_table"))

for x, y in result:
    # ...
    

通過索引取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    x = row[0]
      

MetaData

MetaData是包含TableEngine的對象, 也就是說它主要是用來管理Table (表)的
下面列出MetaData對象的一些方法

方法 參數 描述
clear 清除此元數據中的所有表對象
create_all(bind=None, tables=None, checkfirst=True) bind: 資料庫Engine tables: Table對象列表 checkfirst: 是否 僅不存在表時 創建 在資料庫中創建 元數據中的所有表
drop_all(bind=None, tables=None, checkfirst=True) bind: 資料庫Engine tables: Table對象列表 checkfirst: 是否 僅存在表時 刪除 在資料庫中刪除 元數據中存儲的所有表
remove(table) table: 表對象 從此元數據中刪除給定的表對象
tables tables是屬性, 無參數 返回Table的欄位對象

內置函數

常用的SQL函數:

函數名 對應的SQL函數 描述
max MAX 返回一組值中的最大值
min MIN 返回一組值中的最小值
count COUNT 返回匹配指定條件的行數, 沒有參數時為: COUNT(*)
sum SUM 計算一組值的總和
rank RAND 產生 0 至 1 之間的隨機數
concat CONCAT 多個字元串連接為一個字元串
char_length CHAR_LENGTH 計算字元串字元數
coalesce COALESCE 接受一系列的表達式或列, 返回第一個非空的值
session_user SESSION_USER 返回當前連接的當前用戶名和主機名, 形如: root@localhost
user USER 返回連接的當前用戶名和主機名, 形如: root@localhost
current_user CURRENT_USER 返回用戶名和主機名, 形如: root@localhost
current_date CURRENT_DATE 函數返回當前日期, 格式: YYYY-MM-DD
current_time CURRENT_TIME 返回當前時間, 格式: HH-MM-SS
current_timestamp CURRENT_TIMESTAMP 返回當前日期和時間, 格式: YYYY-MM-DD HH-MM-SS
localtime LOCALTIME 返回當前日期和時間, 格式: YYYY-MM-DD HH-MM-SS
localtimestamp LOCALTIMESTAMP 返回當前日期和時間, 格式: YYYY-MM-DD HH-MM-SS
now NOW 返回當前日期和時間, 格式: YYYY-MM-DD HH-MM-SS
sysdate SYSDATE 返回當前日期和時間, 格式: YYYY-MM-DD HH:MM:SS
array_agg ARRAY_AGG PostgreSql可用, 把表達式變成一個數組
dense_rank DENSE_RANK 用於排名, 見: MySQL DENSE_RANK() 函數
percent_rank PERCENT_RANK 計算分區或結果集中行的百分位數排名
Function 描述一個SQL函數, 見: Function
GenericFunction 見: GenericFunction
grouping_sets GROUPING SETS 定義分組集, 見: SQL Grouping Sets運算符
rollup ROLLUP 生成小計和總計, 見: MySQL ROLLUP
cube 見: cube
cume_dist 見: cume_dist

全部函數見: SQL and Generic Functions

使用例子:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy import select, func

# 導入公共基類
Base = declarative_base()
#  資料庫配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 連接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)


class Teacher(Base):
    __tablename__ = "teacher"

    tid = Column("tid", Integer, primary_key=True, autoincrement=True)

    name = Column("name", String(10), nullable=False, comment="教師名")


with Session() as session:
    session_res = session.execute(
        select(func.count()).select_from(Teacher)
    )
    # (3,)
    print(session_res.one())

Column定義

一個Column即表的一列數據, 和我們用SQL語句定義一列數據一樣, 參數主要包括: 欄位名、欄位類型、約束條件, 比如:

from sqlalchemy import Column, String

Column("name", String(30), unique=True, nullable=False, comment='姓名')

欄位類型, 一般你可以用直接指定資料庫的欄位類型, 也可以讓SQLAlchemy的DDL自動選擇欄位類型
直接使用資料庫的欄位類型

欄位類型 描述
ARRAY(item_type, ...) 數組類型, 目前只支援PostgreSQL, 因此建議用: sqlalchemy.dialects.postgresql.ARRAY
BIGINT SQL BIGINT類型
BINARY(length) SQL BINARY類型
BLOB SQL BLOB類型
BOOLEAN SQL布爾類型
CHAR SQLCHAR類型
CLOB SQL CLOB型
DATE SQL DATE期類型
DATETIME SQL DATETIME類型
DECIMAL SQL DECIMAL類型
FLOAT SQL FLOAT類型
INT sqlalchemy.sql.sqltypes.INTEGER的別名
INTEGER SQL INT或INTEGER類型
JSON SQL JSON類型
NCHAR SQL NChar類型
NUMERIC SQL NUMERIC類型
NVARCHAR SQL NVARCHAR類型
REAL SQL REAL類型
SMALLINT SQL SMALLINT類型
TEXT SQL TEXT類型
TIME SQL TIME類型
TIMESTAMP SQL TIMESTAMP類型
VARBINARY SQLVARBINARY類型
VARCHAR SQL VARCHAR類型

關於SQL的數據類型, 見: SQL 數據類型

自動轉化的欄位類型

欄位類型 描述 通常對應的欄位類型
Boolean 布爾數據類型 Boolean或SMALLINT
String 所有字元串和字元類型的基 VARCHAR
Text 大小可變的字元串類型 CLOB或TEXT
LargeBinary 大的二進位位元組數據類型 BLOB或BYTEA
Unicode 長度可變的Unicode字元串類型
UnicodeText 無限長的Unicode字元串類型
SmallInteger 較小的一種 int 整數 SMALLINT
Integer int類型
BigInteger BIGINT數據類型 BIGINT
Numeric 用於固定精度數字的類型 NUMERIC或DECIMAL
Float 浮點類型 FLOAT 或 REAL .
Date datetime.date類型
Time datetime.time類型
DateTime datetime.datetime類型
Interval datetime.timedelta類型
Enum 枚舉類型 ENUM或VARCHAR
PickleType 保存使用pickle序列化的python對象 二進位類型
SchemaType 將類型標記為可能需要架構級DDL才能使用
MatchType 引用match運算符的返回類型 MySQL的是浮點型

約束條件

約束條件和其他參數 描述
autoincrement 是否自增
default 設置默認參數, 可以是可調用對象
index 是否創建索引
info SchemaItem.info的屬性
nullable 是否非空, Falsenot null
primary_key 是否為主鍵
unique 是否唯一
comment 注釋欄位, 會寫入SQL 中的COMMENT
onupdate 調用更新數據時傳入的值, 如: updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
server_default 為SQLAlchemy的DDL設置默認值, 可以是str unicode text(), 如: sqlalchemy.func.now()

注 : sqlalchemy.func, 用於生成SQL函數表達式, 詳情見: 內置函數

Column的例子:

from sqlalchemy import DateTime, func


class Data(Base):
    __tablename__ = 'data'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    created_at = Column(DateTime, server_default=func.now(), comment='創建時間')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新時間')
    
Tags: