粗糙版ORM(附详细注释)

  • 2019 年 10 月 7 日
  • 筆記

目录

ORM

作为数据库表记录 和 python中对象的映射关系中间件

数据库中

python代码中

不同的表

不同的表模型类

一条条记录

一个个模型类对象

记录里的某个字段

模型类对象的属性

在python代码中通过操作orm来进行数据库的存取操作

这为简易版demo,查询条件等不够完善,仅展示实现原理

其他

焦急规划中…

ORM代码

数据库表代码

数据库使用 mysql,将下面的 mysql代码导入数据库 需先 安装 pymysql 模块db/pymysql_opreator.py 中把 pymysql 配置那块儿更改数据库,密码等

mysql代码

/*   Navicat MySQL Data Transfer     Source Server         : localhost-E   Source Server Type    : MySQL   Source Server Version : 50645   Source Host           : localhost:3306   Source Schema         : youkuserver     Target Server Type    : MySQL   Target Server Version : 50645   File Encoding         : 65001     Date: 27/08/2019 08:35:35  */    SET NAMES utf8mb4;  SET FOREIGN_KEY_CHECKS = 0;    -- ----------------------------  -- Table structure for user  -- ----------------------------  DROP TABLE IF EXISTS `user`;  CREATE TABLE `user`  (    `id` int(11) NOT NULL AUTO_INCREMENT,    `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,    `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,    PRIMARY KEY (`id`) USING BTREE  ) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;    -- ----------------------------  -- Records of user  -- ----------------------------  INSERT INTO `user` VALUES (1, 'tank_dsb', '123456');  INSERT INTO `user` VALUES (2, 'jason', '123');    SET FOREIGN_KEY_CHECKS = 1;

db/models.py

from db.pymysql_opreator import MyMySQL  # pycharm默认将 项目根目录添加到 sys.path中,这里就先不写启动文件和配置文件的了,后续完善也会要写的    '''  定义几个字段类      int         --》     IntegerField      varchar     --》     StringField    '''      class Field:      def __init__(self, field_name, field_type, is_primary_key, default_value):          self.field_name = field_name          self.field_type = field_type          self.is_primary_key = is_primary_key          self.default_value = default_value      # 先定义几个常见的字段类型  class IntegerField(Field):      # 这后面的类型要是数据库的类型,用来拼接 sql语句的, 可能有多个 int 类型,那 is_primary_key=False 默认值得是 False, default 该字段的默认值, 0      def __init__(self, field_name, field_type='int', is_primary_key=False, default_value=0):          super().__init__(field_name, field_type, is_primary_key, default_value)      class StringField(Field):      # 这后面的类型要是数据库的类型,用来拼接 sql语句的, 可能有多个 int 类型,那 is_primary_key=False 默认值得是 False, default 该字段的默认值, 0      def __init__(self, field_name, field_type='varchar(255)', is_primary_key=False, default_value='NULL'):          super().__init__(field_name, field_type, is_primary_key, default_value)          '''  表类      有且只有一个主键字段      表名必须有      所有的字段专门放在 mappings 里面    特殊点      不确定字段个数  --->   规定采用关键字传参的方式来写,利用字典可以接收任意个数关键字参数的特性      要支持点语法    --->   对象.属性 触发的是 __getattr__、 __setattr__、 __delattr__ 这几个方法,重写一下,把字典的值返回回去,实现点语法的支持    意向语法      生成一张表 --> 直接定义一个表类      一条记录   --> 表类实例化        --> user_obj = User(name='tank', password='123')      一个字段   --> 类对象.属性       --> user_obj.name        user_obj = User(name='tank', password='123')      user_obj.insert_record()  --> 新增一条记录        user_obj.update_record()  --> 更新一条记录        user_obj.select_record()  --> 查询记录  列表套对象      user_obj.select(name='tank') --> 找出 name='tank' 的记录  列表套对象        user_obj.name -->  获取到 name属性的值      user_obj.name = 'tank_dsb' -->  更新属性值      user_obj.age = 18  -->  报错! 表结构已固定,不需要额外的        user_obj.table_name -->  获取表名      user_obj.primary_key_field --> 主键字段名      user_obj.name.is_primary_key --> name 字段是否是主键      user_obj.name.field_name --> 字段的字段名     user_obj.name -->  获取到 name属性的值      '''      class MyMetaClass(type):      # ----------------------------------------------------------      # 表类限制      #   有且只有一个主键字段      #   表名必须有           --->    不一定表模型类的类名就能对应上数据库中表名,建议还是匹配上      #   所有的字段专门放在 mappings 里面      #      #   控制类的创建过程      # ----------------------------------------------------------        # def __new__(cls, *args, **kwargs):  # 默认 **kwargs 是空的,而 args 得到的是 类名(字符串)、类的父类们(元组)、名称空间(字典)      def __new__(cls, class_name, class_bases, class_attr: dict):  # 把接收到的 args 解压赋值给 class_name, class_bases, class_attr          # -----------------------------------------------------------          # 过滤表模型类          # -----------------------------------------------------------          if class_name == 'Models':              return super().__new__(cls, class_name, class_bases, class_attr)  # MyMetaClass 的父类就是元类              # return type.__new__(cls, class_name, class_bases, class_attr)  # 写法二            # -----------------------------------------------------------          # 表名必须有          # -----------------------------------------------------------          try:              table_name = class_attr['table_name']          except Exception as e:              print(e)              raise Exception(f'表模型类 {class_name} 中没有指定 表名 table_name')          # table_name = class_attr.get('table_name', f'表模型类 {class_name} 中没有指定 表名 table_name')            # -----------------------------------------------------------          # 有且只有一个主键字段、所有的字段专门放在 mappings 里面          # -----------------------------------------------------------          primary_key_field = None          mappings = {}  # 将表字段都放进来          for key, value in class_attr.items():  # 循环遍历 名称空间(字典)中的键值对              # 只处理字段类型的键值对              if isinstance(value, Field):                  # 是不是主键                  if value.is_primary_key:                      # 只能有一个主键过滤                      if primary_key_field:                          raise Exception(f'一个表只能有一个主键! 出错表模型:{class_name}')                      primary_key_field = value.field_name                  mappings[key] = value  # {'name': 'tank', 'password': '123'}          if not primary_key_field:              raise Exception(f'每个模型表都必须有一个主键! 出错表模型:{class_name}')            # 把名称空间中的字段键值对删除掉(节省点空间,字段键值对都放在mappings 里面了)          for key, value in mappings.items():              class_attr.pop(key)  # 本来就是名称空间里取过来的字段键值对,所以这里不会报错            # 将 primary_key_field、mappings 放到名称空间里,接着实例化成一个类          # table_name 本身就在 class_attr 里, 这一块只是限制每个表模型类必须指定表名          class_attr['primary_key_field'] = primary_key_field          class_attr['mappings'] = mappings            # 会调用 Models的 __new__方法,最后走到MyMetaClass 的 __new__, 而 class_name 是models,则会直接调用 type的 __new__方法          return super().__new__(cls, class_name, class_bases, class_attr)          # return type.__new__(cls, class_name, class_bases, class_attr)  # 写法二      class Models(dict, metaclass=MyMetaClass):      # ----------------------------------------------------------      # 利用字典可以接收任意个数关键字参数的特性来保存字段      #       user_obj = User(name='tank', password='123')      # ----------------------------------------------------------      def __init__(self, **kwargs):   # 将接收到的关键字参数 通过 **kwargs 打包成字典 --> kwargs          # 打散传给 dict 转为字典属性 -->  dict(name='tank', password='123')  --> {name='tank', password='123'}          super().__init__(self, **kwargs)        # ----------------------------------------------------------      # 支持点语法  对象.属性      # ----------------------------------------------------------      # user_obj.name -->  获取到 name属性的值      def __getattr__(self, item):          return self.get(item, f'表模型类 {self.__class__.__name__} 中没有此字段 {item}')          # try:          #     return self[item]          # except Exception as e:          #     print(e)          #     raise Exception(f'表模型类 {self.__class__.__name__} 中没有此字段 {item}')        # user_obj.name = 'tank_dsb' -->  更新属性值      # user_obj.age = 18  -->  报错! 表结构已固定,不需要额外的         ---> 先不管      def __setattr__(self, key, value):          self[key] = value        # ----------------------------------------------------------      # 支持点查询、更改、插入方法      # ----------------------------------------------------------        # user_obj.select_record()  --> 查询记录  列表套对象      # user_obj.select(name='tank') --> 找出 name='tank' 的记录  列表套对象      @classmethod  # 一般不会拿着对象(记录)去查记录(对象),所以这里写成类绑定方法      def select_record(cls, **kwargs):  # ---> 这里暂时只做一个查询条件, 且是 ... = ... 的形式          # 只在调用的时候才打开这个          mysql_op = MyMySQL()          # select * from user;          # 或者    select * from user where name='tank' 多字段暂时不考虑          sql = f'select * from {cls.table_name}'          if not kwargs:  # 如果没有条件              res = mysql_op.select_record(sql)          else:              # select * from user where name='tank'              # {name='tank', id=1}              field = list(kwargs.keys())[0]  # kwargs.keys() --> dict_keys(['name', 'age'])  需要list 强转成列表              value = list(kwargs.values())[0]              sql = f'{sql} where {field}=%s'              res = mysql_op.select_record(sql, value)            # res = [{}, {}]   返回的都是列表套字典,现在要转换成 列表套对象          record_objs = []          for record_dic in res:              # cls(**record_dic)  # {'name': 'jason', 'password': '123'} --> name='jason'  password='123'              # 将字典打散,拆分成一个个关键字参数的形式              record_objs.append(cls(**record_dic))          return record_objs        # user_obj.insert_record()  --> 新增一条记录      def insert_record(self):          # insert into user (name, password) values('tank', '123')  id 自动填入          mysql_op = MyMySQL()          fields = []          values = []          for key, value in self.mappings.items():              # 主键默认是id 默认自动增长,所以这里忽略掉              if not value.is_primary_key:                  fields.append(key)                  values.append(getattr(self, value.field_name, value.default_value))  # ['egon','123']            # 这里单双引号千万不要包错了,  (".".join(fields)  -->  (name, password)          list_s = ['%s' for i in range(len(fields))]          # insert into user (name,password) values (%s,%s)          # ({",".join(list_s)}) -->  (%s, %s)          sql = f'insert into {self.table_name}({",".join(fields)}) values ({",".join(list_s)})'          mysql_op.execute_sql(sql, values)        # user_obj.update_record()  --> 更新一条记录      def update_record(self):  # 不能取名 update ---> 和字典的 update 方法重名,会造成方法重写,覆盖          # update table user set name='tank_dsb', passowrd='123' where id=1          mysql_op = MyMySQL()          primary_key_field = self.primary_key_field          primary_key_value = 0          fields = []          values = []          for key, value in self.mappings.items():              # 主键默认是id 默认自动增长,所以这里忽略掉              if not value.is_primary_key:                  fields.append(key)                  # values.append(value)  # 报错, ---> execute 拼接的是字符串,拼接不了对象(这里 value 是 StringField)                  values.append(getattr(self, value.field_name, value.default_value))              # 主键存起来做判断条件              else:                  primary_key_value = getattr(self, self.primary_key_field, value.default_value)                  # primary_key_value = getattr(self, value.field_name, value.default_value)            # 这里单双引号千万不要包错了,  (".".join(fields)  -->  (name, password)          fields_s = [f"{field}=%s" for field in fields]          # ",".join(fields_s) --> name=%s, passowrd=%s          sql = f'update {self.table_name} set {",".join(fields_s)} where {primary_key_field}={primary_key_value}'          # print(sql, '------------------------------')          # # update user set name=%s,password=%s where id=15 ------------------------------            # update user set name='tank_dsb', passowrd='123' where id=1          mysql_op.execute_sql(sql, values)      if __name__ == '__main__':      class User(Models):          table_name = 'user'          id = IntegerField('id', is_primary_key=True)          name = StringField('name')          password = StringField('password')        # print(User.select_record(name='tank'))  # 打印按条件查询      # select * from user where name='tank'      # [{'id': 1, 'name': 'tank', 'password': '123'}]        print(User.select_record())      user_obj = User(name='egon', password='123')      user_obj.insert_record()        user_obj = User.select_record(name='egon')[0]  # 重名的时候会有问题  (两个 egon,改的是最前面那条 egon的记录),我也暂时只做了一个 查询条件      # print(user_obj.name, '++++++++++')      # # egon ++++++++++      user_obj.password = '666'      user_obj.update_record()        user_egon = User.select_record(name='egon')[0]  # 取出来的是 列表 ,我们只要第一个元素(就算只有一条记录也是一个列表)        # select * from user      # [{'id': 1, 'name': 'tank', 'password': '123'}, {'id': 2, 'name': 'jason', 'password': '123'}]

db/pymysql_opreator.py

import pymysql    '''  数据库连接类      包含数据库连接和关闭数据库方法      '''      class MyMySQL:      # -------------------------------------------------      # 单例模式      # -------------------------------------------------      _instance = None        # __init__ 不能返回任何东西,所以写在这里,当然其他地方也行      def __new__(cls, *args, **kwargs):          if not cls._instance:              cls._instance = object.__new__(cls, *args, **kwargs)          return cls._instance        # -------------------------------------------------      # 初始化建立数据库连接,获得游标对象      # -------------------------------------------------      def __init__(self):          # 建立mysql连接          self.conn = pymysql.connect(              host='127.0.0.1',              port=3306,              user='root',              password='000000',              database='YouKuServer',              charset='utf8',              autocommit=True          )          # 拿到游标对象          self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)        # 关闭资源操作      def close(self):          # 一定要先关闭游标再关闭连接          self.cursor.close()          self.conn.close()        # def __del__(self):  # 当当前类对象被删除时自动触发,关闭资源(可能多线程多进程会对其他的造成影响吧?)      #     self.close()        def select_record(self, sql, args=None):          try:              print(self.cursor.mogrify(sql, args))  # 打印执行的sql语句              # select * from user where name='egon'                self.cursor.execute(sql, args)              return self.cursor.fetchall()          except Exception as e:              print(e)              raise Exception(f'这里报错啦!{self.__class__.__name__}')        # 这里为什么要分开来? 就因为有没有返回值吗?      def execute_sql(self, sql, args):          try:              print(self.cursor.mogrify(sql, args))  # 打印执行的sql语句              # # update user set name='egon',password='666' where id=15              self.cursor.execute(sql, args)          except Exception as e:              print(e)              raise Exception(f'这里报错啦!{self.__class__.__name__}')