前言
看大型项目的源码时,经常摸不着头脑,耗时但收益少,所以也就不容易坚持下去。但如果带着问题来看的话,屏蔽掉问题无关的代码,专注于相关的,看看能不能真正的从源码中学习到好东西
问题: Scrapy 的并发处理
在 Scrapy 的文档中提到可以通过配置 DOWNLOAD_DELAY
来控制请求(request)间的时间间隔,配置 CONCURRENT_REQUESTS
来限制请求(request)的并发数(粒度可基于域名,或者 IP 地址)。甚至可以通过使用 Auto-Throttling 自动节流插件,来自动地且动态的调整这些配置项。
这样我们就有了以下这两个问题
- 如何并发地处理请求
- 如何动态地改变配置项
以下的源码分析基于版本号为 1.7.2
的 scrapy 项目
如何并发地处理请求
scrapy/core/downloader/__init__.pyL138-L160
里的 Downloader
类的类方法 _process_queue
就负责处理 scrapy 的并发请求
为了更好的理解这函数的代码,首先要确定什么情况下 _process_queue
才会被调用
以下就是该函数的代码,加上了些注释方便大家理解
def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active():
# 由于同个 slot 会被 _process_queue 重复调用多次,情况 1,2
# 当情况 3 在处理时,则不执行
return
# Delay queue processing if a download_delay is configured
now = time()
# 获取下载延迟,若是启用了 randomize_delay 则每次会获取到不同的下载延迟
delay = slot.download_delay()
if delay:
# slot.lastseen 记录了最后一个 request 进行网络处理的时间
penalty = delay - now + slot.lastseen
if penalty > 0:
# 当请求间隔不满足配置好的 DOWNLOAD_DELAY 时,则稍后再作处理
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0:
# 当实际并发数未达到限制时,则 request 进行网络处理
slot.lastseen = now
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
# 当 DONWLOAD_DELAY > 0 时,要通过 _process_queue 进行延迟处理
self._process_queue(spider, slot)
break
# 当 DOWNLOAD_DELAY == 0 时
# 则一直进行 request 的网络处理,直到实际并发数达到限制,或者队列中没有新的 request
关于 slot
的类定义在 scrapy/core/downloader/__init__.py#L19-L58
如何动态地改变配置项
明白了如何并发地处理 request,可如何动态地改变配置项?我们来看看 Auto-Throttling 插件是怎么工作的,就能明白了。
通过绑定 signals.response_downloaded
信号,当 request 的网络处理完成时就会调用绑定的函数。scrapy/extensions/throttle.py#L19
crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)
_response_downloaded
的定义如下 scrapy/extensions/throttle.py#L40-L48
,同样,加上了些注释方便大家理解
def _response_downloaded(self, response, request, spider):
key, slot = self._get_slot(request, spider)
latency = request.meta.get('download_latency')
# 获取 request 网络处理的耗时
if latency is None or slot is None:
return
olddelay = slot.delay
self._adjust_delay(slot, latency, response)
# 调整 request 间的时间间隔
if self.debug:
...
_adjust_delay
的定义如下 scrapy/extensions/throttle.py#L68-L93
def _adjust_delay(self, slot, latency, response):
"""Define delay adjustment policy"""
# If a server needs `latency` seconds to respond then
# we should send a request each `latency/N` seconds
# to have N requests processed in parallel
# 如果服务器需要 latency 秒来响应,
# 则我们需要以 latency / N 秒的时间间隔
# 来并发地处理 N 个 requests
target_delay = latency / self.target_concurrency
# Adjust the delay to make it closer to target_delay
# 调整 delay 使其逼近目标延迟时间 target_delay
new_delay = (slot.delay + target_delay) / 2.0
# If target delay is bigger than old delay, then use it instead of mean.
# It works better with problematic sites.
# 如果目标延迟时间 target_delay 大于旧的,则直接使用 target_delay
# 这样的话,能更好的处理一些有问题的网站
new_delay = max(target_delay, new_delay)
# Make sure self.mindelay <= new_delay <= self.max_delay
# 保证新的延迟 new_delay 大于最小延迟,小于最大延迟限制
new_delay = min(max(self.mindelay, new_delay), self.maxdelay)
# Dont adjust delay if response status != 200 and new delay is smaller
# than old one, as error pages (and redirections) are usually small and
# so tend to reduce latency, thus provoking a positive feedback by
# reducing delay instead of increase.
# 当响应的状态不为 200,及新的延迟小于旧的时,则不调整延迟。
# 这是因为错误页或者重定向的响应时间小,会趋向于降低延迟
# 但应该做的是增加延迟(错误页响应有可能是因为高频率的请求导致的)
if response.status != 200 and new_delay <= slot.delay:
return
slot.delay = new_delay
现今有些网站不正确地使用 HTTP 状态码(HTTP Status Codes),一律响应 200。从这个插件的源码来看,对于这样的网站不能很好的处理
Q&A
为什么配置的并发数很高,但爬虫的爬取速度却没有提高?
发生这种情况一般是因为 request 网络处理的平均用时小于配置的
DOWNLOAD_DELAY
如何为不同的 request 配置不同的
CONCURRENT_REQUESTS
及DOWNLOAD_DELAY
通过创建不同的 Slot 来配置,以
start_requests
中配置为例def start_requests(self): slot_key = "custom_slot_key" slot = Slot(concurrency=10, delay=1, randomize_delay=True) self.crawler.engine.downloader.slots[slot_key] = slot yield scrapy.Request(url="https://example.com", meta={"download_slot":slot_key})