← All Posts
ai-mlApril 21, 2026ragstreamingssewebsocketllmazure-openaiproductionenterprise-ailatencyobservability

RAG Streaming Responses in Production — How to Fix the 30-Second Freeze

A RAG system that makes users wait 30 seconds before showing anything is not a latency problem — it's a UX architecture problem. Here's how to design progressive streaming so users see useful tokens in under 2 seconds while the full pipeline continues running.

A RAG system that takes 30 seconds is not necessarily broken. A RAG system that makes users stare at a blank screen for 30 seconds is.

The distinction matters. Time to first token and time to final answer are different problems. Conflating them is why teams spend months optimizing retrieval latency when the real fix is a frontend architecture change that takes a week.

This post answers a question that comes up in every senior AI architect interview: your RAG pipeline takes 30 seconds — how do you design streaming responses so users start seeing tokens while the rest of the pipeline continues running?


Why RAG Takes 30 Seconds — And Why Waiting Is Not the Fix

A production RAG pipeline does a lot before the LLM generates a single token:

Add up the serial dependencies and you are easily at 20-30 seconds before the user sees anything. The instinct is to optimize each step. That helps — but the faster win is to stop making the user wait for all steps before showing anything.

The correct architectural goal:

Separate the pipeline into what must complete before streaming and what can continue in parallel after streaming starts.


The Design — Progressive Streaming Architecture

The key shift: the API gateway returns a streaming channel immediately. The orchestrator runs two parallel tracks. The user sees progress events within milliseconds — and grounded tokens within 2-3 seconds.


The Fast Path vs. Slow Path Split

Not every RAG step needs to complete before the user can see something useful. The split:

StepPathWhy
Query rewriteFastLightweight, deterministic, sub-500ms
Initial retrieval (top-k)FastMinimum viable context for first draft
Prompt assemblyFastTemplate render — near zero
LLM token generationFast (starts streaming)Stream tokens as generated
Deep rerankingSlowAdditional retrieval passes, richer evidence
Citation verificationSlowCross-reference sources, attach metadata
Safety / content checkSlowResponsible AI gate on complete answer
Answer refinementSlowImprove with better evidence if available

The gate before streaming starts: I will not stream LLM tokens until the fast path meets a minimum grounding threshold. Not raw model output before any retrieval. At minimum:

  • Retrieval returned at least N chunks with confidence score above threshold
  • Prompt is properly assembled with context
  • No hard policy block triggered on the query

For high-risk domains (financial, medical, legal), the gate is stricter: stream status updates only until the slow path completes at least one safety check.


The Streaming Event Protocol

Raw token streaming is not enough. Production streaming needs a structured event protocol — not just a text stream — so the frontend can handle state properly.

# streaming_events.py
from enum import Enum
from dataclasses import dataclass, asdict
import json
from datetime import datetime

class EventType(str, Enum):
    STATUS_UPDATE = "status_update"
    PARTIAL_ANSWER = "partial_answer"
    TOKEN = "token"
    CITATION_ATTACHED = "citation_attached"
    REFINEMENT_UPDATE = "refinement_update"
    FINAL_ANSWER = "final_answer"
    ERROR = "error"
    PIPELINE_METRIC = "pipeline_metric"

@dataclass
class StreamEvent:
    event_type: EventType
    request_id: str
    timestamp: str
    payload: dict

    def to_sse(self) -> str:
        """Format as Server-Sent Event"""
        data = json.dumps(asdict(self))
        return f"data: {data}\n\n"

# Example events emitted during a request
def make_status(request_id: str, message: str, stage: str) -> StreamEvent:
    return StreamEvent(
        event_type=EventType.STATUS_UPDATE,
        request_id=request_id,
        timestamp=datetime.utcnow().isoformat(),
        payload={"message": message, "stage": stage}
    )

def make_token(request_id: str, token: str, is_grounded: bool) -> StreamEvent:
    return StreamEvent(
        event_type=EventType.TOKEN,
        request_id=request_id,
        timestamp=datetime.utcnow().isoformat(),
        payload={"token": token, "grounded": is_grounded}
    )

def make_citation(request_id: str, source: str, section: str, score: float) -> StreamEvent:
    return StreamEvent(
        event_type=EventType.CITATION_ATTACHED,
        request_id=request_id,
        timestamp=datetime.utcnow().isoformat(),
        payload={"source": source, "section": section, "confidence": score}
    )

The event sequence for a typical query:

t=0ms     status_update  "Understanding your question..."
t=80ms    status_update  "Searching relevant documents..."
t=1200ms  status_update  "Drafting answer from 6 sources..."
t=1400ms  token          "Based"
t=1420ms  token          " on"
t=1440ms  token          " the"
...       (tokens stream at LLM generation rate)
t=8000ms  citation_attached  {source: "FHA Guide 4000.1", section: "II.A.1.b"}
t=9200ms  citation_attached  {source: "Fannie Mae B3-6-02", section: "DTI limits"}
t=12000ms refinement_update  "Additional evidence found — updating answer..."
t=14000ms final_answer    {complete: true, groundedness: 0.94, citations: [...]}
t=14000ms pipeline_metric {ttft: 1400, retrieval_ms: 1100, total_ms: 14000}

The user sees their first token at 1.4 seconds. The 14-second full pipeline runs behind the stream they are already reading.


Server-Sent Events Implementation — Azure OpenAI + FastAPI

# rag_streaming_service.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from azure.search.documents.aio import SearchClient
from azure.core.credentials import AzureKeyCredential

app = FastAPI()
openai_client = AsyncAzureOpenAI(
    azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
    api_version="2024-12-01"
)

@app.post("/query")
async def query_stream(request: QueryRequest):
    return StreamingResponse(
        run_rag_pipeline(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"   # disable nginx buffering
        }
    )

async def run_rag_pipeline(request: QueryRequest):
    request_id = generate_request_id()

    # Emit immediately — user sees response channel open
    yield make_status(request_id, "Understanding your question...", "query_rewrite").to_sse()

    # Fast path: query rewrite
    rewritten_query = await rewrite_query(request.query)

    yield make_status(request_id, "Searching relevant documents...", "retrieval").to_sse()

    # Fast path: initial retrieval (top-5 for speed)
    fast_chunks = await retrieve_chunks(rewritten_query, top_k=5)

    if len(fast_chunks) < 2 or max(c.score for c in fast_chunks) < 0.65:
        # Retrieval confidence too low — don't stream ungrounded content
        yield make_status(request_id, "Still searching trusted sources...", "retrieval_retry").to_sse()
        fast_chunks = await retrieve_chunks(rewritten_query, top_k=10, fallback=True)

    yield make_status(request_id, f"Drafting answer from {len(fast_chunks)} sources...", "generation").to_sse()

    # Build prompt from fast-path chunks
    prompt = build_prompt(rewritten_query, fast_chunks)

    # Launch slow path in parallel — do not await yet
    slow_path_task = asyncio.create_task(
        run_slow_path(request_id, rewritten_query, fast_chunks)
    )

    # Stream LLM tokens as generated
    token_buffer = []
    async with openai_client.chat.completions.stream(
        model="gpt-4o",
        messages=prompt,
        temperature=0.1,
        max_tokens=1000
    ) as stream:
        async for chunk in stream:
            token = chunk.choices[0].delta.content or ""
            if token:
                token_buffer.append(token)
                yield make_token(request_id, token, is_grounded=True).to_sse()

    full_answer = "".join(token_buffer)

    # Emit fast-path citations as they become available
    for chunk in fast_chunks:
        yield make_citation(request_id, chunk.source, chunk.section, chunk.score).to_sse()

    # Await slow path — emit refinements if answer improved
    slow_result = await slow_path_task
    if slow_result.improved:
        yield StreamEvent(
            event_type=EventType.REFINEMENT_UPDATE,
            request_id=request_id,
            timestamp=datetime.utcnow().isoformat(),
            payload={
                "message": "Additional evidence found — updating answer",
                "refined_answer": slow_result.answer,
                "additional_citations": slow_result.citations
            }
        ).to_sse()

    # Final answer with full metadata
    yield StreamEvent(
        event_type=EventType.FINAL_ANSWER,
        request_id=request_id,
        timestamp=datetime.utcnow().isoformat(),
        payload={
            "complete": True,
            "groundedness_score": slow_result.groundedness,
            "safety_pass": slow_result.safety_pass,
            "citations": slow_result.citations,
            "answer": slow_result.answer if slow_result.improved else full_answer
        }
    ).to_sse()

    # Emit pipeline metrics for observability
    yield StreamEvent(
        event_type=EventType.PIPELINE_METRIC,
        request_id=request_id,
        timestamp=datetime.utcnow().isoformat(),
        payload={
            "time_to_first_token_ms": slow_result.ttft_ms,
            "total_pipeline_ms": slow_result.total_ms,
            "retrieval_ms": slow_result.retrieval_ms,
            "chunks_used": len(slow_result.citations)
        }
    ).to_sse()

The Slow Path — Running in Parallel

# slow_path.py
import asyncio
from dataclasses import dataclass

@dataclass
class SlowPathResult:
    answer: str
    citations: list[dict]
    groundedness: float
    safety_pass: bool
    improved: bool
    ttft_ms: int
    total_ms: int
    retrieval_ms: int

async def run_slow_path(
    request_id: str,
    query: str,
    fast_chunks: list
) -> SlowPathResult:
    start = time.monotonic()

    # Run deeper retrieval and safety checks in parallel
    deep_chunks_task = asyncio.create_task(retrieve_chunks(query, top_k=15))
    safety_task = asyncio.create_task(run_safety_check(query))

    deep_chunks, safety_result = await asyncio.gather(
        deep_chunks_task, safety_task
    )

    # Rerank with full chunk set
    reranked = await semantic_rerank(query, deep_chunks, top_k=8)

    # Check if slow path found meaningfully better evidence
    fast_max_score = max(c.score for c in fast_chunks)
    slow_max_score = max(c.score for c in reranked) if reranked else 0

    improved = slow_max_score > fast_max_score + 0.05  # 5% improvement threshold

    if improved:
        # Generate refined answer with better context
        refined_prompt = build_prompt(query, reranked)
        refined_answer = await generate_answer(refined_prompt)
    else:
        refined_answer = None

    # Run groundedness check on the final answer
    final_answer_text = refined_answer or "(original streamed answer)"
    groundedness = await evaluate_groundedness(final_answer_text, reranked or fast_chunks)

    total_ms = int((time.monotonic() - start) * 1000)

    return SlowPathResult(
        answer=refined_answer or "",
        citations=[{"source": c.source, "section": c.section, "score": c.score} for c in reranked],
        groundedness=groundedness,
        safety_pass=safety_result.passed,
        improved=improved,
        ttft_ms=0,     # set by orchestrator
        total_ms=total_ms,
        retrieval_ms=int(deep_chunks_task_time * 1000)
    )

Frontend Event Handling

The frontend must handle structured events, not a raw text stream. A frontend that only handles token events will miss citation attachments, refinements, and final answer validation.

// rag-stream-client.ts
interface StreamEvent {
  event_type: 'status_update' | 'token' | 'citation_attached' | 
              'refinement_update' | 'final_answer' | 'pipeline_metric' | 'error';
  request_id: string;
  timestamp: string;
  payload: Record<string, unknown>;
}

async function streamQuery(query: string, onEvent: (e: StreamEvent) => void) {
  const response = await fetch('/query', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ query })
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const event: StreamEvent = JSON.parse(line.slice(6));
        onEvent(event);
      }
    }
  }
}

// Usage — React component state management
function handleStreamEvent(event: StreamEvent, setState: Function) {
  switch (event.event_type) {
    case 'status_update':
      setState(s => ({ ...s, status: event.payload.message }));
      break;
    case 'token':
      setState(s => ({ ...s, answer: s.answer + event.payload.token }));
      break;
    case 'citation_attached':
      setState(s => ({ ...s, citations: [...s.citations, event.payload] }));
      break;
    case 'refinement_update':
      // Replace streamed answer with refined version
      setState(s => ({
        ...s,
        answer: event.payload.refined_answer,
        citations: event.payload.additional_citations,
        refined: true
      }));
      break;
    case 'final_answer':
      setState(s => ({
        ...s,
        complete: true,
        groundedness: event.payload.groundedness_score,
        safetyPass: event.payload.safety_pass
      }));
      break;
    case 'pipeline_metric':
      // Log to observability — do not show to user
      logPipelineMetrics(event.payload);
      break;
  }
}

The Grounding Gate — What Not to Stream

This is the most important architecture decision in the entire design.

Never stream LLM tokens before minimum grounding conditions are met. The temptation is to start streaming immediately to minimize time-to-first-token. The risk is streaming a confident-sounding wrong answer — which in regulated domains (mortgage, healthcare, financial services) is worse than a slower correct answer.

Domain-specific gate thresholds at MortgageIQ:

DomainMin chunksMin scoreSafety pre-check
Loan guidelines30.72Query classification first
Balance inquiries10.60None — low risk
Underwriting decisions50.80Full slow path first
Rate quotesNever stream — always human review

For the underwriting domain, we stream status updates only until the slow path's safety check completes. The 5-second wait is acceptable because the alternative — streaming a wrong underwriting answer — is not.


Production Metrics — What to Measure

A system that optimizes only for total latency will still feel slow. Measure what users actually experience:

MetricWhat It MeasuresTarget
Time to first token (TTFT)Perceived responsiveness — how quickly does something useful appear?<2s
Time to first useful answerFirst grounded sentence visible<5s
Final answer latencyComplete, validated response<30s
Groundedness before first tokenDid retrieval pass the gate before streaming?100%
Refinement rateHow often does slow path improve the fast draft?Track — if >40%, fast retrieval is weak
User abandonment rateDid users leave before final answer?<5%
Citation coverageFraction of claims with attached source>0.85

The single most important metric: time to first token — not total latency. A system that delivers a first grounded sentence in 2 seconds and completes in 25 seconds feels fast. The same system with no streaming feels broken at 25 seconds.


Handling Bad Retrieval Gracefully

When retrieval fails the grounding gate, do not leave the user with nothing:

async def handle_weak_retrieval(request_id: str, query: str) -> AsyncGenerator:
    yield make_status(request_id, "Still searching trusted sources...", "retrieval_retry").to_sse()

    # Try expanded search
    expanded_chunks = await retrieve_chunks(
        query,
        top_k=20,
        expand_keywords=True,
        fallback_index=True
    )

    if not expanded_chunks or max(c.score for c in expanded_chunks) < 0.50:
        # Genuinely cannot ground an answer
        yield StreamEvent(
            event_type=EventType.FINAL_ANSWER,
            request_id=request_id,
            timestamp=datetime.utcnow().isoformat(),
            payload={
                "complete": True,
                "answer": (
                    "I was unable to find specific guidance on this in the current "
                    "documentation. Please review this with your team or contact support."
                ),
                "groundedness_score": 0.0,
                "citations": [],
                "safe_fallback": True
            }
        ).to_sse()
        return

    # Otherwise continue with expanded chunks
    async for event in continue_pipeline(request_id, query, expanded_chunks):
        yield event

A safe, honest fallback is always better than a confident ungrounded answer.


Key Takeaways

  • Time to first token and time to final answer are different problems — optimizing retrieval latency while making users wait for all steps is the wrong fix. Streaming architecture is the right fix.
  • Split the pipeline into fast path and slow path — fast path runs the minimum viable retrieval and starts token streaming; slow path runs deeper reranking, citation verification, and safety checks in parallel.
  • Use a structured event protocol, not raw text streamingstatus_update, token, citation_attached, refinement_update, final_answer events let the frontend handle state properly and the backend improve the response as more pipeline steps complete.
  • Never stream before minimum grounding conditions are met — streaming ungrounded content in regulated domains is architecturally worse than a slower correct answer. The grounding gate is non-negotiable.
  • Measure time to first token, not just total latency — a 25-second pipeline that delivers grounded tokens in 2 seconds feels fast; the same pipeline with no streaming feels broken.
  • Design safe fallbacks for weak retrieval — show honest "still searching" status, expand the search scope, and return a safe acknowledged response rather than a confident wrong answer.