Codementor Events

Python Asynchronous Programming with asyncio library

Published Dec 12, 2017Last updated Dec 30, 2018

I would not consider myself as an experienced programmer but I can code scripts or small programs, mostly for proof of concepts. I started about 20 years ago with Perl scripting then C, PHP, Bash, C++, Java, Lua and finally Python. All that time, asynchronous programming was synonym of Threads and Forked Processes. Sharing memory was done through queues and locks.

I started a new project last September where I wanted to build a Websocket Server in Python for real-time data reporting. I decided to use the Python WebSocket Library that is using the Python asyncio library introduced in Python 3.4.

I have heard very good things about the asyncio Python library and the event loop programming in general but this project was going to be my first experience with it. Now that my Websocket Server project is going well, I decided to write down a few things I have learned working with asyncio. The library has been introduced in Python 3.4 and some keywords/functions changed in Python 3.6. This post is only using Python 3.6 syntax and all the code has been tested with Python 3.6.3.

Asynchronous versus Parallel Programming

A lot of programming languages allows you to use Threads and Forked Processes to execute code in parallel. The threaded functions can run simultaneously on different CPU cores which can speed up your application processing. Also, some threads can run while other threads are waiting for i/o such as network connections.

With Asyncio programming, there is no such parallelism. All the functions attached with your event loop are running within a single thread. However, when functions are waiting for something else, they can let the Python interpreter run other functions and resume when they have all they need to continue their execution.

Asyncio basics

Let's start with a first example.

import asyncio

async def my_function(delay):
    print(f'Start {delay}')
    await asyncio.sleep(delay)
    print(f'Stop {delay}')

asyncio.ensure_future(my_function(3))
print('Scheduled 3')
asyncio.ensure_future(my_function(2))
print('Scheduled 2')
asyncio.ensure_future(my_function(1))
print('Scheduled 1')

loop = asyncio.get_event_loop()

loop.run_forever()

Let's take a look at my_function. You probably noticed the async keyword before the def. This tells you that the function will be executed asynchronously. It is called a coroutine. Now once the coroutine is executed, it will start a Task. The Task will execute to the end unless it reaches the await call. In this example, the Task will suspend its execution when it reaches the line await asyncio.sleep(delay). The asyncio.sleep coroutine is the equivalent to the time.sleep function. After the delay, the event loop will automatically resume the execution of the Task.

The await keyword is used when executing another coroutine from a coroutine.

Later in the example, you will find the asyncio.ensure_future() function. This function will schedule the execution of the coroutine and return a Task. At this point, the Tasks are not executed yet.

The asyncio.get_event_loop() function will return the loop object. The loop.run_forever() function will start the loop and run forever... To exit this example, use CTRL+C. This example output should be:

Scheduled 3
Scheduled 2
Scheduled 1
Start 3
Start 2
Start 1
Stop 1
Stop 2
Stop 3

Stop the run_forever()

If you are using loop.run_forever(), you probably want to stop the loop at some time. Here is an example that is stopping the loop based on the Unix Signals received (SIGINT is the signal received upon CTRL+C).

import asyncio
import signal

async def my_function(delay):
    print(f'Start {delay}')
    await asyncio.sleep(delay)
    print(f'Stop {delay}')

def stop_loop(loop):
    print('Stopping the loop')
    loop.stop()

asyncio.ensure_future(my_function(1))
print('Scheduled 1')

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, stop_loop, loop)
loop.run_forever()
print('Exiting')

Wait for the execution of a coroutine

The loop.run_until_complete() function will start the loop and run it until the coroutine is returned. After, the execution, the loop is stopped (but not closed) and will continue it's execution at the second loop.run_until_complete().

import asyncio
import time

start_time = time.time()

def print_ts(txt):
    print(f'{time.time() - start_time:.2f} sec : {txt}')

async def my_function(delay):
    print_ts(f'Start {delay}')
    await asyncio.sleep(delay)
    print_ts(f'Stop {delay}')

loop = asyncio.get_event_loop()

asyncio.ensure_future(my_function(1))    # Will stop at 1 sec
asyncio.ensure_future(my_function(3))    # Will stop at 3 sec
asyncio.ensure_future(my_function(15))   # Will stop at 15 sec
asyncio.ensure_future(my_function(20))   # Will stop at 20 sec

loop.run_until_complete(my_function(2))  # Will stop at 2 sec
print_ts(f'Is loop running? {loop.is_running()}')
print_ts('Blocking sleep for 10 seconds - Should not use with asyncio.')

time.sleep(10)

loop.run_until_complete(my_function(4))  # Will stop at 16 sec (2 + 10 + 4)
print_ts('Exiting. 20 never finished.')

The output should be:

0.00 sec : Start 1
0.00 sec : Start 3
0.00 sec : Start 15
0.00 sec : Start 20
0.00 sec : Start 2
1.01 sec : Stop 1
2.01 sec : Stop 2
2.01 sec : Is loop running? False
2.01 sec : Blocking sleep for 10 seconds - Should not use with asyncio.
12.01 sec : Start 4
12.01 sec : Stop 3
15.01 sec : Stop 15
16.01 sec : Stop 4
16.01 sec : Exiting. 20 never finished.

Closing the loop

In the previous example, the coroutine my_function(20) never finish it's execution. If we add loop.close() at the end of the previous example to explicitly close the loop, we should have the following output at the end:

Task was destroyed but it is pending!
task: <Task pending coro=<my_function() done, defined at example-3.py:11> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1068ac4c8>()]>>

This is normal since the Task is not done yet. The following example describes how to gracefully exit an asyncio loop.

import asyncio

from concurrent.futures import CancelledError

async def my_function(delay):
    print(f'Start {delay}')
    try:
        await asyncio.sleep(delay)
    except CancelledError:
        print(f'Cancelled {delay}')
        return
    print(f'Stop {delay}')

loop = asyncio.get_event_loop()

asyncio.ensure_future(my_function(10))

loop.run_until_complete(my_function(1))

for task in asyncio.Task.all_tasks():
    print(f'Cancelling {task}')
    task.cancel()
    loop.run_until_complete(task)

loop.close()

There are two reasons why the command loop.run_until_complete(task) is executed after the task.cancel():

  • If we don't continue the loop execution, the cancel will never be executed on the Task and we will still have the same warning.
  • By making sure we execute the Task, we have the opportunity to catch the CancelledError exception and gracefully clean-up the task.

Wait for the execution of multiple tasks and catch exceptions

If you want to wait for the execution of multiple tasks, you can use the asyncio.wait coroutine. The return_when argument must be FIRST_COMPLETED, FIRST_EXCEPTION or ALL_COMPLETED. More details in the documentation.

import asyncio

from concurrent.futures import CancelledError

async def my_function(delay):
    print(f'Start {delay}')
    try:
        await asyncio.sleep(delay)
    except CancelledError:
        print(f'Cancelled {delay}')
        return
    print(f'Stop {delay}')

loop = asyncio.get_event_loop()

task_a = asyncio.ensure_future(my_function(1))
task_b = asyncio.ensure_future(my_function(2))
task_c = asyncio.ensure_future(my_function('a'))

done, pending = loop.run_until_complete(asyncio.wait(
    [task_a, task_b, task_c],
    return_when=asyncio.ALL_COMPLETED,
))

for task in done:
    if task.exception:
        try:
            loop.run_until_complete(task)
        except Exception as e:
            print(f'Exception catched: {e}')

for task in pending:
    task.cancel()
    loop.run_until_complete(task)

loop.close()

To catch the Task exception, we need to finish it's execution. The output will be:

Start 1
Start 2
Start a
Stop 1
Stop 2
Exception catched: unsupported operand type(s) for +: 'float' and 'str'

Conclusion

Asynchronous programming with asyncio is not trivial but once you understand how it works, it's a fantastic tool when you have multiple concurrent function that are mostly i/o bound.
It allows you to share the same memory between all the functions with less need for Queues or Locks.

I hope this post is useful. If anything is not clear enough or you find some mistakes, please leave a comment below.

Discover and read more posts from Jean-Francois Levesque
get started
post comments4Replies
Auguste Moire
6 years ago

This was quite helpful. Thank you for taking the time to write this.
I get glimpses of the overall logic, but I still don’t see how to apply this to my problem.

I have a WebSocket that’s receiving a lot of data, and I need to process it and store it in real time. I am noticing that the connection often dies, and I suspect it is because the data processing takes too long and the socket is blocked from receiving data.

Roughly I do this.

while True:
    message = await websocket.recv()
    process(message)

It seems I need to turn the process() function into a Task. Or somehow decouple its execution from the socket receiving. What would you suggest I try?

Auguste Moire
6 years ago

Maybe process() should be a co-routine rather.

The Gr8 Adakron
7 years ago

Great tut! mate. The only thing I am concerned about is, I have 5 functions which needs to be run independently in an asynchronous manner. And all of them return certain output, So I am wondering how to get the each of their independent output?

Jean-Francois Levesque
7 years ago

You need to call the result()on the done tasks. Here is an example:

import asyncio

async def my_function(p1):
    await asyncio.sleep(p1)
    return p1

loop = asyncio.get_event_loop()

task_a = asyncio.ensure_future(my_function(1))
task_b = asyncio.ensure_future(my_function(2))
task_c = asyncio.ensure_future(my_function(3))

done, pending = loop.run_until_complete(asyncio.wait(
    [task_a, task_b, task_c],
    return_when=asyncio.ALL_COMPLETED,
))

for task in done:
    print(task.result())

Details on Futures: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future

Show more replies