python-ATM加购物车

  • 2020 年 1 月 19 日
  • 筆記

模拟实现一个ATM + 购物商城程序      1.额度 15000或自定义      4.支持多账户登录      5.支持账户间转账      9.提供管理接口,包括添加账户、用户额度,冻结账户等。。。      10.用户认证用装饰器      3.可以提现,手续费5%      2.实现购物商城,买东西加入 购物车,调用信用卡接口结账      6.记录每月日常消费流水      7.提供还款接口      8.ATM记录操作日志      包含随时返回:b   和随时退出:q    ps:使用面向过程的思想去实现
画的贼烂,框架如上图
ATM+购物车      |-------conf 配置加接口      |        |------interface.py  接口配置      |        |------settings.py   路径加日志配置      |      |-------core 核心  用户与超管      |        |------admin.py  超管功能      |        |------atm.py    ATM功能      |        |------shopping.py  购物功能      |      |-------db  数据库      |        |------userdb  用户文件 json格式      |        |------dbhandler.py  数据层操作      |      |-------lib  公共库      |        |------common.py  用户认证加日志功能      |      |-------log 日志文件夹      |        |------operation.log  操作日志      |        |------trading.log    交易日志      |-------start.py  程序启动文件
#coding:utf-8  from db import dbhandler  from lib import common    logger1=common.get_logger("操作日志")    logger2=common.get_logger('交易日志')    def add_user(name,pwd='123',lines=15000):      '''添加账户接口'''      user_dict={'name':name,'password':pwd,'lines':lines,'balance':lines,'buy':[],'lock':False}      state,msg=dbhandler.query(name)      if state:          return False,'用户已存在'      else:          dbhandler.save(user_dict)          logger1.info('%s用户创建成功' % name)          return True, '%s用户创建成功,密码为%s' % (name,pwd)    def trans_interface(my_name,trans_name,money):      '''转账接口'''      state_my,msg_my=dbhandler.query(my_name)      state_tra,msg_tra=dbhandler.query(trans_name)      if state_tra:          if int(money) < int(msg_my['balance']):              msg_my['balance']=int(msg_my['balance'])-int(money)              msg_tra['balance']=int(msg_tra['balance'])+int(money)              dbhandler.save(msg_my)              dbhandler.save(msg_tra)              logger2.info('%s转账给%s%s元成功'%(my_name,trans_name,money))              return True,'%s转账给%s%s元成功'%(my_name,trans_name,money)          else:              return False,'转账失败,余额不足'      else:          return False,'转账用户不存在'    def withdrawal_interface(name,money):      '''提现接口'''      state,msg=dbhandler.query(name)      poundage=int(money)*5/100      if msg['balance'] < (int(money)+poundage):          return False,'提现失败,余额不足'      else:          msg['balance']=msg['balance'] - int(money) - poundage          dbhandler.save(msg)          logger2.info('%s提现%s元成功,手续费为%s元'%(name,money,poundage))          return True,'%s提现%s元成功,手续费为%s元'%(name,money,poundage)    def repay_interface(name,money):      '''还款接口'''      state,msg=dbhandler.query(name)      if state:          msg['balance']=msg['balance']+int(money)          dbhandler.save(msg)          logger2.info('%s还款%s元成功,余额为%s'%(name,money,msg['balance']))          return True,'%s还款%s元成功,余额为%s'%(name,money,msg['balance'])    def check_balance(name):      '''查看余额接口'''      state,user_dict=dbhandler.query(name)      res=user_dict['balance']      return res    def shopping_interface(name,cost,shopping_dict):      '''购物支付接口'''      state,user_dict=dbhandler.query(name)      if user_dict['balance'] > cost:          user_dict['balance']-=cost          user_dict['buy'].append(shopping_dict)          dbhandler.save(user_dict)          logger2.info('%s购买成功,消费了%s元'%(name,cost))          return True,'购买成功,消费了%s元'%cost      else:          return False,'余额不足,购买失败'    def check_shop_car(name):      '''查看购物记录即可'''      state,user_dict=dbhandler.query(name)      res=user_dict['buy']      return res

interface.py

# coding:utf-8  # ============================路径配置加日志配置项(日志配置模板)=========================================  import os    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))    USER_DB = os.path.join(BASE_DIR, 'db', 'userdb')    standard_format = '[%(asctime)s][task_id:%(name)s]'                     '[%(levelname)s]< %(message)s >'  # 其中name为getlogger指定的名字    simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'    id_simple_format = '[%(asctime)s][%(levelname)s] %(message)s'    # log文件的路径  OPE_LOG_PATH = os.path.join(BASE_DIR, 'log', 'operation.log')  # 操作日志路径  TRA_LOG_PATH = os.path.join(BASE_DIR, 'log', 'trading.log')  # 交易日志路径    # log配置字典  LOGGING_DIC = {      'version': 1,      'disable_existing_loggers': False,      'formatters': {          'standard': {              'format': standard_format          },          'simple': {              'format': id_simple_format          },      },      'filters': {},      'handlers': {          # 打印到终端的日志,使用的格式为 simple_format          'ch': {              'level': 'DEBUG',              'class': 'logging.StreamHandler',  # 打印到屏幕              'formatter': 'simple'          },          # 打印到文件的日志,使用的格式为 standard_format          'default1': {              'level': 'DEBUG',              'class': 'logging.FileHandler',  # 保存到文件              'formatter': 'standard',              'filename': OPE_LOG_PATH,  # 日志文件              'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了          },          'default2': {              'level': 'DEBUG',              'class': 'logging.FileHandler',  # 保存到文件              'formatter': 'standard',              'filename': TRA_LOG_PATH,  # 日志文件              'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了          },      },      'loggers': {          '操作日志': {              'handlers': ['ch', 'default1'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕              'level': 'DEBUG',              'propagate': False,          },          '交易日志': {              'handlers': ['ch', 'default2'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕              'level': 'DEBUG',              'propagate': False,          },      },  }
# coding:utf-8  from conf import interface  from db import dbhandler  from lib import common    tag = True      def adduser():      '''添加用户'''      while True:          name = input('请输入账户名:').strip()          if not name: continue          name1 = input('请确认账户名:').strip()          if name != name1:              print('两次输入不一致!')              continue          state, msg = interface.add_user(name1)          if state:              print(msg)              break          else:              print(msg)      def set_lines():      '''设置额度'''      while True:          name = input('请输入账户名:').strip()          if not name: continue          name1 = input('请确认账户名:').strip()          if name != name1:              print('两次输入不一致!')              continue          lines = input('请输入信用额度:').strip()          if lines.isdigit():              state, msg = interface.dbhandler.query(name1)              if state:                  msg['lines'] = lines                  dbhandler.save(msg)                  interface.logger1.info('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines))                  print('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines))              else:                  print(msg)          else:              print('额度请输入数字')              break      def freeze():      '''冻结功能'''      name = input('请输入冻结账户名:').strip()      name1 = input('请确认冻结账户名:').strip()      if name != name1:          print('两次输入不一致!')          return      state, msg = dbhandler.query(name1)      if state:          msg['lock'] = True          dbhandler.save(msg)          interface.logger1.info('%s冻结%s成功' % ('admin', name1))          print('%s冻结%s成功' % (common.user_state['name'], name1))      else:          print(msg)      def unfreeze():      '''解冻功能'''      name = input('请输入冻结账户名:').strip()      name1 = input('请确认冻结账户名:').strip()      if name != name1:          print('两次输入不一致!')          return      state, msg = dbhandler.query(name1)      if state:          msg['lock'] = False          dbhandler.save(msg)          interface.logger1.info('%s解冻%s成功' % ('admin', name1))          print('%s解冻%s成功' % ('admin', name1))      else:          print(msg)      def check_log():      '''查看操作日志功能'''      dbhandler.query_log('admin')      def menu():      view = '''          1.添加账户          2.设置用户额度          3.冻结账户          4.解冻账户          5.查看操作日志          q.退出      '''      print(view)      @common.login_auth(auth='admin')  def main():      view_dict = {          '1': adduser,          '2': set_lines,          '3': freeze,          '4': unfreeze,          '5': check_log      }      global tag      while tag:          print('管理员'.center(60, '-'))          menu()          choice = input('请选择:').strip()          if choice == 'q':              tag = False              break          if not choice or choice not in view_dict: continue          view_dict[choice]()

admin.py

# coding:utf-8  from core import admin  from conf import interface  from db import dbhandler  from lib import common      def registered():      '''注册功能'''      if common.user_state['name']:          print('已登录,无法注册')          return      while True:          print('注册'.center(40, '-'))          name = input('请输入账户名:').strip()          if not name: continue          pwd1 = input('请输入密码:').strip()          if not pwd1: continue          pwd2 = input('请确认密码:').strip()          if pwd1 != pwd2:              print('两次输入密码不一致!')              break          if len(pwd2) < 3:              print('密码强度太低')              continue          state, msg = interface.add_user(name, pwd2)          if state:              print(msg)              break          else:              print(msg)      def check_balance():      '''查看余额'''      balance = interface.check_balance(common.user_state['name'])      print('%s当前余额为%s元!!!' % (common.user_state['name'], balance))      def transfer():      '''转账'''      while True:          print('转账'.center(40, '-'))          name = input('请输入要转账的账户名:').strip()          name1 = input('请确认账户名:').strip()          if name != name1:              print('账户名不一致')              continue          money = input('请输入转账金额:'.strip())          if money.isdigit():              state, msg = interface.trans_interface(common.user_state['name'], name1, money)              if state:                  print(msg)                  break              else:                  print(msg)                  break          else:              print('金额请输入数字!')              continue      def repay():      '''还款'''      state, msg = dbhandler.query(common.user_state['name'])      arrears = msg['lines'] - msg['balance']      print('您的欠款为%s元!!!' % (arrears))      repay_wage = input('请输入您要还款的金额: ').strip()      if not repay_wage.isdigit():          print('请输入数字!')          return      else:          state, msg = interface.repay_interface(common.user_state['name'], repay_wage)          print(msg)      def withdrawal():      '''提现'''      while True:          print('提现'.center(40, '-'))          money = input('请输入要提现的金额:').strip()          if not money.isdigit(): continue          message = input('提现将要收取%5手续费,是否继续(y/n):').strip()          if message not in ['y', 'n']: continue          if message == 'y':              state, msg = interface.withdrawal_interface(common.user_state['name'], money)              if state:                  print(msg)                  break              else:                  print(msg)          else:              break      def check_log():      '''查看操作日志'''      dbhandler.query_log(common.user_state['name'])      def view():      view_dict = '''          1.查询余额          2.转账          3.还款          4.提现          5.查询日志          b.返回上一层          q.退出      '''      print(view_dict)      @common.login_auth(auth='user')  def main():      view_dict = {          '1': check_balance,          '2': transfer,          '3': repay,          '4': withdrawal,          '5': check_log      }      while admin.tag:          print('ATM操作界面'.center(60, '-'))          view()          choice = input('请选择:').strip()          if choice == 'q':              admin.tag = False              break          if choice == 'b': break          if choice not in view_dict: continue          view_dict[choice]()

atm.py

# coding:utf-8  from lib import common  from core import admin  from conf import interface    shopping_dict = {}  # 购物车    def product():      '''购物功能'''      product_list = [          ['华为P20', 5488],          ['笔记本', 5599],          ['Nike运动鞋', 669],          ['十三香小龙虾', 165],          ['山地车', 399],          ['天梭表', 3699],          ['某宝t恤', 69],          ['三只松鼠', 59],          ['重庆小面', 18],      ]      cost = 0  # 初始总花费为0      user_balance = interface.check_balance(common.user_state['name'])  # 调接口查询余额      global shopping_dict      while admin.tag:          print('商品列表'.center(50, '-'))          for k, v in enumerate(product_list):  # 将商品列表 加上索引打印出来              print(k, '', v)          print('q : 退出/购买 ')          choice = input('n请选择购买商品:').strip()          if choice.isdigit():  # 判断输入是否为数字              choice = int(choice)              if choice >= len(product_list): continue  # 输入的数字大于索引则重新开始              product_name = product_list[choice][0]  # 拿到商品名              product_price = product_list[choice][1]  # 拿到商品价格              if user_balance >= product_price:  # 判断余额是否大于商品价格                  if product_name in shopping_dict:  # 判断商品是否已在购物车里                      shopping_dict[product_name]['数量'] += 1  # 有则购物车的对应商品数量加1                  else:                      shopping_dict[product_name] = {'价格': product_price, '数量': 1}  # 购物车添加新字典                  user_balance -= product_price  # 余额减去商品价格                  cost += product_price  # 花费加上商品价格                  print('%s加入购物车成功' % [product_name, product_price])              else:                  print('余额不足')          elif choice == 'q':              if cost == 0: break              for k, v in shopping_dict.items():                  print(k, '', v)              buy = input('是否确认购买(y/n),需支付%s元:' % cost).strip()              if buy == 'y':  # 确认支付时调支付接口                  state, msg = interface.shopping_interface(common.user_state['name'], cost, shopping_dict)                  if state:                      print(msg)                      break                  else:                      print(msg)                      break              else:                  print('无任何商品购买')                  shopping_dict = {}                  break          else:              print('输入错误')              continue      def shop_car():      '''查看购物车功能'''      for k, v in shopping_dict.items():          print(k, '', v)      def check_shop():      '''查看消费记录功能'''      shop_dic = interface.check_shop_car(common.user_state['name'])      for i in shop_dic:          print(i)      def view():      view_dict = '''          1.商品列表          2.购物车          3.查看消费记录          b.返回上一层          q.退出      '''      print(view_dict)      @common.login_auth(auth='user')  def main():      view_dict = {          '1': product,          '2': shop_car,          '3': check_shop,      }      while admin.tag:          print('购物界面'.center(60, '-'))          view()          choice = input('请选择:').strip()          if choice == 'q':              admin.tag = False              break          if choice == 'b': break          if choice not in view_dict: continue          view_dict[choice]()

shopping.py

#coding:utf-8  import json,os  from conf import settings    def save(user_dic):      '''数据写入'''      name=user_dic['name']      with open(settings.USER_DB+r'%s.json'%name,'wt') as f:          json.dump(user_dic,f)          f.flush()      def query(name):      '''查询用户信息'''      db_path=settings.USER_DB+r'%s.json'%name      if os.path.isfile(db_path):          with open(db_path,'rt') as f:              res=json.load(f)          return True,res      else:          return False, '用户不存在'      def query_log(name):      '''查询日志功能'''      with open(settings.OPE_LOG_PATH,'rt',encoding='utf-8') as f:          for names in f.readlines():              if name in names:                  print(names)      with open(settings.TRA_LOG_PATH,'rt',encoding='utf-8') as f1:          for names in f1.readlines():              if name in names:                  print(names)

dbhandler.py

#:coding:utf-8  user_state = {'name': None}  from conf import settings  import logging.config  from db import dbhandler      def login_user():      '''认证'''      count = 0      while True:          name = input('请输入用户名:').strip()          pwd = input('请输入密码:').strip()          state, msg = dbhandler.query(name)          if count == 3:              print('您已被锁定')              msg['lock'] = True              dbhandler.save(msg)              break          if state:  # 查询状态              if pwd == msg['password'] and not msg['lock']:                  user_state['name'] = name                  print('%s登录成功' % name)                  break              elif msg['lock'] == True:                  print('您已被锁定')                  break              else:                  count += 1                  print('密码错误%s次' % count)          else:              print(msg)              break      def login_admin():      '''管理员认证'''      admin_dict = {'name': 'admin', 'password': '123', 'lock': False}      name = input('请输入管理员账户:').strip()      pwd = input('请输入密码:').strip()      if name != admin_dict['name'] or pwd != admin_dict['password']:          print('账号或密码错误')      else:          print('登录成功')          user_state['name'] = name      def login_auth(auth='module'):      def outter(func):          '''装饰器'''            def warppers(*args, **kwargs):              if auth == 'user':                  if user_state['name'] == None:                      login_user()                  if user_state['name']:                      func(*args, **kwargs)              if auth == 'admin':                  if user_state['name'] == None:                      login_admin()                  if user_state['name'] == 'admin':                      func(*args, **kwargs)                  else:                      print('没有管理员权限!')                      return            return warppers        return outter      def get_logger(name):      '''日志模块'''      logging.config.dictConfig(settings.LOGGING_DIC)      logger = logging.getLogger(name)      return logger

common.py

# coding:utf-8  # ================================start.py=======================================  import sys, os    BASE_DIR = os.path.dirname(__file__)  sys.path.append(BASE_DIR)    from core import atm, shopping, admin      def menu():      view = '''          1.注册          2.ATM          3.购物          4.管理员          q.退出      '''      print(view)      if __name__ == '__main__':      view_dict = {          '1': atm.registered,          '2': atm.main,          '3': shopping.main,          '4': admin.main      }      while admin.tag:          print('总界面'.center(60, '-'))          menu()          choice = input('请选择:').strip()          if choice == 'q': break          if not choice or choice not in view_dict: continue          view_dict[choice]()

花了点时间将之前的面向过程编程作业重写了次,完成后感觉还是写的不够好。时间原因就先这样。