使用 selenium 實現谷歌以圖搜圖爬蟲
使用selenium實現谷歌以圖搜圖
實現思路
原理非常簡單,就是利用selenium去操作瀏覽器,獲取到想要的鏈接,然後進行圖片的下載,和一般的爬蟲無異。
用到的技術:multiprocessing,selenium,xpath,requests
以下按照代碼執行的順序進行講解。
首先導入需要的包
# coding=utf-8
import base64
import hashlib
import os
import re
import shutil
import time
from multiprocessing import Pool, cpu_count
import requests
import tqdm
from colorama import Fore
from selenium import webdriver
from selenium.common.exceptions import (ElementNotVisibleException,
StaleElementReferenceException)
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
定義一個 run()
函數,作為入口。這裡使用多進程技術,同時打開多個瀏覽器進行圖片爬取。
def run():
num_process = cpu_count() # 進程數設置為cpu核心數
pool = Pool(num_process) # 建立一個進程池
filelist = []
upload = r"./upload" # 需要進行上傳的圖片文件夾
getfilelist(upload, filelist) # 遞歸查找文件夾裏面所有的圖片文件
result = partition(filelist, num_process) # 將圖片文件列表平均分為幾個list,每個進程跑一部分
pool.map_async(download_task, result) # 下載任務丟進進程池
pool.close() # 不再允許加入進程池
pool.join() # 等待進程完成
其中 getfilelist()
函數是遞歸查找,工作中用得很多了。
EXTEND = [".bmp", ".jpg", ".jpeg", ".tif", ".tiff",
".jfif", ".png", ".gif", ".iff", ".ilbm"]
def is_img(img_path):
# 根據後綴判斷是否為圖片
ext = os.path.splitext(img_path)[1]
if ext in EXTEND:
return True
else:
return False
def getfilelist(path, filelist):
file = os.listdir(path)
for im_name in file:
if os.path.isdir(os.path.join(path, im_name)):
getfilelist(os.path.join(path, im_name), filelist)
else:
if is_img(im_name):
name = os.path.join(path, im_name)
filelist.append(name)
partition()
函數用於將一個列表均分為幾份,以便實現多進程。
def partition(ls, size):
num_per_list = len(ls)//size
result = []
if num_per_list*size == len(ls):
for i in range(size):
result.append(ls[num_per_list*i:num_per_list*(i+1)])
else:
for i in range(size-1):
result.append(ls[num_per_list*i:num_per_list*(i+1)])
result.append(ls[num_per_list*(size-1):])
return result
download_task()
為具體的下載任務,一個task實例化一個GoogleSearcher
類,遍歷自己的圖片列表進行以圖搜圖。
def download_task(filelist):
searcher = GoogleSearcher(
download=r"./download")
for file in filelist:
searcher.simple_file_run(file) # 上傳單張圖並進行以圖搜圖
GoogleSearcher
類比較長,在注釋中進行講解。
USERNAME = os.environ['USERNAME']
class GoogleSearcher:
def __init__(self, download="download", sleep_time=1):
super().__init__()
self._download = download # 下載文件夾
self.sleep_time = sleep_time # 下載頁面時等待時間
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36"}
os.makedirs(self._download, exist_ok=True) # 創建下載文件夾
self.option = webdriver.ChromeOptions()
# self.option.add_argument("--user-data-dir=" + f"C:/Users/{USERNAME}/AppData/Local/Google/Chrome/User Data/")
# self.option.add_argument("headless") # if use headless, may failed.
self.option.add_argument("disable-gpu")
self.driver = webdriver.Chrome(options=self.option) # 以上為瀏覽器對象創建
def upload_img_get_html(self, file):
# 上傳圖片並轉到圖片列表頁面
print(
f"{Fore.GREEN} Begin to upload image {os.path.split(file)[1]} {Fore.RESET}")
self.driver.get("//www.google.com/imghp")
# 等待相機按鈕出現
condition_1 = EC.visibility_of_element_located(
(By.CLASS_NAME, "LM8x9c"))
WebDriverWait(self.driver, timeout=20,
poll_frequency=0.5).until(condition_1)
# 相機按鈕出現後點擊
image_button = self.driver.find_element_by_class_name("LM8x9c")
image_button.send_keys(Keys.ENTER)
# 等待出現上傳圖片字樣
condition_2 = EC.visibility_of_element_located(
(By.ID, "dRSWfb"))
WebDriverWait(self.driver, timeout=20, poll_frequency=0.5).until(
condition_2)
# 點擊上傳圖片
upload = self.driver.find_element_by_xpath('//*[@id="dRSWfb"]/div/a')
upload.send_keys(Keys.ENTER)
# 找到上傳圖片的控件
condition_3 = EC.visibility_of_element_located(
(By.ID, 'awyMjb'))
WebDriverWait(self.driver, timeout=10, poll_frequency=0.5).until(
condition_3)
input_ = self.driver.find_element_by_id('awyMjb')
# 因為上傳圖片的控件是一個input,直接將文件send就行
input_.send_keys(file)
print(f"{Fore.GREEN} uploaded {Fore.RESET}")
# 頁面轉向另一頁
condition_4 = EC.visibility_of_element_located(
(By.XPATH, '//*[@id="top_nav"]'))
WebDriverWait(self.driver, timeout=20,
poll_frequency=0.5).until(condition_4)
# 等待片刻
time.sleep(self.sleep_time)
# print(driver.current_url)
# print(driver.page_source)
print(f"{Fore.GREEN} Finish download source code{Fore.RESET}")
return self.driver.page_source
def highlight(self, element):
self.driver.execute_script(
"arguments[0].setAttribute('style', arguments[1]);", element, "background: yellow; border: 2px solid red;")
def wait_and_click(self, xpath):
# Sometimes click fails unreasonably. So tries to click at all cost.
try:
w = WebDriverWait(self.driver, 15)
elem = w.until(EC.element_to_be_clickable((By.XPATH, xpath)))
elem.click()
self.highlight(elem)
except Exception as e:
print('Click time out - {}'.format(xpath))
print('Refreshing browser...')
self.browser.refresh()
time.sleep(2)
return self.wait_and_click(xpath)
return elem
def get_extension_from_link(self, link, default='jpg'):
# 獲取文件後綴
splits = str(link).split('.')
if len(splits) == 0:
return default
ext = splits[-1].lower()
if ext == 'jpg' or ext == 'jpeg':
return 'jpg'
elif ext == 'gif':
return 'gif'
elif ext == 'png':
return 'png'
else:
return default
def base64_to_object(self, src):
# base64 解碼
header, encoded = str(src).split(',', 1)
data = base64.decodebytes(bytes(encoded, encoding='utf-8'))
return data
def download_images(self, links, download_dir):
# 下載圖片
total = len(links)
for index, link in enumerate(links):
try:
if len(link) < 100:
print('Downloading {} : {} / {}'.format(link, index + 1, total))
else:
print(
'Downloading {} : {} / {}'.format(link[:100], index + 1, total))
# 鏈接過長,只打印部分
if str(link).startswith('data:image/jpeg;base64'):
# base64編碼的jpg圖片
response = self.base64_to_object(src=link)
ext = 'jpg'
is_base64 = True
elif str(link).startswith('data:image/png;base64'):
# base64編碼的png圖片
response = self.base64_to_object(src=link)
ext = 'png'
is_base64 = True
else:
# 圖片超鏈接
response = requests.get(link, stream=True, timeout=5)
ext = self.get_extension_from_link(link=link)
is_base64 = False
path = os.path.join(download_dir, str(index).zfill(4)+"."+ext)
try:
with open(path, "wb") as f:
# base64圖片和超鏈接圖片兩種保存方法
if is_base64:
f.write(response)
else:
shutil.copyfileobj(response.raw, f)
except Exception as e:
print('Save failed - {}'.format(e))
del response
except Exception as e:
print('Download failed - ', e)
continue
def get_full_resolution_links(self):
print('[Full Resolution Mode]')
time.sleep(1)
elem = self.driver.find_element_by_tag_name("body")
print('Scraping links')
self.wait_and_click('//div[@data-ri="0"]')
time.sleep(1)
links = []
count = 1
last_scroll = 0
scroll_patience = 0
while True:
try:
xpath = '//div[@id="islsp"]//div[@class="v4dQwb"]'
div_box = self.driver.find_element(By.XPATH, xpath)
self.highlight(div_box)
xpath = '//img[@class="n3VNCb"]'
img = div_box.find_element(By.XPATH, xpath)
self.highlight(img)
xpath = '//div[@class="k7O2sd"]'
loading_bar = div_box.find_element(By.XPATH, xpath)
# 等待圖片加載,如果加載不完,獲取到的是 base64 編碼的圖片
while str(loading_bar.get_attribute('style')) != 'display: none;':
time.sleep(0.1)
src = img.get_attribute('src')
if src is not None:
links.append(src)
if len(src) < 100:
print('%d: %s' % (count, src))
else:
print('%d: %s' % (count, src[:100])) # 如果太長,只打印一部分
count += 1
except StaleElementReferenceException:
pass
except Exception as e:
print(
'[Exception occurred while collecting links from google_full] {}'.format(e))
scroll = self.driver.execute_script("return window.pageYOffset;") # 頁面滾動的位置
if scroll == last_scroll:
# 頁面滾動1
scroll_patience += 1
else:
scroll_patience = 0
last_scroll = scroll
if scroll_patience >= 30:
#頁面滾動30,停止
break
elem.send_keys(Keys.RIGHT)
links = list(dict.fromkeys(links)) # 鏈接去重
print('Collect links done. Total: {}'.format(len(links)))
return links
def simple_file_run(self, img):
# 上傳圖片並進行搜索
img_name = os.path.splitext(os.path.split(img)[1])[0] # 圖片名
parent_name = os.path.split(os.path.split(img)[0])[-1] # 圖片的父級名字,用來區分圖片的類別
print("--> Processing image: {} ".format(img_name))
download_dir = os.path.join(self._download, parent_name, img_name)
os.makedirs(download_dir, exist_ok=True)
html_source = self.upload_img_get_html(img) # 上傳圖片,到搜索結果頁
similar_img_href = self.driver.find_element_by_xpath(
'//div[@class="e2BEnf U7izfe"]/h3/a')
similar_img_href.click() # 查找「類似圖片」的鏈接並點擊,進入圖片列表頁
links = self.get_full_resolution_links() # 將所有圖片的大圖鏈接進行收集
self.download_images(links, download_dir) # 下載這些大圖
print("{}Image {} finished\n{}".format(
Fore.GREEN, img_name, Fore.RESET))
整個流程就跟打開瀏覽器進行操作一樣,難點在於如何控制速度,不被谷歌反爬,不然出現谷歌驗證碼,破解是不可能的,就要幫它免費打碼了。
有何用途
當你需要訓練一個圖片分類的模型,手頭上圖片有限,那就可以用這個方法,每一張圖都找跟它相似的,輕輕鬆鬆就把訓練集擴大了幾十倍(理想情況,不被反爬的話)。