Files
rag-solution/services/rag/llamaindex/vector_storage.py

185 lines
7.0 KiB
Python
Raw Normal View History

"""
Vector storage configuration for the RAG solution using LlamaIndex and Qdrant.
This module provides initialization and configuration for:
- Qdrant vector storage connection
- Embedding model based on configured strategy
- Automatic collection creation
"""
import os
from typing import Optional
from dotenv import load_dotenv
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from loguru import logger
from qdrant_client import QdrantClient
# Import the new configuration module
from config import get_embedding_model
load_dotenv()
def get_qdrant_connection_config() -> dict[str, int | str]:
"""Load Qdrant connection settings from environment variables."""
host = os.getenv("QDRANT_HOST", "localhost")
rest_port = int(os.getenv("QDRANT_REST_PORT", "6333"))
grpc_port = int(os.getenv("QDRANT_GRPC_PORT", "6334"))
return {
"host": host,
"port": rest_port,
"grpc_port": grpc_port,
}
def initialize_vector_storage(
collection_name: str = "documents_llamaindex",
host: Optional[str] = None,
port: Optional[int] = None,
grpc_port: Optional[int] = None,
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Initialize Qdrant vector storage with embedding model based on configured strategy.
Args:
collection_name: Name of the Qdrant collection
host: Qdrant host address
port: Qdrant REST API port
grpc_port: Qdrant gRPC API port
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
qdrant_config = get_qdrant_connection_config()
host = host or str(qdrant_config["host"])
port = port or int(qdrant_config["port"])
grpc_port = grpc_port or int(qdrant_config["grpc_port"])
logger.info(
f"Initializing vector storage with collection: {collection_name} "
f"(host={host}, rest_port={port}, grpc_port={grpc_port})"
)
try:
# Initialize Qdrant client
client = QdrantClient(host=host, port=port, grpc_port=grpc_port)
# Get the embedding model based on the configured strategy
embed_model = get_embedding_model()
# Get a test embedding to determine the correct dimensions
test_embedding = embed_model.get_text_embedding("test")
embedding_dimension = len(test_embedding)
logger.info(f"Detected embedding dimension: {embedding_dimension}")
# Check if collection exists, create if it doesn't
collections = client.get_collections().collections
collection_names = [coll.name for coll in collections]
if collection_name not in collection_names:
logger.info(f"Collection '{collection_name}' does not exist, creating...")
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the actual embedding size
"distance": "Cosine", # Cosine distance is commonly used
},
)
logger.info(
f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}"
)
else:
logger.info(f"Collection '{collection_name}' already exists")
# Get the actual collection config to determine the vector size
collection_info = client.get_collection(collection_name)
# Access the vector configuration properly - handle different possible structures
if (
hasattr(collection_info.config.params, "vectors")
and collection_info.config.params.vectors is not None
):
existing_dimension = collection_info.config.params.vectors.size
if existing_dimension != embedding_dimension:
logger.warning(
f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating..."
)
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine",
},
)
logger.info(
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
)
else:
logger.info(
f"Using existing collection with matching dimension: {embedding_dimension}"
)
else:
# Last resort: recreate the collection with the correct dimensions
logger.warning(
f"Could not determine vector dimension for existing collection, recreating..."
)
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine",
},
)
logger.info(
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
)
# Initialize the Qdrant vector store
vector_store = QdrantVectorStore(client=client, collection_name=collection_name)
# Create index from vector store with the embedding model we already created
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store, embed_model=embed_model
)
logger.info("Vector storage initialized successfully")
return vector_store, index
except Exception as e:
logger.error(f"Failed to initialize vector storage: {str(e)}")
raise
def get_vector_store_and_index(
collection_name: str = "documents_llamaindex",
host: Optional[str] = None,
port: Optional[int] = None,
grpc_port: Optional[int] = None,
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Convenience function to get the initialized vector store and index.
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
return initialize_vector_storage(
collection_name=collection_name,
host=host,
port=port,
grpc_port=grpc_port,
)
if __name__ == "__main__":
# Example usage
logger.info("Testing vector storage initialization...")
try:
vector_store, index = get_vector_store_and_index()
logger.info("Vector storage test successful!")
except Exception as e:
logger.error(f"Vector storage test failed: {e}")