Scrapy 的并发处理
Jul 29, 2019
3 minute read

前言

看大型项目的源码时,经常摸不着头脑,耗时但收益少,所以也就不容易坚持下去。但如果带着问题来看的话,屏蔽掉问题无关的代码,专注于相关的,看看能不能真正的从源码中学习到好东西

问题: Scrapy 的并发处理

在 Scrapy 的文档中提到可以通过配置 DOWNLOAD_DELAY 来控制请求(request)间的时间间隔,配置 CONCURRENT_REQUESTS 来限制请求(request)的并发数(粒度可基于域名,或者 IP 地址)。甚至可以通过使用 Auto-Throttling 自动节流插件,来自动地且动态的调整这些配置项。

这样我们就有了以下这两个问题

  1. 如何并发地处理请求
  2. 如何动态地改变配置项

以下的源码分析基于版本号为 1.7.2scrapy 项目

如何并发地处理请求

scrapy/core/downloader/__init__.pyL138-L160 里的 Downloader 类的类方法 _process_queue 就负责处理 scrapy 的并发请求

为了更好的理解这函数的代码,首先要确定什么情况下 _process_queue 才会被调用

  1. 当新的请求进入队列时 L135
  2. 当请求完成网络处理时 L186
  3. 当不满足 DONWLOAD_DELAY 时的延迟调用 L148

以下就是该函数的代码,加上了些注释方便大家理解

def _process_queue(self, spider, slot):
    if slot.latercall and slot.latercall.active():
        # 由于同个 slot 会被 _process_queue 重复调用多次,情况 1,2
        # 当情况 3 在处理时,则不执行
        return


    # Delay queue processing if a download_delay is configured
    now = time()
    # 获取下载延迟,若是启用了 randomize_delay 则每次会获取到不同的下载延迟
    delay = slot.download_delay()
    if delay:
        # slot.lastseen 记录了最后一个 request 进行网络处理的时间
        penalty = delay - now + slot.lastseen
        if penalty > 0:
            # 当请求间隔不满足配置好的 DOWNLOAD_DELAY 时,则稍后再作处理
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return


    # Process enqueued requests if there are free slots to transfer for this slot
    while slot.queue and slot.free_transfer_slots() > 0:
        # 当实际并发数未达到限制时,则 request 进行网络处理
        slot.lastseen = now
        request, deferred = slot.queue.popleft()
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # prevent burst if inter-request delays were configured
        if delay:
            # 当 DONWLOAD_DELAY > 0 时,要通过 _process_queue 进行延迟处理
            self._process_queue(spider, slot)
            break

        # 当 DOWNLOAD_DELAY == 0 时
        # 则一直进行 request 的网络处理,直到实际并发数达到限制,或者队列中没有新的 request

关于 slot 的类定义在 scrapy/core/downloader/__init__.py#L19-L58

如何动态地改变配置项

明白了如何并发地处理 request,可如何动态地改变配置项?我们来看看 Auto-Throttling 插件是怎么工作的,就能明白了。

通过绑定 signals.response_downloaded 信号,当 request 的网络处理完成时就会调用绑定的函数。scrapy/extensions/throttle.py#L19

crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)

_response_downloaded 的定义如下 scrapy/extensions/throttle.py#L40-L48 ,同样,加上了些注释方便大家理解

def _response_downloaded(self, response, request, spider):
    key, slot = self._get_slot(request, spider)
    latency = request.meta.get('download_latency')
    # 获取 request 网络处理的耗时
    if latency is None or slot is None:
        return


    olddelay = slot.delay
    self._adjust_delay(slot, latency, response)
    # 调整 request 间的时间间隔
    if self.debug:
        ...

_adjust_delay 的定义如下 scrapy/extensions/throttle.py#L68-L93

def _adjust_delay(self, slot, latency, response):
    """Define delay adjustment policy"""


    # If a server needs `latency` seconds to respond then
    # we should send a request each `latency/N` seconds
    # to have N requests processed in parallel
    # 如果服务器需要 latency 秒来响应,
    # 则我们需要以 latency / N 秒的时间间隔
    # 来并发地处理 N 个 requests
    target_delay = latency / self.target_concurrency


    # Adjust the delay to make it closer to target_delay
    # 调整 delay 使其逼近目标延迟时间 target_delay
    new_delay = (slot.delay + target_delay) / 2.0


    # If target delay is bigger than old delay, then use it instead of mean.
    # It works better with problematic sites.
    # 如果目标延迟时间 target_delay 大于旧的,则直接使用 target_delay
    # 这样的话,能更好的处理一些有问题的网站
    new_delay = max(target_delay, new_delay)


    # Make sure self.mindelay <= new_delay <= self.max_delay
    # 保证新的延迟 new_delay 大于最小延迟,小于最大延迟限制
    new_delay = min(max(self.mindelay, new_delay), self.maxdelay)


    # Dont adjust delay if response status != 200 and new delay is smaller
    # than old one, as error pages (and redirections) are usually small and
    # so tend to reduce latency, thus provoking a positive feedback by
    # reducing delay instead of increase.
    # 当响应的状态不为 200,及新的延迟小于旧的时,则不调整延迟。
    # 这是因为错误页或者重定向的响应时间小,会趋向于降低延迟
    # 但应该做的是增加延迟(错误页响应有可能是因为高频率的请求导致的)
    if response.status != 200 and new_delay <= slot.delay:
        return


    slot.delay = new_delay

现今有些网站不正确地使用 HTTP 状态码(HTTP Status Codes),一律响应 200。从这个插件的源码来看,对于这样的网站不能很好的处理

Q&A

  1. 为什么配置的并发数很高,但爬虫的爬取速度却没有提高?

    发生这种情况一般是因为 request 网络处理的平均用时小于配置的 DOWNLOAD_DELAY

  2. 如何为不同的 request 配置不同的 CONCURRENT_REQUESTSDOWNLOAD_DELAY

    通过创建不同的 Slot 来配置,以 start_requests 中配置为例

    def start_requests(self):
        slot_key = "custom_slot_key"
        slot = Slot(concurrency=10, delay=1, randomize_delay=True)
        self.crawler.engine.downloader.slots[slot_key] = slot
        yield scrapy.Request(url="https://example.com", meta={"download_slot":slot_key})

参考

  1. 关于 CONCURRENT_REQUESTS 配置的文档
  2. 关于 download_latency 的文档
  3. 关于 Auto-Throttling 插件的文档



comments powered by Disqus