Files
rag-solution/services/rag/langchain/vector_storage.py

182 lines
5.9 KiB
Python

"""Vector storage module using Qdrant and configurable embeddings for the RAG solution."""
import os
from typing import Optional
from dotenv import load_dotenv
from langchain_qdrant import QdrantVectorStore
from langchain_core.documents import Document
from langchain_ollama import OllamaEmbeddings
from langchain_openai import OpenAIEmbeddings
from qdrant_client import QdrantClient
# Load environment variables
load_dotenv()
# Qdrant configuration
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
QDRANT_REST_PORT = int(os.getenv("QDRANT_REST_PORT", 6333))
QDRANT_GRPC_PORT = int(os.getenv("QDRANT_GRPC_PORT", 6334))
# Embedding model configuration
EMBEDDING_STRATEGY = os.getenv("EMBEDDING_STRATEGY", "ollama").lower()
OLLAMA_EMBEDDING_MODEL = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")
OPENAI_EMBEDDING_BASE_URL = os.getenv("OPENAI_EMBEDDING_BASE_URL")
OPENAI_EMBEDDING_API_KEY = os.getenv("OPENAI_EMBEDDING_API_KEY")
def initialize_vector_store(
collection_name: str = "documents_langchain", recreate_collection: bool = False
) -> QdrantVectorStore:
"""
Initialize and return a Qdrant vector store with configurable embeddings.
Args:
collection_name: Name of the Qdrant collection to use
recreate_collection: Whether to recreate the collection if it exists
Returns:
Initialized Qdrant vector store
"""
# Determine which embedding strategy to use
if EMBEDDING_STRATEGY == "openai":
# Validate required OpenAI embedding variables
if not OPENAI_EMBEDDING_API_KEY or not OPENAI_EMBEDDING_BASE_URL:
raise ValueError("OPENAI_EMBEDDING_API_KEY and OPENAI_EMBEDDING_BASE_URL must be set when using OpenAI embedding strategy")
# Initialize OpenAI embeddings
embeddings = OpenAIEmbeddings(
model=OPENAI_EMBEDDING_MODEL,
openai_api_base=OPENAI_EMBEDDING_BASE_URL,
openai_api_key=OPENAI_EMBEDDING_API_KEY,
)
else: # Default to ollama
# Initialize Ollama embeddings
embeddings = OllamaEmbeddings(
model=OLLAMA_EMBEDDING_MODEL,
base_url="http://localhost:11434", # Default Ollama URL
)
# Check if collection exists and create if needed
client = QdrantClient(
host=QDRANT_HOST,
port=QDRANT_REST_PORT,
)
collection_exists = False
try:
client.get_collection(collection_name)
collection_exists = True
except Exception:
# Collection doesn't exist, we'll create it
collection_exists = False
if recreate_collection and collection_exists:
client.delete_collection(collection_name)
collection_exists = False
# If collection doesn't exist, create it using the client directly
if not collection_exists:
# Create collection using the Qdrant client directly
from qdrant_client.http.models import Distance, VectorParams
# First, we need to determine the embedding size by creating a sample embedding
sample_embedding = embeddings.embed_query("sample text for dimension detection")
vector_size = len(sample_embedding)
# Create the collection with appropriate vector size
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
)
# Create the Qdrant instance connected to the collection
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=embeddings,
)
return vector_store
def add_documents_to_vector_store(
vector_store: QdrantVectorStore, documents: list[Document], batch_size: int = 10
) -> None:
"""
Add documents to the vector store.
Args:
vector_store: Initialized Qdrant vector store
documents: List of documents to add
batch_size: Number of documents to add in each batch
"""
# Add documents to the vector store in batches
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
vector_store.add_documents(batch)
def search_vector_store(vector_store: QdrantVectorStore, query: str, top_k: int = 5) -> list:
"""
Search the vector store for similar documents.
Args:
vector_store: Initialized Qdrant vector store
query: Query string to search for
top_k: Number of top results to return
Returns:
List of similar documents
"""
return vector_store.similarity_search(query, k=top_k)
# Just in case add possibility to connect via openai embedding, using openrouter api key.
# Comment this section, so it can be used in the future.
"""
# Alternative implementation using OpenAI embeddings via OpenRouter
# Uncomment and configure as needed
import os
from langchain_openai import OpenAIEmbeddings
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_EMBEDDING_MODEL = os.getenv("OPENROUTER_EMBEDDING_MODEL", "openai/text-embedding-ada-002")
def initialize_vector_store_with_openrouter(
collection_name: str = "documents_langchain"
) -> QdrantVectorStore:
# Initialize Qdrant client
client = QdrantClient(
host=QDRANT_HOST,
port=QDRANT_REST_PORT,
)
# Initialize OpenAI embeddings via OpenRouter
embeddings = OpenAIEmbeddings(
model=OPENROUTER_EMBEDDING_MODEL,
openai_api_key=OPENROUTER_API_KEY,
openai_api_base="https://openrouter.ai/api/v1"
)
# Create or get the vector store
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embeddings=embeddings,
)
return vector_store
"""
if __name__ == "__main__":
# Example usage
print(
f"Initializing vector store with Ollama embedding model: {OLLAMA_EMBEDDING_MODEL}"
)
vector_store = initialize_vector_store()
print("Vector store initialized successfully!")