Skip to content

Async Tasks Example

This example demonstrates background task processing and asynchronous operations in FastOpenAPI.

Background Tasks with Async Frameworks

Using Starlette

import uvicorn
import asyncio
from datetime import datetime
from starlette.applications import Starlette
from pydantic import BaseModel, EmailStr
from fastopenapi import Body, Depends
from fastopenapi.routers import StarletteRouter
from fastopenapi.errors import ResourceNotFoundError

app = Starlette()
router = StarletteRouter(
    app=app,
    title="Async Tasks API",
    version="1.0.0"
)

# Task storage (in production, use database)
tasks_db = {}
task_counter = 0

class EmailTask(BaseModel):
    to: EmailStr
    subject: str
    body: str

class TaskStatus(BaseModel):
    task_id: str
    status: str
    created_at: datetime
    completed_at: datetime | None = None
    result: dict | None = None

async def send_email_async(to: str, subject: str, body: str):
    """Simulate sending email"""
    print(f"Sending email to {to}: {subject}")
    await asyncio.sleep(5)  # Simulate slow operation
    print(f"Email sent to {to}")
    return {"sent_to": to, "sent_at": datetime.now().isoformat()}

async def process_task_async(task_id: str, email_data: dict):
    """Background task processor"""
    tasks_db[task_id]["status"] = "processing"

    try:
        result = await send_email_async(
            email_data["to"],
            email_data["subject"],
            email_data["body"]
        )

        tasks_db[task_id]["status"] = "completed"
        tasks_db[task_id]["completed_at"] = datetime.now()
        tasks_db[task_id]["result"] = result
    except Exception as e:
        tasks_db[task_id]["status"] = "failed"
        tasks_db[task_id]["result"] = {"error": str(e)}

@router.post(
    "/tasks/send-email",
    response_model=dict,
    status_code=202,
    tags=["Tasks"]
)
async def create_email_task(email: EmailTask = Body(...)):
    """Create background email task"""
    global task_counter
    task_counter += 1
    task_id = f"task_{task_counter}"

    # Store task
    tasks_db[task_id] = {
        "task_id": task_id,
        "status": "pending",
        "created_at": datetime.now(),
        "completed_at": None,
        "result": None
    }

    # Start background task
    asyncio.create_task(
        process_task_async(task_id, email.model_dump())
    )

    return {
        "task_id": task_id,
        "status": "pending",
        "message": "Email task created",
        "status_url": f"/tasks/{task_id}"
    }

@router.get(
    "/tasks/{task_id}",
    response_model=TaskStatus,
    tags=["Tasks"]
)
def get_task_status(task_id: str):
    """Get task status"""
    if task_id not in tasks_db:
        raise ResourceNotFoundError(f"Task {task_id} not found")

    return tasks_db[task_id]

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

Testing

# Create email task
curl -X POST http://localhost:8000/tasks/send-email \
  -H "Content-Type: application/json" \
  -d '{
    "to": "user@example.com",
    "subject": "Hello",
    "body": "This is a test email"
  }'
# Returns: {"task_id": "task_1", "status": "pending", "status_url": "/tasks/task_1"}

# Check task status (immediately)
curl http://localhost:8000/tasks/task_1
# Returns: {"task_id": "task_1", "status": "processing", ...}

# Check task status (after 5+ seconds)
curl http://localhost:8000/tasks/task_1
# Returns: {"task_id": "task_1", "status": "completed", "result": {...}}

Task Queue with Celery

from celery import Celery
from pydantic import BaseModel
from fastopenapi import Body
from fastopenapi.routers import FlaskRouter
from flask import Flask

# Initialize Celery
celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# Celery tasks
@celery_app.task(name='send_email')
def send_email_task(to: str, subject: str, body: str):
    """Celery task for sending emails"""
    import time
    print(f"Sending email to {to}")
    time.sleep(5)  # Simulate slow operation
    print(f"Email sent to {to}")
    return {"sent_to": to, "subject": subject}

@celery_app.task(name='generate_report')
def generate_report_task(report_type: str, filters: dict):
    """Generate report in background"""
    import time
    print(f"Generating {report_type} report...")
    time.sleep(10)
    return {
        "report_type": report_type,
        "file_url": f"/reports/{report_type}.pdf"
    }

# Flask API
app = Flask(__name__)
router = FlaskRouter(
    app=app,
    title="Celery Tasks API",
    version="1.0.0"
)

class EmailRequest(BaseModel):
    to: str
    subject: str
    body: str

class ReportRequest(BaseModel):
    report_type: str
    filters: dict = {}

@router.post("/tasks/send-email", status_code=202, tags=["Tasks"])
def create_email_task(email: EmailRequest = Body(...)):
    """Queue email task"""
    task = send_email_task.delay(email.to, email.subject, email.body)

    return {
        "task_id": task.id,
        "status": "pending",
        "status_url": f"/tasks/{task.id}"
    }

@router.post("/tasks/generate-report", status_code=202, tags=["Tasks"])
def create_report_task(report: ReportRequest = Body(...)):
    """Queue report generation task"""
    task = generate_report_task.delay(report.report_type, report.filters)

    return {
        "task_id": task.id,
        "status": "pending",
        "status_url": f"/tasks/{task.id}"
    }

@router.get("/tasks/{task_id}", tags=["Tasks"])
def get_task_status(task_id: str):
    """Get task status"""
    from celery.result import AsyncResult

    task = AsyncResult(task_id, app=celery_app)

    response = {
        "task_id": task_id,
        "status": task.state,
        "result": None
    }

    if task.state == "SUCCESS":
        response["result"] = task.result
    elif task.state == "FAILURE":
        response["error"] = str(task.info)

    return response

if __name__ == "__main__":
    app.run(debug=True)

Async Database Operations

import uvicorn
from starlette.applications import Starlette
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from pydantic import BaseModel
from fastopenapi import Depends
from fastopenapi.routers import StarletteRouter
from fastopenapi.errors import ResourceNotFoundError

app = Starlette()
router = StarletteRouter(app=app)

# Async database setup
engine = create_async_engine("sqlite+aiosqlite:///./test.db")
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_async_db():
    """Async database session dependency"""
    async with AsyncSessionLocal() as session:
        yield session

class User(BaseModel):
    id: int
    username: str
    email: str

@router.get("/users", response_model=list[User], tags=["Users"])
async def list_users(db: AsyncSession = Depends(get_async_db)):
    """List users (async database query)"""
    result = await db.execute(select(User))
    users = result.scalars().all()
    return users

@router.get("/users/{user_id}", response_model=User, tags=["Users"])
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_async_db)
):
    """Get user by ID (async)"""
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()

    if not user:
        raise ResourceNotFoundError(f"User {user_id} not found")

    return user

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

Parallel Async Operations

import asyncio
import httpx
from starlette.applications import Starlette
from fastopenapi.routers import StarletteRouter

app = Starlette()
router = StarletteRouter(app=app)

async def fetch_user(user_id: int):
    """Fetch user from external API"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()

async def fetch_posts(user_id: int):
    """Fetch user's posts from external API"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}/posts")
        return response.json()

async def fetch_comments(user_id: int):
    """Fetch user's comments"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}/comments")
        return response.json()

@router.get("/users/{user_id}/profile", tags=["Users"])
async def get_user_profile(user_id: int):
    """Get complete user profile (parallel async requests)"""

    # Fetch all data in parallel
    user, posts, comments = await asyncio.gather(
        fetch_user(user_id),
        fetch_posts(user_id),
        fetch_comments(user_id)
    )

    return {
        "user": user,
        "posts": posts,
        "comments": comments
    }

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

Streaming Responses

import asyncio
from starlette.responses import StreamingResponse
from fastopenapi.routers import StarletteRouter

router = StarletteRouter()

async def generate_data():
    """Generate data stream"""
    for i in range(100):
        await asyncio.sleep(0.1)
        yield f"data: {i}\n\n"

@router.get("/stream", tags=["Streaming"])
async def stream_data():
    """Server-Sent Events (SSE) endpoint"""
    return StreamingResponse(
        generate_data(),
        media_type="text/event-stream"
    )

async def generate_csv():
    """Generate CSV data stream"""
    yield "id,name,email\n"
    for i in range(1000):
        await asyncio.sleep(0.01)
        yield f"{i},User{i},user{i}@example.com\n"

@router.get("/export/users.csv", tags=["Export"])
async def export_users():
    """Stream large CSV export"""
    return StreamingResponse(
        generate_csv(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"}
    )

WebSocket Support

import uvicorn
from starlette.applications import Starlette
from starlette.websockets import WebSocket
from fastopenapi.routers import StarletteRouter

app = Starlette()
router = StarletteRouter(app=app)

# Active WebSocket connections
active_connections: list[WebSocket] = []

@router.get("/chat", tags=["WebSocket"])
async def chat_info():
    """Get chat info"""
    return {
        "active_connections": len(active_connections),
        "websocket_url": "ws://localhost:8000/ws/chat"
    }

# WebSocket endpoint (not handled by router, added directly to app)
@app.websocket_route("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    """WebSocket chat endpoint"""
    await websocket.accept()
    active_connections.append(websocket)

    try:
        while True:
            # Receive message
            data = await websocket.receive_text()

            # Broadcast to all connections
            for connection in active_connections:
                await connection.send_text(f"Message: {data}")
    except:
        pass
    finally:
        active_connections.remove(websocket)

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

Scheduled Tasks

import asyncio
from datetime import datetime
from starlette.applications import Starlette
from fastopenapi.routers import StarletteRouter

app = Starlette()
router = StarletteRouter(app=app)

# Task scheduler
async def cleanup_task():
    """Runs every hour"""
    while True:
        print(f"Running cleanup at {datetime.now()}")
        # Perform cleanup
        await asyncio.sleep(3600)  # 1 hour

async def stats_update_task():
    """Updates stats every 5 minutes"""
    while True:
        print(f"Updating stats at {datetime.now()}")
        # Update statistics
        await asyncio.sleep(300)  # 5 minutes

@app.on_event("startup")
async def startup_tasks():
    """Start background tasks on startup"""
    asyncio.create_task(cleanup_task())
    asyncio.create_task(stats_update_task())

@router.get("/health", tags=["Health"])
async def health_check():
    """Health check"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat()
    }

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

Rate Limiting with Async

import asyncio
from datetime import datetime, timedelta
from collections import defaultdict
from fastopenapi import Depends, Header
from fastopenapi.errors import AuthorizationError

class AsyncRateLimiter:
    """Async rate limiter"""

    def __init__(self, max_requests: int = 100, window: int = 3600):
        self.max_requests = max_requests
        self.window = window
        self.requests = defaultdict(list)
        self.lock = asyncio.Lock()

    async def check_rate_limit(self, key: str):
        """Check if request is within rate limit"""
        async with self.lock:
            now = datetime.now()
            window_start = now - timedelta(seconds=self.window)

            # Remove old requests
            self.requests[key] = [
                req_time for req_time in self.requests[key]
                if req_time > window_start
            ]

            # Check limit
            if len(self.requests[key]) >= self.max_requests:
                raise AuthorizationError("Rate limit exceeded")

            # Add current request
            self.requests[key].append(now)

rate_limiter = AsyncRateLimiter(max_requests=100, window=3600)

async def check_rate_limit(user_id: str = Header(..., alias="X-User-ID")):
    """Rate limit dependency"""
    await rate_limiter.check_rate_limit(user_id)

@router.get("/api/data")
async def get_data(_: None = Depends(check_rate_limit)):
    """Rate-limited endpoint"""
    return {"data": "..."}

Best Practices

1. Use Async for I/O-Bound Operations

# Good - async for network/database operations
@router.get("/users")
async def list_users(db = Depends(get_async_db)):
    users = await db.fetch_users()
    return users

# Avoid - sync for I/O operations in async framework
@router.get("/users")
def list_users():
    users = database.fetch_users()  # Blocking!
    return users

2. Background Tasks for Long Operations

# Good - return immediately, process in background
@router.post("/reports", status_code=202)
async def create_report(report_type: str):
    task_id = await queue_report_task(report_type)
    return {"task_id": task_id, "status": "pending"}

# Avoid - blocking the request
@router.post("/reports")
async def create_report(report_type: str):
    report = await generate_report(report_type)  # Takes 10 minutes!
    return report

3. Use Task Status Endpoints

# Create task endpoint
@router.post("/tasks", status_code=202)
async def create_task():
    task_id = create_background_task()
    return {"task_id": task_id, "status_url": f"/tasks/{task_id}"}

# Status endpoint
@router.get("/tasks/{task_id}")
def get_task_status(task_id: str):
    return {"status": "completed", "result": {...}}

4. Handle Task Failures Gracefully

try:
    result = await process_task()
    task["status"] = "completed"
    task["result"] = result
except Exception as e:
    task["status"] = "failed"
    task["error"] = str(e)

5. Use Proper Async Dependencies

# Good - async dependency
async def get_async_db():
    async with AsyncSessionLocal() as session:
        yield session

# Avoid - sync dependency in async framework
def get_db():
    db = SessionLocal()  # Blocking!
    yield db

See Also