python-優酷系統管理員視圖粗糙版(無詳細注釋)
- 2019 年 10 月 7 日
- 筆記
目錄
Tank-YouKu(僅管理員功能粗糙版)
優酷系統管理員視圖功能
- 註冊
- 登錄
- 上傳視頻
- 刪除視頻
- 發佈公告
前期準備
先創建好數據庫以及各數據表 安裝 pymysql 模塊、安裝 DBUtils 模塊 配置好服務端的 db_pool 中的數據庫信息
創庫創表語句
手動創建數據庫 youku_demo
, 需配置數據庫編碼 utf8
(安裝時配置好了命令行中就不用管)
創建數據庫:create database youku_demo;
選擇該數據庫:use youku_demo
,然後執行下面的一堆sql代碼,或者手動導入
創表代碼及測試數據
測試用戶:tank
密碼: 123
/* Navicat MySQL Data Transfer Source Server : localhost-E Source Server Type : MySQL Source Server Version : 50645 Source Host : localhost:3306 Source Schema : youku_demo Target Server Type : MySQL Target Server Version : 50645 File Encoding : 65001 Date: 28/08/2019 21:22:47 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for download_record -- ---------------------------- DROP TABLE IF EXISTS `download_record`; CREATE TABLE `download_record` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NULL DEFAULT NULL, `movie_id` int(11) NULL DEFAULT NULL, `download_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Table structure for movie -- ---------------------------- DROP TABLE IF EXISTS `movie`; CREATE TABLE `movie` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `is_free` int(11) NULL DEFAULT NULL, `file_md5` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `user_id` int(11) NULL DEFAULT NULL, `is_delete` int(11) NULL DEFAULT NULL, `upload_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Table structure for notice -- ---------------------------- DROP TABLE IF EXISTS `notice`; CREATE TABLE `notice` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `user_id` int(11) NULL DEFAULT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of notice -- ---------------------------- INSERT INTO `notice` VALUES (1, 'test1', '測試發佈公告是否正常', 1, '2019-08-28 21:18:38'); -- ---------------------------- -- Table structure for user -- ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `pwd` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `is_vip` int(11) NULL DEFAULT NULL, `is_locked` int(11) NULL DEFAULT NULL, `user_type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `register_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (1, 'tank', 'e23087636dfcd7addf39e32f89e25d44', 0, 0, 'admin', '2019-08-28 21:18:10'); SET FOREIGN_KEY_CHECKS = 1;
數據庫結構

安裝pymysql模塊
參見博客: MySQL-注釋-Navicat基本使用-複雜查詢練習題-解題思路-pymysql操作數據庫-SQL注入-05 的安裝部分
安裝DBUtils模塊
在命令行輸入pip3 install DBUtils
開始安裝

配置 db_pool
根據自己電腦的mysql 情況配置

項目架構與數據流向

目錄結構
server目錄結構

client目錄結構

服務端代碼
start.py
import os import sys from tcp_server.socket_server import SocketServer sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': server = SocketServer() server.run()
tcp_server/socket_server.py
import socket import struct import json from interface import common_interface from interface import admin_interface from concurrent.futures import ThreadPoolExecutor from threading import Lock from lib import lock_file lock = Lock() lock_file.mutex = lock func_dic = { 'register': common_interface.register_interface, 'login': common_interface.login_interface, 'check_movie': admin_interface.check_movie_interface, 'upload_movie': admin_interface.upload_movie_interface, 'get_movie_list': common_interface.get_movie_list_interface, 'delete_movie': admin_interface.delete_movie_interface, 'put_notice': admin_interface.put_notice_interface } class SocketServer: def __init__(self): self.server = socket.socket() self.server.bind(('127.0.0.1', 9527)) self.server.listen(5) self.pool = ThreadPoolExecutor(50) def run(self): print('啟動服務端...') while True: conn, addr = self.server.accept() self.pool.submit(self.working, conn, addr) # 任務分發 def dispatcher(self, client_back_dic, conn): # # 判斷功能的類型 # if client_back_dic.get('type') == 'register': # common_interface.register_interface(client_back_dic, conn) # # elif client_back_dic.get('type') == 'login': # common_interface.login_interface(client_back_dic, conn) _type = client_back_dic.get('type') if _type in func_dic: # register func_dic.get(_type)(client_back_dic, conn) # 用於執行客戶端連接任務 def working(self, conn, addr): while True: try: # 每一個客戶端訪問服務端都會經過此處 # 此處用於接收客戶端傳入的數據 headers = conn.recv(4) data_len = struct.unpack('i', headers)[0] data_bytes = conn.recv(data_len) client_back_dic = json.loads(data_bytes.decode('utf-8')) client_back_dic['addr'] = str(addr) self.dispatcher(client_back_dic, conn) except Exception as e: print(e) conn.close() break
interface/common_interface.py
from db import models from lib import common, lock_file from db import user_data def register_interface(client_back_dic, conn): # 寫業務邏輯 # 1.判斷用戶名是否存在 username = client_back_dic.get('username') # 通過用戶名當作條件查詢 user_obj_list = models.User.select(name=username) # 若存在,給客戶端返回數據, 告訴用戶,用戶已存在! if user_obj_list: send_dic = {'flag': False, 'msg': '用戶已存在!'} # 若不存在,保存數據到MySQL數據庫中, 返回註冊成功給客戶端 else: password = client_back_dic.get('password') user_obj = models.User( name=username, # pwd, is_vip, is_locked, user_type, register_time pwd=common.get_md5_pwd(password), is_vip=0, # 0表示不是VIP, 1表示VIP is_locked=0, # 0表示不鎖定, 1表示鎖定 user_type=client_back_dic.get('user_type'), register_time=common.get_time()) user_obj.save() send_dic = {'flag': True, 'msg': '註冊成功'} common.send_data(send_dic, conn) def login_interface(client_back_dic, conn): username = client_back_dic.get('username') user_list = models.User.select(name=username) if not user_list: send_dic = {'flag': False, 'msg': '用戶不存在'} else: user_obj = user_list[0] password = client_back_dic.get('password') # 1.判斷客戶端傳入的密碼與數據庫中的密碼是否相等 if user_obj.pwd == common.get_md5_pwd(password): # 產生一個隨機字符串,作為session值 session = common.get_random_code() addr = client_back_dic.get('addr') # 保存session值到服務端,session + user_id一同保存到服務端本地 # 使用鎖寫入數據 lock_file.mutex.acquire() user_data.user_online[addr] = [session, user_obj.id] lock_file.mutex.release() send_dic = {'flag': True, 'msg': '登錄成功!', 'session': session} else: send_dic = {'flag': False, 'msg': '密碼錯誤!'} common.send_data(send_dic, conn) # 獲取電影接口 @common.login_auth def get_movie_list_interface(client_back_dic, conn): # 獲取所有電影對象 movie_obj_list = models.Movie.select() back_movie_list = [] if movie_obj_list: # 過濾已刪除的電影 for movie_obj in movie_obj_list: # 沒有刪除則返回 if not movie_obj.is_delete: back_movie_list.append( # [電影名稱、是否免費、電影ID] [movie_obj.name, '免費' if movie_obj.is_free else "收費", movie_obj.id] ) if back_movie_list: send_dic = {'flag': True, 'back_movie_list': back_movie_list} else: send_dic = {'flag': False, 'msg': '沒有可刪除的電影!'} else: send_dic = {'flag': False, 'msg': '沒有電影!'} common.send_data(send_dic, conn)
db/models.py
from orm.orm import Models, StringField, IntegerField # 用戶表 class User(Models): # 表名 table_name = 'user' # 字段 id = IntegerField(name='id', primary_key=True) name = StringField(name='name') # pwd, is_vip, is_locked, user_type, register_time pwd = StringField(name='pwd') is_vip = IntegerField(name='is_vip') is_locked = IntegerField(name='is_locked') user_type = StringField(name='user_type') register_time = StringField(name='register_time') # 電影表 class Movie(Models): # 表名 table_name = 'movie' # 字段 id = IntegerField(name='id', primary_key=True) name = StringField(name='name') # path, is_free, file_md5, user_id, is_delete, upload_time path = StringField(name='path') is_free = IntegerField(name='is_free') # 1 0 file_md5 = StringField(name='file_md5') user_id = IntegerField(name='user_id') is_delete = IntegerField(name='is_delete') upload_time = StringField(name='upload_time') # 公告表 class Notice(Models): table_name = 'notice' # 字段 id = IntegerField(name='id', primary_key=True) title = StringField(name='title') content = StringField(name='content') user_id = IntegerField(name='user_id') create_time = StringField(name='create_time') # 下載記錄表 class DownloadRecord(Models): table_name = 'download_record' # 字段 id = IntegerField(name='id', primary_key=True) user_id = IntegerField(name='user_id') movie_id = IntegerField(name='movie_id') download_time = StringField(name='download_time')
orm/orm.py
''' 定義字段類 ''' from orm.mysql_control import Mysql class Field: def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default # varchar class StringField(Field): def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) # int class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=0): super().__init__(name, column_type, primary_key, default) # 元類控制表模型類的創建 class OrmMetaClass(type): # 類名, 類的基類, 類的名稱空間 def __new__(cls, class_name, class_bases, class_attr): # print(class_name, class_bases, class_attr) # 1.過濾Models類 if class_name == 'Models': return type.__new__(cls, class_name, class_bases, class_attr) # 2.控制模型表中: 表名, 主鍵, 表的字段 # 如果模型表類中沒有定義table_name,把類名當做表名 # 獲取表名 table_name = class_attr.get('table_name', class_name) # user_info, User # 3.判斷是否只有一個主鍵 primary_key = None # 用來存放所有的表字段, 存不是目的,目的是為了取值方便 mappings = {} ''' __main__: xxxx 'id': <__main__.IntegerField object at 0x000001E067D48B00>, 'name': <__main__.StringField object at 0x000001E067D48AC8>} ''' for key, value in class_attr.items(): # 判斷value是否是字段類的對象 if isinstance(value, Field): # 把所有字段都添加到mappings中 mappings[key] = value if value.primary_key: if primary_key: raise TypeError('主鍵只能有一個') # 獲取主鍵 primary_key = value.name # 刪除class_attr中與mappings重複的屬性, 節省資源 for key in mappings.keys(): class_attr.pop(key) # 判斷是否有主鍵 if not primary_key: raise TypeError('必須要有一個主鍵') class_attr['table_name'] = table_name class_attr['primary_key'] = primary_key class_attr['mappings'] = mappings ''' 'table_name': table_name 'primary_key': primary_key 'mappings': {'id': <__main__.IntegerField object at 0x000001E067D48B00>, 'name': <__main__.StringField object at 0x000001E067D48AC8>} } ''' return type.__new__(cls, class_name, class_bases, class_attr) # 繼承字典類, class Models(dict, metaclass=OrmMetaClass): def __init__(self, **kwargs): # print(kwargs) # 接收關鍵字參數 super().__init__(**kwargs) # 在對象.屬性沒有的時候觸發 def __getattr__(self, item): # print(item) return self.get(item, '沒有這個key') # 在對象.屬性 = 屬性值 時觸發 def __setattr__(self, key, value): # 字典賦值操作 self[key] = value # 查 @classmethod def select(cls, **kwargs): # 獲取數據庫鏈接對象 ms = Mysql() # 若沒有kwargs代表沒有條件查詢 if not kwargs: # select * from table; sql = 'select * from %s' % cls.table_name res = ms.my_select(sql) # 若有kwargs代表有條件 else: # print(kwargs) # {id:1} key = list(kwargs.keys())[0] # id value = kwargs.get(key) # 1 # select * from table where id=1; sql = 'select * from %s where %s=?' % (cls.table_name, key) sql = sql.replace('?', '%s') res = ms.my_select(sql, value) if res: # [{},{}, {}] ----> [obj1, obj2, obj3] # 把mysql返回來的 列表套 字典 ---> 列表套 對象 # l1 = [] # # 遍歷mysql返回所有的字典 # for d in res: # # 把每一個字典傳給cls實例化成一個個的r1對象 # r1 = cls(**d) # # 追加到l1列表中 # l1.append(r1) return [cls(**result) for result in res] # 插入 def save(self): ms = Mysql() # insert into table(x,x,x) values(x,x,x); # 字段名 fields = [] # 字段的值 values = [] # 存放對應字段的?號 args = [] for k, v in self.mappings.items(): # 把主鍵過濾掉 if not v.primary_key: fields.append(v.name) values.append(getattr(self, v.name, v.default)) args.append('?') # insert into table(x,x,x) values(?, ?, ?); sql = 'insert into %s(%s) values(%s)' % ( self.table_name, ','.join(fields), ','.join(args) ) sql = sql.replace('?', '%s') ms.my_execute(sql, values) # 更新 def sql_update(self): ms = Mysql() fields = [] primary_key = None values = [] for k, v in self.mappings.items(): # 獲取主鍵的值 if v.primary_key: primary_key = getattr(self, v.name, v.default) else: # 獲取 字段名=?, 字段名=?,字段名=? fields.append(v.name + '=?') # 獲取所有字段的值 values.append(getattr(self, v.name, v.default)) # update table set %s=?,... where id=1; 把主鍵當做where條件 sql = 'update %s set %s where %s=%s' % ( self.table_name, ','.join(fields), self.primary_key, primary_key ) # print(sql) # update User set name=? where id=3 sql = sql.replace('?', '%s') ms.my_execute(sql, values)
orm/mysql_control.py
import pymysql from orm.db_pool import POOL class Mysql: def __init__(self): # 建立鏈接 self.conn = POOL.connection() # 獲取游標 self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) # 關閉游標鏈接方法 def close_db(self): self.cursor.close() self.conn.close() # 查看 def my_select(self, sql, args=None): self.cursor.execute(sql, args) res = self.cursor.fetchall() # [{}, {}, {}] # print(res) return res # 提交 def my_execute(self, sql, args): try: # 把insert , update...一系列sql提交到mysql中 self.cursor.execute(sql, args) except Exception as e: print(e)
orm/db_pool.py
from DBUtils.PooledDB import PooledDB import pymysql # pip3 install DBUtils POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯 maxusage=None, # 一個鏈接最多被重複使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='000000', database='youku_demo', charset='utf8', autocommit='True' )
lib/common.py
import time import hashlib import json import struct import uuid from functools import wraps from db import user_data def get_time(): now_time = time.strftime('%Y-%m-%d %X') return now_time def get_md5_pwd(pwd): md = hashlib.md5() md.update(pwd.encode('utf-8')) md.update('虹橋炮王,Jason是也!'.encode('utf-8')) return md.hexdigest() def send_data(send_dic, conn): data_bytes = json.dumps(send_dic).encode('utf-8') headers = struct.pack('i', len(data_bytes)) conn.send(headers) conn.send(data_bytes) def get_random_code(): # uuid可以產生一個世界上唯一的字符串 md5 = hashlib.md5() md5.update(str(uuid.uuid4()).encode('utf-8')) return md5.hexdigest() # 登錄認證裝飾器 def login_auth(func): @wraps(func) # client_back_dic, conn = args def inner(*args, **kwargs): # if args[0].get('session') == 服務端存放的session值: # # [session, user_id] = values addr = args[0].get('addr') # addr: [session, user_id] user_session = user_data.user_online.get(addr) if args[0].get('session') == user_session[0]: args[0]['user_id'] = user_session[1] # # for values in user_data.user_online.values(): # if args[0].get('session') == values[0]: # # 添加到client_back_dic # args[0]['user_id'] = values[1] # user_id # 判斷user_id是否存在 if args[0].get('user_id'): func(*args, **kwargs) else: send_dic = {'flag': False, 'msg': '未登錄,請去登錄!'} # send_data(send_dic, conn) send_data(send_dic, args[1]) return inner # if __name__ == '__main__': # # # print(get_time()) # print(get_random_code()) # 05248e1b1a10ac08872f8dd5d9dbd814 # 161df6d362dc52b0037d938a0717963e # aabd3987f88b2db46566cf6d9ec864e2
lib/lock_file.py
mutex = None
db/user_data.py
user_online = { # addr: [session, user_id] }
interface/admin_interface.py
from lib import common from db import models import os from conf import settings @common.login_auth def upload_movie_interface(client_back_dic, conn): print('炮王來交貨啦!') # 確保電影名稱是唯一的 隨機字符串 + 電影名稱 movie_name = common.get_random_code() + client_back_dic.get('movie_name') # .mp4 movie_size = client_back_dic.get('file_size') movie_path = os.path.join(settings.DOWNLOAD_PATH, movie_name) # 1.接受上傳的文件 data_recv = 0 with open(movie_path, 'wb') as f: while data_recv < movie_size: data = conn.recv(1024) f.write(data) data_recv += len(data) # 2.把電影數據保存到mysql中 movie_obj = models.Movie( name=movie_name, file_md5=client_back_dic.get('file_md5'), is_free=client_back_dic.get('is_free'), is_delete=0, path=movie_path, user_id=client_back_dic.get('user_id'), upload_time=common.get_time() ) movie_obj.save() send_dic = { 'flag': True, 'msg': f'{client_back_dic.get("movie_name")}電影上傳成功!' } common.send_data(send_dic, conn) @common.login_auth def check_movie_interface(client_back_dic, conn): file_md5 = client_back_dic.get('file_md5') movie_list = models.Movie.select(file_md5=file_md5) if movie_list: print(1111) send_dic = { 'flag': False, 'msg': '電影已存在!' } else: print(222) send_dic = { 'flag': True, 'msg': '電影可以上傳' } common.send_data(send_dic, conn) def delete_movie_interface(client_back_dic, conn): # 直接刪除 movie_obj = models.Movie.select(id=client_back_dic.get('movie_id'))[0] movie_obj.is_delete = 1 # 調用更新方法 movie_obj.sql_update() send_dic = { 'flag': True, 'msg': '電影刪除成功!' } common.send_data(send_dic, conn) @common.login_auth def put_notice_interface(client_back_dic, conn): title = client_back_dic.get('title') content = client_back_dic.get('content') user_id = client_back_dic.get('user_id') notice_obj = models.Notice(title=title, content=content, user_id=user_id, create_time=common.get_time()) notice_obj.save() send_dic = { 'msg': '公告發佈成功!' } common.send_data(send_dic, conn)
conf/settings.py
import os BASE_PATH = os.path.dirname(os.path.dirname(__file__)) DOWNLOAD_PATH = os.path.join(BASE_PATH, 'download_files')
客戶端代碼
start.py
import os import sys from core import src sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': src.run()
core/src.py
from core import admin, user func_dic = { '1': admin.admin_view, '2': user.user_view, } def run(): while True: print(''' 1.管理員功能 2.用戶功能 q.退出 ''') choice = input('請選擇功能編號: ').strip() if choice == 'q': break if choice not in func_dic: continue func_dic.get(choice)()
core/admin.py
from tcp_client import socket_client from lib import common import os from conf import settings user_info = { 'cookies': None } def register(client): while True: username = input('請輸入用戶名:').strip() password = input('請輸入密碼:').strip() re_password = input('請確認密碼:').strip() if password == re_password: send_dic = {'username': username, 'password': password, 'type': 'register', 'user_type': 'admin'} # {'flag': False, 'msg': '用戶已存在!'} # {'flag': True, 'msg': '註冊成功'} back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) def login(client): while True: username = input('請輸入用戶名: ').strip() password = input('請輸入密碼:').strip() send_dic = { 'type': 'login', 'username': username, 'password': password, 'user_type': 'admin' } back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): session = back_dic.get('session') user_info['cookies'] = session print(back_dic.get('msg')) break else: print(back_dic.get('msg')) # 上傳電影 def upload_movie(client): while True: # 1.打印電影列表 movie_list = common.get_movie_list() for index, movie in enumerate(movie_list): print(index, movie) choice = input('請輸入上傳的電影編號:').strip() if not choice.isdigit(): print('請輸入數字!') continue choice = int(choice) if choice not in range(len(movie_list)): print("請選擇正確編號!") continue movie_name = movie_list[choice] movie_path = os.path.join(settings.UPLOAD_FILES, movie_name) # 2.去服務端校驗電影是否存在 file_md5 = common.get_movie_md5(movie_path) send_dic = { 'type': 'check_movie', 'session': user_info.get('cookies'), 'file_md5': file_md5 } back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) send_dic = { 'type': 'upload_movie', 'file_md5': file_md5, 'file_size': os.path.getsize(movie_path), 'movie_name': movie_name, 'session': user_info.get('cookies') } is_free = input('上傳電影是否免費: y/n').strip() if is_free == 'y': send_dic['is_free'] = 1 else: send_dic['is_free'] = 0 back_dic = common.send_msg_back_dic(send_dic, client, file=movie_path) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) # # # send_dic = {'type': 'upload_movie','session': user_info.get('cookies')} # back_dic = common.send_msg_back_dic(send_dic, client) # print(back_dic) # 刪除電影 def delete_movie(client): while True: # 1.從服務端獲取電影列表 send_dic = { 'type': 'get_movie_list', 'session': user_info.get('cookies') } # 發送獲取電影請求 back_dic = common.send_msg_back_dic( send_dic, client) if back_dic.get('flag'): back_movie_list = back_dic.get('back_movie_list') # 打印選擇的電影 for index, movie_list in enumerate(back_movie_list): print(index, movie_list) # 2.選擇需要刪除的電影 choice = input('請輸入需要刪除的電影編號:').strip() if not choice.isdigit(): continue choice = int(choice) if choice not in range(len(back_movie_list)): continue movie_id = back_movie_list[choice][2] send_dic = { 'type': 'delete_movie', 'movie_id': movie_id, 'session': user_info.get('cookies') } # 發送刪除電影請求 back_dic = common.send_msg_back_dic(send_dic, client) if back_dic.get('flag'): print(back_dic.get('msg')) break else: print(back_dic.get('msg')) break # 發佈公告 def put_notice(client): title = input('請輸入公告標題:').strip() content = input('請輸入公告內容:').strip() send_dic = { 'type': 'put_notice', 'session': user_info.get('cookies'), 'title': title, 'content': content } back_dic = common.send_msg_back_dic(send_dic, client) print(back_dic.get('msg')) func_dic = { '1': register, '2': login, '3': upload_movie, '4': delete_movie, '5': put_notice, } def admin_view(): sk_client = socket_client.SocketClient() client = sk_client.get_client() while True: print(''' 1.註冊 2.登錄 3.上傳視頻 4.刪除視頻 5.發佈公告 q.退出 ''') choice = input('請選擇功能編號:').strip() if choice == 'q': break if choice not in func_dic: continue func_dic.get(choice)(client)
tcp_client/socket_client.py
import socket class SocketClient: def __init__(self): self.client = socket.socket() self.client.connect(('127.0.0.1', 9527)) def get_client(self): return self.client
lib/common.py
import json import struct from conf import settings import os import hashlib def send_msg_back_dic(send_dic, client, file=None): data_bytes = json.dumps(send_dic).encode('utf-8') headers = struct.pack('i', len(data_bytes)) client.send(headers) client.send(data_bytes) # 上傳電影 if file: with open(file, 'rb') as f: for line in f: # print(line) client.send(line) headers = client.recv(4) data_len = struct.unpack('i', headers)[0] data_bytes = client.recv(data_len) back_dic = json.loads(data_bytes.decode('utf-8')) return back_dic def get_movie_list(): if os.path.exists(settings.UPLOAD_FILES): movie_list = os.listdir(settings.UPLOAD_FILES) if movie_list: return movie_list # 獲取電影的md5值 def get_movie_md5(movie_path): md5 = hashlib.md5() # 截取電影的4個位置的md5值 movie_size = os.path.getsize(movie_path) # 從電影的4個位置個截取10個bytes數據 current_index = [0, movie_size // 3, (movie_size // 3) * 2, movie_size - 10] with open(movie_path, 'rb') as f: for index in current_index: f.seek(index) data = f.read(10) md5.update(data) return md5.hexdigest()
conf/settings.py
import os BASE_PATH = os.path.dirname(os.path.dirname(__file__)) UPLOAD_FILES = os.path.join(BASE_PATH, 'upload_files')