TL;DR:
- I use Celery to orchestrate a multi-step AI pipeline: transcription (AssemblyAI) → analysis (OpenAI) → database storage → webhook delivery.
- The configuration prioritizes reliability: late acknowledgment, automatic requeue on worker death, exponential backoff retries.
- Graceful degradation means partial failures don't kill the whole job—if speaker identification fails, the transcript still saves.
- Worker concurrency is intentionally low (2 tasks max) because AI API calls are I/O-bound and memory-intensive.
When building the Meeting Intelligence Platform, the core processing pipeline involves multiple AI services:
- AssemblyAI — Transcribes audio with speaker diarization (5-10 minutes for a 1-hour meeting)
- OpenAI — Generates summaries, extracts action items, identifies decisions (multiple API calls)
- Database — Stores structured results (meetings, speakers, segments, action items)
- Webhook — Notifies the originating CMS of completion
Each step can fail independently. AssemblyAI might timeout. OpenAI might rate-limit. The database might be temporarily unavailable. The webhook endpoint might be down.
Celery handles this orchestration—managing retries, tracking state, and ensuring jobs eventually complete or fail gracefully.
For users, this means fewer failed jobs and fewer "please try again later" errors. For engineering teams, it means fewer late-night pages when external APIs misbehave.
Why Celery?
For Python async job processing, the main options are:
- Celery — Battle-tested, Redis/RabbitMQ backends, rich configuration
- RQ (Redis Queue) — Simpler, Redis-only, fewer features
- Dramatiq — Modern alternative, good defaults
- Custom asyncio — Maximum control, maximum maintenance burden
I chose Celery because:
- Proven at scale (Instagram, Mozilla, Stripe use it)
- Excellent retry and failure handling
- Built-in result storage
- Good monitoring tools (Flower)
The complexity is manageable for a single-service deployment, and the reliability features pay off immediately.
When is Celery overkill? If your workload is just a few quick background tasks, a simple queue like RQ is enough. Once you have multi-step AI calls, rate limits, and long-running jobs, Celery's reliability features start to pay off.
The Configuration
Here's the Celery app configuration:
# from celery_app.py
celery_app = Celery(
"meeting_intelligence",
broker=REDIS_URL,
backend=REDIS_URL,
include=["worker.tasks"],
)
celery_app.conf.update(
# Task settings
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Task execution settings
task_acks_late=True, # Acknowledge after task completes
task_reject_on_worker_lost=True, # Requeue if worker dies
# Retry settings
task_default_retry_delay=60, # 1 minute
task_max_retries=3,
# Worker settings
worker_prefetch_multiplier=1, # One task at a time
worker_concurrency=2, # 2 concurrent tasks max
# Result backend settings
result_expires=86400, # Results expire after 24 hours
)
Let me explain the key settings:
Late Acknowledgment (task_acks_late=True)
By default, Celery acknowledges tasks when they're received. If the worker crashes mid-task, the task is lost.
With task_acks_late=True, tasks are acknowledged only after completion. Combined with task_reject_on_worker_lost=True, a crashed worker's tasks return to the queue for another worker to pick up.
This is essential for long-running jobs—a 10-minute transcription shouldn't be lost because a container was recycled.
Low Concurrency (worker_concurrency=2)
AI API calls are I/O-bound but memory-intensive. Each transcription job might hold several megabytes of audio data and transcript text. Running 10 concurrent jobs would exhaust memory quickly.
Two concurrent tasks is a conservative choice that works well for a single worker container. Scale horizontally (more workers) rather than vertically (more concurrency per worker).
Prefetch Multiplier (worker_prefetch_multiplier=1)
By default, Celery workers prefetch multiple tasks to reduce latency. For short tasks, this is efficient. For 10-minute AI jobs, it creates problems:
- Task A is prefetched and executing (10 minutes)
- Tasks B, C, D are prefetched and waiting
- Another worker sits idle because tasks are already claimed
Setting worker_prefetch_multiplier=1 means each worker only claims one task at a time, improving distribution across workers.
In practice, this makes sure long jobs are fairly distributed instead of piling up on one worker while others sit idle.
The Task: Processing a Job
Here's the main processing task:
# from tasks.py
@celery_app.task(bind=True, max_retries=3)
def process_job(self, job_id: str):
"""Process a transcription job."""
logger.info(f"Starting job processing: {job_id}")
db = get_db_session()
try:
# Get job from database
job = db.execute(
select(Job).where(Job.id == job_id)
).scalar_one_or_none()
if not job:
logger.error(f"Job not found: {job_id}")
return {"error": "Job not found"}
# Update status to processing
job.status = JobStatus.PROCESSING
job.started_at = datetime.utcnow()
db.commit()
# Get audio file path
audio_path = os.path.join(upload_dir, job.audio_filename)
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
# Process the audio (this is where the AI magic happens)
processor = MeetingProcessor(
assemblyai_api_key=os.getenv("ASSEMBLYAI_API_KEY"),
openai_api_key=os.getenv("OPENAI_API_KEY"),
)
result = processor.process_meeting_audio(audio_path)
# Save to database (meeting, speakers, segments, action items, decisions)
meeting = save_meeting_to_db(db, job, result)
# Update job status
job.status = JobStatus.COMPLETED
job.completed_at = datetime.utcnow()
db.commit()
# Publish to Redis for real-time SSE updates
publish_job_update(
job_id=job_id,
status="completed",
results_url=f"/meeting-intelligence/meeting/{meeting.id}"
)
# Send webhook
_send_webhook(job, meeting, db)
return {"status": "completed", "job_id": job_id}
except Exception as e:
logger.error(f"Job failed: {job_id} - {str(e)}")
handle_job_failure(db, job_id, self, e)
finally:
db.close() Key patterns:
bind=True— Gives access toself, needed for retry logic- Status tracking — Job moves through
PENDING→PROCESSING→COMPLETED/FAILED - Real-time updates — Publishes to Redis so the SSE service can notify browsers
- Webhook delivery — Notifies the originating CMS
Retry Logic with Exponential Backoff
When a job fails, I don't retry immediately. API services might be rate-limited or temporarily down. Hammering them makes things worse.
# from tasks.py
except Exception as e:
logger.error(f"Job failed: {job_id} - {str(e)}")
# Update job status
job = db.execute(
select(Job).where(Job.id == job_id)
).scalar_one_or_none()
if job:
job.status = JobStatus.FAILED
job.error_message = str(e)
job.retry_count = self.request.retries
db.commit()
# Publish failure for SSE
publish_job_update(
job_id=job_id,
status="failed",
error=str(e)
)
# Send failure webhook
_send_webhook(job, None, db, error=str(e))
# Retry with exponential backoff
if self.request.retries < self.max_retries:
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
return {"status": "failed", "job_id": job_id, "error": str(e)}
The countdown formula 60 * (2 ** self.request.retries) gives:
- Retry 1: 60 seconds (1 minute)
- Retry 2: 120 seconds (2 minutes)
- Retry 3: 240 seconds (4 minutes)
After 3 retries, the job is permanently failed. The user sees the error via SSE, and the webhook delivers failure details to the CMS.
Graceful Degradation in the Pipeline
The AI pipeline has multiple steps, and not all are equally critical:
# from audio_processor.py
def process_meeting_audio(self, audio_file_path: str) -> MeetingResult:
"""Process a meeting audio file."""
# Step 1: Transcription (CRITICAL - must succeed)
try:
full_transcript, segments, speakers, language = self.transcriber.transcribe_audio(
audio_file_path
)
if not full_transcript or len(full_transcript.strip()) < 10:
raise Exception("Transcription produced insufficient content")
except Exception as e:
raise Exception(f"Audio transcription failed: {str(e)}")
# Step 2: Speaker identification (OPTIONAL - graceful degradation)
try:
speakers = self.analyzer.identify_speaker_names(full_transcript, speakers)
except Exception as e:
print(f"Warning: Speaker name identification failed: {e}")
# Keep original speakers with IDs if identification fails
# Step 3: Summary generation (OPTIONAL - graceful degradation)
try:
summary = self.analyzer.generate_meeting_summary(full_transcript)
if not summary or "failed" in summary.lower():
summary = "Summary generation failed. Please review the transcript manually."
except Exception as e:
print(f"Warning: Summary generation failed: {e}")
summary = "Summary generation failed. Please review the transcript manually."
# Step 4: Action item extraction (OPTIONAL - graceful degradation)
try:
action_items = self.analyzer.extract_action_items(full_transcript)
except Exception as e:
print(f"Warning: Action item extraction failed: {e}")
action_items = []
# Step 5: Decision extraction (OPTIONAL - graceful degradation)
try:
decisions = self.analyzer.extract_decisions(full_transcript)
except Exception as e:
print(f"Warning: Decision extraction failed: {e}")
decisions = []
# ... build and return result
The philosophy:
- Transcription is critical — If it fails, the whole job fails. No transcript = no value.
- Everything else degrades gracefully — Speaker names, summary, action items, and decisions are nice-to-have. If OpenAI is down, return partial results rather than failing the entire job.
This means users get something even when AI services are flaky. A transcript with "Speaker A" and "Speaker B" is better than nothing.
Webhook Delivery
After processing completes (success or failure), the CMS needs to know:
# from tasks.py
def _send_webhook(job: Job, meeting: Meeting | None, db: Session, error: str | None = None):
"""Send webhook callback to the configured URL."""
payload = {
"job_id": str(job.id),
"status": job.status.value,
"error": error,
}
if meeting:
payload["meeting"] = {
"id": str(meeting.id),
"title": meeting.title,
"summary": meeting.summary,
# ... speakers, segments, action_items, decisions
}
headers = {
"Content-Type": "application/json",
"X-Webhook-Secret": job.webhook_secret,
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(
job.webhook_url,
json=payload,
headers=headers,
)
response.raise_for_status()
logger.info(f"Webhook sent successfully: {job.webhook_url}")
except Exception as e:
logger.error(f"Webhook failed: {job.webhook_url} - {str(e)}")
# Don't raise - webhook failure shouldn't fail the job
Key decisions:
- 30-second timeout — Don't wait forever for slow endpoints
- Secret header — The CMS provides a secret when submitting the job; we echo it back in the webhook header. This lets the CMS verify callbacks came from us, not from an attacker.
- Failure is logged, not raised — The job succeeded; webhook delivery is best-effort
If webhooks need guaranteed delivery, I'd add a separate retry queue. For this use case, the SSE channel provides real-time updates, and webhooks are a backup.
Monitoring and Observability
For production, I'd add:
- Flower — Celery's web-based monitoring tool
- Structured logging — JSON logs with job_id, status, duration
- Metrics — Task duration histograms, failure rates, queue depth
For the demo platform, logging is sufficient:
logger.info(f"Starting job processing: {job_id}")
logger.info(f"Processing audio file: {audio_path}")
logger.info(f"Job completed successfully: {job_id}")
logger.error(f"Job failed: {job_id} - {str(e)}")
Every log line includes the job_id, making it easy to trace a single job through the system.
Lessons Learned
1. Late acknowledgment is non-negotiable for long-running tasks
Losing a 10-minute job because a container restarted is unacceptable. task_acks_late=True should be the default for anything over a few seconds.
2. Exponential backoff respects external services
Retrying immediately after a rate limit just gets you rate-limited again. Give APIs time to recover.
3. Partial results beat total failure
Users would rather have a transcript without AI-generated summaries than nothing at all. Design pipelines to degrade gracefully.
4. Low concurrency, horizontal scaling
For memory-intensive AI workloads, running fewer tasks per worker and scaling workers horizontally is more predictable than running many concurrent tasks.
Wrapping Up
Celery is overkill for simple job queues but exactly right for orchestrating multi-step pipelines with external service dependencies. The configuration takes some thought, but once set up, it handles the hard problems—retries, failure tracking, and worker management—reliably.
This pattern works for any AI pipeline: document processing, image analysis, video transcription. The specific services change; the orchestration challenges remain the same.
You can see this pipeline in action on the live demo—upload a meeting recording and watch the job progress through transcription, analysis, and completion.
Part of a series on building the Meeting Intelligence Platform. Next up: Building a Multi-Platform API.
Building AI-powered features? I can help with:
- Designing Celery-based pipelines for AI workloads
- Tuning workers for long-running, memory-heavy jobs
- Implementing graceful degradation and progress tracking
- Integrating multiple AI services (transcription, LLMs, embeddings)
[Get in touch on Upwork] | [Get in Touch Directly →] to discuss your project.