Python服务端开发
- 2020 年 1 月 13 日
- 筆記
之前的爬虫https://blog.csdn.net/column/details/17218.html几篇文章为Python实现客户端以Get/Post等方法请求服务。最近用Python写了一个服务,在此总结一下知识点:
1、Python代码中执行shell命令; 2、白名单设置—通过查看客户端ip是否在服务端的list中存在; 3、日志分割—引入loggin类实现日志记录,引入TimedRotatingFileHandler类实现日志分割;
下面以一个小Demo为例进行整理:服务端接收客户端get请求,从git拉取代码,执行修改,并在最后修改完成后提交git。代码实现的功能:
- 对访问的客户端ip进行限制;
- 按天记录日志,日志存放时间为60天;
- 对get请求参数验证;
- 拉取git,执行更新,修改完成后提交git;
# coding:utf-8 import logging import urlparse from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer import os from logging.handlers import TimedRotatingFileHandler # 客户端白名单,白名单中的机器可以访问该服务 WHITE_LIST = ['127.0.0.1'] # 操作日志记录 # 1、logging.basicConfig(level, format, datefmt, filename, filemode) # level: 设置日志级别,默认为logging.WARNING,NOTSET/DEBUG/INFO/WARNING/ERROR/CRITICAL # format: 指定输出的格式和内容,format可以输出很多有用信息【'%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s'】 # datefmt: 指定时间格式,【'%a, %d %b %Y %H:%M:%S'】 # filename: 指定日志文件件 # filemode: 指定日志文件的打开模式,'w'或'a' logging.basicConfig( level=logging.DEBUG, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', # 定义输出log的格式 datefmt = '%Y%m%d %A %H:%M:%S', # 时间 filename = os.path.join(os.getcwd(), 'log.txt'), filemode = 'a') # 添加 TimedRotatingFileHandler # 2、TimedRotatingFileHandler(filename [,when [,interval [,backupCount]]]) # filename 是输出日志文件名的前缀,比如log/myapp.log # when 是一个字符串的定义如下: # “S”: Seconds # “M”: Minutes # “H”: Hours # “D”: Days # “W”: Week day (0=Monday) # “midnight”: Roll over at midnight # interval 是指等待多少个单位when的时间后,Logger会自动重建文件 # backupCount 是保留日志个数 log_file_handler = TimedRotatingFileHandler( filename = os.path.join(os.getcwd(), 'log.txt'), when = "D", interval = 1, backupCount = 60) logging.getLogger().addHandler(log_file_handler) # 实例化添加handler # 添加控制台显示日志 console = logging.StreamHandler(); console.setLevel(logging.DEBUG); formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s'); console.setFormatter(formatter); logging.getLogger('').addHandler(console); # 词典id file_no_list = ["0", "1"] # 允许的操作,1:增加词 2:删除词 operation_list = ["1", "2"] # 词典映射 file = {} file['0'] = "test1.txt" file['1'] = "test2.txt" class MyRequestHandler(BaseHTTPRequestHandler): # 请求响应 def _write_resp(self, resp_code, msg): self.send_response(resp_code) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(str(self.headers)) self.wfile.write(msg) # Get请求的处理 def do_GET(self): logging.info("Client %s request service, method %s, params %s", self.client_address, self.command, self.path) if self.client_address[0] not in WHITE_LIST: logging.warning("Client has no access right.") self._write_resp(403, "Please check access rights.") return datas = self.url2Dict(self.path) flag, file_no, operation, words = self.check_params(datas) if not flag: logging.warning("Request params error.") self._write_resp(400, "Please check request params.") return res_code, msg = self.modify_Dict(file_no, operation, words) self._write_resp(res_code, msg) # url解析,将url解析转换成字典 def url2Dict(self, url): query = urlparse.urlparse(url).query return dict([(k, v[0]) for k, v in urlparse.parse_qs(query).items()]) # 参数校验 def check_params(self, datas): file_no = datas.get("file_no", -1) if file_no == -1 or file_no not in file_no_list: logging.debug("Request params error --- file_no error.") return False, "", "", [] operation = datas.get("operation", -1) if operation == -1 or operation not in operation_list: logging.debug("Request params error --- operation error.") return False, file_no, "", [] words_tmp = datas.get("words", -1) if words_tmp == -1: logging.debug("Request params error --- words error.") return False, file_no, operation, [] words = words_tmp.split(",") if len(words) < 1: logging.debug("Request params error --- words error.") return False, file_no, operation, [] return True, file_no, operation, words # 更新词库操作 def modify_Dict(self, file_no, operation, words): url = "[email protected]:xingzhexiaozhu/UsefulProgram.git" file_path = os.path.abspath(".") + "/UsefulProgram/" data_path = file_path + file[file_no] # 拉取最新代码 if not self.git_init(url, file_path): logging.error("Git init error.") return 500, "git init error." # 更新词库 if not self.modify(data_path, operation, words): logging.error("Modify dict error.") return 500, "modify dict error." # 提交修改 if not self.git_push(file_path, data_path): logging.error("Git push error.") return 500, "git push error." logging.debug("Everything done.") return 200, "Success." # 初始化操作 def git_init(self, url, file_path): try: if os.path.exists(file_path): # 拉取过则每次更新前先 git pull os.system("git -C " + file_path + " pull origin master") logging.debug("git pull %s", file_path) else: # 否则更新前需要先 git clone os.system("git clone " + url) logging.debug("git clone %s", url) return True except: logging.error("git init error") return False # 更新词库 def modify(self, data_path, operation, words): try: # 增加词 if operation == '1': self.add_words(data_path, words) # 删除词 elif operation == '2': self.del_words(data_path, words) logging.debug("Modify dict done.") return True except: logging.error("Modify dict error") return False # 修改完成后提交修改到对应的分支 def git_push(self, file_path, data_path): try: path = os.getcwd() # 获取当前路径 os.chdir(file_path) # 进入要提交的仓库 print(os.getcwd()) print(file_path) print(data_path) os.system("git add " + data_path) os.system("git commit -m " + "update") os.system("git push origin master") os.getcwd() os.chdir(path) # 返回之前的路径 logging.debug("Git push done.") return True except: logging.error("git push error") return False # 增加词到词库 def add_words(self, data_path, words): with open(data_path, 'a+') as file: for word in words: if len(word.strip()) != 0: logging.info("insert word %s in file %s", word, data_path) file.write(word) file.write("n") # 从词库中删除词 def del_words(self, data_path, words): tmp_file = data_path + ".tmp" write_file = open(tmp_file, 'w') with open(data_path, 'r') as read_file: while True: line = read_file.readline() if line != '': if len(line.strip()) != 0: if line.strip() in words: logging.info("delete word %s from %s", line, data_path) continue write_file.write(line) else: break write_file.close() os.system("cp -rf " + tmp_file + " " + data_path) os.system("rm " + tmp_file) if __name__=="__main__": server = HTTPServer(('', 8899), MyRequestHandler) print("Started http server ...") server.serve_forever()