当编写一个爬虫时,会使用 lxml 库来解析 HTML 文件。当爬取到了一个超大且复杂的 HTML 文件,解析起来十分耗费时间,进而影响了爬虫的正常运行
为了不影响爬虫的正常运行,尝试把解析任务交给线程池来处理
模拟爬虫
先随便准备一个 html 文件,就直接从 lxml 的文档上下载一个
wget https://lxml.de/tutorial.html -O example.html
举一个重解析的模拟爬虫例子,每次请求的 io 时间假定为 0.01s ,请求并发数无上限,总量为 2000 个。
import asyncio
import time
from pathlib import Path
from lxml import html
_html_text = None
_latest_fetched = 0
async def fetch_text():
# 模拟爬取
await asyncio.sleep(0.01)
# 记录最后的一个请求的结束时间
global _latest_fetched
_latest_fetched = time.perf_counter()
return _html_text
def get_title(text):
doc = html.fromstring(text)
return doc.xpath("//title/text()")[0]
async def create_task():
text = await fetch_text()
title = get_title(text)
return title
async def main():
futs = []
for _ in range(2000):
fut = asyncio.ensure_future(create_task())
futs.append(fut)
await asyncio.gather(*futs)
if __name__ == "__main__":
_html_text = Path("example.html").read_text()
t_start = time.perf_counter()
asyncio.run(main())
t_end = time.perf_counter()
print("total:", (t_end - t_start))
print("latest fetched:", _latest_fetched - t_start)
这段代码执行的结果如下
total: 46.338736165
latest fetched: 38.004939187
模拟爬虫网络请求的耗时为 0.01 s,而且没有设置并发数,按道理说应该在极短的时间内完成所有网络请求
可实际上最后一个请求的结束时间却十分的大,这说明该代码的解析处理已经大大影响了爬虫的正常工作
用 ThreadPoolExecutor 把解析任务交给线程池
@@ -22,16 +22,20 @@ def get_title(text):
return doc.xpath("//title/text()")[0]
-async def create_task():
+async def create_task(loop):
text = await fetch_text()
- title = get_title(text)
+ # 把解析任务交给线程池
+ title = await loop.run_in_executor(None, get_title, text)
return title
async def main():
+ loop = asyncio.get_event_loop()
+ # 可以用 loop.set_default_executor 来配置默认的线程池
+
futs = []
for _ in range(2000):
- fut = asyncio.ensure_future(create_task())
+ fut = asyncio.ensure_future(create_task(loop))
futs.append(fut)
await asyncio.gather(*futs)
照以上更改后,代码执行的结果如下
total: 46.996482637
latest fetched: 0.087005389
最后一次爬取的结束时间就如我们预料,十分的小。可以看出使用多线程来做解析,就不会影响到主线程的爬虫工作
但由于 GIL ,实际上相当于串行处理,所以解析的速度没有任何起色
在解析时,避免 GIL 的解决方案
而幸运的是如果使用的是包含 1.1 版本以后的 lxml,可以通过以下方法来避开 GIL
@@ -1,4 +1,5 @@
import asyncio
+import threading
import time
from pathlib import Path
@@ -6,6 +7,7 @@ from lxml import html
_html_text = None
_latest_fetched = 0
+_local = threading.local()
async def fetch_text():
@@ -18,7 +20,12 @@ async def fetch_text():
def get_title(text):
- doc = html.fromstring(text)
+ # 通过为各个线程,创建属于自己的解析器
+ # 这样当解析时,线程就会释放 GIL
+ if not hasattr(_local, "parser"):
+ _local.parser = html.HTMLParser()
+
+ doc = html.fromstring(text, parser=_local.parser)
return doc.xpath("//title/text()")[0]
照以上更改后,代码执行的结果如下下
total: 27.775925195000003
latest fetched: 0.425428249
这样的处理相比之前的,由于解析的过程中释放了 GIL,程序就能利用到多核 CPU 的优势,加快了处理速度,减少了运行时间
如果使用 Python 3.7 以上版本, 可以使用
ThreadPoolExecutor.initializer
来创建各个线程的解析器
使用 lxml 时,有哪些情况会释放掉 GIL
通过查看 lxml 的变更历史,看看具体哪些情况会释放掉 GIL
curl -sSL https://github.com/lxml/lxml/raw/master/CHANGES.txt | grep GIL
运行以上命令,得出的结果如下
* Parsing from a plain file object frees the GIL under Python 2.x.
* Running ``iterparse()`` on a plain file (or filename) frees the GIL
* Unlock the GIL for deep copying documents and for XPath()
* All in-memory operations (tostring, parse(StringIO), etc.) free the GIL
* File operations (on file names) free the GIL
* Reading from file-like objects frees the GIL and reacquires it for reading
用 ProcessPoolExecutor 把解析任务交给进程池
用多进程处理的话,会更好的利用多核 CPU 的优势,但为什么不直接用多进程去做处理呢?
这是因为使用多进程处理有以下的缺点
- 多进程开销大,比如 IPC 进程间通信会产生额外的开销
- lxml 的对象无法 pickle ,这样同个对象就无法与多个进程间共享了
但在合适的应用场景,如果正确使用多进程,就能更快地处理
在第一个多线程例子的代码中,把 executor 置换成 ProcessPoolExecutor 即可使用多进程处理
@@ -1,5 +1,6 @@
import asyncio
import time
+from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from lxml import html
@@ -22,20 +23,20 @@ def get_title(text):
return doc.xpath("//title/text()")[0]
-async def create_task(loop):
+async def create_task(loop, executor):
text = await fetch_text()
- # 把解析任务交给线程池
- title = await loop.run_in_executor(None, get_title, text)
+ # 把解析任务交给进程池
+ title = await loop.run_in_executor(executor, get_title, text)
return title
async def main():
loop = asyncio.get_event_loop()
- # 可以用 loop.set_default_executor 来配置默认的线程池
+ executor = ProcessPoolExecutor()
futs = []
for _ in range(2000):
- fut = asyncio.ensure_future(create_task(loop))
+ fut = asyncio.ensure_future(create_task(loop, executor))
futs.append(fut)
await asyncio.gather(*futs)
照以上更改后,代码执行的结果如下
total: 23.120031775
latest fetched: 0.16141446800000003
这种简单的解析处理,从完成速度来看,用多进程处理的效果比多线程的稍好些
但实际场景由于较复杂,多进程处理的收益可能反而不好
总结
多进程处理的劣势反过来就是多线程处理的优势了
- 多线程处理的开销小,收益较大
- lxml 的对象能在不同的线程间共享 (但解析的操作最好在同个线程内作处理,这样会更高效)
要结合实际的应用场景,采用合适的方案,才能得到最大的收益