Using Celery With FastAPI: Solving Async Event Loop Errors Cleanly
Python

Using Celery With FastAPI: Solving Async Event Loop Errors Cleanly

Why async/await breaks in Celery tasks — and the production-safe pattern that fixes i

TermTrix
TermTrix
Dec 20, 2025
3min read
Back to Blog

🐍⚡ Using Celery With FastAPI: The “Async Inside Tasks” Event Loop Problem (and How Endpoints Save You)

When you start pairing FastAPI + Celery, everything feels smooth at first:

  • FastAPI handles async APIs
  • Celery handles heavy background work
  • Redis / RabbitMQ handles queueing

…until one day, you do this inside a Celery task:

@app.task
def process_report(user_id):
    data = await fetch_user_data(user_id)  # ❌ Problem!

Boom.

You get:

RuntimeError: This event loop is already running

Or worse — the task hangs forever.

Why does this happen? And how do we fix it cleanly?

Let’s break it down.


❗ Why Async Code Fails Inside Celery Tasks

Celery workers are NOT async.
They run tasks inside a separate process with their own event loop logic.

So when your Celery task tries to run await something():

  • Celery doesn’t know how to handle asyncio
  • It thinks you’re trying to start or use an event loop
  • Python complains

This is why code like:

await send_email()
await fetch_data()
await some_async_call()

fails.

Sure, you can “hack” around this using asyncio.run() inside Celery, but it creates new event loops for every task — messy, dangerous, and unpredictable.


💡 The Correct Pattern: Use an Internal FastAPI Endpoint

When you must run async code inside a Celery task, don’t run the async code in Celery.

Instead, call an internal FastAPI endpoint from the Celery task.

FastAPI can run async properly.
Celery will only trigger it.


🔄 Example Flow

Celery Task → makes a request → FastAPI Endpoint → runs async logic safely

Architecture

Client → FastAPI → Celery Task → FastAPI (internal) → Async Function

This avoids the whole event loop issue.


🛠️ Let’s Build It

Internal FastAPI Endpoint

from fastapi import APIRouter

router = APIRouter()

@router.post("/internal/fetch-user")
async def fetch_user_endpoint(payload: dict):
    user_id = payload["user_id"]
    data = await fetch_user_data(user_id)
    return data

Sync Trigger Function

import requests

def trigger_user_processing(user_id):
    url = "http://localhost:8000/internal/fetch-user"
    response = requests.post(url, json={"user_id": user_id})
    return response.json()

Celery Task

@app.task
def process_data(metadata):
    data = trigger_user_processing(metadata.get("user_id"))
    # You can also update DB, send notifications, etc.

✔ What Happened?

  • Celery makes a normal sync HTTP request
  • FastAPI runs async functions using its own event loop

Problem solved 🎉


✔ Why This Works

  • Celery stays synchronous — no event loop issues

  • FastAPI handles async logic using Uvicorn’s event loop

  • Clean separation of concerns

    Celery = trigger + orchestration
    FastAPI = async operations
    Worker = background heavy lifting

This pattern works extremely well in microservice architectures.


🏁 Final Thought

If you’re running FastAPI + Celery and hitting weird async/await errors:

  • Don’t fight Celery’s event loop
  • Let FastAPI handle the async
  • Let Celery trigger it

Just expose an internal endpoint and call it using requests.

This keeps your architecture clean, stable, and production-safe.


#Python#FastAPI#Celery#Programming#Asynchronous Programming