Series: Self-Hosted AI · Part 2 of 4
FastAPI Production Patterns: From Prototype to Enterprise
The architecture patterns that take FastAPI from a quick prototype to a production-grade enterprise API — dependency injection, background tasks, streaming responses, multi-tenant middleware, and deployment.
From Prototype to Production
FastAPI is deceptively easy to start with. A working API in 10 lines of Python. But enterprise production systems need structure, observability, and safety that the basic examples don't show.
After using FastAPI across 15+ production projects at Hureka Technologies, here are the patterns that actually matter.
Project Structure That Scales
app/
api/
v1/
endpoints/
agents.py
knowledge.py
router.py
core/
config.py # Pydantic settings
security.py # Auth helpers
database.py # SQLAlchemy setup
models/ # DB models
schemas/ # Pydantic request/response schemas
services/ # Business logic (NOT in endpoints)
workers/ # Celery tasks
middleware/ # Custom middleware
main.py
Tenant-Aware Middleware
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddlewareclass TenantMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): # Extract tenant from JWT or subdomain tenant_id = extract_tenant_id(request) if not tenant_id: return JSONResponse({"error": "Invalid tenant"}, status_code=401)
# Attach to request state — accessible anywhere request.state.tenant_id = tenant_id request.state.tenant = await get_tenant(tenant_id)
response = await call_next(request) return response ```
Streaming LLM Responses
from fastapi.responses import StreamingResponse@router.post("/chat/stream") async def chat_stream(request: ChatRequest, tenant=Depends(get_tenant)): async def generate(): async with anthropic.messages.stream( model="claude-sonnet-4-6", messages=request.messages, system=tenant.system_prompt, ) as stream: async for text in stream.text_stream: yield f"data: {json.dumps({'text': text})}\n\n" yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream") ```
Background Tasks with Celery
from celery import Celerycelery = Celery("app", broker="redis://localhost:6379/0")
@celery.task(bind=True, max_retries=3, default_retry_delay=30) def process_document(self, document_id: str, tenant_id: str): try: doc = fetch_document(document_id) chunks = chunk_document(doc.content) embeddings = embed_chunks(chunks) store_in_qdrant(embeddings, tenant_id) except Exception as exc: raise self.retry(exc=exc)
# Trigger from endpoint @router.post("/documents/upload") async def upload_document(file: UploadFile, tenant=Depends(get_tenant)): doc_id = await save_document(file, tenant.id) process_document.delay(doc_id, tenant.id) # non-blocking return {"id": doc_id, "status": "processing"} ```
Health Check and Readiness Probe
@router.get("/health")
async def health_check():
checks = {
"database": await check_db(),
"redis": await check_redis(),
"qdrant": await check_qdrant(),
}
healthy = all(checks.values())
return JSONResponse(
content={"status": "healthy" if healthy else "degraded", "checks": checks},
status_code=200 if healthy else 503
)