Day 8: Data Pipelines
Learning Objectives
- Understand the ETL pattern applied to AI (chunk, embed, index)
- Know when batch vs streaming pipelines are appropriate
- Build a pipeline that watches a directory, chunks documents, computes embeddings, and upserts to a vector store
Theory (15 min)
The Data Flywheel
Every AI system starts with data. The quality of your data pipeline determines the ceiling of your system's performance.
Source ──▶ Extract ──▶ Transform ──▶ Load ──▶ Serve
│ │ │ │
Files, Clean, Vector Search /
APIs, Chunk, DB / Retrieval
DBs Embed Object Store
Batch vs Streaming
| Characteristic | Batch | Streaming |
|---|---|---|
| Frequency | Hourly/Daily | Real-time |
| Latency | High (minutes+) | Low (seconds) |
| Infrastructure | Cron + script | Kafka + Flink |
| Complexity | Low | High |
| Best for | Document ingestion, training data | Chat logs, clickstream, monitoring |
For RAG pipelines, batch is usually sufficient. A document corpus doesn't change every second.
The RAG Ingestion Pipeline
┌─────────────┐
Raw Document ──▶ │ Splitter │──▶ Chunks (512-1024 tokens)
└─────────────┘
│
▼
┌─────────────┐
│ Embedder │──▶ Vector (384-1536 dims)
└─────────────┘
│
▼
┌─────────────┐
│ Indexer │──▶ Vector DB (Qdrant, Chroma, Pinecone)
└─────────────┘
Key decisions:
- Chunk size: 256-1024 tokens (too small = lost context, too large = noise)
- Chunk overlap: 10-20% to prevent boundary splitting breaking concepts
- Embedding model: BAAI/bge-small-en-v1.5 (384 dims, fast) vs text-embedding-3-large (3072 dims, expensive)
- Index type: HNSW (fast, high recall) vs IVF (scales to billions)
Hands-on (15 min)
Build a File Watcher → Embedder Pipeline
# Prerequisites: pip install chromadb sentence-transformers
# Or use Ollama for embeddings
#!/usr/bin/env python3
"""data-pipeline-watcher.py — watch dir, chunk, embed, index."""
import os
import time
import hashlib
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Stub — Ayva will expand with:
# - PDF/text/markdown parsing
# - configurable chunking (by paragraph, by token count)
# - embedding via sentence-transformers or local llama.cpp
# - upsert to ChromaDB / Qdrant
# - dedup by content hash
WATCH_DIR = Path(os.getenv("WATCH_DIR", "./data/inbox"))
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "512"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "64"))
class DocumentHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
print(f"📄 New file detected: {event.src_path}")
self.process_file(Path(event.src_path))
def process_file(self, path):
# 1. Read file
text = path.read_text(encoding="utf-8", errors="replace")
# 2. Chunk
chunks = self.chunk_text(text)
# 3. Embed + index — TODO
print(f" → {len(chunks)} chunks generated")
for i, chunk in enumerate(chunks[:2]):
print(f" Chunk {i}: {chunk[:60]}...")
if len(chunks) > 2:
print(f" ... and {len(chunks)-2} more")
def chunk_text(self, text: str):
"""Simple chunk by paragraph, merge until near chunk_size."""
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks = []
current = ""
for p in paragraphs:
if len(current) + len(p) < CHUNK_SIZE:
current += "\n\n" + p if current else p
else:
if current:
chunks.append(current)
current = p
if current:
chunks.append(current)
return chunks
if __name__ == "__main__":
WATCH_DIR.mkdir(parents=True, exist_ok=True)
print(f"👀 Watching {WATCH_DIR} for new files...")
observer = Observer()
observer.schedule(DocumentHandler(), str(WATCH_DIR), recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Key questions for Ayva to research: - Best chunking strategy for markdown vs PDF vs code - Embedding model comparison (speed vs quality tradeoffs) - Metadata extraction (title, date, source URL) for better retrieval
Key Takeaways
- Data pipelines are the foundation of any RAG system
- Batch ingestion is simpler and sufficient for most use cases
- Chunk size and overlap are the most impactful knobs to tune
- The pipeline pattern (watch → process → load) is reusable across all data sources