Files
rag-solution/services/rag/llamaindex/vector_storage.py

168 lines
6.9 KiB
Python

"""
Vector storage configuration for the RAG solution using LlamaIndex and Qdrant.
This module provides initialization and configuration for:
- Qdrant vector storage connection
- Ollama embedding model
- Automatic collection creation
"""
import os
from typing import Optional
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from qdrant_client import QdrantClient
from loguru import logger
def initialize_vector_storage(
collection_name: str = "documents_llamaindex",
host: str = "localhost",
port: int = 6333,
grpc_port: int = 6334,
ollama_base_url: str = "http://localhost:11434",
ollama_embed_model: Optional[str] = None
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Initialize Qdrant vector storage with Ollama embeddings.
Args:
collection_name: Name of the Qdrant collection
host: Qdrant host address
port: Qdrant REST API port
grpc_port: Qdrant gRPC API port
ollama_base_url: Base URL for Ollama API
ollama_embed_model: Name of the Ollama embedding model
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
logger.info(f"Initializing vector storage with collection: {collection_name}")
# Get embedding model from environment if not provided
if ollama_embed_model is None:
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
logger.info(f"Using Ollama embedding model: {ollama_embed_model}")
try:
# Initialize Qdrant client
client = QdrantClient(host=host, port=port)
# Initialize the embedding model first to get the correct dimensions
embed_model = OllamaEmbedding(
model_name=ollama_embed_model,
base_url=ollama_base_url
)
# Get a test embedding to determine the correct size
test_embedding = embed_model.get_text_embedding("test")
embedding_dimension = len(test_embedding)
logger.info(f"Detected embedding dimension: {embedding_dimension}")
# Check if collection exists, create if it doesn't
collections = client.get_collections().collections
collection_names = [coll.name for coll in collections]
if collection_name not in collection_names:
logger.info(f"Collection '{collection_name}' does not exist, creating...")
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the actual embedding size
"distance": "Cosine" # Cosine distance is commonly used
}
)
logger.info(f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}")
else:
logger.info(f"Collection '{collection_name}' already exists")
# Get the actual collection config to determine the vector size
collection_info = client.get_collection(collection_name)
# Access the vector configuration properly - handle different possible structures
if hasattr(collection_info.config.params, 'vectors') and collection_info.config.params.vectors is not None:
existing_dimension = collection_info.config.params.vectors.size
if existing_dimension != embedding_dimension:
logger.warning(f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating...")
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine"
}
)
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
else:
logger.info(f"Using existing collection with matching dimension: {embedding_dimension}")
else:
# Last resort: recreate the collection with the correct dimensions
logger.warning(f"Could not determine vector dimension for existing collection, recreating...")
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine"
}
)
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
# Initialize the Qdrant vector store
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name
)
# Create index from vector store with the embedding model we already created
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
embed_model=embed_model
)
logger.info("Vector storage initialized successfully")
return vector_store, index
except Exception as e:
logger.error(f"Failed to initialize vector storage: {str(e)}")
raise
# Optional: Alternative embedding configuration using OpenAI via OpenRouter
# Uncomment and configure as needed for future use
# from llama_index.embeddings.openai import OpenAIEmbedding
#
# def initialize_openai_embeddings():
# # Use OpenRouter API key from environment
# os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "")
#
# embed_model = OpenAIEmbedding(
# model="openai/text-embedding-3-small", # Or another suitable model
# api_base="https://openrouter.ai/api/v1" # OpenRouter endpoint
# )
# return embed_model
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Convenience function to get the initialized vector store and index.
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
# Get the embedding model name from environment variables
embed_model_name = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
return initialize_vector_storage(ollama_embed_model=embed_model_name)
if __name__ == "__main__":
# Example usage
logger.info("Testing vector storage initialization...")
try:
vector_store, index = get_vector_store_and_index()
logger.info("Vector storage test successful!")
except Exception as e:
logger.error(f"Vector storage test failed: {e}")