Friday, April 5, 2013

Using SQLAlchemy with Celery Tasks

If you are reading this, then you probably know what both SQLAlchemy and Celery are, but for the uninitiated, here is a (very) brief introduction. SQLAlchemy is a full-featured python Object Relational Mapper (ORM) that lets one perform operations on a database using python classes and methods instead of writing SQL. Celery is (in their own words), "is an asynchronous task queue/job queue based on distributed message passing." If you haven't heard or, or haven't used, either of these, I highly suggest you give them a try.

Ok, and now on to the meat of this post. If you are using SQLAlchemy (or any other database connection method, for that matter) and want to execute Celery tasks that retrieve items from the database, you have to deal with properly handling database sessions. Fortunately for us, the designers of both of these projects know what they are doing and it isn't a complicated matter.

First, let's start with creating a simple task without any database requirements (from the Celery tutorial):
# file: proj/tasks.py
from __future__ import absolute_import
import celery

@celery.task
def add(x, y):
    return x + y
The one thing that is missing from this is setting up Celery so that it is connected to the broker and backend. Many of the tutorials that you find on the web show placing the configuration of the celery object (and the database object) in the same file as the tasks being created. While that works fine for examples, it doesn't portray how it should be done in "real life." I find the best way to set up these resources is to create separate modules for each of the resources that you are going to use. As such, we would have a module for setting up the celery object and another one for setting up the database connections. (The Celery tutorial has exactly such an example here.) Why in separate modules instead of just one big resources.py module? Well, if we put all of them in one file, then all of those resources are setup when we may only require one of them. For example, let's say we write a script that only ever needs to connect to the database and never any celery tasks, why should that script create a celery object? So, on that note, let's create another file that sets up the celery object:
# file: proj/celery.py
from __future__ import absolute_import
from celery import Celery

celery = Celery(
    'tasks',
    broker='amqp://',
    backend='amqp',
    include=['tasks.py']
)

if __name__ == '__main__':
    celery.start()
As you can see from that configuration, we have tell Celery that all of our tasks live in a module called tasks.py. Now that we have Celery setup, let's create a very similar module for our SQLAlchemy (or any other database) connections.
# file: proj/db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'sqlite:///:memory:', convert_unicode=True,
    pool_recycle=3600, pool_size=10)
db_session = scoped_session(sessionmaker(
    autocommit=False, autoflush=False, bind=engine))
Great, now we can connect to our database using SQLAlchemy, but how does this all work with Celery tasks? Let's update our tasks module with a new task that gets something from the database. For the sake of the example, let's assume that we have some User model that contains some user information.
# file: proj/tasks.py
from __future__ import absolute_import
import celery

from proj.db import db_session

@celery.task
def get_from_db(user_id):
    user = db_session.query(User).filter(User.id=user_id).one()
    # do something with the user
But wait! If we do this, what happens to the database connection? Don't we have to close it? Yes, we do! Since we are using a scoped_session, we'll want to make sure that we release our connection from the current thread and return it to the session pool managed by SQLAlchemy. We could, just place a db_session.remove() at the end of our task, but that seems a bit fragile. Fortunately for us, there is a way for us to subclass the default Celery Task object and make sure that all connections are returned to the pool auto-magically. Let's update our tasks module again.
# file: proj/tasks.py
from __future__ import absolute_import
import celery

from proj.db import db_session

class SqlAlchemyTask(celery.Task):
    """An abstract Celery Task that ensures that the connection the the
    database is closed on task completion"""
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        db_session.remove()


@celery.task(base=SqlAlchemyTask)
def get_from_db(user_id):
    user = db_session.query(User).filter(User.id=user_id).one()
    # do something with the user
As you can see, we created a SqlAlchemyTask that implements the after_return handler that does the job of removing the session. The only change we had to make to our task was to make sure that we set the appropriate base Task type. Neat, huh? By the way, you can check out the other Task handlers here.

UPDATE: There was a question in a comment regarding dealing with data not being in the database when the celery task runs. There are two main ways you can handle this: delay the task or re-try the task (or both).

If you want to delay the task, you simply need to call your task using the apply_async() method with the countdown parameter set instead of the simpler delay() method. However, this is not ideal since you will never know with 100% certainty how long to wait, and since waiting, by definition, makes things run slower (from the user's point of view).

The better way to deal with this is to retry the task on failure. This way, if all goes well, it runs the first time, but if not, you fall back to trying again. Fortunately, this is easy to accomplish by updating the task decorator slightly and making the task retry on error.
@celery.task(base=SqlAlchemyTask, max_retries=10, default_retry_delay=60)
def get_from_db(user_id):
    try:
        user = db_session.query(User).filter(User.id=user_id).one()
    except NoResultFound as exc:
        raise get_from_db.retry(exc=exc)
    # do something with the user
In the example above, this task will retry 10 times and wait 60 seconds between subsequent retries. For this scenario this is most certainly overkill (I sure hope it doesn't take your main app 600 seconds to create a user object!), but in certain situations this might be appropriate (e.g. waiting for some analytics computation to complete). That's it. Now you have a Celery task that gets data from a database and releases the connection after completion. Happy coding!