LLMOps: A Practical Guide to Deploying LLMs in Production
Comprehensive LLMOps guide covering model serving with Ollama and vLLM, caching strategies, prompt versioning, monitoring with LangFuse, cost tracking, A/B testing prompts, and rollback patterns.
LLMOps Is Not MLOps
If you come from the MLOps world, you might assume deploying LLMs is similar to deploying traditional ML models. It is not. LLMs bring unique challenges: nondeterministic outputs, massive memory footprints, prompt sensitivity, per-token billing, and the constant churn of new models.
LLMOps is the discipline of deploying, monitoring, and managing LLM-powered applications in production. This guide covers the full stack — from model serving to prompt versioning to cost optimization — with patterns I use across every production LLM deployment.
Model Serving: Choosing Your Runtime
The first decision is how to serve models. Three runtimes dominate production deployments:
Comparison Table
| Runtime | Best For | GPU Memory Efficiency | Throughput | Setup Complexity |
|---|---|---|---|---|
| Ollama | Dev, small-scale prod, multiple models | Good (quantized) | Medium | Very Low |
| vLLM | High-throughput production | Excellent (PagedAttention) | Very High | Medium |
| TGI (Text Generation Inference) | HuggingFace ecosystem | Good | High | Medium |
Ollama for Development and Small-Scale Production
Ollama is unbeatable for rapid iteration and small-scale deployments:
import httpxclass OllamaService: def __init__(self, base_url: str = "http://localhost:11434"): self.base_url = base_url self.client = httpx.AsyncClient(timeout=120)
async def generate(self, model: str, prompt: str, system: str = "", **kwargs): response = await self.client.post( f"{self.base_url}/api/generate", json={ "model": model, "prompt": prompt, "system": system, "stream": False, "options": { "temperature": kwargs.get("temperature", 0.7), "num_ctx": kwargs.get("num_ctx", 8192), "top_p": kwargs.get("top_p", 0.9), }, }, ) return response.json()
async def chat(self, model: str, messages: list[dict], **kwargs): response = await self.client.post( f"{self.base_url}/api/chat", json={"model": model, "messages": messages, "stream": False, "options": kwargs}, ) return response.json() ```
vLLM for High-Throughput Production
When you need maximum throughput, vLLM's PagedAttention delivers 2-4x throughput over naive implementations:
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-70B-Instruct \
--tensor-parallel-size 2 \
--max-model-len 8192 \
--gpu-memory-utilization 0.9 \
--port 8000 \
--api-key $VLLM_API_KEY
vLLM exposes an OpenAI-compatible API, so switching from OpenAI to self-hosted requires changing only the base URL:
from openai import AsyncOpenAIvllm_client = AsyncOpenAI(base_url="http://gpu-server:8000/v1", api_key=VLLM_API_KEY)
response = await vllm_client.chat.completions.create( model="meta-llama/Llama-3.1-70B-Instruct", messages=[{"role": "user", "content": "Summarize this document..."}], temperature=0.3, max_tokens=2048, ) ```
Caching Strategies
LLM calls are expensive and often redundant. A good caching strategy cuts costs by 40-60%.
Layer 1: Exact Match Cache
For deterministic queries (temperature=0), cache the exact prompt-response pair:
import hashlib
import jsonclass ExactMatchCache: def __init__(self, redis_client): self.redis = redis_client self.ttl = 3600 * 24
def _hash_key(self, model: str, messages: list[dict], **kwargs) -> str: content = json.dumps({"model": model, "messages": messages, **kwargs}, sort_keys=True) return f"llm:exact:{hashlib.sha256(content.encode()).hexdigest()}"
async def get(self, model: str, messages: list[dict], **kwargs) -> str | None: key = self._hash_key(model, messages, **kwargs) cached = await self.redis.get(key) if cached: await self.redis.hincrby("llm:cache:stats", "hits", 1) return json.loads(cached) await self.redis.hincrby("llm:cache:stats", "misses", 1) return None
async def set(self, model: str, messages: list[dict], response: str, **kwargs): key = self._hash_key(model, messages, **kwargs) await self.redis.setex(key, self.ttl, json.dumps(response)) ```
Layer 2: Semantic Cache
For similar but not identical queries, use embedding-based semantic caching:
class SemanticCache:
def __init__(self, qdrant_client, threshold: float = 0.95):
self.qdrant = qdrant_client
self.threshold = threshold
self.collection = "llm_semantic_cache"async def get(self, query: str) -> str | None: results = await self.qdrant.search( collection_name=self.collection, query_vector=embed(query), score_threshold=self.threshold, limit=1, ) if results: return results[0].payload["response"] return None
async def set(self, query: str, response: str, metadata: dict = None): await self.qdrant.upsert( collection_name=self.collection, points=[PointStruct( id=str(uuid4()), vector=embed(query), payload={"query": query, "response": response, "cached_at": now(), **(metadata or {})}, )], ) ```
Prompt Versioning and Management
Prompts are code. Treat them with the same rigor as application code — version control, testing, and rollback.
Prompt Registry Pattern
from langfuse import Langfuselangfuse = Langfuse()
class PromptRegistry: """Manage versioned prompts via LangFuse."""
@staticmethod async def get_prompt(name: str, version: int | None = None) -> str: """Fetch a prompt by name and optional version.""" prompt = langfuse.get_prompt(name, version=version) return prompt.compile()
@staticmethod async def get_prompt_with_variables(name: str, variables: dict) -> str: """Fetch and compile a prompt with variables.""" prompt = langfuse.get_prompt(name) return prompt.compile(**variables) ```
Prompt Version Table
Track prompt performance across versions:
| Prompt | Version | Model | Faithfulness | Latency (P50) | Cost/Call | Status |
|---|---|---|---|---|---|---|
| summarizer | v3 | gpt-4o | 0.94 | 1.2s | $0.012 | Active |
| summarizer | v2 | gpt-4o | 0.91 | 1.4s | $0.015 | Archived |
| classifier | v5 | claude-sonnet-4 | 0.97 | 0.8s | $0.008 | Active |
| classifier | v4 | gpt-4o-mini | 0.89 | 0.3s | $0.002 | Rolled back |
| extractor | v2 | gpt-4o | 0.92 | 2.1s | $0.018 | Active |
Monitoring with LangFuse
LangFuse gives you full observability into your LLM application — traces, costs, latencies, and prompt analytics:
from langfuse.decorators import observe, langfuse_context@observe(name="rag-pipeline") async def rag_pipeline(query: str, tenant_id: str): langfuse_context.update_current_trace( user_id=tenant_id, metadata={"pipeline_version": "v3"}, tags=["production", "rag"], )
with langfuse_context.observe(name="retrieval") as span: docs = await retrieve_documents(query, tenant_id) span.update(metadata={"num_docs": len(docs)})
with langfuse_context.observe(name="generation") as span: response = await generate_answer(query, docs) span.update(metadata={"model": "gpt-4o", "tokens": response.usage.total_tokens})
return response ```
Key Dashboards to Build
- 1Cost dashboard: Daily/weekly LLM spend by model, tenant, and feature
- 2Latency dashboard: P50/P95/P99 latencies by endpoint
- 3Quality dashboard: Faithfulness and relevance scores over time
- 4Error dashboard: Tool call failures, timeouts, rate limits
A/B Testing Prompts
Run controlled experiments on prompt changes before rolling out to all users:
import randomclass PromptExperiment: def __init__(self, name: str, control_version: int, treatment_version: int, traffic_split: float = 0.1): self.name = name self.control = control_version self.treatment = treatment_version self.traffic_split = traffic_split
async def get_prompt(self, user_id: str) -> tuple[str, str]: """Return (prompt_text, variant) based on consistent user bucketing.""" bucket = hash(f"{self.name}:{user_id}") % 100 if bucket < self.traffic_split * 100: variant = "treatment" version = self.treatment else: variant = "control" version = self.control
prompt = await PromptRegistry.get_prompt(self.name, version=version)
langfuse.trace( name=f"experiment:{self.name}", metadata={"variant": variant, "version": version}, user_id=user_id, ) return prompt, variant ```
Rollback Patterns
When a prompt or model change goes wrong, you need to roll back in seconds, not minutes:
class LLMConfigManager:
"""Manage model and prompt configurations with instant rollback."""def __init__(self, redis_client): self.redis = redis_client
async def get_active_config(self, feature: str) -> dict: config = await self.redis.hgetall(f"llm:config:{feature}") return { "model": config.get("model", "gpt-4o"), "prompt_version": int(config.get("prompt_version", 1)), "temperature": float(config.get("temperature", 0.7)), "max_tokens": int(config.get("max_tokens", 2048)), }
async def rollback(self, feature: str, to_version: int): """Instant rollback to a previous configuration.""" backup = await self.redis.hgetall(f"llm:config:{feature}:v{to_version}") if not backup: raise ValueError(f"No backup found for version {to_version}") await self.redis.hmset(f"llm:config:{feature}", backup) await self.redis.publish("config-updates", json.dumps({"feature": feature, "action": "rollback"})) ```
Conclusion
LLMOps is still a young discipline, but the patterns are crystallizing. Model serving, caching, prompt versioning, monitoring, and rollback are the five pillars that separate production LLM applications from demos.
The tools are there — LangFuse for observability, Ollama/vLLM for serving, Redis/Qdrant for caching. What matters is putting them together with operational discipline.
If you need help setting up a production LLMOps pipeline, [contact us](/contact) for a consultation. We have deployed LLM infrastructure across healthcare, SaaS, and enterprise — see our [services page](/services) for details on our infrastructure offerings.