This document describes the asynchronous task processing and scheduling system used in the FastAPI Best Architecture. The system is built on Celery with a custom database-driven scheduler that enables persistent, timezone-aware task scheduling with distributed locking for high availability.
For information about the CLI commands used to manage Celery services, see page 4.1. For details on how tasks integrate with the main application lifecycle, see page 3.4.
This page provides a comprehensive overview of the background processing system. For detailed information on:
The background processing system consists of three primary components:
| Component | Purpose | Implementation |
|---|---|---|
| Celery Workers | Execute asynchronous tasks using gevent pool | backend/app/task/celery.py |
| Celery Beat | Schedule periodic tasks with database persistence | backend/app/task/utils/schedulers.py |
| Flower | Monitor task execution and worker status | Started via CLI or shell script |
The system supports both manual task execution (triggered via API) and scheduled execution (interval-based or cron-based), with all task definitions persisted in the database for dynamic management.
Sources: backend/app/task/celery.py backend/app/task/utils/schedulers.py backend/celery-start.sh
The Celery application is initialized in backend/app/task/celery.py with support for both Redis and RabbitMQ brokers, and a custom database-backed result storage.
Celery App Configuration Overview
Key Configuration:
| Configuration Parameter | Value | Purpose |
|---|---|---|
beat_scheduler | DatabaseScheduler | Custom database-driven scheduler |
task_cls | TaskBase | Custom task base class |
task_track_started | True | Track when tasks start execution |
enable_utc | False | Use local timezone |
timezone | settings.DATETIME_TIMEZONE | Application timezone |
The system supports both Redis and RabbitMQ as message brokers, automatically discovers task modules, and includes async task support via celery_aio_pool.
For detailed configuration including broker setup, worker pools, and Flower monitoring, see page 9.1.
Sources: backend/app/task/celery.py21-59
The system uses a DatabaseScheduler that replaces Celery's default in-memory scheduler with a persistent database-backed implementation. This enables dynamic task creation and modification at runtime.
DatabaseScheduler Lifecycle
Distributed Locking: Redis-based distributed lock prevents multiple beat instances from executing simultaneously. Lock is acquired on startup with a 25-second timeout and extended during each tick cycle backend/app/task/utils/schedulers.py349-355
Schedule Change Detection: The scheduler monitors Redis for external schedule changes and invalidates its internal heap when modifications are detected backend/app/task/utils/schedulers.py383-396
Persistent Storage: Task schedules are stored in the sys_task_scheduler table, allowing dynamic creation and modification via the API.
For complete implementation details including distributed locking mechanisms, crontab parsing, and task control routes, see page 9.2.
Sources: backend/app/task/utils/schedulers.py34-460
The ModelEntry class wraps a TaskScheduler database model into a Celery ScheduleEntry backend/app/task/utils/schedulers.py42-269
Schedule Type Parsing:
| Type | Fields Used | Celery Schedule Object |
|---|---|---|
INTERVAL | interval_every, interval_period | schedules.schedule(timedelta) |
CRONTAB | crontab | TzAwareCrontab |
Crontab Format: The crontab string is split into 5 components backend/app/task/utils/schedulers.py59-66:
minute hour day_of_week day_of_month month_of_year
Example: "0 2 * * *" → Runs at 2:00 AM daily
The TzAwareCrontab class extends Celery's crontab schedule to be timezone-aware backend/app/task/utils/tzcrontab.py10-51:
The custom is_due() method backend/app/task/utils/tzcrontab.py24-37 uses the application's timezone utility to calculate task due times, ensuring consistent behavior across different server timezones.
Validation: The crontab_verify() function validates crontab expressions before saving backend/app/task/utils/tzcrontab.py53-66:
celery.schedules.crontab objectRequestError if parsing failsSources: backend/app/task/utils/tzcrontab.py backend/app/task/utils/schedulers.py42-103
Workers are started with gevent pool for concurrent execution:
| Option | Value | Purpose |
|---|---|---|
-A | backend.app.task.celery | App location |
-l | info | Log level |
-P | gevent | Pool implementation (async) |
-c | 100 | Concurrency (100 greenlets) |
For detailed worker configuration including pool types and performance tuning, see page 9.1.
Sources: backend/celery-start.sh4
The scheduler runs in a synchronous context but needs to interact with async database operations. The run_await() utility bridges this gap backend/utils/_await.py56-82
Key Concept: The run_await decorator wraps async functions to make them callable from sync code. It:
Usage Examples in Scheduler:
run_await(lock.acquire)() backend/app/task/utils/schedulers.py458run_await(self.lock.extend)(...) backend/app/task/utils/schedulers.py353run_await(self.save)(save_fields) backend/app/task/utils/schedulers.py131run_await(self.get_all_task_schedulers)() backend/app/task/utils/schedulers.py425Sources: backend/utils/_await.py backend/app/task/utils/schedulers.py
The TaskSchedulerService provides CRUD operations for task schedules backend/app/task/service/scheduler_service.py
To execute a task immediately backend/app/task/service/scheduler_service.py119-141:
celery_app.control.ping() backend/app/task/service/scheduler_service.py128-130celery_app.send_task() to dispatch to the broker backend/app/task/service/scheduler_service.py140Error Handling:
ServerError if no workers are availableNotFoundError if task scheduler doesn't existRequestError if task parameters are invalid JSONSources: backend/app/task/service/scheduler_service.py
Task results are stored in the database using a custom DatabaseBackend configured in backend/app/task/celery.py37-39
Connection String Format:
db+{DATABASE_TYPE}+{driver}://{user}:{password}@{host}:{port}/{schema}
Where driver is:
pymysql for MySQLpsycopg for PostgreSQLExtended Results: Setting result_extended=True stores additional task metadata including arguments, name, and worker information backend/app/task/celery.py39
Sources: backend/app/task/celery.py37-42
Flower provides a web-based interface for monitoring Celery workers and tasks. It is started alongside workers with basic authentication:
Key Monitoring Capabilities:
| Feature | Description |
|---|---|
| Task Monitoring | View active, scheduled, and completed tasks |
| Worker Status | Monitor worker health and availability |
| Rate Limiting | View and modify task rate limits |
| Task History | Browse historical task executions |
| Queue Stats | Monitor queue lengths and throughput |
When deployed with nginx, Flower is reverse-proxied at the /flower/ path. For detailed Flower configuration and monitoring setup, see page 9.1.
Sources: backend/celery-start.sh9-10
Use the shell script backend/celery-start.sh:
The fba_celery service in Docker Compose runs all three components using supervisord:
Service Definition:
SERVER_TYPE=celeryNetworking:
For detailed deployment configurations, see page 10.1.
Sources: backend/celery-start.sh
| Setting | Default | Purpose |
|---|---|---|
CELERY_BROKER | 'redis' | Broker type (redis/rabbitmq) |
CELERY_BROKER_REDIS_DATABASE | 1 | Redis database for broker |
CELERY_REDIS_PREFIX | 'fba_celery' | Redis key prefix |
DATETIME_TIMEZONE | Application timezone | Timezone for schedules |
Interval Schedule:
Crontab Schedule:
ModelEntry Options backend/app/task/utils/schedulers.py82-93:
queue: Route to specific queueexchange: Use specific exchangerouting_key: Routing key for messageexpires_: Task expiration timestart_time: Delay first executionone_off: Execute only onceSources: backend/core/conf.py backend/app/task/utils/schedulers.py82-102
Sources: backend/app/task/utils/schedulers.py backend/app/task/service/scheduler_service.py backend/celery-start.sh
Refresh this wiki
This wiki was recently refreshed. Please wait 7 days to refresh again.