A RAG system that takes 30 seconds is not necessarily broken. A RAG system that makes users stare at a blank screen for 30 seconds is.
The distinction matters. Time to first token and time to final answer are different problems. Conflating them is why teams spend months optimizing retrieval latency when the real fix is a frontend architecture change that takes a week.
This post answers a question that comes up in every senior AI architect interview: your RAG pipeline takes 30 seconds — how do you design streaming responses so users start seeing tokens while the rest of the pipeline continues running?
Why RAG Takes 30 Seconds — And Why Waiting Is Not the Fix
A production RAG pipeline does a lot before the LLM generates a single token:
Add up the serial dependencies and you are easily at 20-30 seconds before the user sees anything. The instinct is to optimize each step. That helps — but the faster win is to stop making the user wait for all steps before showing anything.
The correct architectural goal:
Separate the pipeline into what must complete before streaming and what can continue in parallel after streaming starts.
The Design — Progressive Streaming Architecture
The key shift: the API gateway returns a streaming channel immediately. The orchestrator runs two parallel tracks. The user sees progress events within milliseconds — and grounded tokens within 2-3 seconds.
The Fast Path vs. Slow Path Split
Not every RAG step needs to complete before the user can see something useful. The split:
| Step | Path | Why |
|---|---|---|
| Query rewrite | Fast | Lightweight, deterministic, sub-500ms |
| Initial retrieval (top-k) | Fast | Minimum viable context for first draft |
| Prompt assembly | Fast | Template render — near zero |
| LLM token generation | Fast (starts streaming) | Stream tokens as generated |
| Deep reranking | Slow | Additional retrieval passes, richer evidence |
| Citation verification | Slow | Cross-reference sources, attach metadata |
| Safety / content check | Slow | Responsible AI gate on complete answer |
| Answer refinement | Slow | Improve with better evidence if available |
The gate before streaming starts: I will not stream LLM tokens until the fast path meets a minimum grounding threshold. Not raw model output before any retrieval. At minimum:
- Retrieval returned at least N chunks with confidence score above threshold
- Prompt is properly assembled with context
- No hard policy block triggered on the query
For high-risk domains (financial, medical, legal), the gate is stricter: stream status updates only until the slow path completes at least one safety check.
The Streaming Event Protocol
Raw token streaming is not enough. Production streaming needs a structured event protocol — not just a text stream — so the frontend can handle state properly.
# streaming_events.py
from enum import Enum
from dataclasses import dataclass, asdict
import json
from datetime import datetime
class EventType(str, Enum):
STATUS_UPDATE = "status_update"
PARTIAL_ANSWER = "partial_answer"
TOKEN = "token"
CITATION_ATTACHED = "citation_attached"
REFINEMENT_UPDATE = "refinement_update"
FINAL_ANSWER = "final_answer"
ERROR = "error"
PIPELINE_METRIC = "pipeline_metric"
@dataclass
class StreamEvent:
event_type: EventType
request_id: str
timestamp: str
payload: dict
def to_sse(self) -> str:
"""Format as Server-Sent Event"""
data = json.dumps(asdict(self))
return f"data: {data}\n\n"
# Example events emitted during a request
def make_status(request_id: str, message: str, stage: str) -> StreamEvent:
return StreamEvent(
event_type=EventType.STATUS_UPDATE,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={"message": message, "stage": stage}
)
def make_token(request_id: str, token: str, is_grounded: bool) -> StreamEvent:
return StreamEvent(
event_type=EventType.TOKEN,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={"token": token, "grounded": is_grounded}
)
def make_citation(request_id: str, source: str, section: str, score: float) -> StreamEvent:
return StreamEvent(
event_type=EventType.CITATION_ATTACHED,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={"source": source, "section": section, "confidence": score}
)
The event sequence for a typical query:
t=0ms status_update "Understanding your question..."
t=80ms status_update "Searching relevant documents..."
t=1200ms status_update "Drafting answer from 6 sources..."
t=1400ms token "Based"
t=1420ms token " on"
t=1440ms token " the"
... (tokens stream at LLM generation rate)
t=8000ms citation_attached {source: "FHA Guide 4000.1", section: "II.A.1.b"}
t=9200ms citation_attached {source: "Fannie Mae B3-6-02", section: "DTI limits"}
t=12000ms refinement_update "Additional evidence found — updating answer..."
t=14000ms final_answer {complete: true, groundedness: 0.94, citations: [...]}
t=14000ms pipeline_metric {ttft: 1400, retrieval_ms: 1100, total_ms: 14000}
The user sees their first token at 1.4 seconds. The 14-second full pipeline runs behind the stream they are already reading.
Server-Sent Events Implementation — Azure OpenAI + FastAPI
# rag_streaming_service.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from azure.search.documents.aio import SearchClient
from azure.core.credentials import AzureKeyCredential
app = FastAPI()
openai_client = AsyncAzureOpenAI(
azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
api_version="2024-12-01"
)
@app.post("/query")
async def query_stream(request: QueryRequest):
return StreamingResponse(
run_rag_pipeline(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # disable nginx buffering
}
)
async def run_rag_pipeline(request: QueryRequest):
request_id = generate_request_id()
# Emit immediately — user sees response channel open
yield make_status(request_id, "Understanding your question...", "query_rewrite").to_sse()
# Fast path: query rewrite
rewritten_query = await rewrite_query(request.query)
yield make_status(request_id, "Searching relevant documents...", "retrieval").to_sse()
# Fast path: initial retrieval (top-5 for speed)
fast_chunks = await retrieve_chunks(rewritten_query, top_k=5)
if len(fast_chunks) < 2 or max(c.score for c in fast_chunks) < 0.65:
# Retrieval confidence too low — don't stream ungrounded content
yield make_status(request_id, "Still searching trusted sources...", "retrieval_retry").to_sse()
fast_chunks = await retrieve_chunks(rewritten_query, top_k=10, fallback=True)
yield make_status(request_id, f"Drafting answer from {len(fast_chunks)} sources...", "generation").to_sse()
# Build prompt from fast-path chunks
prompt = build_prompt(rewritten_query, fast_chunks)
# Launch slow path in parallel — do not await yet
slow_path_task = asyncio.create_task(
run_slow_path(request_id, rewritten_query, fast_chunks)
)
# Stream LLM tokens as generated
token_buffer = []
async with openai_client.chat.completions.stream(
model="gpt-4o",
messages=prompt,
temperature=0.1,
max_tokens=1000
) as stream:
async for chunk in stream:
token = chunk.choices[0].delta.content or ""
if token:
token_buffer.append(token)
yield make_token(request_id, token, is_grounded=True).to_sse()
full_answer = "".join(token_buffer)
# Emit fast-path citations as they become available
for chunk in fast_chunks:
yield make_citation(request_id, chunk.source, chunk.section, chunk.score).to_sse()
# Await slow path — emit refinements if answer improved
slow_result = await slow_path_task
if slow_result.improved:
yield StreamEvent(
event_type=EventType.REFINEMENT_UPDATE,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={
"message": "Additional evidence found — updating answer",
"refined_answer": slow_result.answer,
"additional_citations": slow_result.citations
}
).to_sse()
# Final answer with full metadata
yield StreamEvent(
event_type=EventType.FINAL_ANSWER,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={
"complete": True,
"groundedness_score": slow_result.groundedness,
"safety_pass": slow_result.safety_pass,
"citations": slow_result.citations,
"answer": slow_result.answer if slow_result.improved else full_answer
}
).to_sse()
# Emit pipeline metrics for observability
yield StreamEvent(
event_type=EventType.PIPELINE_METRIC,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={
"time_to_first_token_ms": slow_result.ttft_ms,
"total_pipeline_ms": slow_result.total_ms,
"retrieval_ms": slow_result.retrieval_ms,
"chunks_used": len(slow_result.citations)
}
).to_sse()
The Slow Path — Running in Parallel
# slow_path.py
import asyncio
from dataclasses import dataclass
@dataclass
class SlowPathResult:
answer: str
citations: list[dict]
groundedness: float
safety_pass: bool
improved: bool
ttft_ms: int
total_ms: int
retrieval_ms: int
async def run_slow_path(
request_id: str,
query: str,
fast_chunks: list
) -> SlowPathResult:
start = time.monotonic()
# Run deeper retrieval and safety checks in parallel
deep_chunks_task = asyncio.create_task(retrieve_chunks(query, top_k=15))
safety_task = asyncio.create_task(run_safety_check(query))
deep_chunks, safety_result = await asyncio.gather(
deep_chunks_task, safety_task
)
# Rerank with full chunk set
reranked = await semantic_rerank(query, deep_chunks, top_k=8)
# Check if slow path found meaningfully better evidence
fast_max_score = max(c.score for c in fast_chunks)
slow_max_score = max(c.score for c in reranked) if reranked else 0
improved = slow_max_score > fast_max_score + 0.05 # 5% improvement threshold
if improved:
# Generate refined answer with better context
refined_prompt = build_prompt(query, reranked)
refined_answer = await generate_answer(refined_prompt)
else:
refined_answer = None
# Run groundedness check on the final answer
final_answer_text = refined_answer or "(original streamed answer)"
groundedness = await evaluate_groundedness(final_answer_text, reranked or fast_chunks)
total_ms = int((time.monotonic() - start) * 1000)
return SlowPathResult(
answer=refined_answer or "",
citations=[{"source": c.source, "section": c.section, "score": c.score} for c in reranked],
groundedness=groundedness,
safety_pass=safety_result.passed,
improved=improved,
ttft_ms=0, # set by orchestrator
total_ms=total_ms,
retrieval_ms=int(deep_chunks_task_time * 1000)
)
Frontend Event Handling
The frontend must handle structured events, not a raw text stream. A frontend that only handles token events will miss citation attachments, refinements, and final answer validation.
// rag-stream-client.ts
interface StreamEvent {
event_type: 'status_update' | 'token' | 'citation_attached' |
'refinement_update' | 'final_answer' | 'pipeline_metric' | 'error';
request_id: string;
timestamp: string;
payload: Record<string, unknown>;
}
async function streamQuery(query: string, onEvent: (e: StreamEvent) => void) {
const response = await fetch('/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const event: StreamEvent = JSON.parse(line.slice(6));
onEvent(event);
}
}
}
}
// Usage — React component state management
function handleStreamEvent(event: StreamEvent, setState: Function) {
switch (event.event_type) {
case 'status_update':
setState(s => ({ ...s, status: event.payload.message }));
break;
case 'token':
setState(s => ({ ...s, answer: s.answer + event.payload.token }));
break;
case 'citation_attached':
setState(s => ({ ...s, citations: [...s.citations, event.payload] }));
break;
case 'refinement_update':
// Replace streamed answer with refined version
setState(s => ({
...s,
answer: event.payload.refined_answer,
citations: event.payload.additional_citations,
refined: true
}));
break;
case 'final_answer':
setState(s => ({
...s,
complete: true,
groundedness: event.payload.groundedness_score,
safetyPass: event.payload.safety_pass
}));
break;
case 'pipeline_metric':
// Log to observability — do not show to user
logPipelineMetrics(event.payload);
break;
}
}
The Grounding Gate — What Not to Stream
This is the most important architecture decision in the entire design.
Never stream LLM tokens before minimum grounding conditions are met. The temptation is to start streaming immediately to minimize time-to-first-token. The risk is streaming a confident-sounding wrong answer — which in regulated domains (mortgage, healthcare, financial services) is worse than a slower correct answer.
Domain-specific gate thresholds at MortgageIQ:
| Domain | Min chunks | Min score | Safety pre-check |
|---|---|---|---|
| Loan guidelines | 3 | 0.72 | Query classification first |
| Balance inquiries | 1 | 0.60 | None — low risk |
| Underwriting decisions | 5 | 0.80 | Full slow path first |
| Rate quotes | — | — | Never stream — always human review |
For the underwriting domain, we stream status updates only until the slow path's safety check completes. The 5-second wait is acceptable because the alternative — streaming a wrong underwriting answer — is not.
Production Metrics — What to Measure
A system that optimizes only for total latency will still feel slow. Measure what users actually experience:
| Metric | What It Measures | Target |
|---|---|---|
| Time to first token (TTFT) | Perceived responsiveness — how quickly does something useful appear? | <2s |
| Time to first useful answer | First grounded sentence visible | <5s |
| Final answer latency | Complete, validated response | <30s |
| Groundedness before first token | Did retrieval pass the gate before streaming? | 100% |
| Refinement rate | How often does slow path improve the fast draft? | Track — if >40%, fast retrieval is weak |
| User abandonment rate | Did users leave before final answer? | <5% |
| Citation coverage | Fraction of claims with attached source | >0.85 |
The single most important metric: time to first token — not total latency. A system that delivers a first grounded sentence in 2 seconds and completes in 25 seconds feels fast. The same system with no streaming feels broken at 25 seconds.
Handling Bad Retrieval Gracefully
When retrieval fails the grounding gate, do not leave the user with nothing:
async def handle_weak_retrieval(request_id: str, query: str) -> AsyncGenerator:
yield make_status(request_id, "Still searching trusted sources...", "retrieval_retry").to_sse()
# Try expanded search
expanded_chunks = await retrieve_chunks(
query,
top_k=20,
expand_keywords=True,
fallback_index=True
)
if not expanded_chunks or max(c.score for c in expanded_chunks) < 0.50:
# Genuinely cannot ground an answer
yield StreamEvent(
event_type=EventType.FINAL_ANSWER,
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
payload={
"complete": True,
"answer": (
"I was unable to find specific guidance on this in the current "
"documentation. Please review this with your team or contact support."
),
"groundedness_score": 0.0,
"citations": [],
"safe_fallback": True
}
).to_sse()
return
# Otherwise continue with expanded chunks
async for event in continue_pipeline(request_id, query, expanded_chunks):
yield event
A safe, honest fallback is always better than a confident ungrounded answer.
Key Takeaways
- Time to first token and time to final answer are different problems — optimizing retrieval latency while making users wait for all steps is the wrong fix. Streaming architecture is the right fix.
- Split the pipeline into fast path and slow path — fast path runs the minimum viable retrieval and starts token streaming; slow path runs deeper reranking, citation verification, and safety checks in parallel.
- Use a structured event protocol, not raw text streaming —
status_update,token,citation_attached,refinement_update,final_answerevents let the frontend handle state properly and the backend improve the response as more pipeline steps complete. - Never stream before minimum grounding conditions are met — streaming ungrounded content in regulated domains is architecturally worse than a slower correct answer. The grounding gate is non-negotiable.
- Measure time to first token, not just total latency — a 25-second pipeline that delivers grounded tokens in 2 seconds feels fast; the same pipeline with no streaming feels broken.
- Design safe fallbacks for weak retrieval — show honest "still searching" status, expand the search scope, and return a safe acknowledged response rather than a confident wrong answer.