Core Modules
Storage
ChromaDB vector store and PostgreSQL database modules
Storage
The RAG system uses a dual-storage architecture: ChromaDB for vector embeddings and PostgreSQL for metadata.
Storage Architecture
VectorStore (vector_store.py)
ChromaDB wrapper supporting local and remote backends.
Configuration
| Mode | Setting | Description |
|---|---|---|
| Local | VECTOR_PERSIST_PATH=./data/chroma_db | Persistent local storage |
| Remote | VECTOR_HOST + VECTOR_PORT | HTTP client to ChromaDB server |
Operations
from src.vector_store import VectorStore
store = VectorStore(persist_path="./data/chroma_db", collection_name="rag_documents")
# Add documents with embeddings
store.add(ids=["doc1"], embeddings=[[0.1, 0.2, ...]], documents=["text"], metadatas=[{...}])
# Query by vector similarity
results = store.query(query_embeddings=[[0.1, 0.2, ...]], n_results=10)
# Get documents by ID
docs = store.get(ids=["doc1"])
# Delete documents
store.delete(ids=["doc1"])
# Collection stats
count = store.count()Index Type
ChromaDB uses HNSW (Hierarchical Navigable Small World) with cosine similarity for fast approximate nearest-neighbor search.
DocumentManager (document_manager.py)
Unified CRUD interface that coordinates operations across both ChromaDB and PostgreSQL:
from src import create_document_manager, Document
manager = create_document_manager(
persist_path="./data/chroma_db",
embedding_model="bge-m3:latest",
)
# Add a document
doc = Document(id="doc1", text="content", metadata={"source": "web"})
manager.add(doc)
# Upsert — only re-embeds if content changed
result = manager.upsert_many(documents)
print(f"Added: {result.added}, Updated: {result.updated}, Skipped: {result.skipped}")
# Delete operations
manager.delete("doc1")
manager.delete_by_source("https://example.com")
manager.delete_by_filter({"language": "en"})Upsert Logic
The upsert operation compares content hashes to minimize unnecessary embedding:
PostgreSQL Database (database/)
Models (models.py)
Three SQLAlchemy ORM models:
Source — tracks document origins:
| Column | Type | Description |
|---|---|---|
id | UUID | Primary key |
source_type | string | "url" or "file" |
url | text | Source URL (if scraped) |
url_hash | string | MD5 hash of URL |
filepath | text | File path (if uploaded) |
filepath_hash | string | MD5 hash of filepath |
filename | string | Original filename |
created_at | timestamp | First ingestion time |
last_scraped_at | timestamp | Last re-scrape time |
scrape_count | int | Number of times ingested |
Document — tracks individual chunks:
| Column | Type | Description |
|---|---|---|
id | string | Primary key (matches ChromaDB ID) |
source_id | UUID | Foreign key to sources |
title | string | Document title |
section | string | Category/section |
content | text | Full text content |
content_hash | string | MD5 hash of content |
doc_metadata | JSONB | Additional metadata |
scraped_at | timestamp | Ingestion timestamp |
is_active | boolean | Soft delete flag |
DuplicateLog — records duplicate detection events:
| Column | Type | Description |
|---|---|---|
id | int | Primary key |
duplicate_type | string | "url", "content", "near_duplicate" |
action_taken | string | "skipped", "updated" |
original_doc_id | string | ID of the existing document |
attempted_url | text | URL that was a duplicate |
attempted_filepath | text | File that was a duplicate |
created_at | timestamp | Detection timestamp |
Repository (repository.py)
CRUD operations with transaction support:
add_source()/get_source_by_url()/get_source_by_filepath()add_document()/get_document()/delete_document()log_duplicate()/get_duplicates()get_stats()— aggregated statistics
Session (session.py)
Connection management using SQLAlchemy:
- Connection pooling
- Session factory
- Automatic table creation
- Configurable via
DB_URLenvironment variable