pymssql 介绍

pymssql包是Python语言用于连接SQL Server数据库的驱动程序(或者称作DB API),它是最终和数据库进行交互的工具。SQLAlchemy包就是利用pymssql包实现和SQL Server数据库交互的功能的。

一,pymssql包的基本组成

pymssql包由两个模块构成:pymssql 和 _mssql,pymssql 是建立在_mssql模块之上的模块,相对来说,_mssql性能更高。

pymssql模块由Connection和Cursor 两个大类构成:

  • Connection类代表MS SQL Sever数据库的一个连接,
  • Cursor类用于向数据库发送查询请求,并获取查询的的结果。

二,连接

连接对象用于连接SQL Server引擎,并设置连接的属性,比如连接超时,字符集等。

1,创建连接对象

pymssql通过类函数来构建连接对,在创建连接对象的同时,打开连接:

class pymssql.Connection(user, password, host, database, timeout, login_timeout, charset, as_dict)

2,构建Cursor对象

在创建连接对象之后,创建Cursor对象,使用Cursor对象向数据库引擎发送查询请求,并获取查询的结果:

Connection.cursor(as_dict=False)

as_dict是布尔类型,默认值是False,表示返回的数据是元组(tuple)类型;如果设置为True,返回的数据集是字典(dict)类型。

3,提交查询和自动提交模式

在执行查询之后,必须提交当前的事务,以真正执行Cursor对象的查询请求:

Connection.commit()

默认情况下,自动提交模式是关闭的,用户可以设置自动提交,pymssql自动执行Cursor发送的查询请求:

Connection.autocommit(status)

status是bool值,True表示打开自动提交模式,False表示关闭自动提交模式,默认值是False。

4,关闭连接

在执行完查询之后,关闭连接:

Connection.close()

三,Cursor

通过打开的连接对象来创建Cursor对象,通过Cursor对象向数据库引擎发送查询请求,并获取查询的结果。

1,执行查询

Cursor对象调用execute**()函数来执行查询请求,

Cursor.execute(operation)
Cursor.execute(operation, params)
Cursor.executemany(operation, params_seq)

参数注释:

  • operation:表示执行的sql语句,
  • params :表示sql语句的参数,
  • params_seq:参数序列,用于sql语句包含多个参数的情况。

注意,除设置自动提交模式之外,必须在执行查询之后,通过连接对象来提交查询。

Connection.commit()

如果sql语句只包含一个参数,那么必须在sql语句中显式使用%s或%d作为占位符,分别用于引用字符型的参数和数值型的参数。

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

如果sql语句包含多个参数,那么使用list来传递参数:

cursor.executemany(
    "INSERT INTO persons VALUES (%d, %s, %s)",
    [(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')])

2,获取查询结果

Cursor对象调用fetch**()函数来获取查询的结果:

Cursor.fetchone()
Cursor.fetchmany(size=None)
Cursor.fetchall()

fetch**()函数是迭代的:

  • fetchone():表示从查询结果中获取下一行(next row)
  • fetchmany():表示从查询结果中获取下面的多行(next batch)
  • fetchall():表示从查询结果中获取剩余的所有数据行(all remaining)

3,跳过结果集

当查询的结果包含多个结果集时,可以跳过当前的结果集,跳到下一个结果集:

Cursor.nextset()

如果当前结果集还有数据行未被读取,那么这些剩余的数据行会被丢弃。

四,执行存储过程

Cursor对象有函数callproc(),用于执行存储过程:

Cursor.callproc(sp_name, paras)

第一个参数是存储过程的名称,第二个参数是一个元组类型,

五,pymssql模块的基本操作

 1,pymssql的基本操作

from os import getenv
import pymssql

server = getenv("PYMSSQL_TEST_SERVER")
user = getenv("PYMSSQL_TEST_USERNAME")
password = getenv("PYMSSQL_TEST_PASSWORD")

conn = pymssql.connect(server, user, password, "tempdb")
cursor = conn.cursor(as_dict=False)
cursor.execute("TSQL query")
cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)",
    [(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')])
# you must call commit() to persist your data if you don't set autocommit to True
conn.commit()

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
row = cursor.fetchone()
while row:
    print("ID=%d, Name=%s" % (row[0], row[1]))
    row = cursor.fetchone()

conn.close()

2,以字典集返回数据行

conn = pymssql.connect(server, user, password, "tempdb")
cursor = conn.cursor(as_dict=True)

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in cursor:
    print("ID=%d, Name=%s" % (row['id'], row['name']))

conn.close()

3,使用with语句

with是上下文管理器,可以自动关闭上下文。

如果使用with语句来创建连接对象和Cursor对象,那么就不需要显式的关闭连接和Cursor对象,在语句执行完成之后,Python会自动检测连接对象和Cursor对象的作用域,一旦连接对象或Cursor对象不再有效,Python就会关闭连接或Cursor对象。

with pymssql.connect(server, user, password, "tempdb") as conn:
    with conn.cursor(as_dict=True) as cursor:
        cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
        for row in cursor:
            print("ID=%d, Name=%s" % (row['id'], row['name']))

六,附上代码库

 附上代码,以飨读者。

import pymssql
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy.sql import text as sql_text

class DBHelper:
    def __init__(self):
        self.name='DB Helper'
        self.db_host = r'sql server'
        self.db_name = 'db name'
        self.db_user = r'sa' 
        self.db_password = r'pwd'

######################################################
##                   data connection                ##
######################################################

    def get_engine(self):
        str_format = 'mssql+pymssql://{0}:{1}@{2}/{3}?charset=utf8'
        connection_str = str_format.format(self.db_user,self.db_password,self.db_host,self.db_name)
        engine = create_engine(connection_str,echo=False)
        return engine

    def get_pymssql_conn(self):
        conn = pymssql.connect(self.db_host, self.db_user, self.db_password, self.db_name)
        return conn


######################################################
##                common SQL APIs                   ##
######################################################

    def write_data(self,df,destination,if_exists='append',schema='dbo'):
        engine = self.get_engine()
        df.to_sql(destination, con=engine, if_exists=if_exists,index = False, schema=schema
, method='multi', chunksize=1000) def read_data(self,sql): engine = self.get_engine() df = pd.read_sql(sql, con=engine) return df def exec_sql(self,sql): engine = self.get_engine() con = engine.connect() with con.begin() as tran: con.execute(sql_text(sql).execution_options(autocommit=True)) def exec_sp(self,sp_name,*paras): with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn: with conn.cursor(as_dict=False) as cursor: try: cursor.callproc(sp_name, paras) conn.commit() except Exception as e: print(e) def exec_sp_result(self,sp_name,*paras): with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn: with conn.cursor(as_dict=True) as cursor: try: cursor.callproc(sp_name, paras) cursor.nextset() result=cursor.fetchall() conn.commit() df=pd.DataFrame.from_records(result) return df except Exception as e: print(e)

 

参考文档:

pymssql introduction