python爬虫之多线程、多进程+代码示例

python爬虫之多线程、多进程

使用多进程、多线程编写爬虫的代码能有效的提高爬虫爬取目标网站的效率。

一、什么是进程和线程

引用廖雪峰的官方网站关于进程和线程的讲解:

进程:对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

线程:有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

每个进程至少要做一件事,所以,一个进程至少有一个线程。

二、多进程

实现多进程的四种方式

方式一:os.fork()

python 的 os 模块封装了常见的系统调用,其中,多进程的调用就是 fork() 函数。具体示例代码如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1.只有在Unix系统中有效,Windows系统中无效
2.fork函数调用一次,返回两次:在父进程中返回值为子进程id,在子进程中返回值为0
"""
import os

pid = os.fork()
if pid == 0:
    print("执行子进程,子进程pid={pid},父进程ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
    print("执行父进程,子进程pid={pid},父进程ppid={ppid}".format(pid=pid, ppid=os.getpid()))

# 执行父进程,子进程pid=611,父进程ppid=610
# 执行子进程,子进程pid=611,父进程ppid=610

方法二:Multiprocessing 模块 Process 类

通过 Multiprocessing 模块中的 Process 类,创建Process对象。

Process类的构造方法:

init(self, group=None, targent=None, name=None, args=(), kwargs={})

参数 说明
group 进程所属组,基本不用。
targent 表示调用对象,一般为函数。
args 表示调用对象参数元祖。
name 进程别名。
kwargs 表示调用对象的字典。

具体示例代码如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process


def run_process(name):
    print(name)


if __name__ == "__main__":
    p = Process(target=run_process, args=("test",))
    p.start()
    p.join()
    print("子进程结束")

# test
# 子进程结束

方法三:继承 Process 类

通过继承Process类,重写 run 方法。使用 .start() 方法,会自动调用 run 方法。具体示例代码如下:

from multiprocessing import Process


class NewProcess(Process):
    def __init__(self, n):
        super(NewProcess, self).__init__()
        self.n = n

    def run(self):
        print(self.n)


if __name__ == "__main__":
    test = "test"
    p = NewProcess(test)
    p.start()
    p.join()
    print("子进程结束")

# test
# 子进程结束

方式四:进程池 Pool 类

Pool 类可以提供指定数量(一般为CPU的核数)的进程供用户调用,当有新的请求提交的 Pool 中时,如果池中还没有满,就会创建一个新的进程来执行这些请求。如果池满,请求就会告知先等待。直到池中有进程结束,才会创建新的进程来执行这些请求。

注意:进程池中的进程是不能共享队列和数据的,而 Process 生成的子进程可以共享队列。

Pool 类中的常用方法:

函数 函数原型 说明
apply() apply(func[, args=()[, kwds={}]]) 该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不再出现)。
apply_async() apply_async(func[, args()[, kwds{}[, callback=None]]]) 与apply用法一样,但它是非阻塞且支持结果返回进行回调。
map() map(func, utterable[, chunksize=None]) Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
close() 关闭进程池(Pool),使其不能再添加新的Process。
terminate() 结束工作进程,不再处理未处理的任务。
join() 主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

具体代码如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool


def run(num):
    time.sleep(1)
    return num * num


if __name__ == "__main__":
    testList = [1, 2, 3, 4, 5, 6, 7]
    print('单进程执行')  # 顺序执行
    t1 = time.time()
    for i in testList:
        run(i)
    t2 = time.time()
    print('顺序执行的时间为:', int(t2 - t1))

    print('多进程 map 执行')  # 并行执行
    p = Pool(4)  # 创建拥有4个进程数量的进程池
    result = p.map(run, testList)
    p.close()  # 关闭进程池,不再接受新的任务
    p.join()  # 主进程阻塞等待子进程的退出
    t3 = time.time()
    print('执行的时间为:', int(t3 - t2))

    print(result)
    
# 单进程执行
# 顺序执行的时间为: 7
# 多进程 map 执行
# 执行的时间为: 2
# [1, 4, 9, 16, 25, 36, 49]

进程通信

Queue()

队列:先进先出,按照顺序

通信原理:在内存中建立队列数据结构模型。多个进程都可以通过队列存入内容,取出内容的顺序和存入内容的顺序保存一致。

方法 功能 参数
q = Queue(maxsize = 0) 创建队列消息,并返回队列对象。 表示最多存储多少消息。默认表示根据内存分配存储。
q.put(data, [block, timeout]) 向队列存储消息。 Data:要存入的数据。block:默认队列满时会堵塞,设置False则非堵塞。timeout:超时时间。
data = q.get([block, timeout]) 获取队列消息。 block:默认队列空时会堵塞,设置False则非堵塞。timeout:超时时间。
q.full() 判断队列是否为满。
q.empty() 判断队列是否为空。
q.size() 判断队列中的消息数量。
q.close() 关闭队列。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue


def foo(data):
    s = data.get()  # 管子的另一端放在子进程这里,子进程接收到了数据
    if s not in "":
        print('子进程已收到数据...')
        print(s)  # 子进程打印出了数据内容...


if __name__ == '__main__':  # 要加这行...

    q = Queue()  # 创建进程通信的Queue,你可以理解为我拿了个管子来...
    p = Process(target=foo, args=(q,))  # 创建子进程
    print('主进程准备发送数据...')
    q.put("数据接收成功")  # 将管子的一端放在主进程这里,主进程往管子里丢入数据↑
    p.start()  # 启子子进程

    p.join()
    
# 主进程准备发送数据...
# 子进程已收到数据...
# 数据接收成功

Pipe()

通信原理:在内存中开辟管道空间,生成管道操作对象,多个进程使用“同一个”管道对象进行操作即可实现通信。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])	# 向管道中写入内容
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 从管道读取信息
    p.join()
    
    # prints "[42, None, 'hello']"

manager()

进程的 manager 方法可以共享数据,比如共享列表,元祖,字典,锁,字符。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing


def f(m_list):
    m_list.append("f")


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    m_list = manager.list([1, 2, 3])
    p = multiprocessing.Process(target=f, args=(m_list, ))
    p.start()
    p.join()
    print(m_list)
    
# [1, 2, 3, 'f']

三、多线程

线程在程序中是独立的、并非的执行流。与分隔的进程相比线程之间的隔离程度要小,它们共享内存,文件句柄和其它进程应有的状态。多线程之间共享全局变量

创建多线程多两种方式

方法一:threading模块Thread类

具体代码如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time


def run(n):
    print("task", n)
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)


if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

自定义线程

继承threading.Thread类自定义线程类。其本质是重构Thread类中的run方法。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


if __name__ == '__main__':
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t1.start()
    t2.start()
    

# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

守护线程

setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主进程结束后,子线程也会随之结束。所以当主线程结束后,整个程序就退出了。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time


def run(n):
    print("task", n)
    time.sleep(1)   # 此时子线程停1s
    print('2')
    time.sleep(1)
    print('1')


if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   # 把子进程设置为守护线程,必须在start()之前设置
    t.start()
    print("end")
    
# task t1
# end

想要守护线程执行结束后,主进程再结束,可以使用 join 方法,让主线程等待子线程执行完毕。

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自都有一份拷贝存与每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading

value = 0
lock = threading.Lock()


def change_it(n):
    # 先存后取,结果应该为0:
    global value
    value = value + n
    value = value - n


# 未加锁(值不确定)
def run_thread(n):
    for i in range(2000000):
        change_it(n)


# 加锁
# def run_thread(n):
#     for i in range(2000000):
#         lock.acquire()
#         try:
#             change_it(n)
#         finally:
#             lock.release()


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)

# 29

由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以不会造成修改的冲突。当多个线程同时执行 lock.acquire() 时,只有一个线程能成功获取锁,然后继续执行代码,其它线程就继续等待直到获得锁为止。

获得锁的线程用完一定要释放锁,否则那些等待锁的线程将会永远的等待下去,成为死线程。所以用 try…finally 来确保锁一定会被释放。

锁的好处就是确保某段关键代码只能由一个线程从头到尾完整的执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率大大的下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

信号量(BoundedSemaphore类)

Lock同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程去更改数据。

import threading
import time

def run(n, semaphore):
    semaphore.acquire()   #加锁
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #释放

if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

GIL锁

在非 python 环境中,单核情况下,同时只能有一个任务执行。多核可以同时支持多个线程同时执行。但是在 python 中,无论有多少核,同只能执行一个线程。究其原因,这就是GIL的存在导致的。

GIL全称Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只有在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据,而在pypy和jpython中是没有GIL的。

python针对不同类型的代码执行效率也是不同的。

1、cpu密集型代码(各种循环处理、计数等),在这种情况下,由于计算机工作多,ticks计数很快就会达到阈值。然后触发GIL的释放与再竞争(多个线程来回切换是需要消耗资源的),所以python下的多线程对cpu密集型代并不友好。

2、IO密集型代码(文件处理,网络爬虫等涉及文件读写的操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python的多线程对IO密集型代码比较友好。

使用建议

python下想要充分利用多核CPU,就使用多进程。因为每个进程都有各子独立的GIL,互不干扰,这样就可以真正意义上的并行执行,在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)。

四、爬取豆瓣电影TOP250

采取三种方式。爬取前250名电影。

(1)所爬取的网页链接://movie.douban.com/top250?start=0&filter=

(2)通过分析网页,发现第一页的url start=0,第二页的url start=25,第三页的url start=50。

(3)主要爬取电影名跟评分,用来进行比对,所以数据方面就不过多的提取和保存,只简单的打印出来。

多进程爬取

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue

import time
from lxml import etree
import requests


class DouBanSpider(Process):
    def __init__(self, q, url_list, lock):
        # 重写写父类的__init__方法
        super(DouBanSpider, self).__init__()
        self.url_list = url_list
        self.q = q
        self.lock = lock
        self.headers = {
            'Host': 'movie.douban.com',
            'Referer': '//movie.douban.com/top250?start=225&filter=',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
        }

    def run(self):
        self.parse_page()

    def send_request(self, url):
        '''
        用来发送请求的方法
        :return: 返回网页源码
        '''
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                print(u"[INFO]请求url:" + url)
                return requests.get(url=url, headers=self.headers).content
            except Exception as e:
                print(u'[INFO] %s%s' % (e, url))
                i += 1

    def parse_page(self):
        '''
        解析网站源码,并采用xpath提取 电影名称和平分放到队列中
        :return:
        '''
        time.sleep(0.1)
        while 1:
            try:

                url = self.url_list.pop()
            except IndexError as e:
                break
            self.lock.acquire()
            response = self.send_request(url)
            html = etree.HTML(response)
            #  获取到一页的电影数据
            node_list = html.xpath("//div[@class='info']")
            for move in node_list:
                # 电影名称
                title = move.xpath('.//a/span/text()')[0]
                # 评分
                score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]

                # 将每一部电影的名称跟评分加入到队列
                self.q.put(score + "\t" + title)
            self.lock.release()


class AllUrlSpider(Process):
    def __init__(self, url_lis):
        super(AllUrlSpider, self).__init__()
        self.url_list = url_lis

    def run(self):
        base_url = '//movie.douban.com/top250?start='
        # 构造所有url
        for num in range(225, -1, -25):
            self.url_list.append(base_url + str(num))
            print("获得URL:{}".format(base_url + str(num)))


def main():
    # 创建一个队列用来保存进程获取到的数据
    q = Queue()
    lock = multiprocessing.Lock()

    manager = multiprocessing.Manager()
    url_list = manager.list()
    a = AllUrlSpider(url_list)

    p = DouBanSpider(q, url_list, lock)
    b = DouBanSpider(q, url_list, lock)
    c = DouBanSpider(q, url_list, lock)

    a.start()
    p.start()
    b.start()
    c.start()

    a.join()
    p.join()
    b.join()
    c.join()

    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    start = time.time()
    main()
    print('[info]耗时:%s' % (time.time() - start))

多进程爬取耗时7.15秒,部分结果如下图所示:

多线程爬取

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time

from lxml import etree
import requests

url_list = []
lock = threading.Lock()


class DouBanSpider(Thread):
    def __init__(self, q) :
        # 重写写父类的__init__方法
        super(DouBanSpider, self).__init__()
        self.q = q
        self.headers = {
            'Host': 'movie.douban.com',
            'Referer': '//movie.douban.com/top250?start=225&filter=',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
        }

    def run(self):
        self.parse_page()

    def send_request(self, url):
        '''
        用来发送请求的方法
        :return: 返回网页源码
        '''
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                print
                u"[INFO]请求url:" + url
                html = requests.get(url=url, headers=self.headers).content
            except Exception as e:
                print
                u'[INFO] %s%s' % (e, url)
                i += 1
            else:
                return html

    def parse_page(self):
        '''
        解析网站源码,并采用xpath提取 电影名称和平分放到队列中
        :return:
        '''
        while 1:
            try:

                url = url_list.pop()
            except IndexError as e:
                break
            lock.acquire()
            response = self.send_request(url)
            html = etree.HTML(response)
            #  获取到一页的电影数据
            node_list = html.xpath("//div[@class='info']")
            for move in node_list:
                # 电影名称
                title = move.xpath('.//a/span/text()')[0]
                # 评分
                score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]

                # 将每一部电影的名称跟评分加入到队列
                self.q.put(score + "\t" + title)
            lock.release()


class AllUrlSpider(Thread):
    def run(self):
        base_url = '//movie.douban.com/top250?start='
        # 构造所有url
        for num in range(225, -1, -25):
            url_list.append(base_url + str(num))
            print("获得URL:{}".format(base_url + str(num)))


def main():
    # 创建一个队列用来保存进程获取到的数据
    q = Queue()
    a = AllUrlSpider()
    a.start()

    # 保存线程
    Thread_list = []
    # 创建并启动线程
    for i in range(5):
        p = DouBanSpider(q)
        p.start()
        Thread_list.append(p)

    a.join()
    # 让主线程等待子线程执行完成
    for i in Thread_list:
        i.join()

    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    start = time.time()
    main()
    print('[info]耗时:%s' % (time.time() - start))

多进程爬取耗时5秒,部分结果如下图所示:

耗时跟网络的好坏也是有一定的关系,每次测出的数据结果也不一样。但理论上来讲,线程在I/O密集的操作性是要高于进程的。