• MCP
  • Rules
  • Leaderboard
  • Generate Rule⌘U

Designed and Built by
GrowthX

  • X
  • LinkedIn
    1. Home
    2. Rules
    3. Python Background Job Rules

    Python Background Job Rules

    Comprehensive coding & operational rules for building, running, and maintaining Python background-job systems with Celery, FastAPI, and Redis.

    Stop Blocking Your APIs: Master Python Background Job Systems

    Your API shouldn't make users wait 30 seconds for a PDF to generate. Your web server shouldn't crash when processing thousands of email sends. Every time you handle a long-running task synchronously, you're creating bottlenecks that kill user experience and system reliability.

    The Hidden Cost of Synchronous Processing

    Most Python developers start with this pattern:

    @app.post("/generate-report")
    def create_report(request: ReportRequest):
        # This blocks for 15-45 seconds
        pdf_data = generate_complex_report(request.user_id)
        send_email_with_attachment(request.email, pdf_data)
        return {"status": "complete"}
    

    The problems are immediate and expensive:

    • HTTP timeouts kill user sessions
    • Server threads get exhausted under load
    • Memory usage spikes with concurrent long-running processes
    • Zero fault tolerance - one failure crashes everything
    • Impossible to scale processing independently from API serving

    Transform Your Architecture in 30 Minutes

    These Cursor Rules establish a production-grade background job system using Celery, FastAPI, and Redis that separates concerns cleanly and scales independently. You'll process tasks asynchronously, retry failures intelligently, and monitor everything that matters.

    What you get immediately:

    • HTTP responses in <200ms for any operation
    • Automatic retry logic with exponential backoff
    • Independent scaling of API servers and job workers
    • Built-in monitoring and failure alerting
    • Production-ready error handling and observability

    Key Benefits That Transform Your Development

    Immediate Response Times

    Your APIs return instantly while work happens in the background:

    @app.post("/generate-report")
    async def create_report(request: ReportRequest):
        # Returns in ~50ms
        task = generate_report_task.delay(request.user_id, request.email)
        return {"task_id": task.id, "status": "processing"}
    

    Bulletproof Reliability

    Tasks retry automatically with intelligent backoff:

    • Network failures → retry up to 5 times with exponential backoff
    • Database locks → retry with jitter to prevent thundering herd
    • External service timeouts → fail fast, alert immediately

    Independent Scaling

    Scale your components based on actual demand:

    • API servers scale with user traffic
    • CPU-intensive workers scale with processing load
    • Memory-intensive tasks get dedicated worker pools
    • Peak-hour processing runs on separate infrastructure

    Zero-Downtime Deployments

    Rolling updates keep your system running:

    • Deploy new worker code without dropping tasks
    • Drain old workers gracefully
    • Canary deployments catch issues before full rollout

    Real Developer Workflows

    Before: Blocked Development

    # This pattern kills productivity
    def process_user_signup(user_data):
        validate_email(user_data.email)        # 2s - external API
        generate_avatar(user_data.profile)     # 8s - image processing  
        send_welcome_email(user_data)          # 3s - email service
        sync_to_crm(user_data)                 # 5s - CRM integration
        return "Welcome!"  # User waits 18 seconds
    

    After: Async Excellence

    # API responds instantly, work happens in background
    @app.post("/signup")
    async def signup(user_data: UserSignup):
        # Validate immediately (fail fast)
        if not user_data.email:
            raise ValidationError("Email required")
        
        # Queue background work
        process_signup_task.delay(user_data.dict())
        return {"status": "processing", "message": "Welcome! Check your email."}
    
    # Background task handles the heavy lifting
    @celery.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True)
    def process_signup_task(self, user_data):
        args = UserSignupArgs.parse_obj(user_data)
        
        # Each step is idempotent and fault-tolerant
        validate_email(args.email)
        generate_avatar.delay(args.profile)  # Parallel processing
        send_welcome_email.delay(args)
        sync_to_crm.delay(args)
    

    Advanced Workflow: Report Generation Pipeline

    # Complex multi-stage processing with error isolation
    @celery.task
    def generate_monthly_reports(company_id: str):
        # Fan out to parallel workers
        job_group = group([
            compile_sales_data.s(company_id),
            analyze_user_metrics.s(company_id), 
            generate_financial_summary.s(company_id)
        ])
        
        # Combine results when all complete
        callback = compile_final_report.s(company_id)
        return chord(job_group)(callback)
    

    Implementation Guide

    1. Project Structure Setup

    app/
    ├── api/                 # FastAPI routers
    │   ├── jobs.py         # Job submission endpoints
    │   └── status.py       # Task status checking
    ├── tasks/              # Celery tasks by domain
    │   ├── reports.py      # Report generation
    │   ├── emails.py       # Email sending
    │   └── analytics.py    # Data processing
    ├── services/           # Pure business logic
    ├── schemas/            # Pydantic models
    └── settings.py         # Configuration
    

    2. Core Task Pattern

    Every task follows this bulletproof pattern:

    from app.celery_app import celery
    from app.schemas import TaskArgs, TaskResult
    
    @celery.task(
        bind=True,
        autoretry_for=(ConnectionError, TimeoutError),
        retry_backoff=True,
        max_retries=5,
        acks_late=True,
        time_limit=300
    )
    def process_data(self, payload: dict) -> dict:
        # 1. Validate input immediately
        args = TaskArgs.parse_obj(payload)
        
        # 2. Check if already processed (idempotency)
        if already_processed(args.record_id):
            return {"status": "skipped", "record_id": args.record_id}
        
        # 3. Do the work with proper resource management
        try:
            with database_connection() as conn:
                result = heavy_processing(args, conn)
            
            return TaskResult(
                record_id=args.record_id,
                status="complete",
                output_url=result.url
            ).dict()
            
        except RetryableError as e:
            # Celery handles retry automatically
            raise e
        except FatalError as e:
            # Don't retry, alert immediately
            alert_to_slack(f"Fatal error in {self.name}: {e}")
            raise
    

    3. FastAPI Integration

    @app.post("/submit-job")
    async def submit_job(request: JobRequest):
        task = process_data.delay(request.dict())
        return {
            "task_id": task.id,
            "status_url": f"/tasks/{task.id}/status"
        }
    
    @app.get("/tasks/{task_id}/status")
    async def get_task_status(task_id: str):
        result = celery.AsyncResult(task_id)
        return {
            "status": result.status,
            "result": result.result if result.ready() else None
        }
    

    4. Production Configuration

    # settings.py
    from pydantic_settings import BaseSettings
    
    class Settings(BaseSettings):
        redis_url: str = "redis://localhost:6379"
        celery_broker_url: str = redis_url
        celery_result_backend: str = redis_url
        
        # Worker tuning
        worker_concurrency: int = 4
        worker_prefetch_multiplier: int = 1
        task_time_limit: int = 300
        task_soft_time_limit: int = 240
    

    Results & Impact

    Immediate Productivity Gains

    • 90% faster API responses: Users see instant feedback instead of waiting
    • Zero timeout errors: Long-running tasks never block HTTP threads
    • 50% fewer support tickets: Reliable background processing eliminates user-facing failures

    Scaling Breakthroughs

    • Independent deployment cycles: Update job logic without touching API code
    • Resource optimization: CPU-heavy tasks run on different infrastructure than API servers
    • Cost efficiency: Scale workers based on queue depth, not peak API traffic

    Operational Excellence

    • Automatic failure recovery: Transient errors retry themselves
    • Complete observability: Every task tracked with correlation IDs
    • Zero-downtime deployments: Rolling worker updates keep jobs running

    Development Velocity

    • Clear separation of concerns: Business logic stays in services, orchestration in tasks
    • Easy testing: Pure functions test independently of Celery infrastructure
    • Predictable debugging: Structured logging and monitoring catch issues immediately

    These rules eliminate the guesswork from background job systems. You'll ship features faster, handle failures gracefully, and scale components independently. Your users get responsive interfaces while complex work happens reliably behind the scenes.

    The architecture scales from startup MVPs to enterprise systems processing millions of jobs daily. Start with the core patterns, then layer on advanced features like task chaining, priority queues, and custom retry strategies as your needs grow.

    Python
    Background Jobs
    Celery
    FastAPI
    Backend Development
    Task Queues
    Redis

    Configuration

    You are an expert in Python, Celery, FastAPI, Redis, RabbitMQ, Docker, and Kubernetes.
    
    Technology Stack Declaration
    - Runtime: CPython ≥3.10, poetry for dependency management.
    - Queue brokers: Redis (default) or RabbitMQ for ordered, durable delivery.
    - Task runner: Celery 5 or newer with event-driven worker autoscaling.
    - API layer: FastAPI for job submission & status endpoints.
    - Containerization: Docker + Kubernetes with HPA driven by queue length metrics.
    
    Key Principles
    - Keep HTTP threads free: offload any task >200 ms to a background job.
    - Write idempotent, side-effect-safe tasks: they may be executed ≥1 time.
    - Model every task as a pure function + explicit I/O; never depend on global state.
    - Fail fast, retry cheap: throw early, retry with exponential back-off for transient errors.
    - Separate orchestration (API) from execution (workers). Never import Web code into workers.
    - Treat infrastructure as code: version all queues, schedules, and cron rules.
    
    Python
    - Use type hints everywhere; enforce with mypy strict.
    - Follow PEP8 + black formatting (line length 100).
    - Directory layout:
      """
      app/
        api/            # FastAPI routers
        tasks/          # Celery tasks (one file per domain area)
        services/       # pure logic shared by tasks & API
        schemas/        # pydantic models
        settings.py     # dynaconf / pydantic-settings
      tests/
      """
    - Name task functions with verb_noun(): send_email, crunch_numbers.
    - Return TaskResult dataclass / pydantic model, never raw dict.
    - Use context managers for external resources; close connections inside the task.
    
    Error Handling and Validation
    - At top of every task:
      1. Validate input with pydantic, raise ValidationError.
      2. Abort if pre-conditions fail (e.g., record already processed).
    - Wrap network / IO calls in specific exceptions and map to Celery retry():
      """python
      @shared_task(bind=True, autoretry_for=(HTTPError,), retry_backoff=True)
      def fetch_report(self, report_id: str):
          ...
      """
    - Configure a global "task_failure" signal → Sentry + Slack alert.
    - Distinguish between fatal (no retry) and transient (retry) errors via custom BaseError hierarchy.
    
    Celery Rules
    - Always set acks_late=True, task_acks_on_failure_or_timeout=False to avoid lost work.
    - Hard-time-limit: 90% of SLAs; soft-time-limit: 80%.
    - Use task routing keys per domain (emails.*, billing.*). Map to dedicated queues.
    - Enable result backend only if result needed; prefer emitting event to Kafka for heavy data.
    - Periodic tasks declared via @celery_app.on_after_finalize not cron tab files.
    - Use chord/group for parallelizable work; never spawn >1000 child tasks in a single chord.
    
    Additional Sections
    Testing
    - Unit test tasks as pure functions with fixtures, not Celery integration.
    - Use celery.contrib.testing for E2E in-memory broker.
    - Include chaos tests: kill worker process mid-task, ensure replay works.
    
    Performance
    - Batch small tasks: combine ≤100 DB writes per job to reduce overhead.
    - Schedule CPU-bound jobs on isolated worker pool, concurrency = cores – 1, worker_prefetch_multiplier=1.
    - Run heavy jobs off-peak via celery-beat cron (e.g., "0 3 * * *").
    
    Security
    - Never put PII in task arguments; pass reference IDs.
    - Encrypt payloads at rest when using Redis (use TLS & AUTH).
    - Run workers with minimal OS privileges; no root.
    
    Observability
    - Expose /metrics with celery-prometheus; alert on:
      • queue_latency_seconds p95 > 5m
      • task_failure_total > threshold per task type
    - Tag every task with correlation_id from incoming request for trace continuity.
    
    Deployment & Operations
    - Use rolling updates: scale up new worker pool, drain old with --autoscale=0,0.
    - Keep at least one canary worker running new code before full rollout.
    - Backup broker data daily; store DLQ (dead-letter queue) messages ≥14 days.
    
    Common Pitfalls & Guardrails
    - DON’T nest Celery tasks inside tasks; prefer chords or callbacks.
    - DON’T rely on local file system; use object storage.
    - DON’T block in event loop; if using AsyncIO inside task, call loop.run_until_complete.
    
    Example Complete Task
    """python
    from app.celery_app import celery
    from app.services.pdf import build_invoice_pdf
    from app.schemas import InvoiceTaskArgs, InvoiceTaskResult
    
    @celery.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5, acks_late=True, time_limit=300)
    def generate_invoice(self, payload: dict) -> dict:
        args = InvoiceTaskArgs.parse_obj(payload)
    
        if args.invoice_already_generated():
            return {"status": "skipped", "invoice_id": args.invoice_id}
    
        pdf_bytes = build_invoice_pdf(args)
        url = upload_to_s3(pdf_bytes, f"invoices/{args.invoice_id}.pdf")
        return InvoiceTaskResult(invoice_id=args.invoice_id, pdf_url=url).dict()
    """