Friday, April 5, 2013

Using SQLAlchemy with Celery Tasks

If you are reading this, then you probably know what both SQLAlchemy and Celery are, but for the uninitiated, here is a (very) brief introduction. SQLAlchemy is a full-featured python Object Relational Mapper (ORM) that lets one perform operations on a database using python classes and methods instead of writing SQL. Celery is (in their own words), "is an asynchronous task queue/job queue based on distributed message passing." If you haven't heard or, or haven't used, either of these, I highly suggest you give them a try.

Ok, and now on to the meat of this post. If you are using SQLAlchemy (or any other database connection method, for that matter) and want to execute Celery tasks that retrieve items from the database, you have to deal with properly handling database sessions. Fortunately for us, the designers of both of these projects know what they are doing and it isn't a complicated matter.

First, let's start with creating a simple task without any database requirements (from the Celery tutorial):
# file: proj/tasks.py
from __future__ import absolute_import
import celery

@celery.task
def add(x, y):
    return x + y
The one thing that is missing from this is setting up Celery so that it is connected to the broker and backend. Many of the tutorials that you find on the web show placing the configuration of the celery object (and the database object) in the same file as the tasks being created. While that works fine for examples, it doesn't portray how it should be done in "real life." I find the best way to set up these resources is to create separate modules for each of the resources that you are going to use. As such, we would have a module for setting up the celery object and another one for setting up the database connections. (The Celery tutorial has exactly such an example here.) Why in separate modules instead of just one big resources.py module? Well, if we put all of them in one file, then all of those resources are setup when we may only require one of them. For example, let's say we write a script that only ever needs to connect to the database and never any celery tasks, why should that script create a celery object? So, on that note, let's create another file that sets up the celery object:
# file: proj/celery.py
from __future__ import absolute_import
from celery import Celery

celery = Celery(
    'tasks',
    broker='amqp://',
    backend='amqp',
    include=['tasks.py']
)

if __name__ == '__main__':
    celery.start()
As you can see from that configuration, we have tell Celery that all of our tasks live in a module called tasks.py. Now that we have Celery setup, let's create a very similar module for our SQLAlchemy (or any other database) connections.
# file: proj/db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'sqlite:///:memory:', convert_unicode=True,
    pool_recycle=3600, pool_size=10)
db_session = scoped_session(sessionmaker(
    autocommit=False, autoflush=False, bind=engine))
Great, now we can connect to our database using SQLAlchemy, but how does this all work with Celery tasks? Let's update our tasks module with a new task that gets something from the database. For the sake of the example, let's assume that we have some User model that contains some user information.
# file: proj/tasks.py
from __future__ import absolute_import
import celery

from proj.db import db_session

@celery.task
def get_from_db(user_id):
    user = db_session.query(User).filter(User.id=user_id).one()
    # do something with the user
But wait! If we do this, what happens to the database connection? Don't we have to close it? Yes, we do! Since we are using a scoped_session, we'll want to make sure that we release our connection from the current thread and return it to the session pool managed by SQLAlchemy. We could, just place a db_session.remove() at the end of our task, but that seems a bit fragile. Fortunately for us, there is a way for us to subclass the default Celery Task object and make sure that all connections are returned to the pool auto-magically. Let's update our tasks module again.
# file: proj/tasks.py
from __future__ import absolute_import
import celery

from proj.db import db_session

class SqlAlchemyTask(celery.Task):
    """An abstract Celery Task that ensures that the connection the the
    database is closed on task completion"""
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        db_session.remove()


@celery.task(base=SqlAlchemyTask)
def get_from_db(user_id):
    user = db_session.query(User).filter(User.id=user_id).one()
    # do something with the user
As you can see, we created a SqlAlchemyTask that implements the after_return handler that does the job of removing the session. The only change we had to make to our task was to make sure that we set the appropriate base Task type. Neat, huh? By the way, you can check out the other Task handlers here.

UPDATE: There was a question in a comment regarding dealing with data not being in the database when the celery task runs. There are two main ways you can handle this: delay the task or re-try the task (or both).

If you want to delay the task, you simply need to call your task using the apply_async() method with the countdown parameter set instead of the simpler delay() method. However, this is not ideal since you will never know with 100% certainty how long to wait, and since waiting, by definition, makes things run slower (from the user's point of view).

The better way to deal with this is to retry the task on failure. This way, if all goes well, it runs the first time, but if not, you fall back to trying again. Fortunately, this is easy to accomplish by updating the task decorator slightly and making the task retry on error.
@celery.task(base=SqlAlchemyTask, max_retries=10, default_retry_delay=60)
def get_from_db(user_id):
    try:
        user = db_session.query(User).filter(User.id=user_id).one()
    except NoResultFound as exc:
        raise get_from_db.retry(exc=exc)
    # do something with the user
In the example above, this task will retry 10 times and wait 60 seconds between subsequent retries. For this scenario this is most certainly overkill (I sure hope it doesn't take your main app 600 seconds to create a user object!), but in certain situations this might be appropriate (e.g. waiting for some analytics computation to complete). That's it. Now you have a Celery task that gets data from a database and releases the connection after completion. Happy coding!

29 comments:

  1. Thanks, exactly what I was looking for.

    ReplyDelete
  2. If you were to run multiple instances of get_from_db at the same, does that mean that they would share the same session?

    Then, if one of the sessions got in to an error state (e.g. insert failed due to foreign key error), then the other tasks would fail. Surely this isn't ideal behaviour?

    ReplyDelete
    Replies
    1. Good question. If you take a look at the way that I set up the SqlALchemy session, it makes use of a scoped_session (http://docs.sqlalchemy.org/en/rel_0_9/orm/session.html#contextual-thread-local-sessions). Under the hood this uses a thread-local variable, meaning that if there are concurrent calls to the database from different tasks, they are actually using different sessions (even though in code, it looks the same). In other words, if there is some error in one task, it doesn't affect other sessions in another task. If, however, I would not have made use of scoped_session, and just a single database connection, then you are correct.... but why use a single session when SqlAlchemy makes it so easy to use session pools?

      As a matter of fact, passing of an object to celery tasks is frowned upon as it means that you may have a stale object when you start executing that task. For example, let's say you create a User and then put some task in the Celery queue. And, for the sake of argument, this user logs in, updates his/her name before that celery task gets executed. Now, if you send an email to the user using the object that you passed to the task, you are stuck with the old data, rather than with the new data that actually resides in the database.

      Delete
  3. How do you like to handle synchronization between your "main" processing line (maybe a Flask server) and the creation of celery tasks. If you do the simple thing, then while you are in a db transaction, you will find yourself creating Celery tasks, and workers might even start working on them before the transaction that needed them done has completed. The Celery task might not even be able to see the resources you've created, but not committed.

    I guess I'd like a simple way to delay() Celery tasks, but keep them out of the task queue until after I commit the transaction that wanted them done. But I'm open to other ideas if you have them.

    ReplyDelete
    Replies
    1. Hi John. I just updated the post with some details on how to deal with that situation. Let me know if you have any other questions!

      Delete
    2. To followup, I ended up writing:
      def that_retries(task):
      @wraps(task)
      def retrying(*args, **kwargs):
      try:
      task(*args, **kwargs)
      except Exception as e:
      current_task.retry(exc = e, countdown=30)
      return retrying

      Which can be used like this:
      @celery.task
      @that_retries
      def dataset(ds_id):
      ds = app.models.Dataset.query.get(ds_id)

      Delete
    3. Nice! The one recommendation I would make is to parameterize on what exceptions to retry. For example, let's say you do some validation in the task and it raises some error (e.g. ValueError). Since the assumption is that this condition will never be met, even in subsequent retries, you will needlessly fill up you queue with retries that are bound to fail. Alternatively, you could create a custom exception that is raised only when you actually want to trigger a retry. Using either of these approaches would allow you to use a nice decorator, but it would also ensure no extra work is being done.

      Delete
    4. This comment has been removed by the author.

      Delete
    5. This comment has been removed by the author.

      Delete
  4. Nice article! Just a quick question:
    I have a lot of tasks that all need to write their result to a database. Would it be wise/good practice to make a separate task that waits for all taks to complete and then writes all the results into the db? That way the db doesn't get hammered on?

    ReplyDelete
    Replies
    1. That all depends on what you mean by "a lot". Most modern databases can handle quite a lot of simultaneous connections, thus, depending on your scenario, I wouldn't try to compute things in a task and then write it to your main DB later (I presume if you want to batch the writes to your main DB that you are storing these task level results somewhere temporarily - otherwise all tasks would have to wait until all of its peer tasks complete making it a memory problem instead of a DB problem). Furthermore, doing this sort of synchronized work adds quite a lot of complexity/error handling needs. What happens if one of the tasks fails? What happens if the "write to DB" task fails? What happens to all of your results that are waiting to be written to the DB if the server crashes? So, I'd suggest that unless you have a clear need to do something more drastic (e.g performance numbers from your DB suggesting that there is an issue), go with the simple solution. And with simple solution, you can also just add the retry logic detailed above to have it try again on failure. Hope that helps!

      Delete
  5. This is such a great article! Thanks!

    One nit pick:

    Should we change the following:
    user = db_session.query(User).filter(id=user_id).one()
    user = db_session.query(User).filter(User.id==user_id).one()

    Should it be

    ReplyDelete
    Replies
    1. You are correct! I've updated the examples above. Thanks for catching it.

      Delete
  6. This is a great write up.. Thank you!

    ReplyDelete
  7. Great post! I am still running into issues though. I am getting this error:

    ResourceClosedError: This result object does not return rows. It has been closed automatically.

    There error goes away if I set worker_concurrency = 1

    ReplyDelete
  8. Not exactly sure what your issue is, but here is a StackOverflow article that covers the same error:
    http://stackoverflow.com/questions/14375666/sqlalchemy-prevent-automatic-closing
    Hope that helps!

    ReplyDelete
    Replies
    1. Thanks, I saw that earlier. I think I have it working now, but have no idea what I did. I literally had to tear apart my application and build it up again piece-by-piece. Thanks for the help

      Delete
    2. I have met the same issue like yours. I wonder if it was caused by sqlalchemy session management. So have you figured it out?

      Delete
    3. think it's because celery doesn't create new worker process for each execution of a task, but reuse sone process from the pool. so, it sometimes happens that few tasks share the same db_session, and if yoy do db_session.remove() in one task, it's also removed in another one. i had similiar situation with concurrent.futures.ProcessPoolExecutor. there it helped to switch to concurrent.futures.ThreadPoolExecutor. i maneged to get rid of this error based on https://stackoverflow.com/questions/3199926/how-to-setup-sqlalchemy-session-in-celery-tasks-with-no-global-variable

      basicaly, i initialize db_session on signal @celeryd_init.connect

      Delete
  9. When we use celery to produce task, I import proj/tasks.py and run get_from_db.delay().

    I believe it will end up creating a new engine and session scope manager.

    I don't want to create engine object and session scope manager again.

    I want all workers to leverage same engine.

    How can we do that ?

    ReplyDelete
    Replies
    1. If you make use of the scoped_session, as done in the example above, you'll only ever get one session manager. Take a look at http://docs.sqlalchemy.org/en/latest/orm/contextual.html for details on how it works. Good luck!

      Delete
  10. can we write more than one periodic task using the approach mentioned in the article. If yes, then how to use scooped_session in those periodic tasks.

    ReplyDelete
    Replies
    1. You sure can, and you don't need to do anything else to make it work. The beautify of the scoped_sessions is that it is a registry of session objects that are then "local to the thread" using it. Basically, what you are doing when using a scoped_session is letting each thread get it's own session object from that registry. Take a look at the documentation on Contextual/Thread-Local Sessions. Good luck!

      Delete
    2. Thanks a lot for getting back really appreciate it. I will follow it, hope it will work

      Delete
  11. is that pool_size is 10 separately to different worker or shared between workers

    ReplyDelete
  12. Great work, thanks for writing this up!

    ReplyDelete
  13. Thank you for this piece, you saved me from 3 days headache.

    ReplyDelete