聊聊 Scrapy 分布式的几种方案

聊聊 Scrapy 分布式的几种方案

Scrapy 是一款开源爬虫框架,但是官方对分布式支持并不多,本文聊聊几种简单可行的方案。

需要注意的是,本文的爬虫并非对几个网站的纵向爬取,而是爬取一批不同的地址。如果是其它类型的爬虫,可能不适用,但应该能让你多个思考方向。

网址分批处理

既然是一批不同的地址,那么最直接的方案就是把网址分几个部分,然后起几个进程同时处理就行了。

举个例子,我们要爬取的地址是放到 Mongo 中的,存储格式类似这样

{ "_id" : 1, "url" : "https://www.baidu.com" }
{ "_id" : 2, "url" : "https://www.google.com" }
{ "_id" : 3, "url" : "https://www.csdn.net" }
...

那我们就在编写爬虫的时候,接受 2 个参数(起始的地址主键和结束的主键),就能爬取指定的地址了。之后的工作,便是怎么分配地址了。

我们一步步实现下

class DemoSpider(scrapy.Spider):

    name = 'demo_spider'

    # ...

    def start_requests(self):
        url_start_id = int(getattr(self, 'url_start_id', 1))
        url_end_id = int(getattr(self, 'url_end_id', 1))

        mongo_db = MongoUtil.get_mongo_db()
        # 这里直接使用将 cursor 转为列表
        # 如果不转化,使用 cursor 迭代,有可能因为爬取的地址较多,占用时间太多,而导致 cursor 超时报错
        url_infos = list(mongo_db.url
                         .find({'_id': {'$gte': url_start_id, '$lt': url_end_id}}))
        for url_info in url_infos:
            yield scrapy.Request(url=f'http://{url_info["url"]}', callback=self.parse)

这样,爬虫启动时可以指定需要爬取的范围

$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000

如果要爬取 40W 个地址,那就可以开 4 个进程处理下

$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000
$ scrapy crawl demo_spider -a url_start_id=100000 -a url_end_id=200000
$ scrapy crawl demo_spider -a url_start_id=200000 -a url_end_id=300000
$ scrapy crawl demo_spider -a url_start_id=300000 -a url_end_id=400000

至于是在单服务器还是多服务器上执行,就看你自己的情况了。

如果机器内存较小,建议启动爬虫时指定 JOBDIR 将请求队列等信息持久化到磁盘,类似这样

$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000 -s JOBDIR=/path/to/jobdir

如果需要更方便的管理爬虫,可以使用 Scrapyd,这样在有多台机器的时候,就不用登陆到对应机器上去执行命令了。

同样的启动一个爬虫,直接在本地执行

$ curl http://server-ip:6800/schedule.json -d project=default -d spider=demo_spider -d url_start_id=1 -d url_end_id=100000

我们可能还会碰到一个问题,如果每个机器上要限制爬虫的进程个数怎么办?有了 Scrapyd 之后,就比较方便了,直接在对应的服务器上编辑 Scrapyd 的配置文件 scrapyd.conf 添加进程相关的配置

[scrapyd]
# ...
# 每个 CPU 上最大的进程数
max_proc_per_cpu = 2
# 最大进程数
max_proc = 1

重启 Scrapyd 之后,我们就可以直接 schedule 多个 job 了。如果超过指定的进程数,job 会自动排队,不用我们手动去处理。

当然,你也可以编写脚本,每次只 schedule 指定个数的 job,然后使用 listjobs 接口定时去获取 job 的状态,当数量不足时,启动下一个 job。看你的需要选择吧。

网址共享处理

上个方法有个不爽的地方,就是每个爬虫都需要手动指定爬取的范围,当进程数多了后,分配就显得麻烦了。所以,我们可以换个思路,每个机器我们单纯的去启动爬虫就好了,至于爬哪些地址,它自己去一个地方取就行了。

这就涉及到分布式场景中数据共享了,我们可以使用 Redis。

接下来还有 2 个方面的问题

1、爬虫什么时候读取 Redis?启动时?如果启动时 Redis 中还没有地址数据咋办?爬虫没有地址处理要结束时我们能捕捉到相应的信号吗?能的话我们直接从 Redis 读取地址生成请求 yield 可以吗?

2、Redis 是内存型数据库,爬取的 URL 数据量大了怎么办?

第 2 个问题好解决,编写个脚本,每次只从 Mongo 读取部分数据到 Redis 列表中,每隔几秒检测下列表中数据是否充足,不足的话进行补充就行。

至于爬虫读取地址的时机,我们可以在启动时就读取,但是可能启动时候 Redis 列表中还没有数据,这个时候读不到的话,爬虫就会结束了。还好,爬虫要结束时,我们能捕捉到信号,我们可以再去取地址。如果有就进行抓取,没有的话,我们可以告知爬虫引擎挂起一会儿,待到连续一定时间都没有地址后,爬虫就可以正常结束了。因此,我们只在爬虫要结束时,进行读取地址相关的操作就行了。

Scrapy 要结束时会发起 spider_idle 信号,我们可以捕捉它进行相关操作

class DemoSpider(scrapy.Spider):

    name = 'demo_spider'
    redis_key_start_urls = 'demo_spider:start_urls'

    # ...

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # 用于保证同时只有一个地方在调用 generate_requests
        # 理解为一个方法锁就行
        self.is_generating_requests = False
        # 用于判断是否超时
        self.oldest_time_continious_no_next_requests = None

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super().from_crawler(crawler, *args, **kwargs)
        crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)
        return spider

    def spider_idle(self):
        is_timeout, time_waited = self.is_timeout()
        self.logger.info(f'Waiting for urls...{time_waited}s')
        # 连续超过 timeout_to_wait 秒没有多余的域名,则退出
        if is_timeout:
            self.logger.info(f'Exit from spider_idle')
            return
        if not self.is_generating_requests:
            for req in self.generate_requests():
                self.crawler.engine.crawl(req, spider=self)
        raise DontCloseSpider

    def generate_requests(self):
        self.is_generating_requests = True

        try:
            while True:
                url_binary = self.server.lpop(self.redis_key_start_urls)
                if not url_binary:
                    if self.oldest_time_continious_no_next_requests is None:
                        self.oldest_time_continious_no_next_requests = int(time.time())

                    break
                else:
                    self.oldest_time_continious_no_next_requests = None

                    try:
                        req = scrapy.Request(url=f'http://{url_binary.decode("utf-8")}', callback=self.parse)
                    except ValueError:
                        continue
                    yield req
        finally:
            self.is_generating_requests = False

    def is_timeout(self):
        # 连续超过 60s 没有请求,则判定超时
        time_to_wait = 60
        time_waited = int(time.time()) - self.oldest_time_continious_no_next_requests \
            if self.oldest_time_continious_no_next_requests else 0
        return time_waited > time_to_wait, time_waited

看着代码有点长,不过理解了上面的思路话,很容易能看懂的。

对 Scrapy 比较熟悉的伙伴,可能会看出,上面的代码其实有坑。是的,在 generate_requests 方法中,如果 Redis 队列中一直有地址,则爬虫会一直读地址并生成请求,这样请求队列会无限增大。如果请求队列是存到内存的话,那么爬虫会被直接 kill 掉;如果是存到硬盘的话,这会导致爬虫一直在读取地址,而不进行抓取操作。

这个时候,我们可以自定义一个请求队列最大长度,每次生成请求的时候,都校验下,看当前请求队列是否已经满了,不满我们再继续生成。

我们在 generate_requests 加个判断就行

class DemoSpider(scrapy.Spider):

    name = 'demo_spider'
    redis_key_start_urls = 'domo_spider:start_urls'
    # New added
    max_scheduler_queue_size = 1000

    # ...

    def generate_requests(self):
        self.is_generating_requests = True

        try:
            # New added
            # 队列中的请求数量取磁盘队列和内存队列长度较大的那个
            dqs_len = len(self.crawler.engine.slot.scheduler.dqs or [])
            mqs_len = len(self.crawler.engine.slot.scheduler.mqs)
            req_num_in_queue = dqs_len if dqs_len > mqs_len else mqs_len
            # 需要新生成的请求的数量
            req_num_to_yield = self.max_scheduler_queue_size - req_num_in_queue
            if req_num_to_yield <= 0:
                self.logger.info(f'Requests is enough in queue'
                                 f'({len(self.crawler.engine.slot.scheduler.mqs)} >= {self.max_scheduler_queue_size})')
                return

            while True:
                # New added
                if req_num_to_yield <= 0:
                    break

                url_binary = self.server.lpop(self.redis_key_start_urls)
                if not url_binary:
                    if self.oldest_time_continious_no_next_requests is None:
                        self.oldest_time_continious_no_next_requests = int(time.time())

                    break
                else:
                    self.oldest_time_continious_no_next_requests = None

                    try:
                        req = scrapy.Request(url=f'http://{url_binary.decode("utf-8")}', callback=self.parse)
                    except ValueError:
                        continue
                    yield req

                    # New added
                    req_num_to_yield -= 1

        finally:
            self.is_generating_requests = False

方面你查看,我在新加的语句上面,加了 New Added 的注释。

到这里,爬虫感觉已经 OK 了。但是在实践中,还是发现个地方需要优化下。

假设并发数(CONCURRENT_REQUESTS)我们设置的是 300,在爬虫快要结束时,我们会一次性读入 1000 个地址进入请求队列。爬虫工作时,请求会被慢慢消耗,待请求队列被消耗完毕时,这时下载器中有 300 个请求在等待响应,然后慢慢减少,299、293...,但是由于我们捕捉的是爬虫结束的信号,这个时候不会从 Redis 读入地址生成请求放入到请求队列,导致下载器达不到 300 个并发。如果设置的超时较长或者有重试的时候,这个需要等待很久下载器的请求才能被消耗完,才会发起爬虫结束的信号,然后才进行下一波的抓取。

所以我们不一定非要在爬虫请求消耗完的时候再 generate_requests,可以捕捉下爬虫的其它信号触发下 generate_requests,使得请求队列的数量一直保持一个值,下载器并发一直能打满,这样抓取效率就更高了。

class DemoSpider(scrapy.Spider):
    # ...

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super().from_crawler(crawler, *args, **kwargs)
        crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)

        # New added
        crawler.signals.connect(spider.response_received, signal=signals.response_received)

        return spider

    def response_received(self):
        is_timeout, _ = self.is_timeout()
        if is_timeout:
            return

        if not self.is_generating_requests:
            for req in self.generate_requests():
                self.crawler.engine.crawl(req, spider=self)

这里我们捕捉了 response_received 信号,这个信号会在爬虫引擎收到下载器的响应后被发起,这个说明请求已经被消耗了一个了,在这个时候触发一次 generate_requests 是可以的。当然,你可以根据自己的实际情况选择捕捉什么信号,具体有哪些信号可以参考官方文档 EXTENDING SCRAPY -> Signals -> Built-in signals reference。

上面描述的过程中,如果对 Scrapy 的数据流不了解的话,可能不大好理解。如果觉得不好理解,可以先看下官方文档的 EXTENDING SCRAPY -> Architecture overview。

为了文章的完整性,针对本小节的第 2 个问题,我写了个简单的脚本 demo_spider_feeder.py,有需要的可以看下

# coding=utf-8

import logging
import os
import time

REDIS_KEY_MAX_URL_ID_PUSHED = 'demo_spider:max_url_id_pushed'
REDIS_KEY_START_URLS = 'demo_spider:start_urls'

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    redis_client = RedisUtil.get_redis_client()
    mongo_db = MongoUtil.get_mongo_db()

    while True:
        start_urls_len = redis_client.llen(REDIS_KEY_START_URLS)
        threshold_num = 10000
        if start_urls_len >= threshold_num:  # 队列中还有 threshold_num+ 数据时就不填充了
            logger.info(f'There are enough urls(>={threshold_num}) in start_urls, sleep 5s...')
            time.sleep(5)
            continue

        max_url_id_pushed = int(redis_client.get(REDIS_KEY_MAX_URL_ID_PUSHED)) or 0
        url_infos = list(mongo_db.url
                            .find({'_id': {'$gt': int(max_url_id_pushed)}})
                            .sort([('_id', pymongo.ASCENDING)])
                            .limit(threshold_num))
        if not url_infos:
            logger.info(f'No more urls from mongo, sleep 600s...')
            time.sleep(600)
            continue

        logger.info(f'Begin to feed start_urls from mongo...')
        for url_info in url_infos:
            redis_client.rpush(REDIS_KEY_START_URLS, url_info['url'])
        logger.info(f'Feed {len(url_infos)} start_urls'
                    f'(from {max_url_id_pushed + 1} to {url_infos[-1]["_id"]}) '
                    f'from mongo, sleep 10s...')
        redis_client.set(REDIS_KEY_MAX_URL_ID_PUSHED, url_infos[-1]['_id'])
        time.sleep(5)

if __name__ == '__main__':
    main()

小结

本文介绍了 2 种简单的 Scrapy 分布式方案,在原有的爬虫上做些简单的修改就能实现,希望能给有需要的朋友多个思考方向。

个人经验有限,如果文中有错误的地方,还请指出。

参考

Comments are closed.