Most RAG tutorials index PDFs. Most enterprise knowledge isn't in PDFs.
It's in your SQL database — loan records, product catalogs, customer profiles. It's in SharePoint — policy documents, SOPs, org charts. It's in email threads — decisions, escalations, institutional memory. It's in your APIs — real-time pricing, inventory, compliance checks. It's in Teams conversations — the informal knowledge that never makes it into documentation.
Each data source type has a different extraction strategy, a different chunking approach, and a different embedding pattern. Get one wrong and that entire knowledge domain is either missing or retrieves garbage.
This is a complete guide to every major enterprise data source — how to extract it, how to chunk it, how to embed it, and what the implementation looks like in open source and Azure stacks.
Part 2 of the Chunking series. Part 1 — Chunking Strategies for Documents covers fixed-size, recursive, semantic, document-aware, sentence window, late chunking, and hierarchical strategies for PDFs, Markdown, HTML, Word, and code files.
The Data Source Taxonomy
Each category requires a different ingestion architecture. The mistake most teams make: treating every source like a document and trying to apply the same PDF chunking pipeline to everything.
Structured Databases — SQL / Cosmos DB / Data Warehouse
Structured data is the hardest data source to RAG correctly. The data has no natural "chunks." Every row is atomic. The meaning of a row depends entirely on its schema.
The Core Problem
Rule: never embed raw column values. Always convert rows to natural language via a template before embedding. The embedding model was trained on natural language — pipe-delimited column values produce meaningless vectors.
Chunking Strategy — Row-to-Text Templates
# Template-based row serialization
def row_to_text(row: dict, schema_context: str) -> str:
return f"""
Loan Record:
A {row['loan_type']} loan of ${row['amount']:,} was {row['status'].lower()}
on {row['decision_date']}. The interest rate is {row['rate']}%
for a {row['term_years']}-year term. Loan ID: {row['loan_id']}.
Borrower credit score: {row['credit_score']}.
Loan-to-value ratio: {row['ltv']}%.
""".strip()
# Each row = one chunk
# Metadata: table_name, primary_key, updated_at, schema_version
For wide tables (50+ columns): don't serialize all columns. Identify the columns that users actually query against — typically 8–15 columns — and serialize only those. Unused columns add noise and token cost without improving retrieval.
For related tables (JOINs): denormalize before embedding. A loan record without the borrower's name and property address is incomplete context. Pre-JOIN the relevant related data and embed the denormalized view.
-- Pre-JOIN view for embedding
CREATE VIEW vw_loan_embed AS
SELECT
l.loan_id, l.amount, l.type, l.rate, l.status,
b.first_name + ' ' + b.last_name AS borrower_name,
p.address, p.city, p.state, p.zip,
lt.description AS loan_type_description
FROM loans l
JOIN borrowers b ON l.borrower_id = b.id
JOIN properties p ON l.property_id = p.id
JOIN loan_types lt ON l.type = lt.code
Real-Time vs Batch Indexing
Structured data changes. You have two options:
Batch: simpler, lower cost, acceptable for data that changes daily or less (guidelines, product catalogs, reference data).
CDC (Change Data Capture): SQL Server CDC or Debezium for PostgreSQL → Kafka → embedding consumer → vector index. Use for data where freshness matters (loan status, pricing, inventory). At MortgageIQ, rate sheets are CDC-indexed — a rate change is in the vector index within 3 minutes of the SQL update.
Open Source Implementation
from llama_index.core import Document
from llama_index.readers.database import DatabaseReader
reader = DatabaseReader(
sql_database=sql_engine,
query="""
SELECT loan_id, amount, type, rate, status, decision_date, credit_score, ltv,
b.first_name + ' ' + b.last_name AS borrower_name
FROM vw_loan_embed
WHERE updated_at > :last_indexed
""",
params={"last_indexed": last_run_timestamp}
)
documents = reader.load_data()
# Apply text template to each row before indexing
docs_with_text = [
Document(text=row_to_text(doc.metadata), metadata=doc.metadata)
for doc in documents
]
Azure Implementation
Azure AI Search + Azure SQL / Cosmos DB — native indexer:
{
"name": "loan-sql-indexer",
"dataSourceName": "azure-sql-loans",
"targetIndexName": "mortgage-rag-index",
"schedule": { "interval": "PT1H" },
"parameters": {
"configuration": {
"queryTimeout": "00:05:00"
}
},
"fieldMappings": [
{ "sourceFieldName": "loan_embed_text", "targetFieldName": "content" },
{ "sourceFieldName": "loan_id", "targetFieldName": "chunk_id" },
{ "sourceFieldName": "updated_at", "targetFieldName": "last_modified" }
]
}
Azure AI Search's SQL indexer runs on a schedule, detects changed rows via a high-watermark column (updated_at), and upserts only changed documents. Built-in retry, monitoring, and error logging via Azure Monitor.
For Cosmos DB: use the Cosmos DB change feed → Azure Function → AI Search upsert pattern for near-real-time indexing.
SharePoint — The Enterprise Knowledge Silo
SharePoint is where most enterprise policy, procedure, and governance documentation lives. It's also the most structurally inconsistent data source — documents range from clean Markdown to complex Word files to scanned PDFs to embedded Excel sheets.
Extraction Challenges
Chunking Strategy
SharePoint documents require document-aware chunking — respect headings, never split tables, preserve slide boundaries in PowerPoint.
Critical metadata to capture from SharePoint:
{
"source": "SharePoint",
"site_url": "https://contoso.sharepoint.com/sites/lending",
"library": "Policies",
"file_name": "FHA-Underwriting-Guidelines-2025.docx",
"file_path": "/sites/lending/Policies/FHA-Underwriting-Guidelines-2025.docx",
"last_modified": "2026-03-15T09:23:00Z",
"modified_by": "jane.smith@contoso.com",
"version": "3.2",
"content_type": "Policy Document",
"section": "Income Verification",
"page": 12
}
last_modified + version enable delta indexing — only re-index documents that have changed since the last run. Without this, re-indexing 10,000 SharePoint documents every night is a significant embedding cost.
Open Source Implementation
from llama_index.readers.microsoft_sharepoint import SharePointReader
reader = SharePointReader(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
tenant_id=TENANT_ID
)
# Load from specific site and library
documents = reader.load_data(
sharepoint_site_name="lending",
sharepoint_folder_path="Policies",
recursive=True,
file_extns=[".docx", ".pdf", ".xlsx"]
)
# Apply document-type routing (from chunking post)
from llama_index.core.node_parser import MarkdownNodeParser, HierarchicalNodeParser
nodes = route_and_chunk(documents) # dispatch by file type
Azure Implementation
Azure AI Search SharePoint Online connector (native):
{
"name": "sharepoint-datasource",
"@odata.type": "#Microsoft.Azure.Search.DataSource",
"type": "sharepoint",
"credentials": {
"connectionString": "SharePointOnlineEndpoint=https://contoso.sharepoint.com;ApplicationId=...;ApplicationSecret=..."
},
"container": {
"name": "defaultSiteLibrary",
"query": "/sites/lending/Policies"
}
}
Azure AI Search's SharePoint connector handles OAuth, delta sync, and permission preservation natively. Permission-aware retrieval is a key enterprise feature: a user can only retrieve chunks from documents they have SharePoint read access to. Azure AI Search enforces this via security trimming on the index — open source stacks require custom implementation.
Email — Outlook / Exchange
Email is the most underutilized enterprise knowledge source in RAG systems. Decisions, approvals, escalations, and institutional knowledge that never makes it into documentation live in email threads.
The Threading Problem
Individual emails are often meaningless without the thread context. The word "Yes" as an email body means nothing. "Yes, approved — proceed with the FHA exception at 50% DTI for borrower L-2048" is knowledge.
Chunking strategy: thread-based. Keep an entire email thread as one chunk (if under token limit). Split only at thread boundaries, not email boundaries. For long threads (>512 tokens), use a sliding window over the thread, preserving the first email (original question) in every chunk.
What to index vs. what to skip:
- Index: decisions, approvals, policy exceptions, escalations, client-facing summaries
- Skip: calendar invites, automated notifications, marketing, meeting scheduling
Use a subject-line classifier (a small LLM call or keyword filter) to route emails before indexing.
Open Source Implementation
from llama_index.readers.microsoft_outlook import OutlookReader
# or Microsoft Graph API directly
import msal, requests
def get_email_threads(user_id: str, folder: str = "Inbox") -> list[dict]:
# Microsoft Graph API — fetch threaded conversations
url = f"https://graph.microsoft.com/v1.0/users/{user_id}/mailFolders/{folder}/messages"
params = {
"$select": "subject,from,toRecipients,body,receivedDateTime,conversationId",
"$orderby": "conversedDateTime desc",
"$top": 100
}
# Group by conversationId to reconstruct threads
...
def thread_to_chunk(thread: list[dict]) -> str:
parts = [f"Subject: {thread[0]['subject']}"]
for email in thread:
parts.append(
f"\n[{email['receivedDateTime']}] {email['from']['name']}:\n"
f"{strip_html(email['body']['content'])}"
)
return "\n".join(parts)
Azure Implementation
Azure Communication Services + Azure AI Search for Exchange Online:
Microsoft Graph API → Azure Function (thread grouping + classification) → AI enrichment pipeline → AI Search index.
Key Azure advantage: Microsoft Purview integration — email indexing respects retention policies, legal holds, and sensitivity labels automatically. In regulated industries (lending, healthcare, finance), indexing emails without compliance guardrails is a liability.
Microsoft Teams / Slack — Conversational Knowledge
Teams and Slack conversations are dense with informal knowledge, real-time decisions, and domain expertise that never gets documented. They're also the noisiest data source.
Signal vs Noise
Chunking strategy: conversation-window. Group messages by channel + time window (30-minute sessions). A session = one chunk. Preserve: channel name, participants, timestamp, any @mentions of documents or decisions.
What makes a good Teams chunk:
- A question followed by a detailed answer
- A decision thread (even if informal)
- A channel pinned message (high signal by definition)
- Messages in dedicated knowledge channels (#architecture-decisions, #underwriting-qa)
What to skip:
- Reaction-only responses
- Single-word messages ("thanks", "ok", "+1")
- Status update bots
- Calendar/meeting bot notifications
Open Source Implementation
from slack_sdk import WebClient # or Teams Graph API
def extract_teams_channel(team_id: str, channel_id: str, since: datetime):
# Microsoft Graph API
messages = graph_client.get(
f"/teams/{team_id}/channels/{channel_id}/messages",
params={"$filter": f"lastModifiedDateTime gt {since.isoformat()}"}
)
# Group into 30-minute conversation windows
sessions = group_by_time_window(messages, window_minutes=30)
chunks = []
for session in sessions:
# Skip low-signal sessions
if is_low_signal(session):
continue
text = format_session(session) # "User1: question\nUser2: detailed answer\n..."
chunks.append({
"text": text,
"metadata": {
"source": "teams",
"channel": channel_id,
"participants": list_participants(session),
"start_time": session[0]["createdDateTime"],
"message_count": len(session)
}
})
return chunks
Azure Implementation
Microsoft Teams + Azure AI Search via Graph API connector:
Azure AI Search has a native Microsoft Teams connector (preview) that indexes channel messages and meeting transcripts. It respects Teams permissions — users only retrieve messages from channels they're members of.
Meeting transcripts are a high-value Teams data source. Teams automatically transcribes meetings — these transcripts are indexed as time-stamped conversation chunks with speaker attribution.
REST APIs — Real-Time and Cached
APIs present a fundamentally different challenge: the data doesn't exist until you call the API. You have two options: pre-cache (index API responses periodically) or live tool-call (the LLM calls the API at query time via tool-calling).
Pre-Cache Strategy
For relatively stable API data — product catalogs, loan program definitions, regulatory lookups:
import httpx
from datetime import datetime
async def index_api_endpoint(
url: str,
response_to_text: callable,
refresh_interval_hours: int = 24
):
response = await httpx.get(url, headers=auth_headers)
items = response.json()
chunks = []
for item in items:
text = response_to_text(item) # domain-specific serializer
chunks.append({
"text": text,
"metadata": {
"source": "api",
"endpoint": url,
"item_id": item["id"],
"fetched_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(hours=refresh_interval_hours)).isoformat()
}
})
# Upsert into vector index by item_id
await upsert_chunks(chunks)
Live Tool-Call Strategy
For real-time data — don't index it. Expose it as an LLM tool:
# Semantic Kernel tool / LlamaIndex FunctionTool
from semantic_kernel.functions import kernel_function
@kernel_function(
name="get_current_rates",
description="Get current mortgage rates by loan type. Use when user asks about today's rates."
)
def get_current_rates(loan_type: str) -> str:
response = rates_api.get(f"/rates/{loan_type}")
return f"Current {loan_type} rate: {response['rate']}% (as of {response['as_of']})"
Decision rule:
- Data changes less than once per day → pre-cache and index
- Data changes more than once per day OR is user/session-specific (PII) → live tool-call
- Data is real-time (stock prices, live inventory) → always tool-call, never index
Confluence / Notion — Wiki Knowledge Bases
Wiki platforms are structurally similar to SharePoint but with better API access and more consistent Markdown-like formatting. The chunking strategy is document-aware (heading-based), identical to Markdown chunking from the previous post — but the extraction layer is different.
Open Source Implementation
from llama_index.readers.confluence import ConfluenceReader
reader = ConfluenceReader(base_url="https://yourcompany.atlassian.net/wiki")
documents = reader.load_data(
space_key="LEND", # Confluence space
include_children=True,
include_attachments=False # skip binary attachments
)
# Confluence pages arrive as HTML — parse heading structure
from llama_index.core.node_parser import HTMLNodeParser
parser = HTMLNodeParser(tags=["h1", "h2", "h3", "p", "table", "code"])
nodes = parser.get_nodes_from_documents(documents)
Notion:
from llama_index.readers.notion import NotionPageReader
reader = NotionPageReader(integration_token=NOTION_TOKEN)
documents = reader.load_data(page_ids=["page_id_1", "page_id_2"])
# Notion exports as Markdown-compatible blocks — use MarkdownNodeParser
Azure Implementation
Confluence and Notion don't have native Azure AI Search connectors. Use Logic Apps or Azure Functions to poll the Confluence/Notion APIs, detect changed pages (via version.when timestamp), and push to the AI enrichment pipeline.
JIRA / Azure DevOps Tickets
Tickets are semi-structured: title, description, comments, status, labels, linked tickets. High-value for teams building developer-facing RAG systems (incident search, architecture decision lookup, sprint history).
Chunking Strategy
Keep ticket + all comments as one chunk (if under token limit). The comment thread is where the resolution lives — separating it from the original ticket destroys context.
def ticket_to_text(ticket: dict) -> str:
parts = [
f"Title: {ticket['summary']}",
f"Type: {ticket['issuetype']['name']}",
f"Status: {ticket['status']['name']}",
f"Priority: {ticket['priority']['name']}",
f"Labels: {', '.join(ticket.get('labels', []))}",
f"\nDescription:\n{ticket['description']}",
]
for comment in ticket.get("comments", []):
parts.append(f"\n[{comment['created']}] {comment['author']['displayName']}:\n{comment['body']}")
if ticket.get("resolution"):
parts.append(f"\nResolution: {ticket['resolution']['name']}")
return "\n".join(parts)
High-value metadata:
{
"source": "jira",
"ticket_id": "LEND-2048",
"type": "Bug", # Bug, Story, Epic, Incident
"status": "Resolved",
"resolution": "Fixed",
"components": ["underwriting-service", "rate-engine"],
"labels": ["production-incident", "data-loss"],
"created": "2026-01-15",
"resolved": "2026-01-16",
"sprint": "Sprint 42"
}
Azure DevOps: same pattern — Work Item API → serialize → chunk → embed. Azure AI Search has no native ADO connector; use Azure Functions triggered by ADO webhooks for near-real-time indexing.
Git Repositories / Source Code
Code is the highest-density knowledge source for engineering teams. The right code RAG system lets developers ask "how does the rate lock service work?" and get the actual implementation, not documentation.
Chunking Strategy — Language-Aware
Code must be chunked at semantic boundaries — never mid-function, never mid-class.
from llama_index.core.node_parser import CodeSplitter
# Language-specific — respects function/class boundaries
python_splitter = CodeSplitter(
language="python",
chunk_lines=50, # max lines per chunk
chunk_lines_overlap=5, # small overlap for context
max_chars=2000
)
csharp_splitter = CodeSplitter(language="csharp", chunk_lines=60)
# Critical metadata for code chunks
def code_chunk_metadata(file_path: str, chunk) -> dict:
return {
"source": "git",
"repo": "mortgageiq/loan-service",
"branch": "main",
"file_path": file_path,
"language": detect_language(file_path),
"git_commit": current_commit_sha,
"function_name": extract_function_name(chunk),
"class_name": extract_class_name(chunk),
"last_modified": git_blame_date(file_path)
}
What to index:
- Business logic services (high-value)
- API controllers and endpoints
- Configuration files (appsettings, Terraform)
- Test files — test names describe business behavior
- README and inline documentation
What to skip:
- Auto-generated code (migrations, protobuf output)
- Vendored dependencies (
node_modules,vendor/) - Build artifacts
OpenAPI / Swagger specs are a special case — index each endpoint as its own chunk with the full path, method, parameters, and response schema. This enables "what API do I call to get loan status?" retrieval.
Scanned Documents / Images — Multimodal
Scanned PDFs, photos of whiteboards, screenshots, and image-embedded text require OCR before chunking.
Azure Document Intelligence significantly outperforms open source OCR on complex layouts — multi-column documents, forms with checkboxes, tables with merged cells. For enterprise mortgage documents (loan applications, appraisals, title reports), the quality gap is material.
Confidence scores: Azure Document Intelligence returns a confidence score per word. Low-confidence regions should be flagged in chunk metadata — retrieved chunks from low-confidence OCR should be surfaced with a "OCR quality: low" warning rather than presented as ground truth.
The Enterprise Ingestion Architecture
At scale, every data source feeds into a unified ingestion pipeline. This is what a production enterprise RAG data layer looks like:
The Strategy Router
The router is the brain of the ingestion pipeline — it dispatches each document to the right chunking strategy based on file type, source, and content signals:
from enum import Enum
class ChunkingStrategy(Enum):
FIXED_SIZE = "fixed_size"
DOCUMENT_AWARE_MD = "document_aware_md"
DOCUMENT_AWARE_HTML = "document_aware_html"
HIERARCHICAL = "hierarchical"
ROW_TO_TEXT = "row_to_text"
THREAD_BASED = "thread_based"
SESSION_WINDOW = "session_window"
CODE_AWARE = "code_aware"
OCR_FIXED = "ocr_fixed"
def route_document(doc: Document) -> ChunkingStrategy:
ext = doc.metadata.get("file_extension", "")
source = doc.metadata.get("source", "")
if source in ("sql", "cosmos"):
return ChunkingStrategy.ROW_TO_TEXT
if source in ("email", "outlook"):
return ChunkingStrategy.THREAD_BASED
if source in ("teams", "slack"):
return ChunkingStrategy.SESSION_WINDOW
if ext in (".py", ".cs", ".ts", ".go", ".java"):
return ChunkingStrategy.CODE_AWARE
if ext in (".md", ".mdx"):
return ChunkingStrategy.DOCUMENT_AWARE_MD
if ext in (".html", ".htm"):
return ChunkingStrategy.DOCUMENT_AWARE_HTML
if ext in (".docx", ".pdf") and doc.metadata.get("has_structure"):
return ChunkingStrategy.HIERARCHICAL
if doc.metadata.get("is_scanned"):
return ChunkingStrategy.OCR_FIXED
return ChunkingStrategy.FIXED_SIZE # fallback
PII Detection Before Indexing
Enterprise data sources — especially email, Teams, and SQL — contain PII. Indexing PII into a shared vector store is a compliance and security risk.
# Azure AI Language — PII detection before indexing
from azure.ai.textanalytics import TextAnalyticsClient
def detect_and_redact_pii(text: str) -> tuple[str, list]:
result = ai_language_client.recognize_pii_entities([text])[0]
redacted = result.redacted_text # PII replaced with [REDACTED]
entities_found = [(e.category, e.confidence_score) for e in result.entities]
return redacted, entities_found
# Flag high-sensitivity chunks — don't index, or index into a restricted sub-index
SSN, CREDIT_CARD = "USSocialSecurityNumber", "CreditCardNumber"
high_sensitivity = [SSN, CREDIT_CARD, "BankAccountNumber"]
Open source alternative: Microsoft Presidio — open source PII detection and anonymization, supports 40+ entity types, runs locally.
Open Source vs Azure — Full Data Source Map
| Data Source | Open Source Connector | Azure Connector | Notes |
|---|---|---|---|
| SQL Database | LlamaIndex DatabaseReader | AI Search SQL indexer | Azure handles delta sync natively |
| PostgreSQL | SQLAlchemy + LlamaIndex | AI Search (via REST) | No native Azure connector — use Functions |
| Cosmos DB | LlamaIndex CosmosReader | AI Search Cosmos indexer | Native change feed support |
| SharePoint | llama_index.readers.microsoft_sharepoint | AI Search SharePoint connector | Azure enforces SharePoint permissions |
| Outlook / Exchange | Microsoft Graph API + custom | Logic Apps → AI Search | Azure Purview for compliance |
| Teams | Microsoft Graph API + custom | AI Search Teams connector (preview) | Includes meeting transcripts |
| Confluence | llama_index.readers.confluence | Logic Apps + Functions | No native connector |
| Notion | llama_index.readers.notion | Functions + webhook | No native connector |
| JIRA | llama_index.readers.jira | Functions + JIRA webhook | No native connector |
| REST API | llama_index.readers.web + custom | API Management + Functions | Live tool-call for real-time data |
| Git / GitHub | llama_index.readers.github | Functions + GitHub webhooks | Code-aware chunking required |
| Scanned PDFs | Tesseract / EasyOCR + pymupdf | AI Document Intelligence | Azure quality significantly higher |
| Kafka streams | Custom Kafka consumer | Event Hubs + Stream Analytics | CDC pattern for SQL delta |
What We Ingest at MortgageIQ
| Source | Volume | Strategy | Refresh |
|---|---|---|---|
| Azure SQL (loan records) | ~50K rows | Row-to-text template, CDC via change feed | Real-time (3 min lag) |
| SharePoint (guidelines) | ~800 documents | Azure Doc Intelligence → hierarchical + parent-child | Nightly delta |
| SharePoint (rate sheets) | ~50 files | Row-level chunks, header repeated | Daily (6am) |
| Outlook (escalation threads) | ~200 threads/month | Thread-based, classifier pre-filter | Hourly |
| Teams (#underwriting-qa) | ~500 sessions/month | Session-window, low-signal filter | Hourly |
| Azure DevOps (tickets) | ~300 tickets | Ticket + comments, resolved only | Daily |
| GitHub (loan-service repo) | ~40 services | Code-aware, function-level chunks | On push (webhook) |
| External APIs (FHA/VA limits) | 12 endpoints | Pre-cached, natural language serialized | Weekly |
The most valuable source we almost didn't index: Teams #underwriting-qa channel. Loan officers post edge cases, underwriters answer with regulatory citations. This informal Q&A contains decision logic that exists nowhere in formal documentation. Indexing it raised answer accuracy on exception queries by 34%.
Key Takeaways
- Every data source type needs a different extraction and chunking strategy — applying PDF chunking to SQL rows or email threads produces meaningless embeddings.
- Structured data (SQL, Cosmos) must be serialized to natural language before embedding — row values pipe-delimited produce vectors with no semantic meaning; text templates produce rich, queryable embeddings.
- SharePoint and email are the highest-value underutilized sources — most enterprise knowledge lives here and most RAG systems ignore it.
- Real-time data belongs in tool-calls, not indexes — live rates, current loan status, and PII-sensitive queries should be served by LLM tool-calling at query time, not pre-indexed.
- PII detection before indexing is non-negotiable in regulated industries — email, Teams, and SQL sources all contain sensitive data; use Azure AI Language or Microsoft Presidio to detect and redact before any chunk reaches the vector store.
- The strategy router is your most important ingestion component — a document classifier that dispatches each source type to the right chunking strategy is what separates a RAG prototype from a production data layer.
Coming Up in This Series
- Day 5: Evaluation — RAGAS, context recall, answer faithfulness, and how to run a retrieval A/B test in production
- Day 6: Production Patterns — caching, index freshness, multi-tenant isolation, and cost governance