diff --git a/services/rag/langchain/.gitignore b/services/rag/langchain/.gitignore index 42f20c5..43a6f68 100644 --- a/services/rag/langchain/.gitignore +++ b/services/rag/langchain/.gitignore @@ -214,3 +214,4 @@ __marimo__/ # Streamlit .streamlit/secrets.toml +document_tracking.db diff --git a/services/rag/langchain/cli.py b/services/rag/langchain/cli.py index b5511e3..1295564 100644 --- a/services/rag/langchain/cli.py +++ b/services/rag/langchain/cli.py @@ -1,8 +1,9 @@ -import click -from loguru import logger import os from pathlib import Path +import click +from loguru import logger + # Configure logging to output to both file and stdout as specified in requirements def setup_logging(): @@ -28,17 +29,24 @@ def ping(): click.echo("pong") -@cli.command(name="enrich", help="Load documents from data directory and store in vector database") -@click.option('--data-dir', default="../../../data", help="Path to the data directory") -@click.option('--collection-name', default="documents", help="Name of the vector store collection") +@cli.command( + name="enrich", + help="Load documents from data directory and store in vector database", +) +@click.option("--data-dir", default="../../../data", help="Path to the data directory") +@click.option( + "--collection-name", + default="documents_langchain", + help="Name of the vector store collection", +) def enrich(data_dir, collection_name): """Load documents from data directory and store in vector database""" logger.info(f"Starting enrichment process for directory: {data_dir}") try: # Import here to avoid circular dependencies - from vector_storage import initialize_vector_store from enrichment import run_enrichment_process + from vector_storage import initialize_vector_store # Initialize vector store vector_store = initialize_vector_store(collection_name=collection_name) @@ -55,4 +63,4 @@ def enrich(data_dir, collection_name): if __name__ == "__main__": - cli() \ No newline at end of file + cli() diff --git a/services/rag/langchain/enrichment.py b/services/rag/langchain/enrichment.py index 5d7abd9..37efe7c 100644 --- a/services/rag/langchain/enrichment.py +++ b/services/rag/langchain/enrichment.py @@ -6,14 +6,32 @@ from pathlib import Path from typing import List, Dict, Any from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter -from langchain_community.document_loaders import ( - PyPDFLoader, - UnstructuredWordDocumentLoader, - UnstructuredPowerPointLoader, - PandasExcelLoader, - UnstructuredImageLoader, - UnstructuredODTLoader, -) +from langchain_community.document_loaders import PyPDFLoader +# Dynamically import other loaders to handle optional dependencies +try: + from langchain_community.document_loaders import UnstructuredWordDocumentLoader +except ImportError: + UnstructuredWordDocumentLoader = None + +try: + from langchain_community.document_loaders import UnstructuredPowerPointLoader +except ImportError: + UnstructuredPowerPointLoader = None + +try: + from langchain_community.document_loaders import UnstructuredExcelLoader +except ImportError: + UnstructuredExcelLoader = None + +try: + from langchain_community.document_loaders import UnstructuredImageLoader +except ImportError: + UnstructuredImageLoader = None + +try: + from langchain_community.document_loaders import UnstructuredODTLoader +except ImportError: + UnstructuredODTLoader = None from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker @@ -88,19 +106,35 @@ class DocumentEnricher: def _get_loader_for_extension(self, file_path: str): """Get the appropriate loader for a given file extension.""" ext = Path(file_path).suffix.lower() - + if ext == ".pdf": return PyPDFLoader(file_path) elif ext in [".docx", ".doc"]: - return UnstructuredWordDocumentLoader(file_path) + if UnstructuredWordDocumentLoader is None: + logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.") + return None + return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext == ".pptx": - return UnstructuredPowerPointLoader(file_path) + if UnstructuredPowerPointLoader is None: + logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.") + return None + return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext in [".xlsx", ".xls"]: - return PandasExcelLoader(file_path) + if UnstructuredExcelLoader is None: + logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.") + return None + return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: - return UnstructuredImageLoader(file_path) + if UnstructuredImageLoader is None: + logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.") + return None + # Use OCR strategy for images to extract text + return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]}) elif ext == ".odt": - return UnstructuredODTLoader(file_path) + if UnstructuredODTLoader is None: + logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.") + return None + return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) else: # For text files and unsupported formats, try to load as text try: @@ -114,25 +148,25 @@ class DocumentEnricher: def load_and_split_documents(self, file_paths: List[str]) -> List[Document]: """Load documents from file paths and split them appropriately.""" all_docs = [] - + for file_path in file_paths: if self._is_document_processed(file_path): logger.info(f"Skipping already processed document: {file_path}") continue - + logger.info(f"Processing document: {file_path}") - + # Get the appropriate loader for the file extension loader = self._get_loader_for_extension(file_path) - + if loader is None: # For unsupported formats that we tried to load as text continue - + try: # Load the document(s) docs = loader.load() - + # Add metadata to each document for doc in docs: # Extract metadata from the original file @@ -140,46 +174,56 @@ class DocumentEnricher: doc.metadata["filename"] = Path(file_path).name doc.metadata["file_path"] = file_path doc.metadata["file_size"] = os.path.getsize(file_path) - + # Add page number if available in original metadata if "page" in doc.metadata: doc.metadata["page_number"] = doc.metadata["page"] - + # Add file extension as metadata doc.metadata["file_extension"] = Path(file_path).suffix - + # Split documents if they are too large split_docs = self.text_splitter.split_documents(docs) - + # Add to the collection all_docs.extend(split_docs) - - # Mark document as processed - self._mark_document_processed(file_path) - + except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") continue - + return all_docs def enrich_and_store(self, file_paths: List[str]): """Load, enrich, and store documents in the vector store.""" logger.info(f"Starting enrichment process for {len(file_paths)} files...") - + # Load and split documents documents = self.load_and_split_documents(file_paths) - + if not documents: logger.info("No new documents to process.") return - + logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...") - + # Add documents to vector store - self.vector_store.add_documents(documents) - - logger.info(f"Successfully added {len(documents)} document chunks to vector store.") + try: + self.vector_store.add_documents(documents) + + # Only mark documents as processed after successful insertion to vector store + processed_file_paths = set() + for doc in documents: + if 'source' in doc.metadata: + processed_file_paths.add(doc.metadata['source']) + + for file_path in processed_file_paths: + self._mark_document_processed(file_path) + + logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.") + except Exception as e: + logger.error(f"Error adding documents to vector store: {str(e)}") + raise def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]: diff --git a/services/rag/langchain/vector_storage.py b/services/rag/langchain/vector_storage.py index 2ae230a..1c22c6c 100644 --- a/services/rag/langchain/vector_storage.py +++ b/services/rag/langchain/vector_storage.py @@ -46,26 +46,47 @@ def initialize_vector_store( base_url="http://localhost:11434", # Default Ollama URL ) - # Create or get the vector store - vector_store = Qdrant( - client=client, - collection_name=collection_name, - embeddings=embeddings, - ) + # Check if collection exists, if not create it + collection_exists = False + try: + client.get_collection(collection_name) + collection_exists = True + except Exception: + # Collection doesn't exist, we'll create it + collection_exists = False - # If recreate_collection is True, we'll delete and recreate the collection - if recreate_collection and collection_name in [ - col.name for col in client.get_collections().collections - ]: + if recreate_collection and collection_exists: client.delete_collection(collection_name) + collection_exists = False - # Recreate with proper configuration - vector_store = Qdrant.from_documents( - documents=[], - embedding=embeddings, - url=f"http://{QDRANT_HOST}:{QDRANT_REST_PORT}", + # If collection doesn't exist, create it using the client directly + if not collection_exists: + # Create collection using the Qdrant client directly + from qdrant_client.http.models import Distance, VectorParams + import numpy as np + + # First, we need to determine the embedding size by creating a sample embedding + sample_embedding = embeddings.embed_query("sample text for dimension detection") + vector_size = len(sample_embedding) + + # Create the collection with appropriate vector size + client.create_collection( collection_name=collection_name, - force_recreate=True, + vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), + ) + + # Now create the Qdrant instance connected to the newly created collection + vector_store = Qdrant( + client=client, + collection_name=collection_name, + embeddings=embeddings, + ) + else: + # Collection exists, just connect to it + vector_store = Qdrant( + client=client, + collection_name=collection_name, + embeddings=embeddings, ) return vector_store @@ -116,7 +137,7 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") OPENROUTER_EMBEDDING_MODEL = os.getenv("OPENROUTER_EMBEDDING_MODEL", "openai/text-embedding-ada-002") def initialize_vector_store_with_openrouter( - collection_name: str = "documents" + collection_name: str = "documents_langchain" ) -> Qdrant: # Initialize Qdrant client client = QdrantClient(