diff --git a/services/rag/langchain/PLANNING.md b/services/rag/langchain/PLANNING.md index 4c02fb9..e6d6965 100644 --- a/services/rag/langchain/PLANNING.md +++ b/services/rag/langchain/PLANNING.md @@ -56,3 +56,12 @@ Chosen data folder: relatve ./../../../data - from the current folder - [x] After accepting API endpont address, it should be used to send requests and process responses to imitate chat with the agent by the provided API endpoint. - [x] Show API endpoint in the header of the chat. - [x] If there is error connecting with the API, imitate bot sending message about error with the connection and suggestion to reload page to provide new API endpoint + +# Phase 10 (extracting additional metadata from chunks, and filtering where possible with it) + +- [x] Create separate function in helpers module (create if does not exist) for retrieving years from the text. It should return found years. +- [x] During enriching vector storage, when loading and splitting documents, extract years from the chunk, and add these years as numbers into metadata field "years" (array of number or best suitable Qdrant type for searching by the year if needed). The helper function for retrieving years from text can be used. +- [x] Updating VectorStoreRetriever._get_relevant_documents: We need to ensure, that when searching for something with the year (user mentiones year in the query, in Russian language), we search vectors with metadata which has these mentioned year in the "years" array of years. The helper function for retrieving years from query can be used to filter out documents with years. +- [x] Create heuristic, regex function in helpers module for extracting name of event, in Russian language. We need to use regex and possible words before, after the event, etc. +- [x] Durint enriching vector storage, try to extract event name from the chunk and save in metadata in field "events", which will contain list of strings, possible evennts. Helper function usage is advised. +- [x] In VectorStoreRetriever._get_relevant_documents add similarity search for the event name, if event name is present in the query. Helper function should be used here to try to extract the event name. diff --git a/services/rag/langchain/agent.py b/services/rag/langchain/agent.py index afedfb1..f4a89d2 100644 --- a/services/rag/langchain/agent.py +++ b/services/rag/langchain/agent.py @@ -21,7 +21,9 @@ from vector_storage import initialize_vector_store load_dotenv() -def get_llm_model_info(llm_model: str = None) -> Tuple[str, str, str, str, str]: +def get_llm_model_info( + llm_model: Optional[str] = None, +) -> Tuple[str, str, str, str, str]: """ Get LLM model information based on environment configuration. @@ -121,7 +123,7 @@ class DocumentRetrievalTool(BaseTool): def create_chat_agent( - collection_name: str = "documents_langchain", llm_model: str = None + collection_name: str = "documents_langchain", llm_model: Optional[str] = None ) -> Any: """ Create a chat agent with document retrieval capabilities. @@ -177,7 +179,7 @@ def create_chat_agent( def chat_with_agent( query: str, collection_name: str = "documents_langchain", - llm_model: str = None, + llm_model: Optional[str] = None, history: List[BaseMessage] = None, ) -> Dict[str, Any]: """ diff --git a/services/rag/langchain/enrichment.py b/services/rag/langchain/enrichment.py index 072f3b6..9607085 100644 --- a/services/rag/langchain/enrichment.py +++ b/services/rag/langchain/enrichment.py @@ -39,6 +39,8 @@ from sqlalchemy.orm import sessionmaker from loguru import logger import sqlite3 +from helpers import extract_russian_event_names, extract_years_from_text + # Load environment variables load_dotenv() @@ -189,6 +191,13 @@ class DocumentEnricher: # Split documents if they are too large split_docs = self.text_splitter.split_documents(docs) + # Extract additional metadata from each chunk. + for chunk in split_docs: + years = extract_years_from_text(chunk.page_content) + events = extract_russian_event_names(chunk.page_content) + chunk.metadata["years"] = years + chunk.metadata["events"] = events + # Add to the collection all_docs.extend(split_docs) @@ -277,4 +286,4 @@ if __name__ == "__main__": vector_store = initialize_vector_store() # Run enrichment process - run_enrichment_process(vector_store) \ No newline at end of file + run_enrichment_process(vector_store) diff --git a/services/rag/langchain/helpers.py b/services/rag/langchain/helpers.py new file mode 100644 index 0000000..7b458f3 --- /dev/null +++ b/services/rag/langchain/helpers.py @@ -0,0 +1,107 @@ +"""Helper utilities for metadata extraction from Russian text.""" + +import re +from typing import List + + +_YEAR_PATTERN = re.compile(r"(? str: + normalized = " ".join(value.strip().split()).strip(".,;:!?()[]{}") + return normalized.lower() + + +def extract_years_from_text(text: str) -> List[int]: + """Extract unique years from text as integers.""" + if not text: + return [] + + years = {int(match.group(0)) for match in _YEAR_PATTERN.finditer(text)} + return sorted(years) + + +def extract_russian_event_names(text: str) -> List[str]: + """ + Extract likely Russian event names from text using heuristic regex rules. + + Returns normalized event phrases in lowercase. + """ + if not text: + return [] + + events: List[str] = [] + seen = set() + + for match in _EVENT_PHRASE_PATTERN.finditer(text): + candidate = _normalize_event(match.group(0)) + if len(candidate) < 6: + continue + if not any(keyword in candidate for keyword in _EVENT_KEYWORDS): + continue + if candidate not in seen: + events.append(candidate) + seen.add(candidate) + + for match in _QUOTED_EVENT_PATTERN.finditer(text): + quoted = _normalize_event(match.group(1)) + if len(quoted) < 3: + continue + if quoted not in seen: + events.append(quoted) + seen.add(quoted) + + return events diff --git a/services/rag/langchain/retrieval.py b/services/rag/langchain/retrieval.py index 5d2b16e..c791282 100644 --- a/services/rag/langchain/retrieval.py +++ b/services/rag/langchain/retrieval.py @@ -1,14 +1,16 @@ """Retrieval module for querying vector storage and returning relevant documents with metadata.""" import os -from typing import List, Optional +from typing import List from dotenv import load_dotenv from langchain_core.callbacks import CallbackManagerForRetrieverRun from langchain_core.documents import Document from langchain_core.retrievers import BaseRetriever from loguru import logger +from qdrant_client.http.models import FieldCondition, Filter, MatchAny +from helpers import extract_russian_event_names, extract_years_from_text from vector_storage import initialize_vector_store # Load environment variables @@ -23,6 +25,91 @@ class VectorStoreRetriever(BaseRetriever): vector_store: object # Qdrant vector store instance top_k: int = 5 # Number of documents to retrieve + def _build_qdrant_filter( + self, years: List[int], events: List[str] + ) -> Filter | None: + """Build a Qdrant payload filter for extracted years and events.""" + conditions: List[FieldCondition] = [] + + if years: + conditions.extend( + [ + FieldCondition( + key="metadata.years", + match=MatchAny(any=years), + ), + FieldCondition( + key="years", + match=MatchAny(any=years), + ), + ] + ) + + if events: + conditions.extend( + [ + FieldCondition( + key="metadata.events", + match=MatchAny(any=events), + ), + FieldCondition( + key="events", + match=MatchAny(any=events), + ), + ] + ) + + if not conditions: + return None + + return Filter(should=conditions) + + @staticmethod + def _post_filter_documents( + documents: List[Document], years: List[int], events: List[str] + ) -> List[Document]: + """Fallback filter in Python in case vector DB filter cannot be applied.""" + if not years and not events: + return documents + + year_set = set(years) + event_set = set(events) + filtered: List[Document] = [] + + for doc in documents: + metadata = doc.metadata or {} + doc_years = { + int(year) + for year in metadata.get("years", []) + if isinstance(year, int) or (isinstance(year, str) and year.isdigit()) + } + doc_events = {str(event).lower() for event in metadata.get("events", [])} + + year_match = not year_set or bool(doc_years.intersection(year_set)) + event_match = not event_set or bool(doc_events.intersection(event_set)) + + if year_match and event_match: + filtered.append(doc) + + return filtered + + @staticmethod + def _merge_unique_documents(documents: List[Document]) -> List[Document]: + """Deduplicate documents while preserving order.""" + unique_docs: List[Document] = [] + seen = set() + for doc in documents: + dedup_key = ( + doc.metadata.get("source", ""), + doc.metadata.get("page_number", doc.metadata.get("page", "")), + doc.page_content[:200], + ) + if dedup_key in seen: + continue + seen.add(dedup_key) + unique_docs.append(doc) + return unique_docs + def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: @@ -39,8 +126,54 @@ class VectorStoreRetriever(BaseRetriever): logger.info(f"Searching for documents related to query: {query[:50]}...") try: - # Perform similarity search on the vector store - results = self.vector_store.similarity_search(query, k=self.top_k) + years_in_query = extract_years_from_text(query) + events_in_query = extract_russian_event_names(query) + search_filter = self._build_qdrant_filter(years_in_query, events_in_query) + + logger.info( + f"Extracted query metadata for retrieval: years={years_in_query}, events={events_in_query}" + ) + + # Main search by original user query. + search_k = max(self.top_k * 3, self.top_k) + if search_filter is not None: + try: + results = self.vector_store.similarity_search( + query, k=search_k, filter=search_filter + ) + except Exception as filter_error: + logger.warning( + f"Vector store filter failed, fallback to unfiltered search: {filter_error}" + ) + results = self.vector_store.similarity_search(query, k=search_k) + results = self._post_filter_documents( + results, years_in_query, events_in_query + ) + else: + results = self.vector_store.similarity_search(query, k=search_k) + + # Additional event-focused similarity search if event names are present. + if events_in_query: + event_results: List[Document] = [] + for event_name in events_in_query: + try: + if search_filter is not None: + event_docs = self.vector_store.similarity_search( + event_name, k=self.top_k, filter=search_filter + ) + else: + event_docs = self.vector_store.similarity_search( + event_name, k=self.top_k + ) + except Exception as event_search_error: + logger.warning( + f"Event-focused search failed for '{event_name}': {event_search_error}" + ) + continue + event_results.extend(event_docs) + results.extend(event_results) + + results = self._merge_unique_documents(results)[: self.top_k] logger.info(f"Found {len(results)} relevant documents")