Testing Celery Tasks
Recently I came across the problem of writing unit tests for Celery tasks. Writing unit tests for celery tasks can be painful since they are asynchronous and long. I will be explaining how to write unit tests for Celery tasks and will try to address different cases for this. For this tutorial we will be using python’s native unittest library.
Prerequisites
- You should know the basics of Celery to follow the tutorial.
- If you know basics of writing tests then its awesome but not mandatory for this tutorial.
Environment Setup
- This tutorial is written for python 2.x and should work or python 3.x also.
- You must have working installation of Celery, RabbitMQ and Redis (redis because we want to store the results)
That was all let’s dive into the tutorial
We will be creating a very simple celery task which adds two numbers
Open your favourite text editor and paste the following code and save it as tasks.py
from celery import Celery
from time import sleep
BROKER_URL = ‘amqp://guest@localhost:5672//’
REDIS = ‘redis://localhost/0’
app = Celery(‘test’, broker=BROKER_URL, backend=REDIS)
@app.task(name=”add”)
def add(x, y):
return x+y
Now let’s write some tests for this tasks.
We have to test whether our task returns the correct value for given two numbers or not. So we will write one test case class to test all the possible tests for this tasks.
Now create a file named tests.py
in the same directory as tasks.py
and paste the following code.
from tasks import add, app
import unittest
class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.apply_async(args=[3, 5])
self.results = self.task.get()
Let’s see what we did back up there. We inherited the unittest.TestCase
class and defined a setUp
method. This method is used to initialize the tests and set necessary variables for testing. Here we called our add
task and with arguments 3, 5 and we called get method on it to get the result. This get method call wait till the task finishes. When the tasks finishes we got the results and test initialization is completed. Now let’s test the output of this task.
Let’s check our task’s state whether it returned SUCCESS or FAILURE
In the above mentioned TestAddTask
class we now define a method to check state of our task
def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
In the above mentioned TestAddTask
class now we define a method to test our output.
def test_addition(self):
self.assertEqual(self.results, 8)
Complete test case class
from tasks import add, app
import unittest
class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.apply_async(args=[3, 5])
self.results = self.task.get()
def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
def test_addition(self):
self.assertEqual(self.results, 8)
Now let’s run the tests.
Start celery worker: Inside the directory where tasks.py
is run the following command.
celery -A tasks worker — loglevel=INFO
Run the tests: Inside the directory where tests.py
is run the following command
python -m unittest discover
You should see an output something like below
➜ python -m unittest discover..
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Ran 2 tests in 5.353s
OK
Voila we successfully ran the tests on celery tasks.
What if my tasks are on remote machine and I can’t import them?
Don’t worry just replace apply_async
function with the following statement
self.task = app.send_task(‘add’, args=[3, 5])
Now let’s take an example where the task does not return any results
Let’s write a task which downloads an image and writes it to a file. Update tasks.py
with following task
import requests
@app.task(name=’download-image’, ignore_results=True)
def download_image(url):
r = requests.get(url)
with open(‘image.jpg’, ‘wb’) as f:
f.write(r.content)
Let’s see how should we test this task. Write the following code in the tests.py
class TestDownloadTask(unittest.TestCase):
def setUp(self):
self.task = app.send_task('download-image',
args=['https://www.math.ust.hk/~masyleung/Teaching/CAS/MATLAB/image/images/cameraman.jpg'])
self.results = self.task.get()
def test_task_state(self):
self.assertEqual(self.task.state, 'SUCCESS')
def test_download(self):
self.assertEqual(os.path.exists('image.jpg'), True)
Here we check that our task has wrote the image successfully or not by using os
library.
Check out more about Testing Celery Tasks at official docs. Please comment if you find something wrong.