Orchestrating AI Pipelines with Celery: Rate Limits, Retries, and Failure Handling

Feb 23, 2026 4 min read

How I built a resilient job processing system for chained AI service calls

TL;DR:

  • I use Celery to orchestrate a multi-step AI pipeline: transcription (AssemblyAI) → analysis (OpenAI) → database storage → webhook delivery.
  • The configuration prioritizes reliability: late acknowledgment, automatic requeue on worker death, exponential backoff retries.
  • Graceful degradation means partial failures don't kill the whole job—if speaker identification fails, the transcript still saves.
  • Worker concurrency is intentionally low (2 tasks max) because AI API calls are I/O-bound and memory-intensive.

When building the Meeting Intelligence Platform, the core processing pipeline involves multiple AI services:

  1. AssemblyAI — Transcribes audio with speaker diarization (5-10 minutes for a 1-hour meeting)
  2. OpenAI — Generates summaries, extracts action items, identifies decisions (multiple API calls)
  3. Database — Stores structured results (meetings, speakers, segments, action items)
  4. Webhook — Notifies the originating CMS of completion

Each step can fail independently. AssemblyAI might timeout. OpenAI might rate-limit. The database might be temporarily unavailable. The webhook endpoint might be down.

Celery handles this orchestration—managing retries, tracking state, and ensuring jobs eventually complete or fail gracefully.

For users, this means fewer failed jobs and fewer "please try again later" errors. For engineering teams, it means fewer late-night pages when external APIs misbehave.

Why Celery?

For Python async job processing, the main options are:

  • Celery — Battle-tested, Redis/RabbitMQ backends, rich configuration
  • RQ (Redis Queue) — Simpler, Redis-only, fewer features
  • Dramatiq — Modern alternative, good defaults
  • Custom asyncio — Maximum control, maximum maintenance burden

I chose Celery because:

  • Proven at scale (Instagram, Mozilla, Stripe use it)
  • Excellent retry and failure handling
  • Built-in result storage
  • Good monitoring tools (Flower)

The complexity is manageable for a single-service deployment, and the reliability features pay off immediately.

When is Celery overkill? If your workload is just a few quick background tasks, a simple queue like RQ is enough. Once you have multi-step AI calls, rate limits, and long-running jobs, Celery's reliability features start to pay off.

The Configuration

Here's the Celery app configuration:

# from celery_app.py
celery_app = Celery(
    "meeting_intelligence",
    broker=REDIS_URL,
    backend=REDIS_URL,
    include=["worker.tasks"],
)

celery_app.conf.update(
    # Task settings
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    
    # Task execution settings
    task_acks_late=True,  # Acknowledge after task completes
    task_reject_on_worker_lost=True,  # Requeue if worker dies
    
    # Retry settings
    task_default_retry_delay=60,  # 1 minute
    task_max_retries=3,
    
    # Worker settings
    worker_prefetch_multiplier=1,  # One task at a time
    worker_concurrency=2,  # 2 concurrent tasks max
    
    # Result backend settings
    result_expires=86400,  # Results expire after 24 hours
)

Let me explain the key settings:

Late Acknowledgment (task_acks_late=True)

By default, Celery acknowledges tasks when they're received. If the worker crashes mid-task, the task is lost.

With task_acks_late=True, tasks are acknowledged only after completion. Combined with task_reject_on_worker_lost=True, a crashed worker's tasks return to the queue for another worker to pick up.

This is essential for long-running jobs—a 10-minute transcription shouldn't be lost because a container was recycled.

Low Concurrency (worker_concurrency=2)

AI API calls are I/O-bound but memory-intensive. Each transcription job might hold several megabytes of audio data and transcript text. Running 10 concurrent jobs would exhaust memory quickly.

Two concurrent tasks is a conservative choice that works well for a single worker container. Scale horizontally (more workers) rather than vertically (more concurrency per worker).

Prefetch Multiplier (worker_prefetch_multiplier=1)

By default, Celery workers prefetch multiple tasks to reduce latency. For short tasks, this is efficient. For 10-minute AI jobs, it creates problems:

  • Task A is prefetched and executing (10 minutes)
  • Tasks B, C, D are prefetched and waiting
  • Another worker sits idle because tasks are already claimed

Setting worker_prefetch_multiplier=1 means each worker only claims one task at a time, improving distribution across workers.

In practice, this makes sure long jobs are fairly distributed instead of piling up on one worker while others sit idle.

The Task: Processing a Job

Here's the main processing task:

# from tasks.py
@celery_app.task(bind=True, max_retries=3)
def process_job(self, job_id: str):
    """Process a transcription job."""
    logger.info(f"Starting job processing: {job_id}")
    
    db = get_db_session()
    
    try:
        # Get job from database
        job = db.execute(
            select(Job).where(Job.id == job_id)
        ).scalar_one_or_none()
        
        if not job:
            logger.error(f"Job not found: {job_id}")
            return {"error": "Job not found"}
        
        # Update status to processing
        job.status = JobStatus.PROCESSING
        job.started_at = datetime.utcnow()
        db.commit()
        
        # Get audio file path
        audio_path = os.path.join(upload_dir, job.audio_filename)
        
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"Audio file not found: {audio_path}")
        
        # Process the audio (this is where the AI magic happens)
        processor = MeetingProcessor(
            assemblyai_api_key=os.getenv("ASSEMBLYAI_API_KEY"),
            openai_api_key=os.getenv("OPENAI_API_KEY"),
        )
        
        result = processor.process_meeting_audio(audio_path)
        
        # Save to database (meeting, speakers, segments, action items, decisions)
        meeting = save_meeting_to_db(db, job, result)
        
        # Update job status
        job.status = JobStatus.COMPLETED
        job.completed_at = datetime.utcnow()
        db.commit()
        
        # Publish to Redis for real-time SSE updates
        publish_job_update(
            job_id=job_id,
            status="completed",
            results_url=f"/meeting-intelligence/meeting/{meeting.id}"
        )
        
        # Send webhook
        _send_webhook(job, meeting, db)
        
        return {"status": "completed", "job_id": job_id}
        
    except Exception as e:
        logger.error(f"Job failed: {job_id} - {str(e)}")
        handle_job_failure(db, job_id, self, e)
    finally:
        db.close()    

Key patterns:

  • bind=True — Gives access to self, needed for retry logic
  • Status tracking — Job moves through PENDINGPROCESSINGCOMPLETED/FAILED
  • Real-time updates — Publishes to Redis so the SSE service can notify browsers
  • Webhook delivery — Notifies the originating CMS

Retry Logic with Exponential Backoff

When a job fails, I don't retry immediately. API services might be rate-limited or temporarily down. Hammering them makes things worse.

# from tasks.py
except Exception as e:
    logger.error(f"Job failed: {job_id} - {str(e)}")
    
    # Update job status
    job = db.execute(
        select(Job).where(Job.id == job_id)
    ).scalar_one_or_none()
    
    if job:
        job.status = JobStatus.FAILED
        job.error_message = str(e)
        job.retry_count = self.request.retries
        db.commit()
        
        # Publish failure for SSE
        publish_job_update(
            job_id=job_id,
            status="failed",
            error=str(e)
        )
        
        # Send failure webhook
        _send_webhook(job, None, db, error=str(e))
    
    # Retry with exponential backoff
    if self.request.retries < self.max_retries:
        raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
    
    return {"status": "failed", "job_id": job_id, "error": str(e)}

The countdown formula 60 * (2 ** self.request.retries) gives:

  • Retry 1: 60 seconds (1 minute)
  • Retry 2: 120 seconds (2 minutes)
  • Retry 3: 240 seconds (4 minutes)

After 3 retries, the job is permanently failed. The user sees the error via SSE, and the webhook delivers failure details to the CMS.

Graceful Degradation in the Pipeline

The AI pipeline has multiple steps, and not all are equally critical:

# from audio_processor.py
def process_meeting_audio(self, audio_file_path: str) -> MeetingResult:
    """Process a meeting audio file."""
    
    # Step 1: Transcription (CRITICAL - must succeed)
    try:
        full_transcript, segments, speakers, language = self.transcriber.transcribe_audio(
            audio_file_path
        )
        
        if not full_transcript or len(full_transcript.strip()) < 10:
            raise Exception("Transcription produced insufficient content")
            
    except Exception as e:
        raise Exception(f"Audio transcription failed: {str(e)}")
    
    # Step 2: Speaker identification (OPTIONAL - graceful degradation)
    try:
        speakers = self.analyzer.identify_speaker_names(full_transcript, speakers)
    except Exception as e:
        print(f"Warning: Speaker name identification failed: {e}")
        # Keep original speakers with IDs if identification fails
    
    # Step 3: Summary generation (OPTIONAL - graceful degradation)
    try:
        summary = self.analyzer.generate_meeting_summary(full_transcript)
        if not summary or "failed" in summary.lower():
            summary = "Summary generation failed. Please review the transcript manually."
    except Exception as e:
        print(f"Warning: Summary generation failed: {e}")
        summary = "Summary generation failed. Please review the transcript manually."
    
    # Step 4: Action item extraction (OPTIONAL - graceful degradation)
    try:
        action_items = self.analyzer.extract_action_items(full_transcript)
    except Exception as e:
        print(f"Warning: Action item extraction failed: {e}")
        action_items = []

    # Step 5: Decision extraction (OPTIONAL - graceful degradation)
    try:
        decisions = self.analyzer.extract_decisions(full_transcript)
    except Exception as e:
        print(f"Warning: Decision extraction failed: {e}")
        decisions = []
    
    # ... build and return result

The philosophy:

  • Transcription is critical — If it fails, the whole job fails. No transcript = no value.
  • Everything else degrades gracefully — Speaker names, summary, action items, and decisions are nice-to-have. If OpenAI is down, return partial results rather than failing the entire job.

This means users get something even when AI services are flaky. A transcript with "Speaker A" and "Speaker B" is better than nothing.

Webhook Delivery

After processing completes (success or failure), the CMS needs to know:

# from tasks.py
def _send_webhook(job: Job, meeting: Meeting | None, db: Session, error: str | None = None):
    """Send webhook callback to the configured URL."""
    
    payload = {
        "job_id": str(job.id),
        "status": job.status.value,
        "error": error,
    }
    
    if meeting:
        payload["meeting"] = {
            "id": str(meeting.id),
            "title": meeting.title,
            "summary": meeting.summary,
            # ... speakers, segments, action_items, decisions
        }
    
    headers = {
        "Content-Type": "application/json",
        "X-Webhook-Secret": job.webhook_secret,
    }
    
    try:
        with httpx.Client(timeout=30.0) as client:
            response = client.post(
                job.webhook_url,
                json=payload,
                headers=headers,
            )
            response.raise_for_status()
            logger.info(f"Webhook sent successfully: {job.webhook_url}")
    except Exception as e:
        logger.error(f"Webhook failed: {job.webhook_url} - {str(e)}")
        # Don't raise - webhook failure shouldn't fail the job

Key decisions:

  • 30-second timeout — Don't wait forever for slow endpoints
  • Secret header — The CMS provides a secret when submitting the job; we echo it back in the webhook header. This lets the CMS verify callbacks came from us, not from an attacker.
  • Failure is logged, not raised — The job succeeded; webhook delivery is best-effort

If webhooks need guaranteed delivery, I'd add a separate retry queue. For this use case, the SSE channel provides real-time updates, and webhooks are a backup.

Monitoring and Observability

For production, I'd add:

  • Flower — Celery's web-based monitoring tool
  • Structured logging — JSON logs with job_id, status, duration
  • Metrics — Task duration histograms, failure rates, queue depth

For the demo platform, logging is sufficient:

logger.info(f"Starting job processing: {job_id}")
logger.info(f"Processing audio file: {audio_path}")
logger.info(f"Job completed successfully: {job_id}")
logger.error(f"Job failed: {job_id} - {str(e)}")

Every log line includes the job_id, making it easy to trace a single job through the system.

Lessons Learned

1. Late acknowledgment is non-negotiable for long-running tasks

Losing a 10-minute job because a container restarted is unacceptable. task_acks_late=True should be the default for anything over a few seconds.

2. Exponential backoff respects external services

Retrying immediately after a rate limit just gets you rate-limited again. Give APIs time to recover.

3. Partial results beat total failure

Users would rather have a transcript without AI-generated summaries than nothing at all. Design pipelines to degrade gracefully.

4. Low concurrency, horizontal scaling

For memory-intensive AI workloads, running fewer tasks per worker and scaling workers horizontally is more predictable than running many concurrent tasks.

Wrapping Up

Celery is overkill for simple job queues but exactly right for orchestrating multi-step pipelines with external service dependencies. The configuration takes some thought, but once set up, it handles the hard problems—retries, failure tracking, and worker management—reliably.

This pattern works for any AI pipeline: document processing, image analysis, video transcription. The specific services change; the orchestration challenges remain the same.

You can see this pipeline in action on the live demo—upload a meeting recording and watch the job progress through transcription, analysis, and completion.

Part of a series on building the Meeting Intelligence Platform. Next up: Building a Multi-Platform API.

Building AI-powered features? I can help with:

  • Designing Celery-based pipelines for AI workloads
  • Tuning workers for long-running, memory-heavy jobs
  • Implementing graceful degradation and progress tracking
  • Integrating multiple AI services (transcription, LLMs, embeddings)

[Get in touch on Upwork] | [Get in Touch Directly →] to discuss your project.