When using a web framework not supports to write concurrent code using the async/await syntax, want to use concurrent to speed the connection up with another service, like connecting Redis, making a lot of requests, etc. So it’s why we need to run the asyncio event loop in another thread.
Create a thread that runs asyncio event loop forever
Attention, This example is not fully tested.
import asyncio
import threading
class AsyncioEventLoopThread(threading.Thread):
def __init__(self, *args, loop=None, **kwargs):
super().__init__(*args, **kwargs)
self.loop = loop or asyncio.new_event_loop()
self.running = False
def run(self):
self.running = True
self.loop.run_forever()
def run_coro(self, coro):
return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
self.join()
self.running = False
Due to the asyncio.run_coroutine_threadsafe
function returns a concurrent.futures.Future
object, so we just execute the result
method to get the coroutine result. It will wait for the coroutine finished.
There is a simple example below to demonstrate how to use it.
async def hello_world():
print("hello world")
async def make_request():
await asyncio.sleep(1)
thr = AsyncioEventLoopThread()
thr.start()
try:
thr.run_coro(hello_world())
thr.run_coro(make_request())
finally:
thr.stop()
Attention, don’t run the same coroutine in two different event loops.
Share objects between coroutines
you should inherit the AsyncioEventLoopThread
or just modify it to hold the objects you need to share between coroutines.
Using the contextvars
to share objects between coroutines,
shared values must be stored into contextvars.Context
before running coroutines.
import contextvars
import aiohttp
var_session = contextvars.ContextVar('session')
class FetcherThread(AsyncioEventLoopThread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._session = None
self._event_session_created = threading.Event()
def run_coro(self, coro):
self._event_session_created.wait()
var_session.set(self.session)
return super().run_coro(coro)
async def _create_session(self):
self.session = aiohttp.ClientSession()
async def _close_session(self):
await self.session.close()
def run(self):
fut = asyncio.run_coroutine_threadsafe(self._create_session(), loop=self.loop)
fut.add_done_callback(lambda _: self._event_session_created.set())
super().run()
def stop(self):
self.run_coro(self._close_session())
super().stop()
There is a simple example below to fetch the source code of website github.com.
async def make_request():
session = var_session.get()
async with session.get("https://github.com") as resp:
resp.raise_for_status()
return await resp.text()
thr = FetcherThread()
thr.start()
try:
text = thr.run_coro(make_request())
print(text)
finally:
thr.stop()