Files
rag-solution/services/rag/langchain/vector_storage.py

191 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""Vector storage module using Qdrant and configurable embeddings for the RAG solution."""
import os
from typing import Optional
from dotenv import load_dotenv
from langchain_core.documents import Document
from langchain_ollama import OllamaEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from loguru import logger
from qdrant_client import QdrantClient
# Load environment variables
load_dotenv()
# Qdrant configuration
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
QDRANT_REST_PORT = int(os.getenv("QDRANT_REST_PORT", 6333))
QDRANT_GRPC_PORT = int(os.getenv("QDRANT_GRPC_PORT", 6334))
# Embedding model configuration
EMBEDDING_STRATEGY = os.getenv("EMBEDDING_STRATEGY", "ollama").lower()
OLLAMA_EMBEDDING_MODEL = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")
OPENAI_EMBEDDING_BASE_URL = os.getenv("OPENAI_EMBEDDING_BASE_URL")
OPENAI_EMBEDDING_API_KEY = os.getenv("OPENAI_EMBEDDING_API_KEY")
def initialize_vector_store(
collection_name: str = "documents_langchain", recreate_collection: bool = False
2026-02-03 23:25:24 +03:00
) -> QdrantVectorStore:
"""
Initialize and return a Qdrant vector store with configurable embeddings.
Args:
collection_name: Name of the Qdrant collection to use
recreate_collection: Whether to recreate the collection if it exists
Returns:
Initialized Qdrant vector store
"""
# Determine which embedding strategy to use
if EMBEDDING_STRATEGY == "openai":
# Validate required OpenAI embedding variables
if not OPENAI_EMBEDDING_API_KEY or not OPENAI_EMBEDDING_BASE_URL:
raise ValueError(
"OPENAI_EMBEDDING_API_KEY and OPENAI_EMBEDDING_BASE_URL must be set when using OpenAI embedding strategy"
)
# Initialize OpenAI embeddings
embeddings = OpenAIEmbeddings(
model=OPENAI_EMBEDDING_MODEL,
openai_api_base=OPENAI_EMBEDDING_BASE_URL,
openai_api_key=OPENAI_EMBEDDING_API_KEY,
)
elif EMBEDDING_STRATEGY == "none":
embeddings = None
logger.warning("Embedding strategy for vector storage is NONE! FYI")
else: # Default to ollama
# Initialize Ollama embeddings
embeddings = OllamaEmbeddings(
model=OLLAMA_EMBEDDING_MODEL,
base_url="http://localhost:11434", # Default Ollama URL
)
2026-02-03 23:25:24 +03:00
# Check if collection exists and create if needed
client = QdrantClient(
host=QDRANT_HOST,
port=QDRANT_REST_PORT,
)
2026-02-03 22:55:12 +03:00
collection_exists = False
try:
client.get_collection(collection_name)
collection_exists = True
except Exception:
# Collection doesn't exist, we'll create it
collection_exists = False
if recreate_collection and collection_exists:
client.delete_collection(collection_name)
2026-02-03 22:55:12 +03:00
collection_exists = False
2026-02-03 22:55:12 +03:00
# If collection doesn't exist, create it using the client directly
if not collection_exists:
# Create collection using the Qdrant client directly
from qdrant_client.http.models import Distance, VectorParams
# First, we need to determine the embedding size by creating a sample embedding
sample_embedding = embeddings.embed_query("sample text for dimension detection")
vector_size = len(sample_embedding)
# Create the collection with appropriate vector size
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
)
2026-02-03 23:25:24 +03:00
# Create the Qdrant instance connected to the collection
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=embeddings,
)
return vector_store
def add_documents_to_vector_store(
2026-02-03 23:25:24 +03:00
vector_store: QdrantVectorStore, documents: list[Document], batch_size: int = 10
) -> None:
"""
Add documents to the vector store.
Args:
vector_store: Initialized Qdrant vector store
documents: List of documents to add
batch_size: Number of documents to add in each batch
"""
# Add documents to the vector store in batches
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
vector_store.add_documents(batch)
def search_vector_store(
vector_store: QdrantVectorStore, query: str, top_k: int = 5
) -> list:
"""
Search the vector store for similar documents.
Args:
vector_store: Initialized Qdrant vector store
query: Query string to search for
top_k: Number of top results to return
Returns:
List of similar documents
"""
return vector_store.similarity_search(query, k=top_k)
# Just in case add possibility to connect via openai embedding, using openrouter api key.
# Comment this section, so it can be used in the future.
"""
# Alternative implementation using OpenAI embeddings via OpenRouter
# Uncomment and configure as needed
import os
from langchain_openai import OpenAIEmbeddings
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_EMBEDDING_MODEL = os.getenv("OPENROUTER_EMBEDDING_MODEL", "openai/text-embedding-ada-002")
def initialize_vector_store_with_openrouter(
2026-02-03 22:55:12 +03:00
collection_name: str = "documents_langchain"
2026-02-03 23:25:24 +03:00
) -> QdrantVectorStore:
# Initialize Qdrant client
client = QdrantClient(
host=QDRANT_HOST,
port=QDRANT_REST_PORT,
)
# Initialize OpenAI embeddings via OpenRouter
embeddings = OpenAIEmbeddings(
model=OPENROUTER_EMBEDDING_MODEL,
openai_api_key=OPENROUTER_API_KEY,
openai_api_base="https://openrouter.ai/api/v1"
)
# Create or get the vector store
2026-02-03 23:25:24 +03:00
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embeddings=embeddings,
)
return vector_store
"""
if __name__ == "__main__":
# Example usage
print(
f"Initializing vector store with Ollama embedding model: {OLLAMA_EMBEDDING_MODEL}"
)
vector_store = initialize_vector_store()
print("Vector store initialized successfully!")