Comprehensive coding & operational rules for building, running, and maintaining Python background-job systems with Celery, FastAPI, and Redis.
Your API shouldn't make users wait 30 seconds for a PDF to generate. Your web server shouldn't crash when processing thousands of email sends. Every time you handle a long-running task synchronously, you're creating bottlenecks that kill user experience and system reliability.
Most Python developers start with this pattern:
@app.post("/generate-report")
def create_report(request: ReportRequest):
# This blocks for 15-45 seconds
pdf_data = generate_complex_report(request.user_id)
send_email_with_attachment(request.email, pdf_data)
return {"status": "complete"}
The problems are immediate and expensive:
These Cursor Rules establish a production-grade background job system using Celery, FastAPI, and Redis that separates concerns cleanly and scales independently. You'll process tasks asynchronously, retry failures intelligently, and monitor everything that matters.
What you get immediately:
Your APIs return instantly while work happens in the background:
@app.post("/generate-report")
async def create_report(request: ReportRequest):
# Returns in ~50ms
task = generate_report_task.delay(request.user_id, request.email)
return {"task_id": task.id, "status": "processing"}
Tasks retry automatically with intelligent backoff:
Scale your components based on actual demand:
Rolling updates keep your system running:
# This pattern kills productivity
def process_user_signup(user_data):
validate_email(user_data.email) # 2s - external API
generate_avatar(user_data.profile) # 8s - image processing
send_welcome_email(user_data) # 3s - email service
sync_to_crm(user_data) # 5s - CRM integration
return "Welcome!" # User waits 18 seconds
# API responds instantly, work happens in background
@app.post("/signup")
async def signup(user_data: UserSignup):
# Validate immediately (fail fast)
if not user_data.email:
raise ValidationError("Email required")
# Queue background work
process_signup_task.delay(user_data.dict())
return {"status": "processing", "message": "Welcome! Check your email."}
# Background task handles the heavy lifting
@celery.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True)
def process_signup_task(self, user_data):
args = UserSignupArgs.parse_obj(user_data)
# Each step is idempotent and fault-tolerant
validate_email(args.email)
generate_avatar.delay(args.profile) # Parallel processing
send_welcome_email.delay(args)
sync_to_crm.delay(args)
# Complex multi-stage processing with error isolation
@celery.task
def generate_monthly_reports(company_id: str):
# Fan out to parallel workers
job_group = group([
compile_sales_data.s(company_id),
analyze_user_metrics.s(company_id),
generate_financial_summary.s(company_id)
])
# Combine results when all complete
callback = compile_final_report.s(company_id)
return chord(job_group)(callback)
app/
├── api/ # FastAPI routers
│ ├── jobs.py # Job submission endpoints
│ └── status.py # Task status checking
├── tasks/ # Celery tasks by domain
│ ├── reports.py # Report generation
│ ├── emails.py # Email sending
│ └── analytics.py # Data processing
├── services/ # Pure business logic
├── schemas/ # Pydantic models
└── settings.py # Configuration
Every task follows this bulletproof pattern:
from app.celery_app import celery
from app.schemas import TaskArgs, TaskResult
@celery.task(
bind=True,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
max_retries=5,
acks_late=True,
time_limit=300
)
def process_data(self, payload: dict) -> dict:
# 1. Validate input immediately
args = TaskArgs.parse_obj(payload)
# 2. Check if already processed (idempotency)
if already_processed(args.record_id):
return {"status": "skipped", "record_id": args.record_id}
# 3. Do the work with proper resource management
try:
with database_connection() as conn:
result = heavy_processing(args, conn)
return TaskResult(
record_id=args.record_id,
status="complete",
output_url=result.url
).dict()
except RetryableError as e:
# Celery handles retry automatically
raise e
except FatalError as e:
# Don't retry, alert immediately
alert_to_slack(f"Fatal error in {self.name}: {e}")
raise
@app.post("/submit-job")
async def submit_job(request: JobRequest):
task = process_data.delay(request.dict())
return {
"task_id": task.id,
"status_url": f"/tasks/{task.id}/status"
}
@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
result = celery.AsyncResult(task_id)
return {
"status": result.status,
"result": result.result if result.ready() else None
}
# settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
redis_url: str = "redis://localhost:6379"
celery_broker_url: str = redis_url
celery_result_backend: str = redis_url
# Worker tuning
worker_concurrency: int = 4
worker_prefetch_multiplier: int = 1
task_time_limit: int = 300
task_soft_time_limit: int = 240
These rules eliminate the guesswork from background job systems. You'll ship features faster, handle failures gracefully, and scale components independently. Your users get responsive interfaces while complex work happens reliably behind the scenes.
The architecture scales from startup MVPs to enterprise systems processing millions of jobs daily. Start with the core patterns, then layer on advanced features like task chaining, priority queues, and custom retry strategies as your needs grow.
You are an expert in Python, Celery, FastAPI, Redis, RabbitMQ, Docker, and Kubernetes.
Technology Stack Declaration
- Runtime: CPython ≥3.10, poetry for dependency management.
- Queue brokers: Redis (default) or RabbitMQ for ordered, durable delivery.
- Task runner: Celery 5 or newer with event-driven worker autoscaling.
- API layer: FastAPI for job submission & status endpoints.
- Containerization: Docker + Kubernetes with HPA driven by queue length metrics.
Key Principles
- Keep HTTP threads free: offload any task >200 ms to a background job.
- Write idempotent, side-effect-safe tasks: they may be executed ≥1 time.
- Model every task as a pure function + explicit I/O; never depend on global state.
- Fail fast, retry cheap: throw early, retry with exponential back-off for transient errors.
- Separate orchestration (API) from execution (workers). Never import Web code into workers.
- Treat infrastructure as code: version all queues, schedules, and cron rules.
Python
- Use type hints everywhere; enforce with mypy strict.
- Follow PEP8 + black formatting (line length 100).
- Directory layout:
"""
app/
api/ # FastAPI routers
tasks/ # Celery tasks (one file per domain area)
services/ # pure logic shared by tasks & API
schemas/ # pydantic models
settings.py # dynaconf / pydantic-settings
tests/
"""
- Name task functions with verb_noun(): send_email, crunch_numbers.
- Return TaskResult dataclass / pydantic model, never raw dict.
- Use context managers for external resources; close connections inside the task.
Error Handling and Validation
- At top of every task:
1. Validate input with pydantic, raise ValidationError.
2. Abort if pre-conditions fail (e.g., record already processed).
- Wrap network / IO calls in specific exceptions and map to Celery retry():
"""python
@shared_task(bind=True, autoretry_for=(HTTPError,), retry_backoff=True)
def fetch_report(self, report_id: str):
...
"""
- Configure a global "task_failure" signal → Sentry + Slack alert.
- Distinguish between fatal (no retry) and transient (retry) errors via custom BaseError hierarchy.
Celery Rules
- Always set acks_late=True, task_acks_on_failure_or_timeout=False to avoid lost work.
- Hard-time-limit: 90% of SLAs; soft-time-limit: 80%.
- Use task routing keys per domain (emails.*, billing.*). Map to dedicated queues.
- Enable result backend only if result needed; prefer emitting event to Kafka for heavy data.
- Periodic tasks declared via @celery_app.on_after_finalize not cron tab files.
- Use chord/group for parallelizable work; never spawn >1000 child tasks in a single chord.
Additional Sections
Testing
- Unit test tasks as pure functions with fixtures, not Celery integration.
- Use celery.contrib.testing for E2E in-memory broker.
- Include chaos tests: kill worker process mid-task, ensure replay works.
Performance
- Batch small tasks: combine ≤100 DB writes per job to reduce overhead.
- Schedule CPU-bound jobs on isolated worker pool, concurrency = cores – 1, worker_prefetch_multiplier=1.
- Run heavy jobs off-peak via celery-beat cron (e.g., "0 3 * * *").
Security
- Never put PII in task arguments; pass reference IDs.
- Encrypt payloads at rest when using Redis (use TLS & AUTH).
- Run workers with minimal OS privileges; no root.
Observability
- Expose /metrics with celery-prometheus; alert on:
• queue_latency_seconds p95 > 5m
• task_failure_total > threshold per task type
- Tag every task with correlation_id from incoming request for trace continuity.
Deployment & Operations
- Use rolling updates: scale up new worker pool, drain old with --autoscale=0,0.
- Keep at least one canary worker running new code before full rollout.
- Backup broker data daily; store DLQ (dead-letter queue) messages ≥14 days.
Common Pitfalls & Guardrails
- DON’T nest Celery tasks inside tasks; prefer chords or callbacks.
- DON’T rely on local file system; use object storage.
- DON’T block in event loop; if using AsyncIO inside task, call loop.run_until_complete.
Example Complete Task
"""python
from app.celery_app import celery
from app.services.pdf import build_invoice_pdf
from app.schemas import InvoiceTaskArgs, InvoiceTaskResult
@celery.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5, acks_late=True, time_limit=300)
def generate_invoice(self, payload: dict) -> dict:
args = InvoiceTaskArgs.parse_obj(payload)
if args.invoice_already_generated():
return {"status": "skipped", "invoice_id": args.invoice_id}
pdf_bytes = build_invoice_pdf(args)
url = upload_to_s3(pdf_bytes, f"invoices/{args.invoice_id}.pdf")
return InvoiceTaskResult(invoice_id=args.invoice_id, pdf_url=url).dict()
"""