You can't improve what you can't measure.
API observability tracks latency and error rates. LLM observability tracks whether the model is actually answering correctly — faithfulness, citation rate, format compliance, prohibited phrase rate, and token cost per prompt layer.
This is Part 4 — the operational layer: how to observe prompt behavior in production, govern costs with caching and token budgets, run A/B tests with statistical rigor, use feature flags to enable/disable prompt components per tenant, enforce structured output, and the complete open source vs Azure tooling comparison.
Part 4 covers:
- LLM observability — what to measure and how
- Prompt-level token accounting — cost per layer
- Prompt caching — Azure OpenAI and Anthropic
- Token budget governance
- Prompt compression (LLMLingua)
- A/B testing with statistical significance
- Feature flags for prompt components
- Structured output enforcement (JSON mode, Pydantic)
- Context window management
- Complete open source vs Azure tooling reference
LLM Observability — What to Measure
Traditional API monitoring measures latency, error rate, and throughput. LLM monitoring adds a quality dimension — the model can return HTTP 200 with a completely wrong answer.
The Observability Event
Every LLM call emits a structured observability event — not just a log line:
# observability/event.py
from dataclasses import dataclass, asdict
import time
@dataclass
class LLMObservabilityEvent:
# Identity
event_id: str
request_id: str
session_id: str
tenant_id: str
user_role: str
environment: str
# Prompt
prompt_name: str
prompt_version: str
model: str
model_version: str
# Latency (milliseconds)
retrieval_latency_ms: int
reranker_latency_ms: int
llm_ttft_ms: int # time to first token
llm_total_ms: int
total_request_ms: int
# Token accounting — per layer
system_prompt_tokens: int
few_shot_tokens: int
rag_context_tokens: int
user_message_tokens: int
completion_tokens: int
total_tokens: int
# Quality signals
faithfulness_score: float # 0.0–1.0 (LLM-as-judge or RAGAS)
citation_present: bool
format_compliant: bool
prohibited_phrase_found: bool
# Operational
fallback_used: bool
fallback_level: int
safety_filtered: bool
cache_hit: bool # prompt cache hit
# Cost
cost_usd: float # calculated from token usage + model pricing
def to_metric_tags(self) -> dict:
return {
"tenant": self.tenant_id,
"role": self.user_role,
"prompt": self.prompt_name,
"prompt_version": self.prompt_version,
"model": self.model,
"env": self.environment,
"fallback": str(self.fallback_used),
"cache_hit": str(self.cache_hit)
}
class ObservabilityEmitter:
def __init__(self, app_insights_conn: str, metric_client):
from azure.monitor.opentelemetry import configure_azure_monitor
configure_azure_monitor(connection_string=app_insights_conn)
self.metrics = metric_client
def emit(self, event: LLMObservabilityEvent):
tags = event.to_metric_tags()
# Latency metrics
self.metrics.histogram("llm.latency.total_ms", event.total_request_ms, tags)
self.metrics.histogram("llm.latency.ttft_ms", event.llm_ttft_ms, tags)
self.metrics.histogram("llm.latency.retrieval_ms", event.retrieval_latency_ms, tags)
# Token metrics — per layer
self.metrics.histogram("llm.tokens.system", event.system_prompt_tokens, tags)
self.metrics.histogram("llm.tokens.few_shot", event.few_shot_tokens, tags)
self.metrics.histogram("llm.tokens.rag_context", event.rag_context_tokens, tags)
self.metrics.histogram("llm.tokens.user_message", event.user_message_tokens, tags)
self.metrics.histogram("llm.tokens.completion", event.completion_tokens, tags)
self.metrics.histogram("llm.cost.usd", event.cost_usd, tags)
# Quality metrics
self.metrics.gauge("llm.quality.faithfulness", event.faithfulness_score, tags)
self.metrics.increment("llm.quality.citation_present", int(event.citation_present), tags)
self.metrics.increment("llm.quality.format_compliant", int(event.format_compliant), tags)
self.metrics.increment("llm.safety.prohibited_phrase", int(event.prohibited_phrase_found), tags)
# Operational
self.metrics.increment("llm.ops.fallback_used", int(event.fallback_used), tags)
self.metrics.increment("llm.ops.cache_hit", int(event.cache_hit), tags)
LLM-as-Judge — Automated Quality Evaluation
# quality/llm_judge.py
async def evaluate_faithfulness(
question: str,
context_chunks: list[str],
answer: str
) -> float:
"""
Uses a separate LLM call to evaluate whether the answer
is faithful to the retrieved context.
Sample 10% of production requests to control cost.
"""
judge_prompt = f"""You are an expert evaluator.
Rate whether the ANSWER is faithful to the CONTEXT — meaning every claim
in the answer can be supported by the context.
CONTEXT:
{chr(10).join(context_chunks)}
QUESTION: {question}
ANSWER: {answer}
Rate faithfulness from 0.0 (completely unfaithful) to 1.0 (perfectly faithful).
Respond with ONLY a JSON object: {{"score": 0.0, "reason": "brief reason"}}"""
response = await openai_client.chat.completions.create(
model="gpt-4o-mini", # cheaper model for evaluation
messages=[{"role": "user", "content": judge_prompt}],
response_format={"type": "json_object"},
temperature=0
)
result = json.loads(response.choices[0].message.content)
return float(result["score"])
Token Accounting — Cost Per Layer
Understanding where tokens go — by prompt layer — is the foundation of cost governance.
Total: 1,362 tokens per request
- At GPT-4o pricing ($2.50/1M input, $10/1M output): ~$0.0038 per request
- At 1M requests/month: $3,800/month — just for one assistant
- RAG context (520 tokens, 38% of input) is the biggest controllable variable
# token_accounting.py
import tiktoken
encoder = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(text: str) -> int:
return len(encoder.encode(text))
def account_tokens(messages: list[dict]) -> dict:
"""Break down token usage by message role / layer."""
accounting = {
"system": 0,
"few_shot": 0,
"rag_context": 0,
"user_message": 0,
"overhead": 4 # per-message overhead
}
for msg in messages:
tokens = count_tokens(msg["content"])
role = msg["role"]
layer = msg.get("layer", role) # custom "layer" field for breakdown
accounting[layer] = accounting.get(layer, 0) + tokens
return accounting
def calculate_cost(token_accounting: dict, completion_tokens: int) -> float:
INPUT_COST_PER_M = 2.50 # GPT-4o input
OUTPUT_COST_PER_M = 10.00 # GPT-4o output
input_tokens = sum(token_accounting.values())
return (
(input_tokens / 1_000_000) * INPUT_COST_PER_M +
(completion_tokens / 1_000_000) * OUTPUT_COST_PER_M
)
Prompt Caching — The Most Impactful Cost Optimization
Prompt caching stores the KV (key-value) cache of a prefix of your prompt on the model provider's servers. Subsequent requests that share the same prefix reuse the cached computation — significantly reducing cost and latency.
Azure OpenAI — Automatic Prompt Caching (available on GPT-4o, gpt-4o-mini):
# Azure OpenAI handles caching automatically for repeated prefixes
# No code change needed — just ensure the static prefix is stable
# ✓ Cacheable prefix — system prompt is identical across requests
messages = [
{"role": "system", "content": STATIC_SYSTEM_PROMPT}, # cached after first call
{"role": "user", "content": few_shot_examples}, # cached after first call
# Dynamic content below — NOT cached (changes every request)
{"role": "assistant", "content": "Understood."},
{"role": "user", "content": f"{rag_context}\n\nQuestion: {user_query}"}
]
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=messages
)
# Check if prefix was cached
cached_tokens = response.usage.prompt_tokens_details.cached_tokens
print(f"Cached: {cached_tokens} tokens (~90% discount on those)")
Anthropic Claude — Explicit Cache Control:
import anthropic
client = anthropic.Anthropic()
# Explicitly mark cacheable content with cache_control
messages = client.messages.create(
model="claude-opus-4-6",
max_tokens=1500,
system=[
{
"type": "text",
"text": STATIC_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"} # cache this prefix
}
],
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": few_shot_examples,
"cache_control": {"type": "ephemeral"} # cache few-shot too
},
{
"type": "text",
"text": f"{rag_context}\n\nQuestion: {user_query}"
# No cache_control — dynamic content, not cached
}
]
}
]
)
# Cost saving: cached tokens at 10% of normal input price
Cache hit rate optimization:
- Keep the system prompt and few-shot examples identical across all requests (no dynamic content in the cacheable prefix)
- Place all dynamic content (RAG context, user message) AFTER the cacheable prefix
- Minimum prefix length for caching: 1,024 tokens (Azure OpenAI), 2,048 tokens (Anthropic)
Expected savings at 1M requests/month with 80% cache hit rate:
- Without caching: 417 tokens/request × $2.50/1M = $1,042/month (system + few-shot input only)
- With caching (90% discount on cached): 417 × 0.25 + 417 × 0.20 × $2.50/1M = ~$219/month
- Saving: ~$823/month on system prompt + few-shot alone
Token Budget Governance
Set hard limits per tier to prevent runaway costs:
# token_budget.py
from dataclasses import dataclass
@dataclass
class TokenBudget:
system_prompt_max: int
few_shot_max: int
rag_context_max: int
user_message_max: int
completion_max: int
@property
def total_input_max(self) -> int:
return (self.system_prompt_max + self.few_shot_max +
self.rag_context_max + self.user_message_max)
BUDGETS = {
"standard": TokenBudget(
system_prompt_max=300,
few_shot_max=200,
rag_context_max=600,
user_message_max=100,
completion_max=800
),
"premium": TokenBudget(
system_prompt_max=400,
few_shot_max=300,
rag_context_max=1200,
user_message_max=200,
completion_max=1500
),
"enterprise": TokenBudget(
system_prompt_max=500,
few_shot_max=400,
rag_context_max=2000,
user_message_max=300,
completion_max=2000
)
}
def enforce_budget(layer: str, tokens: int, tier: str) -> int:
budget = BUDGETS[tier]
max_tokens = getattr(budget, f"{layer}_max")
if tokens > max_tokens:
logger.warning(f"Token budget exceeded: {layer}={tokens} > {max_tokens} (tier={tier})")
return min(tokens, max_tokens)
Prompt Compression — LLMLingua
For long system prompts or large few-shot sets, compression reduces token count while preserving semantic content:
# LLMLingua — open source prompt compression
from llmlingua import PromptCompressor
compressor = PromptCompressor(
model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
use_llmlingua2=True,
device_map="cpu"
)
def compress_few_shot(few_shot_text: str, target_ratio: float = 0.5) -> str:
"""
Compress few-shot examples to target_ratio of original length.
0.5 = 50% compression. Test quality impact before deploying.
"""
result = compressor.compress_prompt(
few_shot_text,
rate=target_ratio,
force_tokens=["\n", "Answer:", "Source:"] # preserve formatting tokens
)
return result["compressed_prompt"]
# Usage — compress few-shot at indexing time, store compressed version
compressed = compress_few_shot(raw_few_shot, target_ratio=0.6)
token_savings = count_tokens(raw_few_shot) - count_tokens(compressed)
print(f"Saved {token_savings} tokens per request ({token_savings * 0.0000025:.4f} USD/request)")
LLMLingua results on MortgageIQ few-shot examples:
- Original: 180 tokens
- Compressed (60% ratio): 108 tokens
- Quality impact: faithfulness 0.91 → 0.89 (acceptable)
- Saving: 72 tokens × $2.50/1M × 1M req/month = $180/month
A/B Testing Prompts
Part 2 covered blue/green traffic splitting. Here's how to evaluate the results with statistical rigor.
# ab_test/evaluator.py
from scipy import stats
import numpy as np
class PromptABEvaluator:
def __init__(self, metrics_store):
self.store = metrics_store
async def evaluate_experiment(
self,
experiment_id: str,
min_samples: int = 500,
confidence: float = 0.95
) -> dict:
green_metrics = await self.store.get_variant_metrics(experiment_id, "green")
blue_metrics = await self.store.get_variant_metrics(experiment_id, "blue")
if len(green_metrics) < min_samples or len(blue_metrics) < min_samples:
return {
"status": "insufficient_data",
"green_n": len(green_metrics),
"blue_n": len(blue_metrics),
"required_n": min_samples
}
results = {}
primary_metric = "faithfulness_score"
for metric in ["faithfulness_score", "citation_rate", "format_compliance", "total_tokens"]:
green_vals = [m[metric] for m in green_metrics]
blue_vals = [m[metric] for m in blue_metrics]
# Welch's t-test — does not assume equal variance
t_stat, p_value = stats.ttest_ind(green_vals, blue_vals, equal_var=False)
green_mean = np.mean(green_vals)
blue_mean = np.mean(blue_vals)
relative_change = (blue_mean - green_mean) / green_mean * 100
results[metric] = {
"green_mean": round(green_mean, 4),
"blue_mean": round(blue_mean, 4),
"relative_change_pct": round(relative_change, 2),
"p_value": round(p_value, 4),
"significant": p_value < (1 - confidence),
"winner": "blue" if blue_mean > green_mean and p_value < (1 - confidence)
else "green" if green_mean > blue_mean and p_value < (1 - confidence)
else "no_significant_difference"
}
# Primary metric determines overall winner
primary = results[primary_metric]
return {
"status": "complete",
"experiment_id": experiment_id,
"green_n": len(green_metrics),
"blue_n": len(blue_metrics),
"metrics": results,
"recommendation": primary["winner"],
"confidence": confidence,
"action": (
"promote_blue" if primary["winner"] == "blue" else
"rollback_blue" if primary["winner"] == "green" else
"extend_experiment"
)
}
Feature Flags for Prompt Components
Feature flags let you enable/disable specific prompt behaviors per tenant, role, or percentage of traffic — without deploying a new prompt version.
# feature_flags/prompt_flags.py
from azure.appconfiguration import AzureAppConfigurationClient
from azure.appconfiguration.featuremanagement import FeatureManager
class PromptFeatureFlags:
def __init__(self, app_config_conn: str):
self.client = AzureAppConfigurationClient.from_connection_string(app_config_conn)
self.feature_manager = FeatureManager(self.client)
async def resolve(self, ctx: "PromptContext") -> dict[str, bool]:
return {
"chain_of_thought": await self._check(
"prompt.chain_of_thought",
ctx,
allowed_roles=["underwriter", "compliance_auditor"]
),
"spanish_language_mode": await self._check(
"prompt.spanish_language",
ctx,
allowed_tenants=["acme-mortgage", "quicklend"]
),
"strict_citation_mode": await self._check(
"prompt.strict_citation",
ctx,
allowed_roles=["compliance_auditor"]
),
"extended_context": await self._check(
"prompt.extended_context",
ctx,
allowed_tiers=["premium", "enterprise"]
),
}
async def _check(
self, flag_name: str, ctx,
allowed_roles=None, allowed_tenants=None, allowed_tiers=None
) -> bool:
if not await self.feature_manager.is_enabled(flag_name):
return False
if allowed_roles and ctx.role not in allowed_roles:
return False
if allowed_tenants and ctx.tenant_id not in allowed_tenants:
return False
if allowed_tiers and ctx.tier not in allowed_tiers:
return False
return True
# Apply flags to compose final prompt
class FlaggedPromptComposer:
APPENDICES = {
"chain_of_thought": "\n\nThink through this step by step before answering.",
"spanish_language_mode": "\n\nIf the user writes in Spanish, respond in Spanish.",
"strict_citation_mode": "\n\nEvery factual claim MUST include a citation: [Doc Title, Section X.X].",
"extended_context": None, # handled at retrieval layer, not prompt text
}
def compose(self, base_template: str, flags: dict[str, bool]) -> str:
composed = base_template
for flag, enabled in flags.items():
if enabled and self.APPENDICES.get(flag):
composed += self.APPENDICES[flag]
return composed
Structured Output Enforcement
LLMs return freeform text by default. Production systems need parseable, validated output — especially when downstream code reads the response.
JSON Mode
# Force JSON output — Azure OpenAI
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """You are a mortgage assistant.
IMPORTANT: Always respond with valid JSON in exactly this structure:
{
"summary": "2-sentence summary",
"details": ["bullet 1", "bullet 2", "bullet 3"],
"citations": [{"doc": "document name", "section": "section reference", "version": "v2025-Q1"}],
"confidence": 0.0-1.0,
"requires_human_review": true/false
}"""},
{"role": "user", "content": user_query}
],
response_format={"type": "json_object"} # enforced JSON mode
)
import json
structured = json.loads(response.choices[0].message.content)
Pydantic Validation
from pydantic import BaseModel, Field, validator
from typing import Optional
class Citation(BaseModel):
doc: str
section: str
version: str
class MortgageAssistantResponse(BaseModel):
summary: str = Field(..., max_length=500)
details: list[str] = Field(..., min_items=1, max_items=10)
citations: list[Citation] = Field(..., min_items=1)
confidence: float = Field(..., ge=0.0, le=1.0)
requires_human_review: bool
@validator("summary")
def summary_not_empty(cls, v):
if not v.strip():
raise ValueError("Summary cannot be empty")
return v
@validator("citations")
def citations_required(cls, v, values):
if values.get("confidence", 1.0) > 0.5 and len(v) == 0:
raise ValueError("High-confidence responses must include citations")
return v
def parse_and_validate(raw_response: str) -> MortgageAssistantResponse:
try:
data = json.loads(raw_response)
return MortgageAssistantResponse(**data)
except (json.JSONDecodeError, ValidationError) as e:
logger.error(f"Response validation failed: {e}")
# Retry with explicit format correction prompt
raise ResponseValidationError(str(e))
OpenAI Structured Outputs (Strict Mode)
from openai import AzureOpenAI
from pydantic import BaseModel
# Strict structured outputs — model MUST conform to the schema
# No hallucinated fields, no missing required fields
response = openai_client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06", # structured outputs require this version+
messages=[...],
response_format=MortgageAssistantResponse # Pydantic model directly
)
# Guaranteed to parse — no try/except needed
result: MortgageAssistantResponse = response.choices[0].message.parsed
Context Window Management
Long conversations and large RAG contexts can exceed the context window. Three strategies:
# context_manager.py
MAX_CONTEXT_TOKENS = 8000 # leave headroom for completion
def manage_context_sliding_window(
messages: list[dict],
system_tokens: int,
completion_budget: int = 1500
) -> list[dict]:
available = MAX_CONTEXT_TOKENS - system_tokens - completion_budget
# Always keep system message — slice from history
history = [m for m in messages if m["role"] != "system"]
current_tokens = sum(count_tokens(m["content"]) for m in history)
# Drop oldest turns until within budget
while current_tokens > available and len(history) > 2:
dropped = history.pop(0) # remove oldest
current_tokens -= count_tokens(dropped["content"])
return history
async def summarize_history(history: list[dict]) -> str:
"""Compress conversation history when it gets too long."""
history_text = "\n".join(
f"{m['role'].upper()}: {m['content']}" for m in history
)
response = await openai_client.chat.completions.create(
model="gpt-4o-mini", # cheap model for summarization
messages=[{
"role": "user",
"content": f"Summarize this mortgage consultation conversation in 3-5 bullet points, "
f"preserving all loan details, decisions, and open questions:\n\n{history_text}"
}],
max_tokens=300
)
return response.choices[0].message.content
Complete Open Source vs Azure Tooling Reference
| Capability | Azure Stack | Open Source Stack |
|---|---|---|
| Prompt Storage | Cosmos DB (runtime) + Azure DevOps (Git) | MongoDB (runtime) + GitHub (Git) |
| Prompt SDK | Custom + Semantic Kernel PromptTemplates | Custom + LangChain PromptTemplate |
| Prompt Versioning | Cosmos DB + Git tags | MongoDB + Git tags |
| CI/CD for Prompts | Azure DevOps Pipelines | GitHub Actions |
| Environment Promotion | Azure App Config + Cosmos DB per env | MongoDB per env |
| Feature Flags | Azure App Configuration | LaunchDarkly / Flagsmith / Unleash |
| A/B Testing | Custom + Azure Monitor | Custom + Prometheus |
| Observability | Azure Monitor + App Insights + Log Analytics | Langfuse / LangSmith / Helicone |
| Quality Eval | Azure AI Evaluation SDK | RAGAS / DeepEval |
| Content Safety | Azure Content Safety | LlamaGuard / NeMo Guardrails / Guardrails AI |
| PII Detection | Azure AI Language | Microsoft Presidio |
| Prompt Caching | Azure OpenAI (automatic) | Anthropic (explicit) / GPTCache |
| Token Counting | tiktoken | tiktoken |
| Prompt Compression | LLMLingua (open source, use in Azure) | LLMLingua |
| Structured Output | OpenAI JSON mode / Structured Outputs | Instructor / Outlines / Guidance |
| Drift Detection | Custom + Azure Monitor alerts | Custom + Prometheus/Grafana |
| Audit Logging | Cosmos DB append-only + Azure Monitor | PostgreSQL append-only / OpenSearch |
| Compliance Archiving | Cosmos DB Analytical Store (7yr) | S3/Blob cold storage |
Recommended Open Source Stack (Full)
Prompt Storage: MongoDB (Atlas or self-hosted)
Git Versioning: GitHub + GitHub Actions CI
Feature Flags: Unleash (self-hosted, open source)
A/B Testing: Custom + Prometheus + Grafana
Observability: Langfuse (open source, self-host or cloud)
Quality Eval: RAGAS + DeepEval
Safety: LlamaGuard + NeMo Guardrails
PII: Microsoft Presidio
Caching: GPTCache (Redis backend)
Structured Output: Instructor (wraps any LLM)
Compression: LLMLingua
Langfuse — Open Source LLM Observability
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
langfuse = Langfuse(
public_key=LANGFUSE_PUBLIC_KEY,
secret_key=LANGFUSE_SECRET_KEY,
host="https://your-langfuse.selfhosted.com"
)
@observe()
async def answer_loan_question(
query: str, user_id: str, tenant_id: str
) -> str:
# Langfuse automatically captures:
# - Input / output
# - Latency
# - Token usage
# - Model name
langfuse_context.update_current_observation(
metadata={
"tenant_id": tenant_id,
"user_id": user_id,
"prompt_version": prompt.version
},
tags=["production", tenant_id]
)
response = await llm_complete(query)
# Log quality score
langfuse.score(
trace_id=langfuse_context.get_current_trace_id(),
name="faithfulness",
value=await evaluate_faithfulness(query, rag_chunks, response)
)
return response
What We Run at MortgageIQ — Full Stack
Production numbers:
- Prompt cache hit rate: 84% (system prompt + few-shot, ~$820/month savings)
- Average token cost per request: $0.0031 (down from $0.0058 before caching + compression)
- Drift detection: catches model version updates within 24 hours
- Injection blocks: ~0.3% of requests — mostly automated testing tools, not real attacks
- Feature flag operations: 12 active flags across 4 business units
Key Takeaways — Part 4
- Measure quality, not just latency — faithfulness, citation rate, format compliance, and prohibited phrase rate are the metrics that tell you if your prompt is working. HTTP 200 tells you nothing about answer quality.
- Prompt caching is the highest-ROI optimization — static system prompts + few-shot examples cached at 90% discount. Requires no code change on Azure OpenAI. Delivers immediate cost reduction.
- Token budget governance by tier — set hard limits per layer (system, few-shot, RAG, user, completion) and enforce them at runtime. The RAG context layer is the largest variable and the easiest to control.
- A/B test prompts with statistical rigor — Welch's t-test, 500+ samples per variant, 95% confidence before declaring a winner. Gut feel is not a prompt evaluation metric.
- Feature flags decouple prompt behavior from deployment — enable chain-of-thought for underwriters, strict citation mode for compliance auditors, extended context for premium tenants — without a new prompt version for each combination.
- Structured output is not optional for production — JSON mode + Pydantic validation ensures downstream code can parse responses. Strict structured outputs (OpenAI) guarantee schema compliance at the model level.
The Complete Series
- Part 1: Anatomy, Storage, and Versioning — prompt layers, Git + Cosmos DB storage, Prompt SDK, environment promotion, rollback
- Part 2: Multi-User, Multi-Tenant, and Organizational Management — role routing, tenant isolation, org hierarchy, approval workflows, fallback chains, blue/green deployment
- Part 3: Security, Governance, and Compliance — injection, jailbreaking, extraction, indirect injection, audit trails, drift detection
- Part 4 (this post): Observability, cost governance, A/B testing, feature flags, structured output, full tooling reference