Files
rag-solution/services/rag/langchain/retrieval.py

258 lines
8.9 KiB
Python
Raw Normal View History

2026-02-03 23:25:24 +03:00
"""Retrieval module for querying vector storage and returning relevant documents with metadata."""
import os
2026-02-10 13:20:19 +03:00
from typing import List
2026-02-05 00:08:59 +03:00
from dotenv import load_dotenv
2026-02-03 23:25:24 +03:00
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
2026-02-03 23:25:24 +03:00
from loguru import logger
2026-02-10 13:20:19 +03:00
from qdrant_client.http.models import FieldCondition, Filter, MatchAny
2026-02-03 23:25:24 +03:00
2026-02-10 13:20:19 +03:00
from helpers import extract_russian_event_names, extract_years_from_text
2026-02-03 23:25:24 +03:00
from vector_storage import initialize_vector_store
2026-02-05 00:08:59 +03:00
# Load environment variables
load_dotenv()
2026-02-03 23:25:24 +03:00
class VectorStoreRetriever(BaseRetriever):
"""
A custom retriever that uses the Qdrant vector store to retrieve relevant documents.
"""
2026-02-03 23:25:24 +03:00
vector_store: object # Qdrant vector store instance
top_k: int = 5 # Number of documents to retrieve
2026-02-10 13:20:19 +03:00
def _build_qdrant_filter(
self, years: List[int], events: List[str]
) -> Filter | None:
"""Build a Qdrant payload filter for extracted years and events."""
conditions: List[FieldCondition] = []
if years:
conditions.extend(
[
FieldCondition(
key="metadata.years",
match=MatchAny(any=years),
),
FieldCondition(
key="years",
match=MatchAny(any=years),
),
]
)
if events:
conditions.extend(
[
FieldCondition(
key="metadata.events",
match=MatchAny(any=events),
),
FieldCondition(
key="events",
match=MatchAny(any=events),
),
]
)
if not conditions:
return None
return Filter(should=conditions)
@staticmethod
def _post_filter_documents(
documents: List[Document], years: List[int], events: List[str]
) -> List[Document]:
"""Fallback filter in Python in case vector DB filter cannot be applied."""
if not years and not events:
return documents
year_set = set(years)
event_set = set(events)
filtered: List[Document] = []
for doc in documents:
metadata = doc.metadata or {}
doc_years = {
int(year)
for year in metadata.get("years", [])
if isinstance(year, int) or (isinstance(year, str) and year.isdigit())
}
doc_events = {str(event).lower() for event in metadata.get("events", [])}
year_match = not year_set or bool(doc_years.intersection(year_set))
event_match = not event_set or bool(doc_events.intersection(event_set))
if year_match and event_match:
filtered.append(doc)
return filtered
@staticmethod
def _merge_unique_documents(documents: List[Document]) -> List[Document]:
"""Deduplicate documents while preserving order."""
unique_docs: List[Document] = []
seen = set()
for doc in documents:
dedup_key = (
doc.metadata.get("source", ""),
doc.metadata.get("page_number", doc.metadata.get("page", "")),
doc.page_content[:200],
)
if dedup_key in seen:
continue
seen.add(dedup_key)
unique_docs.append(doc)
return unique_docs
2026-02-03 23:25:24 +03:00
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""
Retrieve relevant documents based on the query.
2026-02-03 23:25:24 +03:00
Args:
query: The query string to search for
run_manager: Callback manager for the run
2026-02-03 23:25:24 +03:00
Returns:
List of relevant documents with metadata
"""
logger.info(f"Searching for documents related to query: {query[:50]}...")
2026-02-03 23:25:24 +03:00
try:
2026-02-10 13:20:19 +03:00
years_in_query = extract_years_from_text(query)
events_in_query = extract_russian_event_names(query)
search_filter = self._build_qdrant_filter(years_in_query, events_in_query)
logger.info(
f"Extracted query metadata for retrieval: years={years_in_query}, events={events_in_query}"
)
# Main search by original user query.
search_k = max(self.top_k * 3, self.top_k)
if search_filter is not None:
try:
results = self.vector_store.similarity_search(
query, k=search_k, filter=search_filter
)
except Exception as filter_error:
logger.warning(
f"Vector store filter failed, fallback to unfiltered search: {filter_error}"
)
results = self.vector_store.similarity_search(query, k=search_k)
results = self._post_filter_documents(
results, years_in_query, events_in_query
)
else:
results = self.vector_store.similarity_search(query, k=search_k)
# Additional event-focused similarity search if event names are present.
if events_in_query:
event_results: List[Document] = []
for event_name in events_in_query:
try:
if search_filter is not None:
event_docs = self.vector_store.similarity_search(
event_name, k=self.top_k, filter=search_filter
)
else:
event_docs = self.vector_store.similarity_search(
event_name, k=self.top_k
)
except Exception as event_search_error:
logger.warning(
f"Event-focused search failed for '{event_name}': {event_search_error}"
)
continue
event_results.extend(event_docs)
results.extend(event_results)
results = self._merge_unique_documents(results)[: self.top_k]
2026-02-03 23:25:24 +03:00
logger.info(f"Found {len(results)} relevant documents")
2026-02-03 23:25:24 +03:00
return results
except Exception as e:
logger.error(f"Error during similarity search: {str(e)}")
return []
def create_retriever(collection_name: str = "documents_langchain", top_k: int = 5):
"""
Create and return a retriever instance connected to the vector store.
2026-02-03 23:25:24 +03:00
Args:
collection_name: Name of the Qdrant collection to use
top_k: Number of documents to retrieve
2026-02-03 23:25:24 +03:00
Returns:
VectorStoreRetriever instance
"""
logger.info(
f"Initializing vector store for retrieval from collection: {collection_name}"
)
2026-02-03 23:25:24 +03:00
# Initialize the vector store
vector_store = initialize_vector_store(collection_name=collection_name)
2026-02-03 23:25:24 +03:00
# Create and return the retriever
retriever = VectorStoreRetriever(vector_store=vector_store, top_k=top_k)
return retriever
2026-02-03 23:25:24 +03:00
def search_documents_with_metadata(
query: str, collection_name: str = "documents_langchain", top_k: int = 5
2026-02-03 23:25:24 +03:00
) -> List[dict]:
"""
Search for documents and return them with detailed metadata.
Args:
query: The query string to search for
collection_name: Name of the Qdrant collection to use
top_k: Number of documents to retrieve
Returns:
List of dictionaries containing document content and metadata
"""
logger.info(f"Starting document search with metadata for query: {query}")
# Initialize the vector store
vector_store = initialize_vector_store(collection_name=collection_name)
try:
# Standard similarity search
documents = vector_store.similarity_search(query, k=top_k)
# Format results to include content and metadata
formatted_results = []
for doc in documents:
formatted_result = {
"content": doc.page_content,
"metadata": doc.metadata,
"source": doc.metadata.get("source", "Unknown"),
"filename": doc.metadata.get("filename", "Unknown"),
"page_number": doc.metadata.get(
"page_number", doc.metadata.get("page", "N/A")
),
2026-02-03 23:25:24 +03:00
"file_extension": doc.metadata.get("file_extension", "N/A"),
"file_size": doc.metadata.get("file_size", "N/A"),
2026-02-03 23:25:24 +03:00
}
formatted_results.append(formatted_result)
logger.info(
f"Metadata search completed, returned {len(formatted_results)} documents"
)
2026-02-03 23:25:24 +03:00
return formatted_results
except Exception as e:
logger.error(f"Error during document search with metadata: {str(e)}")
return []