聊聊 Scrapy 分布式的几种方案
Scrapy 是一款开源爬虫框架,但是官方对分布式支持并不多,本文聊聊几种简单可行的方案。
需要注意的是,本文的爬虫并非对几个网站的纵向爬取,而是爬取一批不同的地址。如果是其它类型的爬虫,可能不适用,但应该能让你多个思考方向。
网址分批处理
既然是一批不同的地址,那么最直接的方案就是把网址分几个部分,然后起几个进程同时处理就行了。
举个例子,我们要爬取的地址是放到 Mongo 中的,存储格式类似这样
{ "_id" : 1, "url" : "https://www.baidu.com" }
{ "_id" : 2, "url" : "https://www.google.com" }
{ "_id" : 3, "url" : "https://www.csdn.net" }
...
那我们就在编写爬虫的时候,接受 2 个参数(起始的地址主键和结束的主键),就能爬取指定的地址了。之后的工作,便是怎么分配地址了。
我们一步步实现下
class DemoSpider(scrapy.Spider):
name = 'demo_spider'
# ...
def start_requests(self):
url_start_id = int(getattr(self, 'url_start_id', 1))
url_end_id = int(getattr(self, 'url_end_id', 1))
mongo_db = MongoUtil.get_mongo_db()
# 这里直接使用将 cursor 转为列表
# 如果不转化,使用 cursor 迭代,有可能因为爬取的地址较多,占用时间太多,而导致 cursor 超时报错
url_infos = list(mongo_db.url
.find({'_id': {'$gte': url_start_id, '$lt': url_end_id}}))
for url_info in url_infos:
yield scrapy.Request(url=f'http://{url_info["url"]}', callback=self.parse)
这样,爬虫启动时可以指定需要爬取的范围
$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000
如果要爬取 40W 个地址,那就可以开 4 个进程处理下
$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000
$ scrapy crawl demo_spider -a url_start_id=100000 -a url_end_id=200000
$ scrapy crawl demo_spider -a url_start_id=200000 -a url_end_id=300000
$ scrapy crawl demo_spider -a url_start_id=300000 -a url_end_id=400000
至于是在单服务器还是多服务器上执行,就看你自己的情况了。
如果机器内存较小,建议启动爬虫时指定 JOBDIR 将请求队列等信息持久化到磁盘,类似这样
$ scrapy crawl demo_spider -a url_start_id=1 -a url_end_id=100000 -s JOBDIR=/path/to/jobdir
如果需要更方便的管理爬虫,可以使用 Scrapyd,这样在有多台机器的时候,就不用登陆到对应机器上去执行命令了。
同样的启动一个爬虫,直接在本地执行
$ curl http://server-ip:6800/schedule.json -d project=default -d spider=demo_spider -d url_start_id=1 -d url_end_id=100000
我们可能还会碰到一个问题,如果每个机器上要限制爬虫的进程个数怎么办?有了 Scrapyd 之后,就比较方便了,直接在对应的服务器上编辑 Scrapyd 的配置文件 scrapyd.conf 添加进程相关的配置
[scrapyd]
# ...
# 每个 CPU 上最大的进程数
max_proc_per_cpu = 2
# 最大进程数
max_proc = 1
重启 Scrapyd 之后,我们就可以直接 schedule 多个 job 了。如果超过指定的进程数,job 会自动排队,不用我们手动去处理。
当然,你也可以编写脚本,每次只 schedule 指定个数的 job,然后使用 listjobs 接口定时去获取 job 的状态,当数量不足时,启动下一个 job。看你的需要选择吧。
网址共享处理
上个方法有个不爽的地方,就是每个爬虫都需要手动指定爬取的范围,当进程数多了后,分配就显得麻烦了。所以,我们可以换个思路,每个机器我们单纯的去启动爬虫就好了,至于爬哪些地址,它自己去一个地方取就行了。
这就涉及到分布式场景中数据共享了,我们可以使用 Redis。
接下来还有 2 个方面的问题
1、爬虫什么时候读取 Redis?启动时?如果启动时 Redis 中还没有地址数据咋办?爬虫没有地址处理要结束时我们能捕捉到相应的信号吗?能的话我们直接从 Redis 读取地址生成请求 yield 可以吗?
2、Redis 是内存型数据库,爬取的 URL 数据量大了怎么办?
第 2 个问题好解决,编写个脚本,每次只从 Mongo 读取部分数据到 Redis 列表中,每隔几秒检测下列表中数据是否充足,不足的话进行补充就行。
至于爬虫读取地址的时机,我们可以在启动时就读取,但是可能启动时候 Redis 列表中还没有数据,这个时候读不到的话,爬虫就会结束了。还好,爬虫要结束时,我们能捕捉到信号,我们可以再去取地址。如果有就进行抓取,没有的话,我们可以告知爬虫引擎挂起一会儿,待到连续一定时间都没有地址后,爬虫就可以正常结束了。因此,我们只在爬虫要结束时,进行读取地址相关的操作就行了。
Scrapy 要结束时会发起 spider_idle
信号,我们可以捕捉它进行相关操作
class DemoSpider(scrapy.Spider):
name = 'demo_spider'
redis_key_start_urls = 'demo_spider:start_urls'
# ...
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 用于保证同时只有一个地方在调用 generate_requests
# 理解为一个方法锁就行
self.is_generating_requests = False
# 用于判断是否超时
self.oldest_time_continious_no_next_requests = None
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)
return spider
def spider_idle(self):
is_timeout, time_waited = self.is_timeout()
self.logger.info(f'Waiting for urls...{time_waited}s')
# 连续超过 timeout_to_wait 秒没有多余的域名,则退出
if is_timeout:
self.logger.info(f'Exit from spider_idle')
return
if not self.is_generating_requests:
for req in self.generate_requests():
self.crawler.engine.crawl(req, spider=self)
raise DontCloseSpider
def generate_requests(self):
self.is_generating_requests = True
try:
while True:
url_binary = self.server.lpop(self.redis_key_start_urls)
if not url_binary:
if self.oldest_time_continious_no_next_requests is None:
self.oldest_time_continious_no_next_requests = int(time.time())
break
else:
self.oldest_time_continious_no_next_requests = None
try:
req = scrapy.Request(url=f'http://{url_binary.decode("utf-8")}', callback=self.parse)
except ValueError:
continue
yield req
finally:
self.is_generating_requests = False
def is_timeout(self):
# 连续超过 60s 没有请求,则判定超时
time_to_wait = 60
time_waited = int(time.time()) - self.oldest_time_continious_no_next_requests \
if self.oldest_time_continious_no_next_requests else 0
return time_waited > time_to_wait, time_waited
看着代码有点长,不过理解了上面的思路话,很容易能看懂的。
对 Scrapy 比较熟悉的伙伴,可能会看出,上面的代码其实有坑。是的,在 generate_requests
方法中,如果 Redis 队列中一直有地址,则爬虫会一直读地址并生成请求,这样请求队列会无限增大。如果请求队列是存到内存的话,那么爬虫会被直接 kill 掉;如果是存到硬盘的话,这会导致爬虫一直在读取地址,而不进行抓取操作。
这个时候,我们可以自定义一个请求队列最大长度,每次生成请求的时候,都校验下,看当前请求队列是否已经满了,不满我们再继续生成。
我们在 generate_requests
加个判断就行
class DemoSpider(scrapy.Spider):
name = 'demo_spider'
redis_key_start_urls = 'domo_spider:start_urls'
# New added
max_scheduler_queue_size = 1000
# ...
def generate_requests(self):
self.is_generating_requests = True
try:
# New added
# 队列中的请求数量取磁盘队列和内存队列长度较大的那个
dqs_len = len(self.crawler.engine.slot.scheduler.dqs or [])
mqs_len = len(self.crawler.engine.slot.scheduler.mqs)
req_num_in_queue = dqs_len if dqs_len > mqs_len else mqs_len
# 需要新生成的请求的数量
req_num_to_yield = self.max_scheduler_queue_size - req_num_in_queue
if req_num_to_yield <= 0:
self.logger.info(f'Requests is enough in queue'
f'({len(self.crawler.engine.slot.scheduler.mqs)} >= {self.max_scheduler_queue_size})')
return
while True:
# New added
if req_num_to_yield <= 0:
break
url_binary = self.server.lpop(self.redis_key_start_urls)
if not url_binary:
if self.oldest_time_continious_no_next_requests is None:
self.oldest_time_continious_no_next_requests = int(time.time())
break
else:
self.oldest_time_continious_no_next_requests = None
try:
req = scrapy.Request(url=f'http://{url_binary.decode("utf-8")}', callback=self.parse)
except ValueError:
continue
yield req
# New added
req_num_to_yield -= 1
finally:
self.is_generating_requests = False
方面你查看,我在新加的语句上面,加了 New Added 的注释。
到这里,爬虫感觉已经 OK 了。但是在实践中,还是发现个地方需要优化下。
假设并发数(CONCURRENT_REQUESTS
)我们设置的是 300,在爬虫快要结束时,我们会一次性读入 1000 个地址进入请求队列。爬虫工作时,请求会被慢慢消耗,待请求队列被消耗完毕时,这时下载器中有 300 个请求在等待响应,然后慢慢减少,299、293...,但是由于我们捕捉的是爬虫结束的信号,这个时候不会从 Redis 读入地址生成请求放入到请求队列,导致下载器达不到 300 个并发。如果设置的超时较长或者有重试的时候,这个需要等待很久下载器的请求才能被消耗完,才会发起爬虫结束的信号,然后才进行下一波的抓取。
所以我们不一定非要在爬虫请求消耗完的时候再 generate_requests
,可以捕捉下爬虫的其它信号触发下 generate_requests
,使得请求队列的数量一直保持一个值,下载器并发一直能打满,这样抓取效率就更高了。
class DemoSpider(scrapy.Spider):
# ...
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)
# New added
crawler.signals.connect(spider.response_received, signal=signals.response_received)
return spider
def response_received(self):
is_timeout, _ = self.is_timeout()
if is_timeout:
return
if not self.is_generating_requests:
for req in self.generate_requests():
self.crawler.engine.crawl(req, spider=self)
这里我们捕捉了 response_received
信号,这个信号会在爬虫引擎收到下载器的响应后被发起,这个说明请求已经被消耗了一个了,在这个时候触发一次 generate_requests
是可以的。当然,你可以根据自己的实际情况选择捕捉什么信号,具体有哪些信号可以参考官方文档 EXTENDING SCRAPY -> Signals -> Built-in signals reference。
上面描述的过程中,如果对 Scrapy 的数据流不了解的话,可能不大好理解。如果觉得不好理解,可以先看下官方文档的 EXTENDING SCRAPY -> Architecture overview。
为了文章的完整性,针对本小节的第 2 个问题,我写了个简单的脚本 demo_spider_feeder.py
,有需要的可以看下
# coding=utf-8
import logging
import os
import time
REDIS_KEY_MAX_URL_ID_PUSHED = 'demo_spider:max_url_id_pushed'
REDIS_KEY_START_URLS = 'demo_spider:start_urls'
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def main():
redis_client = RedisUtil.get_redis_client()
mongo_db = MongoUtil.get_mongo_db()
while True:
start_urls_len = redis_client.llen(REDIS_KEY_START_URLS)
threshold_num = 10000
if start_urls_len >= threshold_num: # 队列中还有 threshold_num+ 数据时就不填充了
logger.info(f'There are enough urls(>={threshold_num}) in start_urls, sleep 5s...')
time.sleep(5)
continue
max_url_id_pushed = int(redis_client.get(REDIS_KEY_MAX_URL_ID_PUSHED)) or 0
url_infos = list(mongo_db.url
.find({'_id': {'$gt': int(max_url_id_pushed)}})
.sort([('_id', pymongo.ASCENDING)])
.limit(threshold_num))
if not url_infos:
logger.info(f'No more urls from mongo, sleep 600s...')
time.sleep(600)
continue
logger.info(f'Begin to feed start_urls from mongo...')
for url_info in url_infos:
redis_client.rpush(REDIS_KEY_START_URLS, url_info['url'])
logger.info(f'Feed {len(url_infos)} start_urls'
f'(from {max_url_id_pushed + 1} to {url_infos[-1]["_id"]}) '
f'from mongo, sleep 10s...')
redis_client.set(REDIS_KEY_MAX_URL_ID_PUSHED, url_infos[-1]['_id'])
time.sleep(5)
if __name__ == '__main__':
main()
小结
本文介绍了 2 种简单的 Scrapy 分布式方案,在原有的爬虫上做些简单的修改就能实现,希望能给有需要的朋友多个思考方向。
个人经验有限,如果文中有错误的地方,还请指出。