Enrichment for llamaindex. It goes for a long time using local model, so better use external model not local, for EMBEDDING
This commit is contained in:
@@ -27,9 +27,9 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
|||||||
|
|
||||||
# Phase 4 (creating module for loading documents from the folder)
|
# Phase 4 (creating module for loading documents from the folder)
|
||||||
|
|
||||||
- [ ] Create file `enrichment.py` with the function that will load data with configured data loaders for extensions from the data folder into the chosen vector storage. Remember to specify default embeddings meta properties, such as filename, paragraph, page, section, wherever this is possible (documents can have pages, sections, paragraphs, etc). Use text splitters of the chosen RAG framework accordingly to the documents being loaded. Which chunking/text-splitting strategies framework has, can be learned online.
|
- [x] Create file `enrichment.py` with the function that will load data with configured data loaders for extensions from the data folder into the chosen vector storage. Remember to specify default embeddings meta properties, such as filename, paragraph, page, section, wherever this is possible (documents can have pages, sections, paragraphs, etc). Use text splitters of the chosen RAG framework accordingly to the documents being loaded. Which chunking/text-splitting strategies framework has, can be learned online.
|
||||||
- [ ] Use built-in strategy for marking which documents loaded (if there is such mechanism) and which are not, to avoid re-reading and re-encriching vector storage with the existing data. If there is no built-in mechanism of this type, install sqlite library and use local sqlite database file `document_tracking.db` to store this information. Important: mark documents as read and processed ONLY when they were stored in the vector storage, to avoid marked documents being ignored when they in fact were not yet been inserted and processed.
|
- [x] Use built-in strategy for marking which documents loaded (if there is such mechanism) and which are not, to avoid re-reading and re-encriching vector storage with the existing data. If there is no built-in mechanism of this type, install sqlite library and use local sqlite database file `document_tracking.db` to store this information. Important: mark documents as read and processed ONLY when they were stored in the vector storage, to avoid marked documents being ignored when they in fact were not yet been inserted and processed.
|
||||||
- [ ] Add activation of this function in the cli entrypoint, as a command.
|
- [x] Add activation of this function in the cli entrypoint, as a command.
|
||||||
|
|
||||||
# Phase 5 (preparation for the retrieval feature)
|
# Phase 5 (preparation for the retrieval feature)
|
||||||
|
|
||||||
|
|||||||
@@ -91,10 +91,10 @@ This is a Retrieval Augmented Generation (RAG) solution built using LlamaIndex a
|
|||||||
- [x] Optional OpenAI embedding via OpenRouter (commented)
|
- [x] Optional OpenAI embedding via OpenRouter (commented)
|
||||||
|
|
||||||
### Phase 4: Document Enrichment
|
### Phase 4: Document Enrichment
|
||||||
- [ ] Document loading module with appropriate loaders
|
- [x] Document loading module with appropriate loaders
|
||||||
- [ ] Text splitting strategies implementation
|
- [x] Text splitting strategies implementation
|
||||||
- [ ] Document tracking mechanism
|
- [x] Document tracking mechanism
|
||||||
- [ ] CLI command for enrichment
|
- [x] CLI command for enrichment
|
||||||
|
|
||||||
### Phase 5: Retrieval Feature
|
### Phase 5: Retrieval Feature
|
||||||
- [ ] Retrieval module configuration
|
- [ ] Retrieval module configuration
|
||||||
|
|||||||
@@ -57,5 +57,35 @@ def ping(verbose):
|
|||||||
logger.info("Ping command executed")
|
logger.info("Ping command executed")
|
||||||
|
|
||||||
|
|
||||||
|
@main.command(help="Load and process documents from the data folder into vector storage")
|
||||||
|
@click.option('--data-path', '-d', default="../../../data", help="Path to the data folder relative to current directory")
|
||||||
|
@click.option('--recursive', '-r', default=True, is_flag=True, help="Process subdirectories recursively")
|
||||||
|
@click.option('--verbose', '-v', is_flag=True, help="Enable verbose output")
|
||||||
|
def enrich(data_path, recursive, verbose):
|
||||||
|
"""Load and process documents from the data folder into vector storage."""
|
||||||
|
if verbose:
|
||||||
|
logger.enable("__main__")
|
||||||
|
|
||||||
|
logger.info(f"Starting document enrichment from: {data_path}")
|
||||||
|
logger.info(f"Recursive processing: {recursive}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Import the enrichment module
|
||||||
|
from enrichment import enrich_documents, process_documents_from_data_folder
|
||||||
|
logger.info("Enrichment module imported successfully")
|
||||||
|
|
||||||
|
# Call the enrichment function
|
||||||
|
process_documents_from_data_folder(data_path=data_path, recursive=recursive)
|
||||||
|
|
||||||
|
logger.info("Document enrichment completed successfully")
|
||||||
|
click.echo("Document enrichment completed successfully")
|
||||||
|
except ImportError as e:
|
||||||
|
logger.error(f"Failed to import enrichment module: {e}")
|
||||||
|
click.echo(f"Error: Could not import enrichment module: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during document enrichment: {e}")
|
||||||
|
click.echo(f"Error during document enrichment: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
312
services/rag/llamaindex/enrichment.py
Normal file
312
services/rag/llamaindex/enrichment.py
Normal file
@@ -0,0 +1,312 @@
|
|||||||
|
"""
|
||||||
|
Document enrichment module for the RAG solution.
|
||||||
|
|
||||||
|
This module handles loading documents from the data directory,
|
||||||
|
processing them with appropriate loaders, splitting them into chunks,
|
||||||
|
and storing them in the vector database with proper metadata.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import hashlib
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from datetime import datetime
|
||||||
|
import sqlite3
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from llama_index.core import SimpleDirectoryReader, Document
|
||||||
|
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
|
||||||
|
# Removed unused import
|
||||||
|
|
||||||
|
from vector_storage import get_vector_store_and_index
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentTracker:
|
||||||
|
"""Class to handle tracking of processed documents to avoid re-processing."""
|
||||||
|
|
||||||
|
def __init__(self, db_path: str = "document_tracking.db"):
|
||||||
|
self.db_path = db_path
|
||||||
|
self._init_db()
|
||||||
|
|
||||||
|
def _init_db(self):
|
||||||
|
"""Initialize the SQLite database for document tracking."""
|
||||||
|
conn = sqlite3.connect(self.db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Create table for tracking processed documents
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS processed_documents (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
filename TEXT UNIQUE NOT NULL,
|
||||||
|
filepath TEXT NOT NULL,
|
||||||
|
checksum TEXT NOT NULL,
|
||||||
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
metadata_json TEXT
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
logger.info(f"Document tracker initialized with database: {self.db_path}")
|
||||||
|
|
||||||
|
def is_document_processed(self, filepath: str) -> bool:
|
||||||
|
"""Check if a document has already been processed."""
|
||||||
|
conn = sqlite3.connect(self.db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Calculate checksum of the file
|
||||||
|
checksum = self._calculate_checksum(filepath)
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT COUNT(*) FROM processed_documents WHERE filepath = ? AND checksum = ?",
|
||||||
|
(filepath, checksum)
|
||||||
|
)
|
||||||
|
count = cursor.fetchone()[0]
|
||||||
|
|
||||||
|
conn.close()
|
||||||
|
return count > 0
|
||||||
|
|
||||||
|
def mark_document_processed(self, filepath: str, metadata: Dict[str, Any] = None):
|
||||||
|
"""Mark a document as processed in the database."""
|
||||||
|
conn = sqlite3.connect(self.db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
checksum = self._calculate_checksum(filepath)
|
||||||
|
filename = Path(filepath).name
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('''
|
||||||
|
INSERT OR REPLACE INTO processed_documents
|
||||||
|
(filename, filepath, checksum, processed_at, metadata_json)
|
||||||
|
VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)
|
||||||
|
''', (filename, filepath, checksum, str(metadata) if metadata else None))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
logger.info(f"Document marked as processed: {filepath}")
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
logger.error(f"Error marking document as processed: {e}")
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def _calculate_checksum(self, filepath: str) -> str:
|
||||||
|
"""Calculate MD5 checksum of a file."""
|
||||||
|
hash_md5 = hashlib.md5()
|
||||||
|
with open(filepath, "rb") as f:
|
||||||
|
# Read file in chunks to handle large files efficiently
|
||||||
|
for chunk in iter(lambda: f.read(4096), b""):
|
||||||
|
hash_md5.update(chunk)
|
||||||
|
return hash_md5.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def get_text_splitter(file_extension: str):
|
||||||
|
"""Get appropriate text splitter based on file type."""
|
||||||
|
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter, TokenTextSplitter
|
||||||
|
from llama_index.core.node_parser import MarkdownElementNodeParser
|
||||||
|
|
||||||
|
# For code files, use CodeSplitter
|
||||||
|
if file_extension.lower() in ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.cs', '.go', '.rs', '.php', '.html', '.css', '.md', '.rst']:
|
||||||
|
return CodeSplitter(language="python", max_chars=1000)
|
||||||
|
|
||||||
|
# For PDF files, use a parser that can handle multi-page documents
|
||||||
|
elif file_extension.lower() == '.pdf':
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=512, # Smaller chunks for dense PDF content
|
||||||
|
chunk_overlap=100
|
||||||
|
)
|
||||||
|
|
||||||
|
# For presentation files (PowerPoint), use smaller chunks
|
||||||
|
elif file_extension.lower() == '.pptx':
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=256, # Slides typically have less text
|
||||||
|
chunk_overlap=50
|
||||||
|
)
|
||||||
|
|
||||||
|
# For spreadsheets, use smaller chunks
|
||||||
|
elif file_extension.lower() == '.xlsx':
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=256,
|
||||||
|
chunk_overlap=50
|
||||||
|
)
|
||||||
|
|
||||||
|
# For text-heavy documents like Word, use medium-sized chunks
|
||||||
|
elif file_extension.lower() in ['.docx', '.odt']:
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=768,
|
||||||
|
chunk_overlap=150
|
||||||
|
)
|
||||||
|
|
||||||
|
# For plain text files, use larger chunks
|
||||||
|
elif file_extension.lower() == '.txt':
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=1024,
|
||||||
|
chunk_overlap=200
|
||||||
|
)
|
||||||
|
|
||||||
|
# For image files, we'll handle them differently (metadata extraction)
|
||||||
|
elif file_extension.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg']:
|
||||||
|
# Images will be handled by multimodal models, return a simple splitter
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=512,
|
||||||
|
chunk_overlap=100
|
||||||
|
)
|
||||||
|
|
||||||
|
# For other files, use a standard SentenceSplitter
|
||||||
|
else:
|
||||||
|
return SentenceSplitter(
|
||||||
|
chunk_size=768,
|
||||||
|
chunk_overlap=150
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def process_documents_from_data_folder(data_path: str = "../../../data", recursive: bool = True):
|
||||||
|
"""
|
||||||
|
Process all documents from the data folder using appropriate loaders and store in vector DB.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_path: Path to the data folder relative to current directory
|
||||||
|
recursive: Whether to process subdirectories recursively
|
||||||
|
"""
|
||||||
|
logger.info(f"Starting document enrichment from: {data_path}")
|
||||||
|
|
||||||
|
# Initialize document tracker
|
||||||
|
tracker = DocumentTracker()
|
||||||
|
|
||||||
|
# Initialize vector storage
|
||||||
|
vector_store, index = get_vector_store_and_index()
|
||||||
|
|
||||||
|
# Get the absolute path to the data directory
|
||||||
|
# The data_path is relative to the current working directory
|
||||||
|
data_abs_path = Path(data_path)
|
||||||
|
|
||||||
|
# If the path is relative, resolve it from the current working directory
|
||||||
|
if not data_abs_path.is_absolute():
|
||||||
|
data_abs_path = Path.cwd() / data_abs_path
|
||||||
|
|
||||||
|
logger.info(f"Looking for documents in: {data_abs_path.absolute()}")
|
||||||
|
|
||||||
|
if not data_abs_path.exists():
|
||||||
|
logger.error(f"Data directory does not exist: {data_abs_path.absolute()}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Find all supported files in the data directory
|
||||||
|
supported_extensions = {
|
||||||
|
'.pdf', '.docx', '.xlsx', '.pptx', '.odt', '.txt',
|
||||||
|
'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg',
|
||||||
|
'.zip', '.rar', '.tar', '.gz'
|
||||||
|
}
|
||||||
|
|
||||||
|
# Walk through the directory structure
|
||||||
|
all_files = []
|
||||||
|
if recursive:
|
||||||
|
for root, dirs, files in os.walk(data_abs_path):
|
||||||
|
for file in files:
|
||||||
|
file_ext = Path(file).suffix.lower()
|
||||||
|
if file_ext in supported_extensions:
|
||||||
|
all_files.append(os.path.join(root, file))
|
||||||
|
else:
|
||||||
|
for file in data_abs_path.iterdir():
|
||||||
|
if file.is_file():
|
||||||
|
file_ext = file.suffix.lower()
|
||||||
|
if file_ext in supported_extensions:
|
||||||
|
all_files.append(str(file))
|
||||||
|
|
||||||
|
logger.info(f"Found {len(all_files)} files to process")
|
||||||
|
|
||||||
|
processed_count = 0
|
||||||
|
skipped_count = 0
|
||||||
|
|
||||||
|
for file_path in all_files:
|
||||||
|
logger.info(f"Processing file: {file_path}")
|
||||||
|
|
||||||
|
# Check if document has already been processed
|
||||||
|
if tracker.is_document_processed(file_path):
|
||||||
|
logger.info(f"Skipping already processed file: {file_path}")
|
||||||
|
skipped_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Load the document using SimpleDirectoryReader
|
||||||
|
# This automatically selects the appropriate reader based on file extension
|
||||||
|
def file_metadata_func(file_path_str):
|
||||||
|
return {"filename": Path(file_path_str).name}
|
||||||
|
|
||||||
|
reader = SimpleDirectoryReader(
|
||||||
|
input_files=[file_path],
|
||||||
|
file_metadata=file_metadata_func
|
||||||
|
)
|
||||||
|
documents = reader.load_data()
|
||||||
|
|
||||||
|
# Process each document
|
||||||
|
for doc in documents:
|
||||||
|
# Extract additional metadata based on document type
|
||||||
|
file_ext = Path(file_path).suffix
|
||||||
|
|
||||||
|
# Add additional metadata
|
||||||
|
doc.metadata["file_path"] = file_path
|
||||||
|
doc.metadata["processed_at"] = datetime.now().isoformat()
|
||||||
|
|
||||||
|
# Handle document-type-specific metadata
|
||||||
|
if file_ext.lower() == '.pdf':
|
||||||
|
# PDF-specific metadata
|
||||||
|
doc.metadata["page_label"] = doc.metadata.get("page_label", "unknown")
|
||||||
|
doc.metadata["file_type"] = "pdf"
|
||||||
|
|
||||||
|
elif file_ext.lower() in ['.docx', '.odt']:
|
||||||
|
# Word document metadata
|
||||||
|
doc.metadata["section"] = doc.metadata.get("section", "unknown")
|
||||||
|
doc.metadata["file_type"] = "document"
|
||||||
|
|
||||||
|
elif file_ext.lower() == '.pptx':
|
||||||
|
# PowerPoint metadata
|
||||||
|
doc.metadata["slide_id"] = doc.metadata.get("slide_id", "unknown")
|
||||||
|
doc.metadata["file_type"] = "presentation"
|
||||||
|
|
||||||
|
elif file_ext.lower() == '.xlsx':
|
||||||
|
# Excel metadata
|
||||||
|
doc.metadata["sheet_name"] = doc.metadata.get("sheet_name", "unknown")
|
||||||
|
doc.metadata["file_type"] = "spreadsheet"
|
||||||
|
|
||||||
|
# Determine the appropriate text splitter based on file type
|
||||||
|
splitter = get_text_splitter(file_ext)
|
||||||
|
|
||||||
|
# Split the document into nodes
|
||||||
|
nodes = splitter.get_nodes_from_documents([doc])
|
||||||
|
|
||||||
|
# Insert nodes into the vector index
|
||||||
|
nodes_with_enhanced_metadata = []
|
||||||
|
for i, node in enumerate(nodes):
|
||||||
|
# Enhance node metadata with additional information
|
||||||
|
node.metadata["original_doc_id"] = doc.doc_id
|
||||||
|
node.metadata["chunk_number"] = i
|
||||||
|
node.metadata["total_chunks"] = len(nodes)
|
||||||
|
node.metadata["file_path"] = file_path
|
||||||
|
nodes_with_enhanced_metadata.append(node)
|
||||||
|
|
||||||
|
# Add all nodes to the index at once
|
||||||
|
if nodes_with_enhanced_metadata:
|
||||||
|
index.insert_nodes(nodes_with_enhanced_metadata)
|
||||||
|
|
||||||
|
logger.info(f"Processed {len(nodes)} nodes from {file_path}")
|
||||||
|
|
||||||
|
# Mark document as processed only after successful insertion
|
||||||
|
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
|
||||||
|
processed_count += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing file {file_path}: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}")
|
||||||
|
|
||||||
|
|
||||||
|
def enrich_documents():
|
||||||
|
"""Main function to run the document enrichment process."""
|
||||||
|
logger.info("Starting document enrichment process")
|
||||||
|
process_documents_from_data_folder()
|
||||||
|
logger.info("Document enrichment process completed")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Example usage
|
||||||
|
logger.info("Running document enrichment...")
|
||||||
|
enrich_documents()
|
||||||
@@ -43,7 +43,7 @@ def initialize_vector_storage(
|
|||||||
|
|
||||||
# Get embedding model from environment if not provided
|
# Get embedding model from environment if not provided
|
||||||
if ollama_embed_model is None:
|
if ollama_embed_model is None:
|
||||||
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
|
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
||||||
|
|
||||||
logger.info(f"Using Ollama embedding model: {ollama_embed_model}")
|
logger.info(f"Using Ollama embedding model: {ollama_embed_model}")
|
||||||
|
|
||||||
@@ -51,6 +51,16 @@ def initialize_vector_storage(
|
|||||||
# Initialize Qdrant client
|
# Initialize Qdrant client
|
||||||
client = QdrantClient(host=host, port=port)
|
client = QdrantClient(host=host, port=port)
|
||||||
|
|
||||||
|
# Initialize the embedding model first to get the correct dimensions
|
||||||
|
embed_model = OllamaEmbedding(
|
||||||
|
model_name=ollama_embed_model,
|
||||||
|
base_url=ollama_base_url
|
||||||
|
)
|
||||||
|
# Get a test embedding to determine the correct size
|
||||||
|
test_embedding = embed_model.get_query_embedding("test")
|
||||||
|
embedding_dimension = len(test_embedding)
|
||||||
|
logger.info(f"Detected embedding dimension: {embedding_dimension}")
|
||||||
|
|
||||||
# Check if collection exists, create if it doesn't
|
# Check if collection exists, create if it doesn't
|
||||||
collections = client.get_collections().collections
|
collections = client.get_collections().collections
|
||||||
collection_names = [coll.name for coll in collections]
|
collection_names = [coll.name for coll in collections]
|
||||||
@@ -60,13 +70,45 @@ def initialize_vector_storage(
|
|||||||
client.create_collection(
|
client.create_collection(
|
||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
vectors_config={
|
vectors_config={
|
||||||
"size": 4096, # Default size for most embedding models
|
"size": embedding_dimension, # Use the actual embedding size
|
||||||
"distance": "Cosine" # Cosine distance is commonly used
|
"distance": "Cosine" # Cosine distance is commonly used
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
logger.info(f"Collection '{collection_name}' created successfully")
|
logger.info(f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}")
|
||||||
else:
|
else:
|
||||||
logger.info(f"Collection '{collection_name}' already exists")
|
logger.info(f"Collection '{collection_name}' already exists")
|
||||||
|
# Get the actual collection config to determine the vector size
|
||||||
|
collection_info = client.get_collection(collection_name)
|
||||||
|
# Access the vector configuration properly - handle different possible structures
|
||||||
|
if hasattr(collection_info.config.params, 'vectors') and collection_info.config.params.vectors is not None:
|
||||||
|
existing_dimension = collection_info.config.params.vectors.size
|
||||||
|
if existing_dimension != embedding_dimension:
|
||||||
|
logger.warning(f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating...")
|
||||||
|
# Delete and recreate the collection with the correct dimensions
|
||||||
|
client.delete_collection(collection_name)
|
||||||
|
client.create_collection(
|
||||||
|
collection_name=collection_name,
|
||||||
|
vectors_config={
|
||||||
|
"size": embedding_dimension, # Use the detected size
|
||||||
|
"distance": "Cosine"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Using existing collection with matching dimension: {embedding_dimension}")
|
||||||
|
else:
|
||||||
|
# Last resort: recreate the collection with the correct dimensions
|
||||||
|
logger.warning(f"Could not determine vector dimension for existing collection, recreating...")
|
||||||
|
# Delete and recreate the collection with the correct dimensions
|
||||||
|
client.delete_collection(collection_name)
|
||||||
|
client.create_collection(
|
||||||
|
collection_name=collection_name,
|
||||||
|
vectors_config={
|
||||||
|
"size": embedding_dimension, # Use the detected size
|
||||||
|
"distance": "Cosine"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
|
||||||
|
|
||||||
# Initialize the Qdrant vector store
|
# Initialize the Qdrant vector store
|
||||||
vector_store = QdrantVectorStore(
|
vector_store = QdrantVectorStore(
|
||||||
@@ -74,13 +116,7 @@ def initialize_vector_storage(
|
|||||||
collection_name=collection_name
|
collection_name=collection_name
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize Ollama embedding
|
# Create index from vector store with the embedding model we already created
|
||||||
embed_model = OllamaEmbedding(
|
|
||||||
model_name=ollama_embed_model,
|
|
||||||
base_url=ollama_base_url
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create index from vector store with the embedding model
|
|
||||||
index = VectorStoreIndex.from_vector_store(
|
index = VectorStoreIndex.from_vector_store(
|
||||||
vector_store=vector_store,
|
vector_store=vector_store,
|
||||||
embed_model=embed_model
|
embed_model=embed_model
|
||||||
@@ -116,7 +152,9 @@ def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
|||||||
Returns:
|
Returns:
|
||||||
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
||||||
"""
|
"""
|
||||||
return initialize_vector_storage()
|
# Get the embedding model from environment variables
|
||||||
|
embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
||||||
|
return initialize_vector_storage(ollama_embed_model=embed_model)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user