python-优酷系统管理员视图粗糙版(无详细注释)
- 2019 年 10 月 7 日
- 笔记
目录
Tank-YouKu(仅管理员功能粗糙版)
优酷系统管理员视图功能
- 注册
- 登录
- 上传视频
- 删除视频
- 发布公告
前期准备
先创建好数据库以及各数据表 安装 pymysql 模块、安装 DBUtils 模块 配置好服务端的 db_pool 中的数据库信息
创库创表语句
手动创建数据库 youku_demo
, 需配置数据库编码 utf8
(安装时配置好了命令行中就不用管)
创建数据库:create database youku_demo;
选择该数据库:use youku_demo
,然后执行下面的一堆sql代码,或者手动导入
创表代码及测试数据
测试用户:tank
密码: 123
/* Navicat MySQL Data Transfer Source Server : localhost-E Source Server Type : MySQL Source Server Version : 50645 Source Host : localhost:3306 Source Schema : youku_demo Target Server Type : MySQL Target Server Version : 50645 File Encoding : 65001 Date: 28/08/2019 21:22:47 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for download_record -- ---------------------------- DROP TABLE IF EXISTS `download_record`; CREATE TABLE `download_record` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NULL DEFAULT NULL, `movie_id` int(11) NULL DEFAULT NULL, `download_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Table structure for movie -- ---------------------------- DROP TABLE IF EXISTS `movie`; CREATE TABLE `movie` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `is_free` int(11) NULL DEFAULT NULL, `file_md5` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `user_id` int(11) NULL DEFAULT NULL, `is_delete` int(11) NULL DEFAULT NULL, `upload_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Table structure for notice -- ---------------------------- DROP TABLE IF EXISTS `notice`; CREATE TABLE `notice` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `user_id` int(11) NULL DEFAULT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of notice -- ---------------------------- INSERT INTO `notice` VALUES (1, 'test1', '测试发布公告是否正常', 1, '2019-08-28 21:18:38'); -- ---------------------------- -- Table structure for user -- ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `pwd` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `is_vip` int(11) NULL DEFAULT NULL, `is_locked` int(11) NULL DEFAULT NULL, `user_type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `register_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (1, 'tank', 'e23087636dfcd7addf39e32f89e25d44', 0, 0, 'admin', '2019-08-28 21:18:10'); SET FOREIGN_KEY_CHECKS = 1;
数据库结构

安装pymysql模块
参见博客: MySQL-注释-Navicat基本使用-复杂查询练习题-解题思路-pymysql操作数据库-SQL注入-05 的安装部分
安装DBUtils模块
在命令行输入pip3 install DBUtils
开始安装

配置 db_pool
根据自己电脑的mysql 情况配置

项目架构与数据流向

目录结构
server目录结构

client目录结构

服务端代码
start.py
import os import sys from tcp_server.socket_server import SocketServer sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': server = SocketServer() server.run()
tcp_server/socket_server.py
import socket import struct import json from interface import common_interface from interface import admin_interface from concurrent.futures import ThreadPoolExecutor from threading import Lock from lib import lock_file lock = Lock() lock_file.mutex = lock func_dic = { 'register': common_interface.register_interface, 'login': common_interface.login_interface, 'check_movie': admin_interface.check_movie_interface, 'upload_movie': admin_interface.upload_movie_interface, 'get_movie_list': common_interface.get_movie_list_interface, 'delete_movie': admin_interface.delete_movie_interface, 'put_notice': admin_interface.put_notice_interface } class SocketServer: def __init__(self): self.server = socket.socket() self.server.bind(('127.0.0.1', 9527)) self.server.listen(5) self.pool = ThreadPoolExecutor(50) def run(self): print('启动服务端...') while True: conn, addr = self.server.accept() self.pool.submit(self.working, conn, addr) # 任务分发 def dispatcher(self, client_back_dic, conn): # # 判断功能的类型 # if client_back_dic.get('type') == 'register': # common_interface.register_interface(client_back_dic, conn) # # elif client_back_dic.get('type') == 'login': # common_interface.login_interface(client_back_dic, conn) _type = client_back_dic.get('type') if _type in func_dic: # register func_dic.get(_type)(client_back_dic, conn) # 用于执行客户端连接任务 def working(self, conn, addr): while True: try: # 每一个客户端访问服务端都会经过此处 # 此处用于接收客户端传入的数据 headers = conn.recv(4) data_len = struct.unpack('i', headers)[0] data_bytes = conn.recv(data_len) client_back_dic = json.loads(data_bytes.decode('utf-8')) client_back_dic['addr'] = str(addr) self.dispatcher(client_back_dic, conn) except Exception as e: print(e) conn.close() break
interface/common_interface.py
from db import models from lib import common, lock_file from db import user_data def register_interface(client_back_dic, conn): # 写业务逻辑 # 1.判断用户名是否存在 username = client_back_dic.get('username') # 通过用户名当作条件查询 user_obj_list = models.User.select(name=username) # 若存在,给客户端返回数据, 告诉用户,用户已存在! if user_obj_list: send_dic = {'flag': False, 'msg': '用户已存在!'} # 若不存在,保存数据到MySQL数据库中, 返回注册成功给客户端 else: password = client_back_dic.get('password') user_obj = models.User( name=username, # pwd, is_vip, is_locked, user_type, register_time pwd=common.get_md5_pwd(password), is_vip=0, # 0表示不是VIP, 1表示VIP is_locked=0, # 0表示不锁定, 1表示锁定 user_type=client_back_dic.get('user_type'), register_time=common.get_time()) user_obj.save() send_dic = {'flag': True, 'msg': '注册成功'} common.send_data(send_dic, conn) def login_interface(client_back_dic, conn): username = client_back_dic.get('username') user_list = models.User.select(name=username) if not user_list: send_dic = {'flag': False, 'msg': '用户不存在'} else: user_obj = user_list[0] password = client_back_dic.get('password') # 1.判断客户端传入的密码与数据库中的密码是否相等 if user_obj.pwd == common.get_md5_pwd(password): # 产生一个随机字符串,作为session值 session = common.get_random_code() addr = client_back_dic.get('addr') # 保存session值到服务端,session + user_id一同保存到服务端本地 # 使用锁写入数据 lock_file.mutex.acquire() user_data.user_online[addr] = [session, user_obj.id] lock_file.mutex.release() send_dic = {'flag': True, 'msg': '登录成功!', 'session': session} else: send_dic = {'flag': False, 'msg': '密码错误!'} common.send_data(send_dic, conn) # 获取电影接口 @common.login_auth def get_movie_list_interface(client_back_dic, conn): # 获取所有电影对象 movie_obj_list = models.Movie.select() back_movie_list = [] if movie_obj_list: # 过滤已删除的电影 for movie_obj in movie_obj_list: # 没有删除则返回 if not movie_obj.is_delete: back_movie_list.append( # [电影名称、是否免费、电影ID] [movie_obj.name, '免费' if movie_obj.is_free else "收费", movie_obj.id] ) if back_movie_list: send_dic = {'flag': True, 'back_movie_list': back_movie_list} else: send_dic = {'flag': False, 'msg': '没有可删除的电影!'} else: send_dic = {'flag': False, 'msg': '没有电影!'} common.send_data(send_dic, conn)
db/models.py
from orm.orm import Models, StringField, IntegerField # 用户表 class User(Models): # 表名 table_name = 'user' # 字段 id = IntegerField(name='id', primary_key=True) name = StringField(name='name') # pwd, is_vip, is_locked, user_type, register_time pwd = StringField(name='pwd') is_vip = IntegerField(name='is_vip') is_locked = IntegerField(name='is_locked') user_type = StringField(name='user_type') register_time = StringField(name='register_time') # 电影表 class Movie(Models): # 表名 table_name = 'movie' # 字段 id = IntegerField(name='id', primary_key=True) name = StringField(name='name') # path, is_free, file_md5, user_id, is_delete, upload_time path = StringField(name='path') is_free = IntegerField(name='is_free') # 1 0 file_md5 = StringField(name='file_md5') user_id = IntegerField(name='user_id') is_delete = IntegerField(name='is_delete') upload_time = StringField(name='upload_time') # 公告表 class Notice(Models): table_name = 'notice' # 字段 id = IntegerField(name='id', primary_key=True) title = StringField(name='title') content = StringField(name='content') user_id = IntegerField(name='user_id') create_time = StringField(name='create_time') # 下载记录表 class DownloadRecord(Models): table_name = 'download_record' # 字段 id = IntegerField(name='id', primary_key=True) user_id = IntegerField(name='user_id') movie_id = IntegerField(name='movie_id') download_time = StringField(name='download_time')
orm/orm.py
''' 定义字段类 ''' from orm.mysql_control import Mysql class Field: def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default # varchar class StringField(Field): def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) # int class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=0): super().__init__(name, column_type, primary_key, default) # 元类控制表模型类的创建 class OrmMetaClass(type): # 类名, 类的基类, 类的名称空间 def __new__(cls, class_name, class_bases, class_attr): # print(class_name, class_bases, class_attr) # 1.过滤Models类 if class_name == 'Models': return type.__new__(cls, class_name, class_bases, class_attr) # 2.控制模型表中: 表名, 主键, 表的字段 # 如果模型表类中没有定义table_name,把类名当做表名 # 获取表名 table_name = class_attr.get('table_name', class_name) # user_info, User # 3.判断是否只有一个主键 primary_key = None # 用来存放所有的表字段, 存不是目的,目的是为了取值方便 mappings = {} ''' __main__: xxxx 'id': <__main__.IntegerField object at 0x000001E067D48B00>, 'name': <__main__.StringField object at 0x000001E067D48AC8>} ''' for key, value in class_attr.items(): # 判断value是否是字段类的对象 if isinstance(value, Field): # 把所有字段都添加到mappings中 mappings[key] = value if value.primary_key: if primary_key: raise TypeError('主键只能有一个') # 获取主键 primary_key = value.name # 删除class_attr中与mappings重复的属性, 节省资源 for key in mappings.keys(): class_attr.pop(key) # 判断是否有主键 if not primary_key: raise TypeError('必须要有一个主键') class_attr['table_name'] = table_name class_attr['primary_key'] = primary_key class_attr['mappings'] = mappings ''' 'table_name': table_name 'primary_key': primary_key 'mappings': {'id': <__main__.IntegerField object at 0x000001E067D48B00>, 'name': <__main__.StringField object at 0x000001E067D48AC8>} } ''' return type.__new__(cls, class_name, class_bases, class_attr) # 继承字典类, class Models(dict, metaclass=OrmMetaClass): def __init__(self, **kwargs): # print(kwargs) # 接收关键字参数 super().__init__(**kwargs) # 在对象.属性没有的时候触发 def __getattr__(self, item): # print(item) return self.get(item, '没有这个key') # 在对象.属性 = 属性值 时触发 def __setattr__(self, key, value): # 字典赋值操作 self[key] = value # 查 @classmethod def select(cls, **kwargs): # 获取数据库链接对象 ms = Mysql() # 若没有kwargs代表没有条件查询 if not kwargs: # select * from table; sql = 'select * from %s' % cls.table_name res = ms.my_select(sql) # 若有kwargs代表有条件 else: # print(kwargs) # {id:1} key = list(kwargs.keys())[0] # id value = kwargs.get(key) # 1 # select * from table where id=1; sql = 'select * from %s where %s=?' % (cls.table_name, key) sql = sql.replace('?', '%s') res = ms.my_select(sql, value) if res: # [{},{}, {}] ----> [obj1, obj2, obj3] # 把mysql返回来的 列表套 字典 ---> 列表套 对象 # l1 = [] # # 遍历mysql返回所有的字典 # for d in res: # # 把每一个字典传给cls实例化成一个个的r1对象 # r1 = cls(**d) # # 追加到l1列表中 # l1.append(r1) return [cls(**result) for result in res] # 插入 def save(self): ms = Mysql() # insert into table(x,x,x) values(x,x,x); # 字段名 fields = [] # 字段的值 values = [] # 存放对应字段的?号 args = [] for k, v in self.mappings.items(): # 把主键过滤掉 if not v.primary_key: fields.append(v.name) values.append(getattr(self, v.name, v.default)) args.append('?') # insert into table(x,x,x) values(?, ?, ?); sql = 'insert into %s(%s) values(%s)' % ( self.table_name, ','.join(fields), ','.join(args) ) sql = sql.replace('?', '%s') ms.my_execute(sql, values) # 更新 def sql_update(self): ms = Mysql() fields = [] primary_key = None values = [] for k, v in self.mappings.items(): # 获取主键的值 if v.primary_key: primary_key = getattr(self, v.name, v.default) else: # 获取 字段名=?, 字段名=?,字段名=? fields.append(v.name + '=?') # 获取所有字段的值 values.append(getattr(self, v.name, v.default)) # update table set %s=?,... where id=1; 把主键当做where条件 sql = 'update %s set %s where %s=%s' % ( self.table_name, ','.join(fields), self.primary_key, primary_key ) # print(sql) # update User set name=? where id=3 sql = sql.replace('?', '%s') ms.my_execute(sql, values)
orm/mysql_control.py
import pymysql from orm.db_pool import POOL class Mysql: def __init__(self): # 建立链接 self.conn = POOL.connection() # 获取游标 self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) # 关闭游标链接方法 def close_db(self): self.cursor.close() self.conn.close() # 查看 def my_select(self, sql, args=None): self.cursor.execute(sql, args) res = self.cursor.fetchall() # [{}, {}, {}] # print(res) return res # 提交 def my_execute(self, sql, args): try: # 把insert , update...一系列sql提交到mysql中 self.cursor.execute(sql, args) except Exception as e: print(e)
orm/db_pool.py
from DBUtils.PooledDB import PooledDB import pymysql # pip3 install DBUtils POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='000000', database='youku_demo', charset='utf8', autocommit='True' )
lib/common.py
import time import hashlib import json import struct import uuid from functools import wraps from db import user_data def get_time(): now_time = time.strftime('%Y-%m-%d %X') return now_time def get_md5_pwd(pwd): md = hashlib.md5() md.update(pwd.encode('utf-8')) md.update('虹桥炮王,Jason是也!'.encode('utf-8')) return md.hexdigest() def send_data(send_dic, conn): data_bytes = json.dumps(send_dic).encode('utf-8') headers = struct.pack('i', len(data_bytes)) conn.send(headers) conn.send(data_bytes) def get_random_code(): # uuid可以产生一个世界上唯一的字符串 md5 = hashlib.md5() md5.update(str(uuid.uuid4()).encode('utf-8')) return md5.hexdigest() # 登录认证装饰器 def login_auth(func): @wraps(func) # client_back_dic, conn = args def inner(*args, **kwargs): # if args[0].get('session') == 服务端存放的session值: # # [session, user_id] = values addr = args[0].get('addr') # addr: [session, user_id] user_session = user_data.user_online.get(addr) if args[0].get('session') == user_session[0]: args[0]['user_id'] = user_session[1] # # for values in user_data.user_online.values(): # if args[0].get('session') == values[0]: # # 添加到client_back_dic # args[0]['user_id'] = values[1] # user_id # 判断user_id是否存在 if args[0].get('user_id'): func(*args, **kwargs) else: send_dic = {'flag': False, 'msg': '未登录,请去登录!'} # send_data(send_dic, conn) send_data(send_dic, args[1]) return inner # if __name__ == '__main__': # # # print(get_time()) # print(get_random_code()) # 05248e1b1a10ac08872f8dd5d9dbd814 # 161df6d362dc52b0037d938a0717963e # aabd3987f88b2db46566cf6d9ec864e2
lib/lock_file.py
mutex = None
db/user_data.py
user_online = { # addr: [session, user_id] }
interface/admin_interface.py
from lib import common from db import models import os from conf import settings @common.login_auth def upload_movie_interface(client_back_dic, conn): print('炮王来交货啦!') # 确保电影名称是唯一的 随机字符串 + 电影名称 movie_name = common.get_random_code() + client_back_dic.get('movie_name') # .mp4 movie_size = client_back_dic.get('file_size') movie_path = os.path.join(settings.DOWNLOAD_PATH, movie_name) # 1.接受上传的文件 data_recv = 0 with open(movie_path, 'wb') as f: while data_recv < movie_size: data = conn.recv(1024) f.write(data) data_recv += len(data) # 2.把电影数据保存到mysql中 movie_obj = models.Movie( name=movie_name, file_md5=client_back_dic.get('file_md5'), is_free=client_back_dic.get('is_free'), is_delete=0, path=movie_path, user_id=client_back_dic.get('user_id'), upload_time=common.get_time() ) movie_obj.save() send_dic = { 'flag': True, 'msg': f'{client_back_dic.get("movie_name")}电影上传成功!' } common.send_data(send_dic, conn) @common.login_auth def check_movie_interface(client_back_dic, conn): file_md5 = client_back_dic.get('file_md5') movie_list = models.Movie.select(file_md5=file_md5) if movie_list: print(1111) send_dic = { 'flag': False, 'msg': '电影已存在!' } else: print(222) send_dic = { 'flag': True, 'msg': '电影可以上传' } common.send_data(send_dic, conn) def delete_movie_interface(client_back_dic, conn): # 直接删除 movie_obj = models.Movie.select(id=client_back_dic.get('movie_id'))[0] movie_obj.is_delete = 1 # 调用更新方法 movie_obj.sql_update() send_dic = { 'flag': True, 'msg': '电影删除成功!' } common.send_data(send_dic, conn) @common.login_auth def put_notice_interface(client_back_dic, conn): title = client_back_dic.get('title') content = client_back_dic.get('content') user_id = client_back_dic.get('user_id') notice_obj = models.Notice(title=title, content=content, user_id=user_id, create_time=common.get_time()) notice_obj.save() send_dic = { 'msg': '公告发布成功!' } common.send_data(send_dic, conn)
conf/settings.py
import os BASE_PATH = os.path.dirname(os.path.dirname(__file__)) DOWNLOAD_PATH = os.path.join(BASE_PATH, 'download_files')
客户端代码
start.py
import os import sys from core import src sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': src.run()
core/src.py
from core import admin, user func_dic = { '1': admin.admin_view, '2': user.user_view, } def run(): while True: print(''' 1.管理员功能 2.用户功能 q.退出 ''') choice = input('请选择功能编号: ').strip() if choice == 'q': break if choice not in func_dic: continue func_dic.get(choice)()
core/admin.py
from tcp_client import socket_client from lib import common import os from conf import settings user_info = { 'cookies': None } def register(client): while True: username = input('请输入用户名:').strip() password = input('请输入密码:').strip() re_password = input('请确认密码:').strip() if password == re_password: send_dic = {'username': username, 'password': password, 'type': 'register', 'user_type': 'admin'} # {'flag': False, 'msg': '用户已存在!'} # {'flag': True, 'msg': '注册成功'} back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) def login(client): while True: username = input('请输入用户名: ').strip() password = input('请输入密码:').strip() send_dic = { 'type': 'login', 'username': username, 'password': password, 'user_type': 'admin' } back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): session = back_dic.get('session') user_info['cookies'] = session print(back_dic.get('msg')) break else: print(back_dic.get('msg')) # 上传电影 def upload_movie(client): while True: # 1.打印电影列表 movie_list = common.get_movie_list() for index, movie in enumerate(movie_list): print(index, movie) choice = input('请输入上传的电影编号:').strip() if not choice.isdigit(): print('请输入数字!') continue choice = int(choice) if choice not in range(len(movie_list)): print("请选择正确编号!") continue movie_name = movie_list[choice] movie_path = os.path.join(settings.UPLOAD_FILES, movie_name) # 2.去服务端校验电影是否存在 file_md5 = common.get_movie_md5(movie_path) send_dic = { 'type': 'check_movie', 'session': user_info.get('cookies'), 'file_md5': file_md5 } back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) send_dic = { 'type': 'upload_movie', 'file_md5': file_md5, 'file_size': os.path.getsize(movie_path), 'movie_name': movie_name, 'session': user_info.get('cookies') } is_free = input('上传电影是否免费: y/n').strip() if is_free == 'y': send_dic['is_free'] = 1 else: send_dic['is_free'] = 0 back_dic = common.send_msg_back_dic(send_dic, client, file=movie_path) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) # # # send_dic = {'type': 'upload_movie','session': user_info.get('cookies')} # back_dic = common.send_msg_back_dic(send_dic, client) # print(back_dic) # 删除电影 def delete_movie(client): while True: # 1.从服务端获取电影列表 send_dic = { 'type': 'get_movie_list', 'session': user_info.get('cookies') } # 发送获取电影请求 back_dic = common.send_msg_back_dic( send_dic, client) if back_dic.get('flag'): back_movie_list = back_dic.get('back_movie_list') # 打印选择的电影 for index, movie_list in enumerate(back_movie_list): print(index, movie_list) # 2.选择需要删除的电影 choice = input('请输入需要删除的电影编号:').strip() if not choice.isdigit(): continue choice = int(choice) if choice not in range(len(back_movie_list)): continue movie_id = back_movie_list[choice][2] send_dic = { 'type': 'delete_movie', 'movie_id': movie_id, 'session': user_info.get('cookies') } # 发送删除电影请求 back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) break # 发布公告 def put_notice(client): title = input('请输入公告标题:').strip() content = input('请输入公告内容:').strip() send_dic = { 'type': 'put_notice', 'session': user_info.get('cookies'), 'title': title, 'content': content } back_dic = common.send_msg_back_dic(send_dic, client) print(back_dic.get('msg')) func_dic = { '1': register, '2': login, '3': upload_movie, '4': delete_movie, '5': put_notice, } def admin_view(): sk_client = socket_client.SocketClient() client = sk_client.get_client() while True: print(''' 1.注册 2.登录 3.上传视频 4.删除视频 5.发布公告 q.退出 ''') choice = input('请选择功能编号:').strip() if choice == 'q': break if choice not in func_dic: continue func_dic.get(choice)(client)
tcp_client/socket_client.py
import socket class SocketClient: def __init__(self): self.client = socket.socket() self.client.connect(('127.0.0.1', 9527)) def get_client(self): return self.client
lib/common.py
import json import struct from conf import settings import os import hashlib def send_msg_back_dic(send_dic, client, file=None): data_bytes = json.dumps(send_dic).encode('utf-8') headers = struct.pack('i', len(data_bytes)) client.send(headers) client.send(data_bytes) # 上传电影 if file: with open(file, 'rb') as f: for line in f: # print(line) client.send(line) headers = client.recv(4) data_len = struct.unpack('i', headers)[0] data_bytes = client.recv(data_len) back_dic = json.loads(data_bytes.decode('utf-8')) return back_dic def get_movie_list(): if os.path.exists(settings.UPLOAD_FILES): movie_list = os.listdir(settings.UPLOAD_FILES) if movie_list: return movie_list # 获取电影的md5值 def get_movie_md5(movie_path): md5 = hashlib.md5() # 截取电影的4个位置的md5值 movie_size = os.path.getsize(movie_path) # 从电影的4个位置个截取10个bytes数据 current_index = [0, movie_size // 3, (movie_size // 3) * 2, movie_size - 10] with open(movie_path, 'rb') as f: for index in current_index: f.seek(index) data = f.read(10) md5.update(data) return md5.hexdigest()
conf/settings.py
import os BASE_PATH = os.path.dirname(os.path.dirname(__file__)) UPLOAD_FILES = os.path.join(BASE_PATH, 'upload_files')