Python实用脚本实践
- 2020 年 1 月 2 日
- 筆記
Python实用脚本实践
一、文件相关
查找文件
import os path = 'xxx' files = os.listdir(path) for f in files: if f.endswith('.png') and 'fish' in f: print('I found this' + f)
分类文件
根据后缀名将文件分类至不同文件夹
import shutil import os path = './' files = os.listdir(path) for f in files: folder_name = './' + f.split('.')[-1] if not os.path.exists(folder_name): os.makedirs(folder_name) shutil.move(f,folder_name) else: shutil.move(f,folder_name)
练习: 1. 把 jpg,png,gif 文件夹中的所有文件移动到 image 文件夹中,然后删除 jpg,png,gif 文件夹 2. 把 doc,docx,md,ppt 文件夹中的所有文件移动到 document 文件夹中,然后删除
问题拆解提示:
如何实现文件归类可以拆解为以下4个子问题: 1. 如何创建目标文件夹? 2. 如何浏览各个文件夹? 3. 如何移动文件夹中的文件? 4. 如何删除文件夹?
问题解决提示: 1. 利用 os 模块中的 makedirs 函数,可以在指定路径创建文件夹。在本题中,可以先创建好 image 和 document 文件夹,在进行后续的处理。 2. os 模块中的 listdir 函数和 for 语句配合,可以完成浏览文件夹中所有文件的功能。在本题中需要注意的是,要浏览的文件夹有7个,所以先将这7个文件夹的名称存到了 list 变量中,便于使用。 3. shutil 模块中的 move 函数提供了移动文件的功能。需要指定文件所在路径和目标路径。 4. os 模块中的 removedirs 函数提供了删除文件夹的功能。
# coding:utf-8 import os import shutil # 需要把路径替换成你的文件夹所在路径,当把这个代码文件放在要处理的文件夹外一层时,可以使用下面的相对路径写法 path = './problem2_files' # 创建目标文件夹 os.makedirs(path + '/image') os.makedirs(path + '/document') # 将需要处理的后缀名存储到list中 image_suffix = ['jpg', 'png', 'gif'] doc_suffix = ['doc', 'docx', 'ppt', 'md'] # 移动jpg、png、gif文件中的文件 for i in image_suffix: cur_path = path + '/' + i files = os.listdir(cur_path) for f in files: # 移动文件夹中的文件 shutil.move(cur_path + '/' + f, path + '/image') # 删除文件夹 os.removedirs(cur_path) # 移动doc、docx、md、ppt文件夹中的文件,步骤与前面类似 for d in doc_suffix: cur_path = path + '/' + d files = os.listdir(cur_path) for f in files: shutil.move(cur_path + '/' + f, path + '/document') os.removedirs(cur_path)
文件解压
面向过程编程思路
scan — unzip — delete
import os import shutil def scan_file(): for f in os.listdir(): if f.endswith('.zip'): return f def unzip_it(f): folder_name = f.split('.')[0] target_path = './' + folder_name os.makedirs(target_path) shutil.unpack_archive(f,target_path) def delete(f): os.remove(f) while True: zip_file = scan_file() if zip_file: unzip_it(zip_file) delete(zip_file)
练习:
监测image
文件夹,如果包含的文件大于等于5个,则将这些文件压缩到archive1.zip
文件中,并删除这些文件。当再次监测到文件多于5个的时候,生成archive2.zip
压缩包,以此类推。
问题解决提示 1. os库中的listdir
函数可以获取一个文件夹中的所有文件名并存入list变量中,那么统计这个list变量中元素的个数,即可得到文件夹中的文件数。同时,利用whileTrue
和time.sleep()
的配合,可以实现每隔一段时间执行一段代码的功能。 2. 利用shutil
库中的make_archive
函数来生成压缩包。 3. 利用 os 库中的remove
函数来删除文件。因为要删除文件夹中的所有文件,所以配合listdir
函数生成的files
变量一起使用。
# coding:utf-8 from shutil import make_archive import os import time # 指定需要监测的文件夹 image_path = './image' # 指定压缩包存放的文件夹 output_path = './output' # 记录生成了多少个压缩包 zip_count = 0 # 利用while True使程序持续运行 while True: files = os.listdir(image_path) # files变量中存储了路径下所有文件的文件名,len()函数可以获取list变量包含多少个元素 # files_count即为路径下的文件数 files_count = len(files) if files_count >= 5: zip_count = zip_count + 1 # 指定压缩包的名称以及路径 zip_name = output_path + '/' + 'archive' + str(zip_count) # 压缩文件 make_archive(zip_name, 'zip', image_path) # 删除压缩过的文件 for f in files: os.remove(image_path + '/' + f) # 休眠1秒,达到每1秒监测一次的效果 time.sleep(1)
删除重复
删除重复的文件。包括不同文件夹内的重复文件。
问题拆解提示
如何删除重复文件可以拆解为以下4个子问题: 1. 如何将所有文件都存放到一个list变量中? 2. 如何判断两个文件的内容是否一致? 3. 如何判断在很多文件中找到一对重复的文件? 4. 如何删除文件?
问题解决提示 1. 假设我们的文件夹只有一层,没有嵌套的文件夹,那么,利用os模块中的listdir函数和for循环配合,就可以浏览所有文件。在浏览文件的同时,记录下每个文件的路径,并存储到list变量中,我们就得到了所有文件的集合。 2. 利用filecmp模块中的cmp函数,判断两个文件的内容是否一致。如果一致,函数返回True;如果不一致,函数返回False。 3. 对一个list变量,使用双重for循环,可以对list中的元素进行两两对比。第一层循环相当于从list中取出一个元素x,第二层循环相当于取出list中的另一个元素y,比较所有的x和y,即实现了对list中所有元素的两两对比。 4. 利用os模块中的remove函数,可以删除指定位置的文件。
# coding:utf-8 import os import filecmp # 需要把路径替换成你的文件夹所在路径,当把这个代码文件放在要处理的文件夹外一层时,可以使用下面的相对路径写法 path = './problem3_files' # 已知路径下存在两个文件夹pic1和pic2 dirs = ['pic1','pic2'] # 将指定目录下的所有文件的路径存储到all_files变量中 def get_all_files(path, dirs): all_files = [] for d in dirs: cur_path = path + '/' + d files = os.listdir(cur_path) for f in files: all_files.append(cur_path + '/' + f) return all_files # 比较两个文件的内容是否一致 def cmp_files(x, y): if filecmp.cmp(x, y): # 如果一致,则删除第二个,保留第一个,并输出信息 os.remove(y) print("路径"" + y + ""下的文件是重复文件,已经删除") # 调用函数,获取文件列表 all_files = get_all_files(path, dirs) # 用双重for循环来比较文件是否有重复 for x in all_files: for y in all_files: # 如果x和y不是相同的文件,而且都存在,则执行后续操作 if x != y and os.path.exists(x) and os.path.exists(y): # 比较两个文件的内容是否一致 cmp_files(x,y)
二、微信相关
定制群发消息
将下面的表格内容群发至对应的人。

用到第三方库wxpy
过程:实例化对象 -> 找到好友 -> 发消息
- 利用Python内置的csv库,通过调用csv.DictReader()函数,读取并解析csv文件。 小技巧:利用列表解析式生成list,list中的每一个元素代表了csv文件中的一行。
- 利用内置的format()函数,根据文字模板和指定内容,生成对应的信息。
- 利用wxpy库,查找好友并发送信息。 // bot -> bot.find_fren() -> bot.send()
// f = bot.friend().search('name')
// f.send('msg')
import csv from wxpy import * import time def read_info(): f = open('./sample.csv','r') reader = csv.DictReader(f) return [info for info in reader]#[{},{},{}] #'xx-同学请于 xx 时间参加 xx 课程,课程地址是 xxx。收到请回复,谢谢' def make_msg(raw_info): t = '{n}-同学请于{t}时间参加{s}课程,课程地址是{a}。收到请回复,谢谢!' return [t.format(n=info['姓名'], t=info['上课时间'], s=info['课程'], a=info['上课地址'] ) for info in raw_info] # -> list ['xxx','xxx'] def send(msg_list): bot = Bot() for msg in msg_list: fren_name = msg.split('-')[0] f = bot.friends().search(fren_name) # list if len(f) == 1: f[0].send(msg) else: print(fren_name) print('Please check this name') time.sleep(3) # bot -> bot.find_fren() -> bot.send() # f = bot.friend().search('name') # f.send('msg') raw_info = read_info() msg_list = make_msg(raw_info) send(msg_list)
练习
现在要给王总、麻瓜编程君、沫沫、侯爵这4个人发送不同的邀请信息,销售给了你一份 csv 名单,但名单里人是不全的,只有其中3个人。
请实现使用 wxpy 库群发信息,并且考虑这种情况:当 csv 名单里缺失了我要发送的人,程序要如何处理这种情况。

# coding:utf-8 import csv import time from wxpy import * # 将要发送的好友的名字存到list中 FRIENDS = ['李总','麻瓜编程君','沫沫'] CSV_PATH = './meeting_msg.csv' # 定义函数获取csv中的内容 def read_csv(path): f = open(path, 'r') reader = csv.DictReader(f) return [info for info in reader] # 定义获取发送内容的函数 def get_msg(infos, name): template = "{name},提醒下,{time}记得来参加{event},地点在{location},{note}" for info in infos: if info['微信昵称'] == name: msg = template.format( name = info['微信昵称'], time = info['时间'], event = info['事件'], location = info['地址'], note = info['备注'] ) return msg # 如果在infos列表中没有找到对应的微信昵称,则输出None return None # 定义用于群发操作的函数 def send_to_friends(infos, friends): # 初始化微信机器人 bot = Bot() for friend in friends: # 搜素好友 friend_search = bot.friends().search(friend) # 如果搜索结果仅有一个,则发送消息,否则返回错误信息 if (len(friend_search) == 1): msg = get_msg(infos, friend) if msg: friend_search[0].send(msg) else: print("发送失败!用户名不在csv中:"+friend) else: print("发送失败!请检查用户名:"+friend) time.sleep(3) # 调用群发函数 send_to_friends(read_csv(CSV_PATH), FRIENDS)
自动拉微信群

建议使用小号建群,运行代码时用小号扫码登录,避免大号被封。
运行代码之前需要先建好群,修改群名称和代码中一致,至少在群里说一句话,然后用大号加小号管理员即可自动入群
加好友的时候用一个try catch捕获异常,多用sleep防止微信号被封
from wxpy import * import time bot = Bot() def listen(pwd): time.sleep(3) return [msg for msg in bot.messages if msg.text ==pwd] def add(users,group): try: group.add_members(users,use_invitation=True) return group except ResponseError: return None def get_newfren(say): time.sleep(3) return [msg for msg in bot.messages if msg.text==say] group = bot.groups().search('test group 1')[0] while True: new_fren = get_newfren('Make friend') if new_fren: print('Found new one!') for msg in new_fren: new_user = msg.card bot.accept_friend(new_user) new_user.send('Hi new friend') bot.messages.remove(msg) time.sleep(3) print('Running') selected = listen('Pull me in') if selected: print('Found a new friend!XD') for msg in selected: this_user = msg.sender add(this_user,group) bot.messages.remove(msg)
进群退群的统计日志
使用 wxpy 文档中「看用户是否在群中」的功能 http://wxpy.readthedocs.io/zh/latest/chats.html#id8
实现机器人群主的监测:每小时看谁进群了了,谁退群了,发送统计日志给自己的微信号。
- 查看wxpy库的文档,发现获取群聊的方法——bot.groups().search("群聊名"),发现获取群聊成员列表的方法——group.members()。
- 通过对比新旧成员列表,来发现退群名单和进群名单。退群名单:如果旧成员不在新的成员列表中,说明他退群了;进群名单:如果新成员不在旧的成员列表中,说明他是新加群的。
- 利用前面提到的send函数来给自己发日志。
# coding:utf-8 import time from wxpy import * # 通过群名找到群聊 def find_group(bot, name): try: result = bot.groups().search(name) if len(result) == 1: return result[0] else: print("监测失败!未发现群或存在多个名称相同的群。") return None except ResponseError as e: print(e.err_code, e.err_msg) # 获取群聊的成员列表 def get_members(group): try: return group.members except ResponseError as e: print(e.err_code, e.err_msg) # 对比新旧两个成员列表 def parse_members(cur_members, last_members): # 获取退群名单:如果旧成员不在新的成员列表中,说明他退群了 quit_list = [last.name for last in last_members if last not in cur_members] # 获取进群名单:如果新成员不在旧的成员列表中,说明他是新加群的 new_list = [cur.name for cur in cur_members if cur not in last_members] # 返回文本信息 return "退群名单:"+",".join(quit_list)+"n"+"进群名单:"+",".join(new_list) # 将指定的消息发送给指定的人 def send_msg(bot, my_name, text): try: myself = bot.friends().search(my_name)[0] myself.send(text) except ResponseError as e: print(e.err_code, e.err_msg) # 将主要操作封装在main函数中 def main(my_name, group_name): bot = Bot() group = find_group(bot, group_name) # 先获取一次成员列表,便于后续对比 last_members = get_members(group) # 利用while True和sleep,实现每隔1小时进行一次监测 while True: time.sleep(3600) # 获取当前的成员列表 cur_members = get_members(group) msg = parse_members(cur_members, last_members) send_msg(bot, my_name, msg) # 将当前成员列表赋值给last_members变量,当作下一次操作中的旧成员列表 last_members = cur_members # 调用main函数,执行监测 main("我自己的微信名","群名称")