Greetings, reader! Welcome to your 35th tutorial on async/await so far
but I promise this one is worth it…at least if you’re part of a specific
audience wanting to know how coroutines actually work under the hood.
Today we’re going to build minio, a tiny asyncio clone from scratch for
teaching purposes. First things first, here
is the full source code for your reference.
async and why we have it#
You have probably written asynchronous code using the async and await
keywords before:
import asyncio
async def serve(reader, writer):
# Echo received data back to the client.
data = await reader.read(100) # (2)
writer.write(data)
await writer.drain()
# Close the client connection when we're done.
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(serve, "127.0.0.1", 8888)
async with server:
await server.serve_forever() # (1)
asyncio.run(main())
Seems easy enough, right? Some function calls require that you await them
and when you do that it has to be inside an async def. Apart from the extra
syntax, the program still feels very similar to a non-async one.
But it’s called asynchronous for a reason! Our echo server above is perfectly capable of serving many client connections at the same time, similar to a multithreaded program. But here’s the twist: This program is running on a single thread.
To understand why that is, you have to realize most of the time in this program is spent waiting:
(1)waits for new clients to connect to the server. When they do, the handling is delegated toserve.(2)waits for a client to send some data, which is then echoed back.
Threads are a poor fit for this kind of concurrency. They’re expensive to create, the context switching is expensive, and most of the time they will just be lazing around while they wait for clients. On top of that, we’d quickly hit some resource limits if we created a thread per connection.
This is where coroutines shine. They are much lighter to create than a full-blown
thread and every await marks a potential suspension point. If a coroutine can’t
make progress because it needs to wait, it can suspend and let something else run
in the meantime. The responsibility for timely reaching an await after a few
milliseconds of execution falls on the programmer.
Anatomy of a coroutine#
With that out of the way, let’s investigate a bit:
>>> async def test():
... print("I am a coroutine")
...
>>> c = test()
>>> c
<coroutine object test at 0x7ff3253c50c0>
A coroutine function produces a coroutine object when called? Who
would’ve guessed! But since we don’t see the print, it is safe to
say that coroutines are lazy beasts. Now how do we run them?
It turns out that coroutines use the implementation of generators as their backbone, which means they share a similar API:
>>> c = test()
>>> c.send(None)
I am a coroutine
Traceback (most recent call last):
File "<python-input-3>", line 1, in <module>
c.send(None)
~~~~~~^^^^^^
StopIteration
There it is! We got our print and a StopIteration exception to indicate
that the coroutine is completed.
But that isn’t all, we can also use the throw method from the generator
API to inject an exception at the current suspension point:
>>> c = test()
>>> c.throw(ValueError("Oops"))
Traceback (most recent call last):
File "<python-input-9>", line 1, in <module>
c.throw(ValueError("Oops"))
~~~~~~~^^^^^^^^^^^^^^^^^^^^
File "<python-input-0>", line 1, in test
async def test():
ValueError: Oops
Now what happens if we try the last function from the generator trio?
>>> c = test()
>>> c.close()
>>> c.send(None)
Traceback (most recent call last):
File "<python-input-12>", line 1, in <module>
c.send(None)
~~~~~~^^^^^^
RuntimeError: cannot reuse already awaited coroutine
Makes sense.
But now that we know how to run a coroutine, how do we make it suspend? Well, we could try treating it like a generator again and simply yield.
>>> async def test2():
... yield "suspend?"
...
>>> c = test2()
>>> c.send(None)
Traceback (most recent call last):
File "<python-input-28>", line 1, in <module>
c.send(None)
^^^^^^
AttributeError: 'async_generator' object has no attribute 'send'. Did you mean: 'asend'?
…Not what I expected. Looks like you can’t actually yield in a coroutine
because then it’s not a coroutine anymore but an async_generator. 🤔
Oh, I know. We could await something. But what exactly? We cannot pull in
asyncio because our goal is to make something of our own.
Awaitables to the rescue#
Luckily, coroutines have yet another important building block besides the actual
coroutine objects - awaitables.
An awaitable is anything that returns an iterable from its __await__ method.
Yes, that applies to coroutine objects too:
>>> for _ in test().__await__():
... pass
...
I am a coroutine
Let’s experiment a bit:
>>> class MyAwaitable:
... def __await__(self):
... yield
...
>>> async def test():
... print("entered test()")
... await MyAwaitable()
... print("resumed test()")
...
>>> c = test()
>>> c.send(None)
entered test()
>>> c.send(None)
resumed test()
Traceback (most recent call last):
File "<python-input-36>", line 1, in <module>
c.send(None)
~~~~~~^^^^^^
StopIteration
And there it is! Notice how it takes two send calls to drive this coroutine
to completion? That’s because the first call only executes the coroutine up
to the point where it suspends at await thanks to the yield.
But that is not all. There is another way to create awaitables that do not have
an __await__ method - by decorating a generator function with @types.coroutine
which grants a regular generator object some superpowers from the Python gods:
>>> import types
>>>
>>> @types.coroutine
... def a():
... yield
...
>>> async def b():
... await a()
...
>>> c = b()
>>> c.send(None)
>>> c.send(None)
Traceback (most recent call last):
File "<python-input-50>", line 1, in <module>
c.send(None)
~~~~~~^^^^^^
StopIteration
Just take a look at what replicating this magic would look like.
>>> def a():
... yield
...
>>> await a()
Traceback (most recent call last):
File "/nix/store/xphki1psg7lc0ixpm80n9l866cdfcy3k-python3-3.14.0rc3/lib/python3.14/concurrent/futures/_base.py", line 450, in result
return self.__get_result()
~~~~~~~~~~~~~~~~~^^
File "/nix/store/xphki1psg7lc0ixpm80n9l866cdfcy3k-python3-3.14.0rc3/lib/python3.14/concurrent/futures/_base.py", line 395, in __get_result
raise self._exception
File "<python-input-1>", line 1, in <module>
await a()
TypeError: 'generator' object can't be awaited
>>> # Here comes the magic:
>>> a.__code__ = a.__code__.replace(co_flags=a.__code__.co_flags | 0x100)
>>> await a()
Yep, it’s really just an internal flag that prevents generators and coroutines to be used interchangeably. No promises that it works with whatever Python version you will be using at the time of reading though.
But the important takeaway here is: Every await may be suspended by a yield at some point down the line.
Baby’s first run loop#
With some coroutine theory out of the way, let’s start laying our foundation. We’re going to implement some important cornerstones:
A
Taskabstraction which represents coroutines that don’t have a directawaiter. This would be the coroutine passed tominio.run()but also the background tasks we’re going to implement later.A mechanism for our coroutines to be able to call into the event loop without needing access to the runtime’s internal state. We use the
_runtime_callhelper for this which suspends execution and passes a message of the form(event, arg)to the event loop.The event loop itself. It drains a queue of Tasks ready to run, executes them until suspension, and processes the runtime call that was made.
When any of the subsequent sections introduce a new type of runtime call, assume the handler method is registered to the
handlersdictionary.
import types
from collections import deque
class Task:
def __init__(self, coro):
self.coro = coro
self.call_result = None
self.result = None
def wake(self):
# Resume our coro until next suspension.
return self.coro.send(self.call_result)
@types.coroutine
def _runtime_call(event, value):
call_result = yield event, value
return call_result
class Runtime:
def __init__(self):
self.handlers = {}
self.run_queue = deque()
def run(self, coro):
# Make a Task for the initial coroutine.
root_task = Task(coro)
# Run the event loop while there's still work left.
self.run_queue.append(root_task)
while self.run_queue:
task = self.run_queue.popleft()
try:
event, arg = task.wake()
except StopIteration as e:
task.result = e.value
except Exception as e:
# Raise only exceptions from the main task.
# Background tasks will die with a print.
if task is root_task:
raise
else:
print(e)
else:
handler = self.handlers[event]
handler(task, arg)
# Return the result of coro when we're done.
return root_task.result
def run(coro):
rt = Runtime()
return rt.run(coro)
This is not doing a lot for now, but let’s give it a shot:
>>> import minio
>>>
>>> async def a():
... raise ValueError("moo")
...
>>> async def b():
... return 2
...
>>> minio.run(a())
Traceback (most recent call last):
File "<python-input-4>", line 1, in <module>
minio.run(a())
~~~~~~~~~^^^^^
File "/tmp/minio.py", line 53, in run
return rt.run(coro)
~~~~~~^^^^^^
File "/tmp/minio.py", line 32, in run
event, arg = task.coro.send(task.call_result)
~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File "<python-input-2>", line 2, in a
raise ValueError("moo")
ValueError: moo
>>> minio.run(b())
2
Seems good!
Background tasks#
At the start I said that the true power of coroutines is concurrency without requiring multithreading. So that is our next hurdle. We need the ability to start background tasks and we need to execute them.
First, we need a public spawn function which tells the runtime we
want to spawn a coroutine in the background.
import inspect
async def spawn(coro):
if not inspect.iscoroutine(coro):
raise TypeError("coro must be a coroutine")
return await _runtime_call("spawn", coro)
That’s it! The runtime call will briefly suspend execution so that
the event loop gets to process the "spawn" event. We need a handler
for that in the Runtime class:
def handle_spawn(self, spawning_task, spawn_coro):
spawn_task = Task(spawn_coro)
# Append the task for the newly spawned coroutine
# to the run queue so the event loop sees it.
self.run_queue.append(spawn_task)
# And since the spawning task is immediately ready
# to make progress again, we put it to the front
# of the run queue so it gets resumed immediately.
spawning_task.call_result = None
self.run_queue.appendleft(spawning_task)
Let’s try it:
>>> import minio
>>> async def bg(i):
... print(f"I am background task {i}")
...
>>> async def main():
... print("entering main()")
... for i in range(10):
... await minio.spawn(bg(i))
... print("main() done")
...
>>> minio.run(main())
entering main()
main() done
I am background task 0
I am background task 1
I am background task 2
I am background task 3
I am background task 4
I am background task 5
I am background task 6
I am background task 7
I am background task 8
I am background task 9
Nice!
Bonus: joining tasks#
Now wouldn’t it be useful if we could also wait for coroutines we spawned to finish? Set your peepers on the changes we are making to support this:
class Task:
def __init__(self, coro):
self.coro = coro
self.data = None
self.result = None
# NEW: Set of Tasks waiting for this Task to finish.
self.waiters = set()
# NEW:
class JoinHandle:
def __init__(self, task):
self.task = task
def __await__(self):
# "I want to wait until self.task finishes"
yield from _runtime_call("join", self.task)
return self.task.result
We introduce a new "join" event type which waits for a certain
Task object to complete. When that has happened, the coroutine
awaiting the JoinHandle will be resumed and can access the
result of the Task.
As for the event loop itself, we must support the new event type and
also inject a JoinHandle as the call result to spawn():
def handle_spawn(self, spawning_task, spawn_coro):
# ...
spawning_task.call_result = JoinHandle(spawn_task)
# ...
def handle_join(self, waiting_task, join_task):
# Register interest in being notified when join_task
# completes. Until then, waiting_task will not be put
# in the run queue again.
join_task.waiters.add(waiting_task)
waiting_task.call_result = None
Another subtle change is needed in the handling of StopIteration
so waiters get added to the run queue when a task completes:
except StopIteration as e:
task.result = e.value
self.run_queue.extend(task.waiters) # !
task.waiters.clear()
Now let’s test our changes:
>>> import minio
>>>
>>> async def bg(num):
... print(f"I am background task {num}")
... return num
...
>>> async def main():
... print("entering main()")
... res = 0
... for i in range(10):
... jh = await minio.spawn(bg(i))
... res += await jh
... print(f"{res=}")
...
>>> minio.run(main())
entering main()
I am background task 0
I am background task 1
I am background task 2
I am background task 3
I am background task 4
I am background task 5
I am background task 6
I am background task 7
I am background task 8
I am background task 9
res=45
See the difference? Now we don’t have main() completing before the
background tasks anymore. Pretty cool, huh?
Handling time#
Let’s add another feature that requires a Task to wait before it can
run again - timers. Specifically, we’re going to add minio.sleep()
which lets us suspend a task for a given number of seconds before it
is woken again.
First, we’re going to need a data structure to store timers in. Every
timer is a tuple (deadline, task) and we keep them in a min heap so
we always know the timer that expires next.
from heapq import heappush, heappop
class TimerHeap:
def __init__(self):
self.timers = []
def __len__(self):
return len(self.timers)
def add(self, task, secs):
heappush(self.timers, (time.monotonic() + secs, task))
def next_deadline(self):
return self.timers[0][0] - time.monotonic()
def get_elapsed(self):
now = time.monotonic()
while self and self.timers[0][0] < now:
_, task = heappop(self.timers)
yield task
Now we need to integrate it into the Runtime. This is done by adding
an instance of TimerHeap to __init__ and adapting the event loop
slightly:
# ...
while self.run_queue or self.timers:
# When no Tasks are left, pause the thread until the next
# timer elapses and reap ready tasks into the run queue.
if not self.run_queue:
time.sleep(self.timers.next_deadline())
for task in self.timers.get_elapsed():
self.run_queue.append(task)
# Same as before from here...
task = self.run_queue.popleft()
# ...
Now the last step is a sleep() method which suspends the Task
until its timer elapses:
async def sleep(delay):
return await _runtime_call("sleep", delay)
# ...
# ...And the corresponding event handler in Runtime
def handle_sleep(self, task, delay):
self.timers.add(task, delay)
task.call_result = None
Let’s test our change:
>>> import minio
>>>
>>> async def main():
... for _ in range(5):
... await minio.sleep(2)
... print("sleepy")
...
>>> minio.run(main())
sleepy
sleepy
sleepy
sleepy
sleepy
Works like a charm. 🥳
I/O support#
Here comes the real juice. A coroutine runtime which doesn’t handle I/O is completely useless because I/O is the main source of all the waiting we’ll need to do.
Nonblocking I/O and Selectors#
Before we write some more code for minio, we must first discuss
how the pieces will fit together. We want to use networking code
from the builtin socket module because that is the low-level
building block from the operating system.
But wait, those APIs are blocking. And we dislike blocking code.
So for our next trick we’ll just make them…not blocking. And I
mean it quite literally, we can call the .setblocking(False)
method on a socket and the world will be healed.
That is only half of the story though. Look what happens now if we try to accept a client connection on a server socket when nobody is trying to connect.
Traceback (most recent call last):
File "/tmp/minio_echo_server.py", line 27, in <module>
minio.run(main())
~~~~~~~~~^^^^^^^^
File "/tmp/minio.py", line 160, in run
return rt.run(coro)
~~~~~~^^^^^^
File "/tmp/minio.py", line 124, in run
event, arg = task.coro.send(task.call_result)
~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File "/tmp/minio_echo_server.py", line 23, in main
conn, _ = server.accept()
~~~~~~~~~~~~~^^
File "/nix/store/xphki1psg7lc0ixpm80n9l866cdfcy3k-python3-3.14.0rc3/lib/python3.14/socket.py", line 298, in accept
fd, addr = self._accept()
~~~~~~~~~~~~^^
BlockingIOError: [Errno 11] Resource temporarily unavailable
We will just get a BlockingIOError instead to let us know that
blocking would be required here to fulfill the operation. 😕
Are we just supposed to test the same call over and over again until
we don’t get the BlockingIOError anymore? Surely there has to be
a better way.
Thankfully we are loved by the makers of our operating systems because
they bless us with APIs where we can register interest in a socket
becoming readable or writable and get notified when that is the case!
They carry funny names like epoll or kqueue.
And most importantly, we are also loved by the Python gods themselves
because they bless us with the builtin selectors module as a portable
wrapper for these APIs.
Adding Runtime support#
The idea behind selectors is relatively straightforward: We’re going
to have an instance of a selector object in Runtime, then we register
our socket object to tell it we’re waiting for it to become either readable
or writable. And then we’re going to need to call selector.select() which
will inform us when any of the events we registered occur.
Let’s introduce some new APIs which allow us to wait for a nonblocking socket to become readable and writable. We turn to none other than our runtime call system for this.
async def wait_readable(fileobj):
return await _runtime_call("io", (EVENT_READ, fileobj))
async def wait_writable(fileobj):
return await _runtime_call("io", (EVENT_WRITE, fileobj))
To say this is barebones is an understatement, but we will see how this is applied to do useful things. And the corresponding event handler:
def handle_io(self, task, data):
# Register interest in the fileobj becoming readable
# or writable. We attach the waiting Task to the map
# so that we can associate every notification to the
# Task it is meant for in the event loop.
event, fileobj = data
self.selector.register(fileobj, event, task)
task.call_result = None
Sweet. Now that we can register interest in a socket becoming readable or writable, we also need to listen for these events to occur.
class Runtime:
def __init__(self):
# ...
self.selector = DefaultSelector()
def run(self, coro):
# ...
while self.run_queue or self.timers or self.selector.get_map():
# When no Tasks are left, pause the thread while waiting
# for I/O or timers. We use the timer closest to expiring
# as the deadline for I/O waits.
if not self.run_queue:
to = self.timers.next_deadline() if self.timers else None
for key, _ in self.selector.select(to):
self.run_queue.append(key.data)
self.selector.unregister(key.fileobj)
# Put elapsed timers in the run queue.
for task in self.timers.get_elapsed():
self.run_queue.append(task)
# Same as before from here...
task = self.run_queue.popleft()
This is quite something. Our selector maintains a mapping
of registered file objects to selector keys. So as long as this
mapping is non-empty, there will be Tasks blocked on I/O
(selector.get_map()).
If the run queue is empty, we need to wait for I/O or timers.
We do that with a selector.select() call using the timer closest
to expiring as a deadline.
If we do get completed I/O notifications, we can schedule these tasks for execution and unregister our interest in notifications (because now we can perform the operation without needing to wait anymore).
Whispers in the Network#
Last but not least, we want our I/O abstraction to do something. What better way is there to wrap this journey up than to go back to what it started with - the classic echo server.
import minio
import socket
async def serve(conn):
conn.setblocking(False)
await minio.wait_readable(conn)
data = conn.recv(100)
while len(data) > 0:
await minio.wait_writable(conn)
sent = conn.send(data)
data = data[sent:]
conn.close()
async def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8888))
server.listen()
server.setblocking(False)
with server:
while True:
await minio.wait_readable(server)
conn, _ = server.accept()
await minio.spawn(serve(conn))
minio.run(main())
The power of convenient abstractions, huh? But that is it for this post.
Conclusion#
We learned a lot today. Of course a proper async runtime has a lot
of other niceties, such as cancellation support, better abstractions
and all the other things you know and love from asyncio.
This is by no means complete or the reference for the real deal. But it provides a solid foundation to build upon and has taught you most of the relevant concepts that also appear in real runtimes.
Maybe this async stuff isn’t all that magical and complicated after all. Until next time!
Reply by Email