🧠 AI System Design

Day 8: Data Pipelines

📂 Data & Training 📖 15 min read Needs expansion

Learning Objectives

  • Understand the ETL pattern applied to AI (chunk, embed, index)
  • Know when batch vs streaming pipelines are appropriate
  • Build a pipeline that watches a directory, chunks documents, computes embeddings, and upserts to a vector store

Theory (15 min)

The Data Flywheel

Every AI system starts with data. The quality of your data pipeline determines the ceiling of your system's performance.

Source ──▶ Extract ──▶ Transform ──▶ Load ──▶ Serve
           │            │              │         │
         Files,      Clean,         Vector     Search /
         APIs,       Chunk,        DB /       Retrieval
         DBs         Embed         Object Store

Batch vs Streaming

Characteristic Batch Streaming
Frequency Hourly/Daily Real-time
Latency High (minutes+) Low (seconds)
Infrastructure Cron + script Kafka + Flink
Complexity Low High
Best for Document ingestion, training data Chat logs, clickstream, monitoring

For RAG pipelines, batch is usually sufficient. A document corpus doesn't change every second.

The RAG Ingestion Pipeline

                    ┌─────────────┐
Raw Document ──▶    │  Splitter   │──▶ Chunks (512-1024 tokens)
                    └─────────────┘
                           │
                           ▼
                    ┌─────────────┐
                    │  Embedder   │──▶ Vector (384-1536 dims)
                    └─────────────┘
                           │
                           ▼
                    ┌─────────────┐
                    │  Indexer    │──▶ Vector DB (Qdrant, Chroma, Pinecone)
                    └─────────────┘

Key decisions: - Chunk size: 256-1024 tokens (too small = lost context, too large = noise) - Chunk overlap: 10-20% to prevent boundary splitting breaking concepts - Embedding model: BAAI/bge-small-en-v1.5 (384 dims, fast) vs text-embedding-3-large (3072 dims, expensive) - Index type: HNSW (fast, high recall) vs IVF (scales to billions)


Hands-on (15 min)

Build a File Watcher → Embedder Pipeline

# Prerequisites: pip install chromadb sentence-transformers
# Or use Ollama for embeddings
#!/usr/bin/env python3
"""data-pipeline-watcher.py — watch dir, chunk, embed, index."""
import os
import time
import hashlib
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# Stub — Ayva will expand with:
# - PDF/text/markdown parsing
# - configurable chunking (by paragraph, by token count)
# - embedding via sentence-transformers or local llama.cpp
# - upsert to ChromaDB / Qdrant
# - dedup by content hash

WATCH_DIR = Path(os.getenv("WATCH_DIR", "./data/inbox"))
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "512"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "64"))

class DocumentHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return
        print(f"📄 New file detected: {event.src_path}")
        self.process_file(Path(event.src_path))

    def process_file(self, path):
        # 1. Read file
        text = path.read_text(encoding="utf-8", errors="replace")
        # 2. Chunk
        chunks = self.chunk_text(text)
        # 3. Embed + index — TODO
        print(f"   → {len(chunks)} chunks generated")
        for i, chunk in enumerate(chunks[:2]):
            print(f"      Chunk {i}: {chunk[:60]}...")
        if len(chunks) > 2:
            print(f"      ... and {len(chunks)-2} more")

    def chunk_text(self, text: str):
        """Simple chunk by paragraph, merge until near chunk_size."""
        paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
        chunks = []
        current = ""
        for p in paragraphs:
            if len(current) + len(p) < CHUNK_SIZE:
                current += "\n\n" + p if current else p
            else:
                if current:
                    chunks.append(current)
                current = p
        if current:
            chunks.append(current)
        return chunks

if __name__ == "__main__":
    WATCH_DIR.mkdir(parents=True, exist_ok=True)
    print(f"👀 Watching {WATCH_DIR} for new files...")
    observer = Observer()
    observer.schedule(DocumentHandler(), str(WATCH_DIR), recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

Key questions for Ayva to research: - Best chunking strategy for markdown vs PDF vs code - Embedding model comparison (speed vs quality tradeoffs) - Metadata extraction (title, date, source URL) for better retrieval


Key Takeaways

  • Data pipelines are the foundation of any RAG system
  • Batch ingestion is simpler and sufficient for most use cases
  • Chunk size and overlap are the most impactful knobs to tune
  • The pipeline pattern (watch → process → load) is reusable across all data sources

References