Over a million developers have joined DZone.

Django concurrency, database locking and refreshing model objects

· Web Dev Zone
Using expressions to make some of our model updates atomic (as discussed previously) wasn't sufficient to make all of our operations safe for concurrent database modifications (although still useful). This is because having fetched some values we wanted to perform operations based on those values, and they must not change whilst the operations are taking place (because the end result will be written back and would overwrite any other changes made).

What we really needed to do is to lock rows in the database, where a row corresponds to a particular model instance, so that no other operation could modify it whilst a change is taking place.

Database row locking is typically done with SELECT ... FOR UPDATE; and unfortunately Django has no built in support for this. Hopefully it will be added in Django 1.4.

Our solution to this is to manually modify a sql query to make it FOR UPDATE. This obtains a lock on the row, and any other query against that row will be blocked until the lock is released. This must be done inside a transaction as the lock is released when the transaction completes (commit or rollback).

Note

Since I started writing this blog entry (interrupted by my wife inconsiderately giving birth to our beautiful daughter Irina) it has been committed. New in Django development version: the QuerySet select_for_update method.

Our initial attempt created a query that would just fetch a single field from the particular model instance we wanted to lock. The query is then made into a FOR UPDATE query and executed to obtain the lock.

This query fetches the field we want to modify. Once we obtain the lock we know that nothing else can modify the field whilst we are using it. We need to fetch the field again (and update the field on our model instance) after obtaining the lock because the model object we already have has a value, but obtaining the lock may have blocked whilst another request modified the value (we always need the latest value at this point - so even if we weren't blocked we want to update after obtaining the lock. As changes to this particular field are always guarded with the lock, in practise we would have been blocked if something else was changing it).

Our initial iteration was only interested in one field that may have changed due to a concurrent modification, so the code that obtains the lock updates that field:

@transaction.commit_on_success
def do_something(self):
    self._get_lock()
    # do stuff

def _mangle_sql_for_locking(self, sql):
    # yeah, it's really this difficult
    return sql + ' FOR UPDATE'

def _get_lock(self):
    query = SomeModel.objects.filter(id=self.id).values_list('field')
    sql, params = query._as_sql(connection=connection)

    cursor = connection.cursor()
    cursor.execute(self._mangle_sql_for_locking(sql), params)

    # acquire the lock and update the field
    field = cursor.fetchone()[0]
    self.field = field


This worked fine until we had more than one field we were interested in. Our naive attempt to modify the code looked like this:

def _get_lock(self):
    values = ('field', 'other_field')
    query = SomeModel.objects.filter(id=self.id).values_list(*values)
    sql, params = query._as_sql(connection=connection)

This blows up in the _as_sql call with this exception:

Cannot use a multi-field ValuesListQuerySet as a filter value.


There was no mention of this in the docs and google didn't yield much, but then we are calling a private method directly.

Sooo... how about a method that will refresh all fields with the latest version from the db? I've always been slightly surprised that model instances don't have this capability built-in (maybe I've just missed it?), but maybe it's a bit of an anti-pattern outside of very specific use cases.

So just in case you're tempted to go down the same rabbit holes as me, here's a refresh method that fetches the latest version of all fields and updates the model instance.

def refresh(self):
    updated = SomeModel.objects.filter(id=self.id)[0]
    fields = [f.name for f in self._meta._fields()
              if f.name != 'id']
    for field in fields:
        setattr(self, field, getattr(updated, field))

This allows us to obtain the lock (still generate and execute the FOR UPDATE SQL) but discard the result as all fields are updated with another query (with the additional overhead that implies of course).

Note that for our code that does the locking there is a sane reason that the SQL locking, trivial as it maybe, is in its own method. The actual code has an additional method and call in it:

def _mangle_sql_for_locking(self, sql):
    # yeah, it's really this difficult
    return sql + ' FOR UPDATE'

def _concurrency_poison(self):
    pass

def _get_lock(self):
    values = ('field', 'other_field')
    query = SomeModel.objects.filter(id=self.id).values_list(*values)
    sql, params = query._as_sql(connection=connection)

    cursor = connection.cursor()
    cursor.execute(self._mangle_sql_for_locking(sql), params)
    self._concurrency_poison()

The _concurrency_poison method does nothing in production, but it enables us to write tests that both prove there is a race condition and prove that it is fixed. In our tests we patch out _mangle_sql_for_locking with a function that returns the sql unmodified. We additionally patch out _concurrency_poison with a function that makes a concurrent modification.

Without the locking the "concurrent change" will be overwritten and the final value will be incorrect (the concurrent change will be lost). We test that we get the wrong final result, which proves we have a race condition.

A second test leaves _mangle_sql_for_locking unchanged, but still patches _concurrency_poison to make a concurrent change. Because this should now block (_concurrency_poison is called after the lock has been obtained) the concurrent change must be made from a separate process or thread. A typical example (used for both tests) might look something like this:

import subprocess
import time
from textwrap import dedent

from django.conf import settings
from mock import patch

ENV = {
    'PGHOST': settings.DATABASES['default']['HOST'],
    'PGUSER': settings.DATABASES['default']['USER'],
    'PGPASSWORD': settings.DATABASES['default']['PASSWORD'],
    'PGPORT': settings.DATABASES['default']['PORT'],
}

@patch('appname.models.Model._concurrency_poison')
def concurrently_modify(self, concurrency_mock):
    # Poison: modify the database in an inconvenient way at an
    # inconvenient time.
    database_name = settings.DATABASES['default']['NAME']
    proc = subprocess.Popen(['psql', database_name],
        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
        stderr=subprocess.PIPE, env=ENV)

    def poison():
        proc.stdin.write(dedent('''
            UPDATE some_table
            SET field = field - 3;\n'''
        ))
        proc.stdin.flush()
        # give the database code a chance to execute
        time.sleep(1)

    concurrency_mock.side_effect = poison

    # call the code that obtains the lock here
    # it will automatically trigger the
    # concurrent operation

In the second test the concurrent change is blocked until the lock is released. Instead of being overwritten the concurrent change is executed after the logic in the model, so the result will be different from the first test and we can assert that the result has this different (correct) value.

This testing pattern, which I think is pretty cool, was devised by David Owen who is the resident database expert on our team. He teaches me about databases whilst I encourage him that testing is useful... Smile

Topics:

Published at DZone with permission of Michael Foord, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}