前言
昨天(2017年7月15日)偶然看到这个视频,看完之后豁然开朗,忍不住要分享给大家。
并发Demo
视频中举了个例子,用socket写了个迷你服务,只有一个作用,计算Fibonacci……
# -*- coding: utf-8 -*-
# server.py
import socket
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def server(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
handler(client)
def handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Connection Closed")
server("", 25000)
当然目前这个只是一个堵塞版本,可以用以下代码测试一下
# -*- coding: utf-8 -*-
# nc.py
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 25000))
while True:
n = input()
sock.send(n.encode('ascii'))
resp = sock.recv(100)
print(resp.decode('ascii'), end='')
该迷你服务一次只能处理一个请求,我们只需要稍微修改一下代码即可以改变现状
# -*- coding: utf-8 -*-
# server.py
import socket
from threading import Thread
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def server(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
Thread(target=handler, args=(client,), daemon=True).start()
def handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Connection Closed")
server("", 25000)
每次收到请求,就会开启一个新的线程来处理。用以下代码来测试其性能。
# -*- coding: utf-8 -*-
# perf.py
import socket
import time
from threading import Thread
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 25000))
counter = 0
def monitor():
global counter
while True:
time.sleep(1)
print(counter, 'reqs/sec')
counter = 0
Thread(target=monitor, daemon=True).start()
while True:
sock.send(b'1')
resp = sock.recv(100)
counter += 1
每当多开启一个测试端,各个测试端每秒请求数量或多或少都会受到影响。而且当我们用nc.py作一个需要大量计算资源请求,比如 fib(40)
,结果更是惨不忍睹。若是我们把线程换成进程。
# -*- coding: utf-8 -*-
# server.py
import socket
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool
pool = Pool(10)
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def server(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
Thread(target=handler, args=(client, ), daemon=True).start()
def handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
result = future.result()
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Connection Closed")
# 使用ProcessPoolExecutor必须用
# if __name__ == '__main__':
# main()
# 因为子进程会导入__main__模块,若不用此方法避免子进程重复执行主进程的代码,
# 可能会导致一些奇怪的结果
if __name__ == '__main__':
server("", 25000)
当然,我们要使用 ProcessPoolExecutor 来执行 CPU密集型任务 ;用 ThreadPoolExecutor 更适合 I/O密集型任务 。这样突然来了一个需要 fib(40)
的请求,也不会造成很大的影响。不过各个测试端的每秒请求数量,会比之前降了一个数量级。
深入了解
David Beazley 讲 Python 圈子里有个特别火的观点,就是因为 GIL (Global Interpreter Lock)的存在,导致用 threading
实际表现起来十分糟糕,所以不要用!所以他把代码恢复到原来的版本。如果不用 threading
模块,如何做到并发呢?用Python中的生成器特性,使用yield关键字。
>>> def countdown(n):
... while n > 0:
... yield n
... n -= 1
>>> cd = countdown(3)
>>> next(cd)
>>> 3
>>> for i in cd:
... print(i)
2
1
>>> next(cd)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
当解释器运行到 yield 会返回结果,并挂起该函数,等待下次激活。利用这个特性,稍微修改一下代码
# -*- coding: utf-8 -*-
# server.py
import socket
import select
from collections import deque
tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# 没有正在运行的task
# 等待I/O操作
can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
for sock in can_recv:
tasks.append(recv_wait.pop(sock))
for sock in can_send:
tasks.append(send_wait.pop(sock))
task = tasks.popleft()
try:
why, what = next(task)
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
else:
raise RuntimeError('需要参数!')
except StopIteration:
print('Task Done!')
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def server(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept()
print('Connection', addr)
tasks.append(handler(client)) # 添加新任务
def handler(client):
while True:
yield 'recv', client
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
yield 'send', client
client.send(resp)
print("Connection Closed")
if __name__ == '__main__':
tasks.append(server("", 25000)) # 添加初始任务
run()
在需要进行I/O操作的地方使用 yield
,挂起该函数,用 select
查看socket的I/O操作是否准备好。准备好就唤醒该函数,执行 I/O 操作。使用perf.py或者nc.py测试一下,发现我们成功地实现了并发,没用借助 threading
模块。但我们遇到CPU密集型任务,该代码的表现就乏善可陈。我们尝试用 ProcessPoolExecutor
来解决这个问题
# -*- coding: utf-8 -*-
# server.py
import socket
import select
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
pool = Pool(10)
tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()
future_wait = dict()
future_notify, future_event = socket.socketpair()
def future_done(future):
tasks.append(future_wait.pop(future))
future_notify.send(b'x')
def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# 没有正在运行的task
# 等待I/O操作
can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
for sock in can_recv:
tasks.append(recv_wait.pop(sock))
for sock in can_send:
tasks.append(send_wait.pop(sock))
task = tasks.popleft()
try:
why, what = next(task)
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
what.add_done_callback(future_done)
else:
raise RuntimeError('需要参数!')
except (StopIteration, ConnectionResetError):
print('Task Done!')
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def server(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept()
print('Connection', addr)
tasks.append(handler(client))
def handler(client):
while True:
yield 'recv', client
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result()
resp = str(result).encode('ascii') + b'\n'
yield 'send', client
client.send(resp)
print("Closed")
if __name__ == '__main__':
tasks.append(server("", 25000))
tasks.append(future_monitor())
run()
因为 future
对象不是文件描述符,无法用 select
来查看任务是否完成。但有更好的方法,future
对象支持添加 callback
函数。用 socket.socketpair
配合 future.add_done_callback
函数来解决这个问题。接下来有个小问题, yield
在代码中很突兀,有什么办法把它包装起来?!?
# -*- co```ding: utf-8 -*-
# server.py
import socket
import select
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
pool = Pool(10)
tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()
future_wait = dict()
future_notify, future_event = socket.socketpair()
def future_done(future):
tasks.append(future_wait.pop(future))
future_notify.send(b'x')
def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# 没有正在运行的task
# 等待I/O操作
can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
for sock in can_recv:
tasks.append(recv_wait.pop(sock))
for sock in can_send:
tasks.append(send_wait.pop(sock))
task = tasks.popleft()
try:
why, what = next(task)
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
what.add_done_callback(future_done)
else:
raise RuntimeError('需要参数!')
except (StopIteration, ConnectionResetError):
print('Task Done!')
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
class AsyncSocket(object):
def __init__(self, sock):
self.sock = sock
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return AsyncSocket(client), addr
def __getattr__(self, name):
return getattr(self.sock, name)
def server(addr, port):
sock = AsyncSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.listen(5)
while True:
client, addr = yield from sock.accept()
print('Connection', addr)
tasks.append(handler(client))
def handler(client):
while True:
req = yield from client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result()
resp = str(result).encode('ascii') + b'\n'
yield from client.send(resp)
print("Connection Closed")
if __name__ == '__main__':
tasks.append(server("", 25000))
tasks.append(future_monitor())
run()
yield from
是用来代理子生成器的,一种简单方式来控制子生成器的行为。在Python3.5之后,这种写法就可以用更简单的 async/await
来代替。
async/await
看了这个视频后,就看了 pycon 上关于 Asynchronous 的视频。研究了一下 3.5 后的协程语法,把代码修改成以下的样子。
# -*- coding: utf-8 -*-
# server.py
import asyncio
import logging
import sys
from concurrent.futures import ProcessPoolExecutor as Executor
logger = logging.getLogger('SERVER')
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
executor = Executor(4)
def fib(n: int) -> int:
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername')
logger.info('Connection<%s:%d> Created', *addr)
loop = asyncio.get_event_loop()
while True:
req = await reader.readline()
if not req.strip():
break
logger.debug('Connection<%s:%d> Recv %r', *addr, req)
try:
n = int(req)
future = loop.run_in_executor(executor, fib, n)
result = await asyncio.gather(future, return_exceptions=True)
if isinstance(result, Exception):
break
resp = str(result[0]).encode('ascii') + b'\n'
logger.debug('Connection<%s:%d> Send %r', *addr, resp)
writer.write(resp)
await writer.drain()
except Exception as e:
writer.write(b'[Error]\n')
await writer.drain()
logger.exception(e)
continue
writer.close()
logger.info('Connection<%s:%d> Done', *addr)
def Server(host: str, port: int):
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handler, host, port, loop=loop)
server = loop.run_until_complete(coro)
print('Serving on http://%s:%d' % (host, port))
try:
loop.run_forever()
finally:
executor.shutdown()
server.close()
try:
loop.run_until_complete(server.wait_closed())
finally:
pass
loop.close()
def main():
Server('localhost', 12345)
if __name__ == '__main__':
main()
是不是看起简洁了许多?刚学体会也不是很深,不是很了解。但觉得代码可读性变好了。
小结
CPU密集型任务 | IO密集型任务 |
---|---|
multiprocessing |
threading |
concurrent.futures.ProcessPoolExecutor |
concurrent.futures.ThreadPoolExecutor |
async / await |
以上关于Fibonacci的例子是为了更好地理解并发,不是一个很好的算法。
def fib(n):
a , b = 1, 1
for _ in range(n):
a, b = b, a + b
return a
使用以上版本大大加快了fib
的计算速度,就没必要用多进程了。而且多进程与多线程不要混搭使用,因为可能会出现死锁,导致程序出现不可预知的错误。必须慎重对待多进程和多线程混搭使用。
参考链接
- 17.4.3. ProcessPoolExecutor | Python Docs
- PYTHON: A QUICK INTRODUCTION TO THE CONCURRENT.FUTURES MODULE
- 6.2.9. Yield expressions | Python Docs
- Python3中的yield from语法
- PEP 380 — Syntax for Delegating to a Subgenerator
扩展阅读
Pycon Montreal, David's talk所出现的所有代码 codeAshu/concurrency | GitHub
协程语法async与await PEP 492 — Coroutines with async and await syntax
Python异步入门指南 Thinking about Concurrency, Raymond Hettinger, Python core developer | PyCon Russia 2016 Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017