python爬取微博圖片數據存到Mysq

前言

  由於硬件等各種原因需要把大概170多萬2t左右的微博圖片數據存到Mysql中.之前存微博數據一直用的非關係型數據庫mongodb,由於對Mysql的各種不熟悉,踩了無數坑,來來回回改了3天才完成。

挖坑填坑之旅

建表

存數據的時候首先需要設計數據庫,我準備設計了3個表

微博表:[id, userid, blog_text, lat, lng, created_time, reserve]   pkey: id

圖片表:[md5, pic_url, pic_bin, exif, reserve]   pkey: md5

關係表:[id, md5, reserve]   pkey: (id, md5)   fkey: (id, 微博表id)  (md5, 圖片表md5)

  建表的時候別的問題都還好,主要是 pic_bin 的類型和 blog_text 的類型有很大的問題,首先是pic_bin的類型,開始設置的為BLOB,但是運行之後發現BLOB最大只能存1M的數據,並不能滿足微博圖片的存儲,後改成MEDIUMBLOB(16M)基本能夠滿足要求了。再後來就是blog_text,我遇到的第一個大坑

  開始的時候很自然的設置blog_text的類型為TEXT,但跑起來發現有些數據存不進去,會報錯,經篩查發現是有些微博文本中包含了emoji表情…隨後找了很多資料發現是因為utf8下文字是三位元組,但是emoji是四位元組,需要將編碼改成utf8mb4。然而我在mac上整mysql的配置文件報各種奇葩錯誤,一怒之下把TEXT改成了BLOB,就好了。因為本地是MAC,我要連接到遠程的一台Windows上才能通過那個Windows連接到群暉的Mysql上…本地配置改了也白改。

存圖片

  然後這就是一個大坑!!! 由於我使用的python3,所以讀取圖片得到的二進制的結果前面會有一個b', 表示bytes,正是由於這個b'導致sql語句拼接的時候這個b後面的單引號會和sql語句的引號結合,導致後面的二進制沒有在引號裏面出錯!二進制編碼又不像string可以對字符轉義,試了好多方法都不行!最後沒有辦法使用base64 對二進制進行加密轉化成字符串,存到數據庫中,然後要用時的時候再解密。

pic_bin = str(base64.b64encode(pic_bin))[2:-1]

改配置文件

  由於使用Python多進程,一個小時8G數據量,圖片數據比較大,發包的時候回超過mysql的默認限制,出現Mysql server has gone away, 這個時候要改配置文件,在配置文件中參數

max_allowed_packet = 600M wait_timeout = 60000

Lost connection to Mysql server during query

  程序跑着跑着總會出現這個錯誤,一直找原因,試了各種辦法看了好多資料,一直都是錯誤。實在不知道什麼原因了…後來一想,我管他什麼原因,失去連接之後重新連接就行了。使用conn.Ping(True) 判斷是否連接mysql成功。如果失去連接就重新連接就行了!最後解決了這個問題

代碼實現

#!/usr/bin/env python  # -*- coding: utf-8 -*-  # Created by Baoyi on 2017/10/16  from multiprocessing.pool import Pool    import pymysql  import requests  import json  import exifread  from io import BytesIO  import configparser  import hashlib  import logging  import base64    # 配置logging  logging.basicConfig(level=logging.WARNING,                      format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',                      datefmt='%a, %d %b %Y %H:%M:%S',                      filename='weibo.log',                      filemode='w')    cf = configparser.ConfigParser()  cf.read("ConfigParser.conf")    # 讀取配置mysql  db_host = cf.get("mysql", "db_host")  db_port = cf.getint("mysql", "db_port")  db_user = cf.get("mysql", "db_user")  db_pass = cf.get("mysql", "db_pass")  db = cf.get("mysql", "db")    # 創建連接  conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, port=db_port, charset='utf8')  # 獲取游標  cursor = conn.cursor()    # 創建insert_sql  insert_blog_sql = (      "INSERT IGNORE INTO blog(userid, id, blog_text, lat, lng, created_time) VALUES('{uid}', '{id}','{blog_text}','{lat}','{lng}','{created_time}')"  )    insert_pic_sql = (      "INSERT IGNORE INTO pics(pic_url, pic_bin, md5, exif) VALUES ('{pic_url}','{pic_bin}','{md5}','{exif}')"  )    insert_relationship_sql = (      "INSERT IGNORE INTO relationship(id, md5) VALUES ('{id}','{md5}')"  )    uid = []    with open('./data/final_id.txt', 'r') as f:      for i in f.readlines():          uid.append(i.strip('rn'))    # 處理圖片數據  def handle_pic(pic_url):      large_pic_url = pic_url.replace('thumbnail', 'large')      large_bin = requests.get(large_pic_url)      return large_bin.content    def get_poiid_info(uid):      try:          url = 'https://api.weibo.com/2/statuses/user_timeline.json'          load = {              'access_token': 'xxxxxxxxxx',              'uid': uid,              'count': 100,              'feature': 2,              'trim_user': 1          }          get_info = requests.get(url=url, params=load, timeout=(10, 10))          if get_info.status_code != 200:              logging.warning(ConnectionError)              pass          info_json = json.loads(get_info.content)          info_json['uid'] = uid          statuses = info_json['statuses']          # 處理篩選微博數據          for status in statuses:              id = status['idstr']              if status['geo'] is not None:                  lat = status['geo']['coordinates'][0]                  lng = status['geo']['coordinates'][1]                  pic_urls = status['pic_urls']                    # 判斷是否在北京                  if (115.7 < lng < 117.4) and (39.4 < lat < 41.6):                      # 若在北京,插入blog數據進庫                      blog_text = status['text'].replace(''', '''')                      created_time = status['created_at']                      try:                          cursor.execute(                              insert_blog_sql.format(uid=uid, id=id, blog_text=blog_text, lat=lat, lng=lng,                                                     created_time=created_time))                      except pymysql.err.OperationalError as e_blog:                          logging.warning(e_blog.args[1])                          pass                        # conn.commit()                      # 處理圖片                      for pic_url in pic_urls:                          # 獲取原圖片二進制數據                          pic_bin = handle_pic(pic_url['thumbnail_pic'])                            # 讀取exif 數據                          pic_file = BytesIO(pic_bin)  # 將二進制數據轉化成文件對象便於讀取exif數據信息和生成MD5                          tag1 = exifread.process_file(pic_file, details=False, strict=True)                          tag = {}                          for key, value in tag1.items():                              if key not in (                                      'JPEGThumbnail', 'TIFFThumbnail', 'Filename',                                      'EXIF MakerNote'):  # 去除四個不必要的exif屬性,簡化信息量                                  tag[key] = str(value)                          tags = json.dumps(tag)  # dumps為json類型 此tag即為exif的json數據                          # 生成MD5                          MD5 = hashlib.md5(pic_file.read()).hexdigest()                          # 首先把二進制圖片用base64 轉成字符串之後再存                          try:                              cursor.execute(                                  insert_pic_sql.format(pic_url=pic_url['thumbnail_pic'].replace('thumbnail', 'large'),                                                        pic_bin=str(base64.b64encode(pic_bin))[2:-1], md5=MD5,                                                        exif=tags))                          except pymysql.err.OperationalError as e_pic:                              logging.warning(e_pic.args[1])                              pass                          try:                              cursor.execute(insert_relationship_sql.format(id=id, md5=MD5))                          except pymysql.err.OperationalError as e_relation:                              logging.warning(e_relation)                              pass                          conn.commit()                    else:                      logging.info(id + " is Not in Beijing")                      pass              else:                  logging.info(id + ' Geo is null')                  pass      except pymysql.err.OperationalError as e:          logging.error(e.args[1])          pass    def judge_conn(i):      global conn      try:          conn.ping(True)          get_poiid_info(i)      except pymysql.err.OperationalError as e:          logging.error('Reconnect')          conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, charset='utf8')          get_poiid_info(i)    def handle_tuple(a_tuple):      read_uid_set = []      for i in a_tuple:          read_uid_set.append(i[0])      return set(read_uid_set)    if __name__ == '__main__':      sql_find_uid = (          "SELECT userid FROM blog"      )      cursor.execute(sql_find_uid)      read_uid_tuple = cursor.fetchall()      read_list = handle_tuple(read_uid_tuple)      print(len(read_list))        new_uid = set(uid).difference(read_list)      print(len(new_uid))        pool = Pool()      pool.map(judge_conn, list(new_uid)