python购物车升级版

  • 2019 年 10 月 7 日
  • 筆記

各文件内容

前言

功能架构等请参考前一篇博客,此篇博客为进阶版的存代码展示。

详细文件内容

启动文件

starts.py启动文件

import os  import sys    BASE_DIR = os.path.dirname(os.path.dirname(__file__))  sys.path.append(BASE_DIR)      from core import src  if __name__ == '__main__':      src.run()

配置文件

logging_dict.py日志配置字典

import os  # ************************************   日志配置字典   **************************************************  '''      --------------------------------------------      ------------* 需要自定义的配置 *-------------      --------------------------------------------  '''  # 定义三种日志输出格式 开始  standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]'                     '[%(levelname)s][%(message)s]'  # 其中name为getlogger指定的名字  simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'  # 定义日志输出格式 结束      # 定义日志文件的存放目录与文件名(一般视情况选择下面的两个即可)  logfile_dir = os.path.dirname(os.path.dirname(__file__))  # log文件的目录 (执行文件starts.py 在bin目录下)  # logfile_dir = os.path.abspath(__file__)  # log文件的目录 (执行文件starts.py 在项目根目录下)  # 拼上 log 文件夹  logfile_dir = os.path.join(logfile_dir, 'log')  logfile_name = 'ATM_Shop_Cart.log'  # log文件名    # 如果不存在定义的日志目录就创建一个  if not os.path.isdir(logfile_dir):      os.mkdir(logfile_dir)    # log文件的全路径  logfile_path = os.path.join(logfile_dir, logfile_name)    # 像日志文件的最大限度、个数限制,日志过滤等级等也可以在下面的对应地方限制    '''      --------------------------------------------      --------------* log配置字典 *---------------      --------------------------------------------  '''  # log配置字典  LOGGING_DIC = {      'version': 1,      'disable_existing_loggers': False,      'formatters': {          'standard': {              'format': standard_format          },          'simple': {              'format': simple_format          },      },      'filters': {},  # 一般用不到过滤,所以就空在这里了      'handlers': {          'console': {              'level': 'DEBUG',              'class': 'logging.StreamHandler',  # 打印到屏幕              'formatter': 'simple'          },          # 打印到文件的日志,收集info及以上的日志          'default': {              'level': 'DEBUG',              'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件              'formatter': 'standard',              'filename': logfile_path,  # 日志文件              'maxBytes': 1024 * 1024 * 5,  # 日志大小上限 5M,超过5M就会换一个日志文件继续记录              'backupCount': 5,  # 日志文件的数量上限 5个,超出会把最早的删除掉再新建记录日志              'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了          },      },      'loggers': {          # logging.getLogger(__name__)拿到的logger配置          '': {              'handlers': ['default', 'console'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕              # 'handlers': ['default'],  # ------------- 正式上线的时候改成这个,取消控制台打印,节省资源  -------------              'level': 'DEBUG',              'propagate': True,  # 向上(更高level的logger)传递          },      },  }

settings.py配置文件

import os  import sys  BASE_DIR = os.path.dirname(os.path.dirname(__file__))  sys.path.append(BASE_DIR)

入口文件

src.py入口文件

from interface import user_interface  from lib import common  from interface import bank_interface  from interface import shop_interface      def register():      while True:          username = input("请输入用户名>>>:").strip()          if username == 'q':              return False          if user_interface.check_user_exits(username):              print("该用户已存在,请换一个试试!")              continue          pwd = input("请输入密码>>>:").strip()          if pwd == 'q':              return False          repwd = input("请再次输入密码>>>:").strip()          if pwd == 'q':              return False          if repwd != pwd:              print("两次密码不一致,请您重新输入!")              continue          flag = user_interface.register(username, pwd)          if not flag:              print("注册失败,请重试!")              continue          else:              print(f"{username}注册成功~")              return True      def login():      while True:          username = input("请输入用户名>>>:").strip()          if username == 'q':              return False          if not user_interface.check_user_exits(username):              print("该用户不存在,请检查输入!")              continue          pwd = input("请输入密码>>>:").strip()          if pwd == 'q':              return False          flag = user_interface.login(username, pwd)          if not flag:              print("您的密码有误或账户已被冻结,请重新登录!")              continue          else:              print(f"登陆成功,{username}您好~")              return True      @common.login_auth  def logout():      user_interface.logout()      print("欢迎下次登录,祝您生活愉快~")      @common.login_auth  def check_balance():      username = current_user.get("username")      balance = user_interface.check_balance(username)      print(f"{username}您好,您的账户余额为{balance}。")      @common.login_auth  def check_flow():      username = current_user.get('username')      flow = user_interface.check_flow(username)      if flow:          print(f"{username}您好,您的流水如下:")          for line in flow:              print(line)      else:          print(f"{username}您好,您的账户暂无流水记录~")      # ---------------------  # 银行接口  @common.login_auth  def repay():      username = current_user.get('username')      while True:          money = input('请输入您的还款金额>>>:').strip()          if money == 'q':              return False          elif not money.isdigit():              print("请正确输入金额")              continue          else:              money = int(money)              bank_interface.repay(username, money)              print("还款成功!")              return True      @common.login_auth  def withdraw():      username = current_user.get('username')      while True:          money = input('请输入您的提现金额>>>:').strip()          if money == 'q':              return False          elif not money.isdigit():              print("请正确输入金额")              continue          else:              money = int(money)              flag = bank_interface.withdraw(username, money)              if flag:                  print("提现成功!")                  return True              else:                  print("您的余额不足提现金额(提现需包含0.05%的手续费)。")      @common.login_auth  def transfer():      username = current_user.get('username')      while True:          target_user = input("请输入您的转账目标用户>>>:").strip()          if user_interface.check_user_exits(target_user):              money = input('请输入您的转账金额>>>:').strip()              if money == 'q':                  return False              elif not money.isdigit():                  print("请正确输入金额")                  continue              else:                  money = int(money)                  flag = bank_interface.transfer(username, target_user, money)                  if not flag:                      print("转账失败,请确认您的余额是否充足!")                      continue                  print("转账成功!")                  return True          print(f"{target_user}用户不存在,请您重新输入!")      # ---------------------  # 购物功能  @common.login_auth  def shop():      username = current_user['username']      while True:          print("------------- 商品列表 -------------")          for key in shopping_list:              print(f"{key}.{shopping_list.get(key)[0]} {shopping_list.get(key)[1]}")          choice = input("请输入商品编号>>>:").strip()          if choice == 'q':              return False          elif choice in shopping_list:              # 开始购物              is_buy = input("请选择购买或加入购物车(y购买/n加入购物车)(q退出)").strip()              if is_buy == 'q':                  return False              good = shopping_list[choice][0]              price = shopping_list[choice][1]                if is_buy == 'y':                  flag = shop_interface.pay_good(username, good, price)                  if not flag:                      print(f"您的余额不足以购买该商品,请换一个试试")                  else:                      print(f"{good}商品购买成功!")              elif is_buy == 'n':                  # 加入购物车                  flag = shop_interface.put_shop_cart(username, good, price)                  if not flag:                      print(f"{good}该商品已在购物车中啦~")                  else:                      print(f"{good}成功放入购物车")                  pass              else:                  print("请正确输入!")                  continue          else:              print("请输入正确的商品编号!")              continue      @common.login_auth  def check_shop_cart():      username = current_user['username']      shop_cart = shop_interface.get_shop_cart(username)      if shop_cart:          print(f"{username}您好,您的购物车商品列表如下")          for shop, price in shop_cart.items():              print(shop, price)      else:          print(f"您的购物车里还没有东西哦~")      # ---------------------  # 管理员功能  @common.login_auth  @common.admin_auth  def lock_user():      username = current_user['username']      while True:          target_user = input("请输入您要冻结的目标用户>>>:").strip()          if user_interface.check_user_exits(target_user):              user_interface.lock_user(username, target_user)              print(f"{target_user}用户已冻结。")              return True          print(f"{target_user}用户不存在,请您重新输入!")      @common.login_auth  @common.admin_auth  def unlock_user():      username = current_user['username']      while True:          target_user = input("请输入您要解冻的目标用户>>>:").strip()          if user_interface.check_user_exits(target_user):              user_interface.unlock_user(username, target_user)              print(f"{target_user}用户已成功解冻。")              return True          print(f"{target_user}用户不存在,请您重新输入!")      @common.login_auth  @common.admin_auth  def change_balance():      username = current_user['username']      while True:          target_user = input("请输入您要更改的目标用户>>>:").strip()          if user_interface.check_user_exits(target_user):              money = input('请输入金额>>>:').strip()              if money == 'q':                  return False              elif not money.isdigit():                  print("请正确输入金额")                  continue              else:                  money = int(money)                  flag = user_interface.change_balance(username, target_user, money)                  if not flag:                      print("不能设置自己的余额,请换个用户试试。")                      continue                  print(f"{target_user}用户余额成功修改为{money}元。")                  return True          print(f"{target_user}用户不存在,请您重新输入!")      func_list = {      '1': register,      '2': login,      '3': logout,      '4': check_balance,      '5': check_flow,      '6': repay,      '7': withdraw,      '8': transfer,      '9': shop,      '10': check_shop_cart,      '11': lock_user,      '12': unlock_user,      '13': change_balance,  }      current_user = {      'username': '',      'pwd': None,      'balance': 0,      'lock': False,      'flow': [],      'shop_cart': {},      'is_admin': False  }      shopping_list = {      "1": ['广东凤爪', 10],      "2": ['面疙瘩', 15],      "3": ['mac', 12888],      "4": ['tesla', 1222222]  }      def run():      while True:          print("""------------------------------------------------------------------          1.注册        2.登录        3.注销        4.查看余额          5.查看流水    6.还款        7.提现        8.转账          9.购物        10.查看购物车  11.锁定用户   12.解锁用户          13.更改用户余额                11-13为管理员功能哦,只有管理员才能操作!          ------------------------------------------------------------------""")          choice = input("请选择功能编号>>>:").strip()          if choice == 'q':              print("感谢您的使用,欢迎下次再来~")              return False          elif choice in func_list:              func_list[choice]()          else:              print("您的输入有误,请重新输入")

数据处理层

db_handler.py数据处理文件

import os  import json  BASE_DIR = os.path.dirname(os.path.dirname(__file__))  DB_DIR = os.path.join(BASE_DIR, 'db')      def select(username: str):      user_dir = os.path.join(DB_DIR, f"{username}.json")      if not os.path.exists(user_dir):          return False      else:          with open(user_dir, mode='r', encoding='utf-8') as f:              user_dict = json.load(f)              return user_dict      def save(user_dict: dict):      user_dir = os.path.join(DB_DIR, f"{user_dict['username']}.json")      with open(user_dir, mode='w', encoding='utf-8') as f:          json.dump(user_dict, f, ensure_ascii=False)      return True

用户数据信息

db/swb.json用户信息文件1

{"username": "swb", "pwd": "59f471f262ab586fb959ded0e2c7b94f", "balance": 10, "lock": false, "flow": ["swb购买了广东凤爪,花费10元,现余额为14990元", "swb购买了面疙瘩,花费15元,现余额为14975元", "管理员tank将您的账户余额重置为10元。"], "shop_cart": {"广东凤爪": 10, "面疙瘩": 15, "mac": 12888, "tesla": 1222222}, "is_admin": false}

db/tank.json用户信息文件2

{"username": "tank", "pwd": "59f471f262ab586fb959ded0e2c7b94f", "balance": 14975, "lock": false, "flow": ["tank购买了广东凤爪,花费10元,现余额为14990元", "tank购买了面疙瘩,花费15元,现余额为14975元"], "shop_cart": {"tesla": 1222222}, "is_admin": true}

接口层

interface/bank_interface.py银行功能接口文件

from db import db_handler  from lib import common  logger = common.get_logger('bank_interface')      def repay(username: str, money: float):      user_dict = db_handler.select(username)      user_dict['balance'] += money      user_dict['flow'].append(f"{username}还款{money}元,现余额为{user_dict['balance']}。")      db_handler.save(user_dict)      logger.info(f"{username}还款{money}元,现余额为{user_dict['balance']}。")      return True      def withdraw(username: str, money: float):      user_dict = db_handler.select(username)      if money*1.05 > user_dict['balance']:          return False      else:          user_dict['balance'] -= money*1.05      user_dict['flow'].append(f"{username}提现{money}元,手续费{money*0.05}元,现余额为{user_dict['balance']}元")      db_handler.save(user_dict)      logger.info(f"{username}提现{money}元,手续费{money*0.05}元,现余额为{user_dict['balance']}元")      return True      def transfer(username: str, target_username: str, money: float):      user_dict = db_handler.select(username)      if user_dict['balance'] < money:          return False      user_dict['balance'] -= money      user_dict['flow'].append(f"{username}向{target_username}转账{money}元。")      db_handler.save(user_dict)        target_user_dict = db_handler.select(target_username)      target_user_dict['balance'] += money      target_user_dict['flow'].append(f"{target_username}收到{username}转账{money}元。")      db_handler.save(target_user_dict)        logger.info(f"{username}向{target_username}转账{money}元。")      return True

interface/shop_interface.py购物车功能接口文件

from db import db_handler  from lib import common  logger = common.get_logger('shop')      def get_shop_cart(username: str):      user_dict = db_handler.select(username)      return user_dict['shop_cart']      def put_shop_cart(username: str, good: str, price: float):      user_dict = db_handler.select(username)      shop_cart = user_dict['shop_cart']      if good in shop_cart:          return False      else:          shop_cart[good] = price      user_dict['shop_cart'] = shop_cart      db_handler.save(user_dict)      logger.info(f"商品{good},单价{price},已加入{username}的购物车~")      return True      def pay_good(username, good, price):      user_dict = db_handler.select(username)      if user_dict['balance'] <= price:          return False      else:          user_dict['balance'] -= price          user_dict['flow'].append(f"{username}购买了{good},花费{price}元,现余额为{user_dict['balance']}元")          db_handler.save(user_dict)          logger.info(f"{username}购买了{good},花费{price}元,现余额为{user_dict['balance']}元")          return True

interface/user_interface.py用户功能接口文件

from db import db_handler  from lib import common  logger = common.get_logger('user_interface')      def check_user_exits(username: str):      flag = db_handler.select(username)      if not flag:          return False      return True      def register(username: str, pwd: str, balance=15000):      pwd = common.get_md5(pwd)      user_dict = {          'username': username,          'pwd': pwd,          'balance': balance,          'lock': False,          'flow': [],          'shop_cart': {},          'is_admin': False      }      db_handler.save(user_dict)      logger.info(f"{username}用户注册成功,初始余额为{balance}元。")      return True      def login(username: str, pwd: str):      pwd = common.get_md5(pwd)      user_dict = db_handler.select(username)      if user_dict['pwd'] == pwd and not user_dict['lock']:          from core import src          src.current_user = user_dict          return True      else:          return False      def logout():      from core import src      logger.info(f"{src.current_user['username']}用户注销登录。")      src.current_user = {          'username': None,          'pwd': None,          'balance': None,          'lock': False,          'flow': [],          'shop_cart': {},          'is_admin': False      }      return True      def check_balance(username: str):      user_dict = db_handler.select(username)      return user_dict['balance']      def check_flow(username: str):      user_dict = db_handler.select(username)      return user_dict['flow']      def lock_user(username: str, target_user: str):      target_user_dict = db_handler.select(target_user)      target_user_dict['lock'] = True      db_handler.save(target_user_dict)      logger.info(f"{username}将{target_user}账户冻结。")      return True      def unlock_user(username: str, target_user: str):      target_user_dict = db_handler.select(target_user)      target_user_dict['lock'] = False      db_handler.save(target_user_dict)      logger.info(f"{username}将{target_user}账户解冻。")      return True      def change_balance(username: str, target_user: str, money: float):      if username == target_user:          return False      target_user_dict = db_handler.select(target_user)      target_user_dict['balance'] = money      target_user_dict['flow'].append(f"管理员{username}将您的账户余额重置为{money}元。")      db_handler.save(target_user_dict)      logger.info(f"{username}将{target_user}的账户余额改为{money}元。")      return True      # 这些管理员方法用装饰器判断他是不是管理员

通用方法

lib/common.py通用方法文件

import logging.config  from conf import logging_dict  import hashlib      # 日志字典使用  def get_logger(type_name: str):      logging.config.dictConfig(logging_dict.LOGGING_DIC)      logger = logging.getLogger(type_name)      return logger      # 登录装饰器  def login_auth(func):      def inner(*args, **kwargs):          from core import src          if src.current_user.get('username'):              # 登录验证              res = func(*args, **kwargs)              return res          else:              print("请登录后再操作!")              src.login()      return inner      # 管理员验证  def admin_auth(func):      def inner(*args, **kwargs):          from core import src          if src.current_user.get('is_admin'):              # 判断是不是管理员              res = func(*args, **kwargs)              return res          else:              print("您不是管理员,不能使用此功能哦~")      return inner      # md5值获取  def get_md5(string: str) -> str:      md5 = hashlib.md5()      md5.update('md5_salt'.encode('utf-8'))      md5.update(string.encode('utf-8'))      return md5.hexdigest()      if __name__ == '__main__':      logger = get_logger('common')      logger.info('info测试数据啦~')      logger.debug('debug测试数据啦~')

日志记录

log/ATM_Shop_Cart.log日志记录文件

[2019-07-23 21:55:50,172][MainThread:2688][task_id:user_interface][user_interface.py:25][INFO][swb用户注册成功,初始余额为15000元。]  [2019-07-23 21:56:07,970][MainThread:2688][task_id:shop][shop_interface.py:30][INFO][swb购买了广东凤爪,花费10元,现余额为14990元]  [2019-07-23 21:56:11,611][MainThread:2688][task_id:shop][shop_interface.py:19][INFO][商品面疙瘩,单价15,已加入swb的购物车~]  [2019-07-23 21:56:13,812][MainThread:2688][task_id:shop][shop_interface.py:19][INFO][商品mac,单价12888,已加入swb的购物车~]  [2019-07-23 21:56:54,035][MainThread:764][task_id:shop][shop_interface.py:20][INFO][商品广东凤爪,单价10,已加入swb的购物车~]  [2019-07-23 21:56:55,516][MainThread:764][task_id:shop][shop_interface.py:20][INFO][商品面疙瘩,单价15,已加入swb的购物车~]  [2019-07-23 21:56:56,882][MainThread:764][task_id:shop][shop_interface.py:20][INFO][商品mac,单价12888,已加入swb的购物车~]  [2019-07-23 21:56:58,797][MainThread:764][task_id:shop][shop_interface.py:20][INFO][商品tesla,单价1222222,已加入swb的购物车~]  [2019-07-23 21:57:06,937][MainThread:764][task_id:shop][shop_interface.py:31][INFO][swb购买了广东凤爪,花费10元,现余额为14990元]  [2019-07-23 21:57:09,363][MainThread:764][task_id:shop][shop_interface.py:31][INFO][swb购买了面疙瘩,花费15元,现余额为14985元]  [2019-07-23 21:57:10,666][MainThread:764][task_id:shop][shop_interface.py:31][INFO][swb购买了mac,花费12888元,现余额为2112元]  [2019-07-23 21:59:27,610][MainThread:14372][task_id:shop][shop_interface.py:31][INFO][swb购买了广东凤爪,花费10元,现余额为14990元]  [2019-07-23 22:00:37,898][MainThread:7112][task_id:shop][shop_interface.py:32][INFO][swb购买了广东凤爪,花费10元,现余额为14990元]  [2019-07-23 22:00:39,419][MainThread:7112][task_id:shop][shop_interface.py:32][INFO][swb购买了面疙瘩,花费15元,现余额为14975元]  [2019-07-23 22:01:08,585][MainThread:1056][task_id:user_interface][user_interface.py:25][INFO][tank用户注册成功,初始余额为15000元。]  [2019-07-23 22:01:18,705][MainThread:1056][task_id:shop][shop_interface.py:32][INFO][tank购买了广东凤爪,花费10元,现余额为14990元]  [2019-07-23 22:01:23,370][MainThread:1056][task_id:shop][shop_interface.py:32][INFO][tank购买了面疙瘩,花费15元,现余额为14975元]  [2019-07-23 22:01:29,289][MainThread:1056][task_id:shop][shop_interface.py:20][INFO][商品tesla,单价1222222,已加入tank的购物车~]  [2019-07-23 22:03:05,065][MainThread:1056][task_id:user_interface][user_interface.py:42][INFO][tank用户注销登录。]  [2019-07-23 22:03:18,514][MainThread:1056][task_id:user_interface][user_interface.py:69][INFO][tank将swb账户冻结。]  [2019-07-23 22:03:20,986][MainThread:1056][task_id:user_interface][user_interface.py:42][INFO][tank用户注销登录。]  [2019-07-23 22:03:45,178][MainThread:1056][task_id:user_interface][user_interface.py:77][INFO][tank将swb账户解冻。]  [2019-07-23 22:04:05,704][MainThread:1056][task_id:user_interface][user_interface.py:42][INFO][swb用户注销登录。]  [2019-07-23 22:04:18,321][MainThread:1056][task_id:user_interface][user_interface.py:88][INFO][tank将swb的账户余额改为10元。]  [2019-07-23 22:04:22,425][MainThread:1056][task_id:user_interface][user_interface.py:42][INFO][tank用户注销登录。]

注意: # swb 用户为管理员用户,可以执行管理员操作(swb 以及 tank的密码均为 123),右键starts.py文件运行即可