Python实用脚本实践

Python实用脚本实践

一、文件相关

查找文件

import os  path = 'xxx'  files = os.listdir(path)    for f in files:      if f.endswith('.png') and 'fish' in f:          print('I found this' + f)  

分类文件

根据后缀名将文件分类至不同文件夹

import shutil  import os    path = './'  files = os.listdir(path)    for f in files:      folder_name = './' + f.split('.')[-1]      if not os.path.exists(folder_name):          os.makedirs(folder_name)          shutil.move(f,folder_name)      else:          shutil.move(f,folder_name)  

练习: 1. 把 jpg,png,gif 文件夹中的所有文件移动到 image 文件夹中,然后删除 jpg,png,gif 文件夹  2. 把 doc,docx,md,ppt 文件夹中的所有文件移动到 document 文件夹中,然后删除

问题拆解提示:

如何实现文件归类可以拆解为以下4个子问题: 1. 如何创建目标文件夹? 2. 如何浏览各个文件夹? 3. 如何移动文件夹中的文件? 4. 如何删除文件夹?

问题解决提示: 1. 利用 os 模块中的 makedirs 函数,可以在指定路径创建文件夹。在本题中,可以先创建好 image 和 document 文件夹,在进行后续的处理。 2. os 模块中的 listdir 函数和 for 语句配合,可以完成浏览文件夹中所有文件的功能。在本题中需要注意的是,要浏览的文件夹有7个,所以先将这7个文件夹的名称存到了 list 变量中,便于使用。 3. shutil 模块中的 move 函数提供了移动文件的功能。需要指定文件所在路径和目标路径。 4. os 模块中的 removedirs 函数提供了删除文件夹的功能。

# coding:utf-8  import os  import shutil  # 需要把路径替换成你的文件夹所在路径,当把这个代码文件放在要处理的文件夹外一层时,可以使用下面的相对路径写法  path = './problem2_files'  # 创建目标文件夹  os.makedirs(path + '/image')  os.makedirs(path + '/document')  # 将需要处理的后缀名存储到list中  image_suffix = ['jpg', 'png', 'gif']  doc_suffix = ['doc', 'docx', 'ppt', 'md']  # 移动jpg、png、gif文件中的文件  for i in image_suffix:      cur_path = path + '/' + i      files = os.listdir(cur_path)      for f in files:          # 移动文件夹中的文件          shutil.move(cur_path + '/' + f, path + '/image')      # 删除文件夹      os.removedirs(cur_path)  # 移动doc、docx、md、ppt文件夹中的文件,步骤与前面类似  for d in doc_suffix:      cur_path = path + '/' + d      files = os.listdir(cur_path)      for f in files:          shutil.move(cur_path + '/' + f, path + '/document')      os.removedirs(cur_path)  

文件解压

面向过程编程思路

scan — unzip — delete

import os  import shutil    def scan_file():      for f in os.listdir():          if f.endswith('.zip'):              return f    def unzip_it(f):      folder_name = f.split('.')[0]      target_path = './' + folder_name      os.makedirs(target_path)      shutil.unpack_archive(f,target_path)    def delete(f):      os.remove(f)    while True:      zip_file = scan_file()      if zip_file:          unzip_it(zip_file)          delete(zip_file)  

练习:

监测image文件夹,如果包含的文件大于等于5个,则将这些文件压缩到archive1.zip文件中,并删除这些文件。当再次监测到文件多于5个的时候,生成archive2.zip压缩包,以此类推。

问题解决提示 1. os库中的listdir函数可以获取一个文件夹中的所有文件名并存入list变量中,那么统计这个list变量中元素的个数,即可得到文件夹中的文件数。同时,利用whileTruetime.sleep()的配合,可以实现每隔一段时间执行一段代码的功能。 2. 利用shutil库中的make_archive函数来生成压缩包。 3. 利用 os 库中的remove函数来删除文件。因为要删除文件夹中的所有文件,所以配合listdir函数生成的files变量一起使用。

# coding:utf-8  from shutil import make_archive  import os  import time  # 指定需要监测的文件夹  image_path = './image'  # 指定压缩包存放的文件夹  output_path = './output'  # 记录生成了多少个压缩包  zip_count = 0  # 利用while True使程序持续运行  while True:      files = os.listdir(image_path)      # files变量中存储了路径下所有文件的文件名,len()函数可以获取list变量包含多少个元素      # files_count即为路径下的文件数      files_count = len(files)      if files_count >= 5:          zip_count = zip_count + 1          # 指定压缩包的名称以及路径          zip_name =  output_path + '/' + 'archive' + str(zip_count)          # 压缩文件          make_archive(zip_name, 'zip', image_path)          # 删除压缩过的文件          for f in files:              os.remove(image_path + '/' + f)      # 休眠1秒,达到每1秒监测一次的效果      time.sleep(1)  

删除重复

删除重复的文件。包括不同文件夹内的重复文件。 

问题拆解提示

如何删除重复文件可以拆解为以下4个子问题: 1. 如何将所有文件都存放到一个list变量中? 2. 如何判断两个文件的内容是否一致? 3. 如何判断在很多文件中找到一对重复的文件? 4. 如何删除文件?

问题解决提示 1. 假设我们的文件夹只有一层,没有嵌套的文件夹,那么,利用os模块中的listdir函数和for循环配合,就可以浏览所有文件。在浏览文件的同时,记录下每个文件的路径,并存储到list变量中,我们就得到了所有文件的集合。 2. 利用filecmp模块中的cmp函数,判断两个文件的内容是否一致。如果一致,函数返回True;如果不一致,函数返回False。 3. 对一个list变量,使用双重for循环,可以对list中的元素进行两两对比。第一层循环相当于从list中取出一个元素x,第二层循环相当于取出list中的另一个元素y,比较所有的x和y,即实现了对list中所有元素的两两对比。 4. 利用os模块中的remove函数,可以删除指定位置的文件。

# coding:utf-8  import os  import filecmp    # 需要把路径替换成你的文件夹所在路径,当把这个代码文件放在要处理的文件夹外一层时,可以使用下面的相对路径写法  path = './problem3_files'  # 已知路径下存在两个文件夹pic1和pic2  dirs = ['pic1','pic2']    # 将指定目录下的所有文件的路径存储到all_files变量中  def get_all_files(path, dirs):      all_files = []      for d in dirs:          cur_path = path + '/' + d          files = os.listdir(cur_path)          for f in files:              all_files.append(cur_path + '/' + f)      return all_files    # 比较两个文件的内容是否一致  def cmp_files(x, y):      if filecmp.cmp(x, y):          # 如果一致,则删除第二个,保留第一个,并输出信息          os.remove(y)          print("路径"" + y + ""下的文件是重复文件,已经删除")    # 调用函数,获取文件列表  all_files = get_all_files(path, dirs)  # 用双重for循环来比较文件是否有重复  for x in all_files:      for y in all_files:          # 如果x和y不是相同的文件,而且都存在,则执行后续操作          if x != y and os.path.exists(x) and os.path.exists(y):              # 比较两个文件的内容是否一致             cmp_files(x,y)  

二、微信相关

定制群发消息

将下面的表格内容群发至对应的人。

用到第三方库wxpy

过程:实例化对象 -> 找到好友 -> 发消息

  1. 利用Python内置的csv库,通过调用csv.DictReader()函数,读取并解析csv文件。 小技巧:利用列表解析式生成list,list中的每一个元素代表了csv文件中的一行。
  2. 利用内置的format()函数,根据文字模板和指定内容,生成对应的信息。
  3. 利用wxpy库,查找好友并发送信息。 // bot -> bot.find_fren() -> bot.send()

// f = bot.friend().search('name')

// f.send('msg')

import csv  from wxpy import *  import time      def read_info():      f = open('./sample.csv','r')      reader = csv.DictReader(f)      return [info for info in reader]#[{},{},{}]        #'xx-同学请于 xx 时间参加 xx 课程,课程地址是 xxx。收到请回复,谢谢'  def make_msg(raw_info):      t = '{n}-同学请于{t}时间参加{s}课程,课程地址是{a}。收到请回复,谢谢!'      return [t.format(n=info['姓名'],                       t=info['上课时间'],                       s=info['课程'],                       a=info['上课地址']                       ) for info in raw_info]      # -> list ['xxx','xxx']    def send(msg_list):      bot = Bot()      for msg in msg_list:          fren_name = msg.split('-')[0]          f = bot.friends().search(fren_name) # list          if len(f) == 1:              f[0].send(msg)          else:              print(fren_name)              print('Please check this name')      time.sleep(3)    # bot -> bot.find_fren() -> bot.send()  # f = bot.friend().search('name')  # f.send('msg')    raw_info = read_info()  msg_list = make_msg(raw_info)  send(msg_list)  

练习

现在要给王总、麻瓜编程君、沫沫、侯爵这4个人发送不同的邀请信息,销售给了你一份 csv 名单,但名单里人是不全的,只有其中3个人。 

请实现使用 wxpy 库群发信息,并且考虑这种情况:当 csv 名单里缺失了我要发送的人,程序要如何处理这种情况。 

# coding:utf-8  import csv  import time  from wxpy import *    # 将要发送的好友的名字存到list中  FRIENDS = ['李总','麻瓜编程君','沫沫']  CSV_PATH = './meeting_msg.csv'    # 定义函数获取csv中的内容  def read_csv(path):      f = open(path, 'r')      reader = csv.DictReader(f)      return [info for info in reader]    # 定义获取发送内容的函数  def get_msg(infos, name):      template = "{name},提醒下,{time}记得来参加{event},地点在{location},{note}"      for info in infos:          if info['微信昵称'] == name:              msg = template.format(                  name = info['微信昵称'],                  time = info['时间'],                  event = info['事件'],                  location = info['地址'],                  note = info['备注']              )              return msg      # 如果在infos列表中没有找到对应的微信昵称,则输出None      return None    # 定义用于群发操作的函数  def send_to_friends(infos, friends):      # 初始化微信机器人      bot = Bot()      for friend in friends:          # 搜素好友          friend_search = bot.friends().search(friend)          # 如果搜索结果仅有一个,则发送消息,否则返回错误信息          if (len(friend_search) == 1):              msg = get_msg(infos, friend)              if msg:                  friend_search[0].send(msg)              else:                  print("发送失败!用户名不在csv中:"+friend)          else:              print("发送失败!请检查用户名:"+friend)          time.sleep(3)    # 调用群发函数  send_to_friends(read_csv(CSV_PATH), FRIENDS)  

自动拉微信

建议使用小号建群,运行代码时用小号扫码登录,避免大号被封。

运行代码之前需要先建好群,修改群名称和代码中一致,至少在群里说一句话,然后用大号加小号管理员即可自动入群

加好友的时候用一个try catch捕获异常,多用sleep防止微信号被封

from wxpy import *  import time      bot = Bot()    def listen(pwd):      time.sleep(3)      return [msg for msg in bot.messages if msg.text ==pwd]    def add(users,group):      try:          group.add_members(users,use_invitation=True)          return group      except ResponseError:          return None    def get_newfren(say):      time.sleep(3)      return [msg for msg in bot.messages if msg.text==say]    group = bot.groups().search('test group 1')[0]    while True:      new_fren = get_newfren('Make friend')      if new_fren:          print('Found new one!')          for msg in new_fren:              new_user = msg.card              bot.accept_friend(new_user)              new_user.send('Hi new friend')              bot.messages.remove(msg)        time.sleep(3)        print('Running')      selected = listen('Pull me in')      if selected:          print('Found a new friend!XD')          for msg in selected:              this_user = msg.sender              add(this_user,group)              bot.messages.remove(msg)      

进群退群的统计日志

使用 wxpy 文档中「看用户是否在群中」的功能 http://wxpy.readthedocs.io/zh/latest/chats.html#id8

实现机器人群主的监测:每小时看谁进群了了,谁退群了,发送统计日志给自己的微信号。

  1. 查看wxpy库的文档,发现获取群聊的方法——bot.groups().search("群聊名"),发现获取群聊成员列表的方法——group.members()。
  2. 通过对比新旧成员列表,来发现退群名单和进群名单。退群名单:如果旧成员不在新的成员列表中,说明他退群了;进群名单:如果新成员不在旧的成员列表中,说明他是新加群的。
  3. 利用前面提到的send函数来给自己发日志。
# coding:utf-8  import time  from wxpy import *    # 通过群名找到群聊  def find_group(bot, name):      try:          result = bot.groups().search(name)          if len(result) == 1:              return result[0]          else:              print("监测失败!未发现群或存在多个名称相同的群。")              return None      except ResponseError as e:          print(e.err_code, e.err_msg)  # 获取群聊的成员列表  def get_members(group):      try:          return group.members      except ResponseError as e:          print(e.err_code, e.err_msg)  # 对比新旧两个成员列表  def parse_members(cur_members, last_members):      # 获取退群名单:如果旧成员不在新的成员列表中,说明他退群了      quit_list = [last.name for last in last_members if last not in cur_members]      # 获取进群名单:如果新成员不在旧的成员列表中,说明他是新加群的      new_list = [cur.name for cur in cur_members if cur not in last_members]      # 返回文本信息      return "退群名单:"+",".join(quit_list)+"n"+"进群名单:"+",".join(new_list)  # 将指定的消息发送给指定的人  def send_msg(bot, my_name, text):      try:          myself = bot.friends().search(my_name)[0]          myself.send(text)      except ResponseError as e:          print(e.err_code, e.err_msg)  # 将主要操作封装在main函数中  def main(my_name, group_name):      bot = Bot()      group = find_group(bot, group_name)      # 先获取一次成员列表,便于后续对比      last_members = get_members(group)      # 利用while True和sleep,实现每隔1小时进行一次监测      while True:          time.sleep(3600)          # 获取当前的成员列表          cur_members = get_members(group)          msg = parse_members(cur_members, last_members)          send_msg(bot, my_name, msg)          # 将当前成员列表赋值给last_members变量,当作下一次操作中的旧成员列表          last_members = cur_members  # 调用main函数,执行监测  main("我自己的微信名","群名称")