The Django-Scalable package introduces a base abstract Scalable class, inherited from the Model class, and some number of utilities, appropriate to spread processing a huge set of instances to any number of workers running in parallel.
Stable version from the PyPi package repository
pip install django-scalableLast development version from the GitHub source version control system
pip install git+git://github.com/nnseva/django-scalable.git
The package uses select_for_update() call with skip_locked parameter,
so it is compatible with the only restricted set of database backends:
- PostgreSQL
- Oracle
- MySQL 8.0.1+
- MariaDB 10.6+
See details in the Django documentation.
Include the scalable application into the INSTALLED_APPS list, like:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
...
'scalable',
...
]Use the following available settings to tune the package behaviour:
-
SCALABLE_ACQUIRE_TIMEOUT- the maximum single processing timeout in seconds, after which the acquired instance should be unaquired automatically, and processing will be tried to start again. Default is600. -
SCALABLE_ACQUIRE_LIMIT- the maximum number of instances to be aquired simultaneously by one call ofaquire. Default is100.
Let's imaging that you have a huge table with a lot of instances to be processed by some algorithm.
The algorithm processes every instance separately, and independently, so you can parallelize processing to some number of workers, which could process these instances. But how to spread instances among your workers, to avoid:
- double processing, when two workers try to aquire the same instance to process simultaneously
- single worker overload, when too many instances to be processed appear unexpectedly and aquired by one worker
The package solves these problems.
The whole processing of one instance may be introduced as the following sequence of three steps:
- acquire an instance by the single worker, so other workers will not try to acquire it
- process the instance, evaluating an algorithm on the acquired instance
- unacquire (free) the instance
We will acquire an instance (really some number of instances, determined by the SCALABLE_ACQUIRE_LIMIT setting)
updating special fields, and allow selecting acquired instances for processing by the worker. The
package provides methods to acquire and unacquire instances, select instances acquired by this worker, and
unaquire instances whose processing took too much time.
The timeout proceed may mean, that the processing was stopped unexpectedly because of worker crash, and needs to be restarted again. Select the timeout length to the appropriate value. While too long processing is running, a special reacquire method may be called to renew the acquire timestamp and avoid unacquiring of instances processed properly, but too much time.
You should inherit your model from scalable.Scalable abstract model to make it using the package.
This base model introduces two additional fields to the model:
- acquired_by
- acquired_at
Fields are indexed, and filled by non-null value only when the instance is acquired. They are marked as
editable=False, so will not be visible from the admin, or API by default. However, you can make them visible
using explicit declaration of readonly fields in admin or API.
Use model-wide methods to manipulate instances on the low level, as described in the chapter below.
The low-level model-wide methods allow directly:
acquire()- acquire a set of instancesreacquire()- reacquire this set while too long processing timeacquired()- get this set of instances as a querysetunacquire()- unacquire the set of instances acquired beforeunacquire_timed_out()- cleanup acquire fields of instances acquired at too much time ago
All these methods are model-wide, so you should use a class method call syntax to call them.
For example, if your model is called Person, you will use the following symtax to call acquire():
acquired = Person.acquire(worker_name)acquire_limitdetermines maximum number of model instances acquired by oneacquirecallacquire_timeoutdetermines a maximum time limit while acquired instances remain acquired withoutreacquirecall
While attributes have a None value, the global settings are used instead:
settings.SCALABLE_ACQUIRE_LIMITforacquire_limitsettings.SCALABLE_ACQUIRE_TIMEOUTforacquire_timeout
If no global settings are set, the following defaults are used:
100forsettings.SCALABLE_ACQUIRE_LIMIT600forsettings.SCALABLE_ACQUIRE_TIMEOUT
The scalable fields are inherited by all models inherited from the Scalable:
acquired_at- timestamp of acquire/reacquire actionacquired_by- string value of theacquired_byparameter of theacquirefunction call
Instances have a non-null value in these fields only while the instance is acquired.
Acquires a set of model instances. A size of the set is limited by the limit parameter, which by default may be
determined by the global SCALABLE_ACQUIRE_LIMIT setting, or acquire_limit class-wide variable member.
If no any of these variables are set, value 100 is used as a default.
The acquired_by string parameter identifies worker which acquires instances.
Use any unique among concurrent workers name. When these instances are unacquired, the name appears
to be free and available to reuse. You can use same names in models having different tables in the database.
All just acquired instances will have the same value in the acquired_by field,
equal to the acquired_by parameter of the acquire() method call.
The acquired_at parameter may be used to manually set the acquired_at field of the acquired instance.
The default value is got calling timezone.now(). This field plays the role only when unacquire_timed_out()
is called.
You can use queryset parameter to make the source queryset differ from the objects member of the model.
Use queryset if you would not like to acquire any available instances, but rather should acquire only filtered
subset of the model instances.
For example, if your model is called Person, you can use the following symtax to call acquire() to acquire
only male persons:
acquired = Person.acquire(worker_name, queryset=Person.objects.filter(gender='male'))The returned queryset is based on the acquired_by worker name filter, having a nature of something like a snapshot.
The future scan of the acquired instances will return the same list of instances, until unacquired, even when they
change values of fields used to make a filter of the original queryset.
Unacquires a set of model instances identified by the acquired_by worker name. It unacquires all identified
instances by default, but can be restricted by the queryset explicitly passed as a parameter. Unacquire just
updates fields acquired_by and acquired_at of filtered instances to the null values.
Reacquires a set of model instances, just updating the acquired_at to the passed value, or to the
timezone.now() value by default.
Present parameters and return value have same meaning as for the acquire() call.
Returns a queryset containing a current list of acquired instances. You can restrict this
queryset passing a queryset patrameter. It will be used instead of the objects member of the model
to create a queryset to be returned.
Unacquires all instances whose acquired_at field has a value lower than now minus acquire timeout.
The now parameter will have a value timezone.now() by default.
The queryset parameter may be used to filter out instances. The objects model attribute will be
used by default, if queryset parameter is not passed.
- auto-reacquire while acquired, in a separate thread
- async versions of low-level methods for Django>=4.2
- high-level
scalablecontextmanager/decorator, sync and async (async for Django>=4.2)