🐍⚡ Using Celery With FastAPI: The “Async Inside Tasks” Event Loop Problem (and How Endpoints Save You)
When you start pairing FastAPI + Celery, everything feels smooth at first:
- FastAPI handles async APIs
- Celery handles heavy background work
- Redis / RabbitMQ handles queueing
…until one day, you do this inside a Celery task:
@app.task
def process_report(user_id):
data = await fetch_user_data(user_id) # ❌ Problem!
Boom.
You get:
RuntimeError: This event loop is already running
Or worse — the task hangs forever.
Why does this happen? And how do we fix it cleanly?
Let’s break it down.
❗ Why Async Code Fails Inside Celery Tasks
Celery workers are NOT async.
They run tasks inside a separate process with their own event loop logic.
So when your Celery task tries to run await something():
- Celery doesn’t know how to handle asyncio
- It thinks you’re trying to start or use an event loop
- Python complains
This is why code like:
await send_email()
await fetch_data()
await some_async_call()
fails.
Sure, you can “hack” around this using asyncio.run() inside Celery, but it creates new event loops for every task — messy, dangerous, and unpredictable.
💡 The Correct Pattern: Use an Internal FastAPI Endpoint
When you must run async code inside a Celery task, don’t run the async code in Celery.
Instead, call an internal FastAPI endpoint from the Celery task.
FastAPI can run async properly.
Celery will only trigger it.
🔄 Example Flow
Celery Task → makes a request → FastAPI Endpoint → runs async logic safely
Architecture
Client → FastAPI → Celery Task → FastAPI (internal) → Async Function
This avoids the whole event loop issue.
🛠️ Let’s Build It
Internal FastAPI Endpoint
from fastapi import APIRouter
router = APIRouter()
@router.post("/internal/fetch-user")
async def fetch_user_endpoint(payload: dict):
user_id = payload["user_id"]
data = await fetch_user_data(user_id)
return data
Sync Trigger Function
import requests
def trigger_user_processing(user_id):
url = "http://localhost:8000/internal/fetch-user"
response = requests.post(url, json={"user_id": user_id})
return response.json()
Celery Task
@app.task
def process_data(metadata):
data = trigger_user_processing(metadata.get("user_id"))
# You can also update DB, send notifications, etc.
✔ What Happened?
- Celery makes a normal sync HTTP request
- FastAPI runs async functions using its own event loop
Problem solved 🎉
✔ Why This Works
-
Celery stays synchronous — no event loop issues
-
FastAPI handles async logic using Uvicorn’s event loop
-
Clean separation of concerns
Celery = trigger + orchestration
FastAPI = async operations
Worker = background heavy lifting
This pattern works extremely well in microservice architectures.
🏁 Final Thought
If you’re running FastAPI + Celery and hitting weird async/await errors:
- Don’t fight Celery’s event loop
- Let FastAPI handle the async
- Let Celery trigger it
Just expose an internal endpoint and call it using requests.
This keeps your architecture clean, stable, and production-safe.





