Prevent duplicate Celery task execution using Redis locks.
When the same task is called multiple times with the same arguments, celery-once-task makes sure only one instance gets queued and only one instance runs at a time. It uses Redis to coordinate locks across workers.
The library provides two independent locks:
- Queue lock: acquired when
apply_async()/delay()is called. If a lock already exists for that task + arguments combination, the call is silently dropped (returnsNone). Released when the worker picks up the task. - Running lock: acquired when the worker starts executing the task. If another worker is already running the same task with the same arguments, the new execution is rejected. Released when the task finishes (success, failure, or revocation).
Both locks use Redis keys with a TTL, so they expire automatically if something goes wrong.
Lock keys are built from the task name and a SHA-256 hash of the arguments, so two calls with different arguments are treated as separate tasks.
pip install celery-once-taskFor Django integration:
pip install celery-once-task[django]- Python 3.9+
- Celery 5.0+
- Redis 4.0+ (Python client)
- A running Redis server
Using Django? Skip to Django Integration, it handles configuration and signals for you automatically.
Call this once at app startup, before any tasks run. Add it to your Celery app module (e.g., celery.py or wherever you create your Celery() instance).
# celery.py
from celery_once_task import configure
configure(
redis_url="redis://localhost:6379/3",
queue_lock_timeout=3600,
running_lock_timeout=3600,
)All three settings are optional. These are the defaults.
Add this right after your configure() call in the same file (e.g., celery.py).
# celery.py
from celery_once_task import setup_once_task_signals
setup_once_task_signals()This hooks into Celery's task_revoked and task_internal_error signals to release locks when tasks are revoked or hit internal errors.
Also in celery.py, after creating your Celery app instance:
# celery.py
from celery_once_task import OnceTaskUnlockBootStep
app.steps["worker"].add(OnceTaskUnlockBootStep)This releases running locks for any active tasks when a worker shuts down.
In your task modules (e.g., tasks.py):
# tasks.py
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
...That's it. Calling my_task.delay(42) multiple times will only queue one instance. If a worker is already running my_task(42), a second worker won't start another one.
If you use Django, the library provides an AppConfig that handles configuration and signal setup automatically. You only need two steps: add it to INSTALLED_APPS and register the boot step.
In your Django settings file (e.g., settings.py):
# settings.py
INSTALLED_APPS = [
...
"celery_once_task.django.OnceTaskAppConfig",
]In the same settings file (e.g., settings.py):
# settings.py
CELERY_ONCE_REDIS_URL = "redis://localhost:6379/3"
CELERY_ONCE_QUEUE_LOCK_TIMEOUT = 3600 # seconds, default: 3600
CELERY_ONCE_RUNNING_LOCK_TIMEOUT = 3600 # seconds, default: 3600All three are optional. The defaults are shown above.
In your Celery app module (e.g., celery.py):
# celery.py
from celery_once_task import OnceTaskUnlockBootStep
app.steps["worker"].add(OnceTaskUnlockBootStep)In your task modules (e.g., tasks.py):
# tasks.py
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask)
def my_task(taskArg1, taskArg2):
...| Setting | Type | Default | Description |
|---|---|---|---|
redis_url |
str |
redis://localhost:6379/3 |
Redis server URL for storing locks |
queue_lock_timeout |
int |
3600 |
TTL in seconds for queue locks |
running_lock_timeout |
int |
3600 |
TTL in seconds for running locks |
When using Django, prefix these with CELERY_ONCE_ and set them in your Django settings (e.g., CELERY_ONCE_REDIS_URL).
You can enable or disable each lock per task by passing queue_lock and running_lock directly in the decorator:
from celery import shared_task
from celery_once_task import OnceTask
@shared_task(base=OnceTask, queue_lock=False, running_lock=True)
def my_task():
...| Option | Type | Default | Description |
|---|---|---|---|
queue_lock |
bool |
True |
Enable/disable the queue lock for this task |
running_lock |
bool |
True |
Enable/disable the running lock for this task |
Examples:
# Only prevent concurrent execution, allow multiple queued instances
@shared_task(base=OnceTask, queue_lock=False)
def allow_queue_duplicates():
...
# Only prevent duplicate queueing, allow concurrent execution
@shared_task(base=OnceTask, running_lock=False)
def allow_parallel_runs():
...Set the global configuration. Call this once at startup before any tasks run.
Connect Celery signals for lock cleanup on task revocation and internal errors.
Disconnect the signals. Useful in tests.
Celery Task subclass. Use as base=OnceTask in your task decorators.
Exception raised (subclass of celery.exceptions.Reject) when a task is rejected because a running lock already exists.
Celery worker boot step that releases running locks on shutdown.
Lock keys follow this pattern:
once_task:{task_name}:{hash}:{lock_type}
task_name: the full dotted task name (e.g.,myapp.tasks.my_task)hash: first 16 characters of a SHA-256 hash of the serialized argumentslock_type: eitherqueueorrunning
Two calls with different arguments get different lock keys and run independently.
MIT