Django Concurrency, Database Locking and Refreshing Model Objects
Join the DZone community and get the full member experience.
Join For FreeWhat we really needed to do is to lock rows in the database, where a row corresponds to a particular model instance, so that no other operation could modify it whilst a change is taking place.
Database row locking is typically done with SELECT ... FOR UPDATE; and unfortunately Django has no built in support for this. Hopefully it will be added in Django 1.4.
Our solution to this is to manually modify a sql query to make it FOR UPDATE. This obtains a lock on the row, and any other query against that row will be blocked until the lock is released. This must be done inside a transaction as the lock is released when the transaction completes (commit or rollback).
Our initial attempt created a query that would just fetch a single field from the particular model instance we wanted to lock. The query is then made into a FOR UPDATE query and executed to obtain the lock.
This query fetches the field we want to modify. Once we obtain the lock we know that nothing else can modify the field whilst we are using it. We need to fetch the field again (and update the field on our model instance) after obtaining the lock because the model object we already have has a value, but obtaining the lock may have blocked whilst another request modified the value (we always need the latest value at this point - so even if we weren't blocked we want to update after obtaining the lock. As changes to this particular field are always guarded with the lock, in practise we would have been blocked if something else was changing it).
Our initial iteration was only interested in one field that may have changed due to a concurrent modification, so the code that obtains the lock updates that field:
@transaction.commit_on_success
def do_something(self):
self._get_lock()
# do stuff
def _mangle_sql_for_locking(self, sql):
# yeah, it's really this difficult
return sql + ' FOR UPDATE'
def _get_lock(self):
query = SomeModel.objects.filter(id=self.id).values_list('field')
sql, params = query._as_sql(connection=connection)
cursor = connection.cursor()
cursor.execute(self._mangle_sql_for_locking(sql), params)
# acquire the lock and update the field
field = cursor.fetchone()[0]
self.field = field
This worked fine until we had more than one field we were interested in. Our naive attempt to modify the code looked like this:
def _get_lock(self):This blows up in the _as_sql call with this exception:
values = ('field', 'other_field')
query = SomeModel.objects.filter(id=self.id).values_list(*values)
sql, params = query._as_sql(connection=connection)
Cannot use a multi-field ValuesListQuerySet as a filter value.
There was no mention of this in the docs and google didn't yield much, but then we are calling a private method directly.
Sooo... how about a method that will refresh all fields with the latest version from the db? I've always been slightly surprised that model instances don't have this capability built-in (maybe I've just missed it?), but maybe it's a bit of an anti-pattern outside of very specific use cases.
So just in case you're tempted to go down the same rabbit holes as me, here's a refresh method that fetches the latest version of all fields and updates the model instance.
def refresh(self):
updated = SomeModel.objects.filter(id=self.id)[0]
fields = [f.name for f in self._meta._fields()
if f.name != 'id']
for field in fields:
setattr(self, field, getattr(updated, field))
This allows us to obtain the lock (still generate and execute the FOR UPDATE SQL) but discard the result as all fields are updated with another query (with the additional overhead that implies of course).
Note that for our code that does the locking there is a sane reason that the SQL locking, trivial as it maybe, is in its own method. The actual code has an additional method and call in it:
def _mangle_sql_for_locking(self, sql):
# yeah, it's really this difficult
return sql + ' FOR UPDATE'
def _concurrency_poison(self):
pass
def _get_lock(self):
values = ('field', 'other_field')
query = SomeModel.objects.filter(id=self.id).values_list(*values)
sql, params = query._as_sql(connection=connection)
cursor = connection.cursor()
cursor.execute(self._mangle_sql_for_locking(sql), params)
self._concurrency_poison()
The _concurrency_poison method does nothing in production, but it enables us to write tests that both prove there is a race condition and prove that it is fixed. In our tests we patch out _mangle_sql_for_locking with a function that returns the sql unmodified. We additionally patch out _concurrency_poison with a function that makes a concurrent modification.
Without the locking the "concurrent change" will be overwritten and the final value will be incorrect (the concurrent change will be lost). We test that we get the wrong final result, which proves we have a race condition.
A second test leaves _mangle_sql_for_locking unchanged, but still patches _concurrency_poison to make a concurrent change. Because this should now block (_concurrency_poison is called after the lock has been obtained) the concurrent change must be made from a separate process or thread. A typical example (used for both tests) might look something like this:
import subprocess
import time
from textwrap import dedent
from django.conf import settings
from mock import patch
ENV = {
'PGHOST': settings.DATABASES['default']['HOST'],
'PGUSER': settings.DATABASES['default']['USER'],
'PGPASSWORD': settings.DATABASES['default']['PASSWORD'],
'PGPORT': settings.DATABASES['default']['PORT'],
}
@patch('appname.models.Model._concurrency_poison')
def concurrently_modify(self, concurrency_mock):
# Poison: modify the database in an inconvenient way at an
# inconvenient time.
database_name = settings.DATABASES['default']['NAME']
proc = subprocess.Popen(['psql', database_name],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=ENV)
def poison():
proc.stdin.write(dedent('''
UPDATE some_table
SET field = field - 3;\n'''
))
proc.stdin.flush()
# give the database code a chance to execute
time.sleep(1)
concurrency_mock.side_effect = poison
# call the code that obtains the lock here
# it will automatically trigger the
# concurrent operation
In the second test the concurrent change is blocked until the lock is released. Instead of being overwritten the concurrent change is executed after the logic in the model, so the result will be different from the first test and we can assert that the result has this different (correct) value.
This testing pattern, which I think is pretty cool, was devised by David Owen who is the resident database expert on our team. He teaches me about databases whilst I encourage him that testing is useful...
Published at DZone with permission of Michael Foord, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Seven Steps To Deploy Kedro Pipelines on Amazon EMR
-
Microservices With Apache Camel and Quarkus (Part 2)
-
Introduction To Git
-
Authorization: Get It Done Right, Get It Done Early
Comments