Python Asynchronous Programming with asyncio library
I would not consider myself as an experienced programmer but I can code scripts or small programs, mostly for proof of concepts. I started about 20 years ago with Perl scripting then C, PHP, Bash, C++, Java, Lua and finally Python. All that time, asynchronous programming was synonym of Threads and Forked Processes. Sharing memory was done through queues and locks.
I started a new project last September where I wanted to build a Websocket Server in Python for real-time data reporting. I decided to use the Python WebSocket Library that is using the Python asyncio library introduced in Python 3.4.
I have heard very good things about the asyncio Python library and the event loop programming in general but this project was going to be my first experience with it. Now that my Websocket Server project is going well, I decided to write down a few things I have learned working with asyncio. The library has been introduced in Python 3.4 and some keywords/functions changed in Python 3.6. This post is only using Python 3.6 syntax and all the code has been tested with Python 3.6.3.
Asynchronous versus Parallel Programming
A lot of programming languages allows you to use Threads and Forked Processes to execute code in parallel. The threaded functions can run simultaneously on different CPU cores which can speed up your application processing. Also, some threads can run while other threads are waiting for i/o such as network connections.
With Asyncio programming, there is no such parallelism. All the functions attached with your event loop are running within a single thread. However, when functions are waiting for something else, they can let the Python interpreter run other functions and resume when they have all they need to continue their execution.
Asyncio basics
Let's start with a first example.
import asyncio
async def my_function(delay):
print(f'Start {delay}')
await asyncio.sleep(delay)
print(f'Stop {delay}')
asyncio.ensure_future(my_function(3))
print('Scheduled 3')
asyncio.ensure_future(my_function(2))
print('Scheduled 2')
asyncio.ensure_future(my_function(1))
print('Scheduled 1')
loop = asyncio.get_event_loop()
loop.run_forever()
Let's take a look at my_function
. You probably noticed the async
keyword before the def
. This tells you that the function will be executed asynchronously. It is called a coroutine. Now once the coroutine is executed, it will start a Task
. The Task will execute to the end unless it reaches the await
call. In this example, the Task will suspend its execution when it reaches the line await asyncio.sleep(delay)
. The asyncio.sleep
coroutine is the equivalent to the time.sleep
function. After the delay
, the event loop will automatically resume the execution of the Task.
The await
keyword is used when executing another coroutine from a coroutine.
Later in the example, you will find the asyncio.ensure_future()
function. This function will schedule the execution of the coroutine and return a Task
. At this point, the Tasks are not executed yet.
The asyncio.get_event_loop()
function will return the loop object. The loop.run_forever()
function will start the loop and run forever... To exit this example, use CTRL+C. This example output should be:
Scheduled 3
Scheduled 2
Scheduled 1
Start 3
Start 2
Start 1
Stop 1
Stop 2
Stop 3
Stop the run_forever()
If you are using loop.run_forever()
, you probably want to stop the loop at some time. Here is an example that is stopping the loop based on the Unix Signals received (SIGINT is the signal received upon CTRL+C).
import asyncio
import signal
async def my_function(delay):
print(f'Start {delay}')
await asyncio.sleep(delay)
print(f'Stop {delay}')
def stop_loop(loop):
print('Stopping the loop')
loop.stop()
asyncio.ensure_future(my_function(1))
print('Scheduled 1')
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, stop_loop, loop)
loop.run_forever()
print('Exiting')
Wait for the execution of a coroutine
The loop.run_until_complete()
function will start the loop and run it until the coroutine is returned. After, the execution, the loop is stopped (but not closed) and will continue it's execution at the second loop.run_until_complete()
.
import asyncio
import time
start_time = time.time()
def print_ts(txt):
print(f'{time.time() - start_time:.2f} sec : {txt}')
async def my_function(delay):
print_ts(f'Start {delay}')
await asyncio.sleep(delay)
print_ts(f'Stop {delay}')
loop = asyncio.get_event_loop()
asyncio.ensure_future(my_function(1)) # Will stop at 1 sec
asyncio.ensure_future(my_function(3)) # Will stop at 3 sec
asyncio.ensure_future(my_function(15)) # Will stop at 15 sec
asyncio.ensure_future(my_function(20)) # Will stop at 20 sec
loop.run_until_complete(my_function(2)) # Will stop at 2 sec
print_ts(f'Is loop running? {loop.is_running()}')
print_ts('Blocking sleep for 10 seconds - Should not use with asyncio.')
time.sleep(10)
loop.run_until_complete(my_function(4)) # Will stop at 16 sec (2 + 10 + 4)
print_ts('Exiting. 20 never finished.')
The output should be:
0.00 sec : Start 1
0.00 sec : Start 3
0.00 sec : Start 15
0.00 sec : Start 20
0.00 sec : Start 2
1.01 sec : Stop 1
2.01 sec : Stop 2
2.01 sec : Is loop running? False
2.01 sec : Blocking sleep for 10 seconds - Should not use with asyncio.
12.01 sec : Start 4
12.01 sec : Stop 3
15.01 sec : Stop 15
16.01 sec : Stop 4
16.01 sec : Exiting. 20 never finished.
Closing the loop
In the previous example, the coroutine my_function(20)
never finish it's execution. If we add loop.close()
at the end of the previous example to explicitly close the loop, we should have the following output at the end:
Task was destroyed but it is pending!
task: <Task pending coro=<my_function() done, defined at example-3.py:11> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1068ac4c8>()]>>
This is normal since the Task is not done yet. The following example describes how to gracefully exit an asyncio loop.
import asyncio
from concurrent.futures import CancelledError
async def my_function(delay):
print(f'Start {delay}')
try:
await asyncio.sleep(delay)
except CancelledError:
print(f'Cancelled {delay}')
return
print(f'Stop {delay}')
loop = asyncio.get_event_loop()
asyncio.ensure_future(my_function(10))
loop.run_until_complete(my_function(1))
for task in asyncio.Task.all_tasks():
print(f'Cancelling {task}')
task.cancel()
loop.run_until_complete(task)
loop.close()
There are two reasons why the command loop.run_until_complete(task)
is executed after the task.cancel()
:
- If we don't continue the loop execution, the cancel will never be executed on the Task and we will still have the same warning.
- By making sure we execute the Task, we have the opportunity to catch the
CancelledError
exception and gracefully clean-up the task.
Wait for the execution of multiple tasks and catch exceptions
If you want to wait for the execution of multiple tasks, you can use the asyncio.wait
coroutine. The return_when
argument must be FIRST_COMPLETED
, FIRST_EXCEPTION
or ALL_COMPLETED
. More details in the documentation.
import asyncio
from concurrent.futures import CancelledError
async def my_function(delay):
print(f'Start {delay}')
try:
await asyncio.sleep(delay)
except CancelledError:
print(f'Cancelled {delay}')
return
print(f'Stop {delay}')
loop = asyncio.get_event_loop()
task_a = asyncio.ensure_future(my_function(1))
task_b = asyncio.ensure_future(my_function(2))
task_c = asyncio.ensure_future(my_function('a'))
done, pending = loop.run_until_complete(asyncio.wait(
[task_a, task_b, task_c],
return_when=asyncio.ALL_COMPLETED,
))
for task in done:
if task.exception:
try:
loop.run_until_complete(task)
except Exception as e:
print(f'Exception catched: {e}')
for task in pending:
task.cancel()
loop.run_until_complete(task)
loop.close()
To catch the Task exception, we need to finish it's execution. The output will be:
Start 1
Start 2
Start a
Stop 1
Stop 2
Exception catched: unsupported operand type(s) for +: 'float' and 'str'
Conclusion
Asynchronous programming with asyncio is not trivial but once you understand how it works, it's a fantastic tool when you have multiple concurrent function that are mostly i/o bound.
It allows you to share the same memory between all the functions with less need for Queues or Locks.
I hope this post is useful. If anything is not clear enough or you find some mistakes, please leave a comment below.
This was quite helpful. Thank you for taking the time to write this.
I get glimpses of the overall logic, but I still don’t see how to apply this to my problem.
I have a WebSocket that’s receiving a lot of data, and I need to process it and store it in real time. I am noticing that the connection often dies, and I suspect it is because the data processing takes too long and the socket is blocked from receiving data.
Roughly I do this.
It seems I need to turn the process() function into a Task. Or somehow decouple its execution from the socket receiving. What would you suggest I try?
Maybe process() should be a co-routine rather.
Great tut! mate. The only thing I am concerned about is, I have 5 functions which needs to be run independently in an asynchronous manner. And all of them return certain output, So I am wondering how to get the each of their independent output?
You need to call the
result()
on the done tasks. Here is an example:Details on Futures: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future