"""Document enrichment module for loading documents into vector storage.""" import os import hashlib from pathlib import Path from typing import List, Dict, Any from dotenv import load_dotenv from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader # Dynamically import other loaders to handle optional dependencies try: from langchain_community.document_loaders import UnstructuredWordDocumentLoader except ImportError: UnstructuredWordDocumentLoader = None try: from langchain_community.document_loaders import UnstructuredPowerPointLoader except ImportError: UnstructuredPowerPointLoader = None try: from langchain_community.document_loaders import UnstructuredExcelLoader except ImportError: UnstructuredExcelLoader = None try: from langchain_community.document_loaders import UnstructuredImageLoader except ImportError: UnstructuredImageLoader = None try: from langchain_community.document_loaders import UnstructuredODTLoader except ImportError: UnstructuredODTLoader = None from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from loguru import logger import sqlite3 # Load environment variables load_dotenv() # Define the path to the data directory DATA_DIR = Path("../../../data").resolve() DB_PATH = Path("document_tracking.db").resolve() Base = declarative_base() class ProcessedDocument(Base): """Database model for tracking processed documents.""" __tablename__ = "processed_documents" id = Column(Integer, primary_key=True) file_path = Column(String, unique=True, nullable=False) file_hash = Column(String, nullable=False) class DocumentEnricher: """Class responsible for enriching documents and loading them to vector storage.""" def __init__(self, vector_store): self.vector_store = vector_store self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len, ) # Initialize database for tracking processed documents self._init_db() def _init_db(self): """Initialize the SQLite database for tracking processed documents.""" self.engine = create_engine(f"sqlite:///{DB_PATH}") Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() def _get_file_hash(self, file_path: str) -> str: """Calculate SHA256 hash of a file.""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: # Read file in chunks to handle large files for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def _is_document_processed(self, file_path: str) -> bool: """Check if a document has already been processed.""" file_hash = self._get_file_hash(file_path) existing = self.session.query(ProcessedDocument).filter_by( file_hash=file_hash ).first() return existing is not None def _mark_document_processed(self, file_path: str): """Mark a document as processed in the database.""" file_hash = self._get_file_hash(file_path) doc_record = ProcessedDocument( file_path=file_path, file_hash=file_hash ) self.session.add(doc_record) self.session.commit() def _get_loader_for_extension(self, file_path: str): """Get the appropriate loader for a given file extension.""" ext = Path(file_path).suffix.lower() if ext == ".pdf": return PyPDFLoader(file_path) elif ext in [".docx", ".doc"]: if UnstructuredWordDocumentLoader is None: logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.") return None return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext == ".pptx": if UnstructuredPowerPointLoader is None: logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.") return None return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext in [".xlsx", ".xls"]: if UnstructuredExcelLoader is None: logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.") return None return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: if UnstructuredImageLoader is None: logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.") return None # Use OCR strategy for images to extract text return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]}) elif ext == ".odt": if UnstructuredODTLoader is None: logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.") return None return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) else: # For text files and unsupported formats, try to load as text try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return None, content # Return content directly for text processing except UnicodeDecodeError: logger.warning(f"Could not decode file as text: {file_path}") return None, None def load_and_split_documents(self, file_paths: List[str]) -> List[Document]: """Load documents from file paths and split them appropriately.""" all_docs = [] for file_path in file_paths: if self._is_document_processed(file_path): logger.info(f"Skipping already processed document: {file_path}") continue logger.info(f"Processing document: {file_path}") # Get the appropriate loader for the file extension loader = self._get_loader_for_extension(file_path) if loader is None: # For unsupported formats that we tried to load as text continue try: # Load the document(s) docs = loader.load() # Add metadata to each document for doc in docs: # Extract metadata from the original file doc.metadata["source"] = file_path doc.metadata["filename"] = Path(file_path).name doc.metadata["file_path"] = file_path doc.metadata["file_size"] = os.path.getsize(file_path) # Add page number if available in original metadata if "page" in doc.metadata: doc.metadata["page_number"] = doc.metadata["page"] # Add file extension as metadata doc.metadata["file_extension"] = Path(file_path).suffix # Split documents if they are too large split_docs = self.text_splitter.split_documents(docs) # Add to the collection all_docs.extend(split_docs) except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") continue return all_docs def enrich_and_store(self, file_paths: List[str]): """Load, enrich, and store documents in the vector store.""" logger.info(f"Starting enrichment process for {len(file_paths)} files...") # Load and split documents documents = self.load_and_split_documents(file_paths) if not documents: logger.info("No new documents to process.") return logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...") # Add documents to vector store try: self.vector_store.add_documents(documents) # Only mark documents as processed after successful insertion to vector store processed_file_paths = set() for doc in documents: if 'source' in doc.metadata: processed_file_paths.add(doc.metadata['source']) for file_path in processed_file_paths: self._mark_document_processed(file_path) logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.") except Exception as e: logger.error(f"Error adding documents to vector store: {str(e)}") raise def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]: """Get all supported document file paths from the data directory.""" supported_extensions = { '.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls', '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.odt' } file_paths = [] for root, dirs, files in os.walk(data_dir): for file in files: if Path(file).suffix.lower() in supported_extensions: file_paths.append(os.path.join(root, file)) return file_paths def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)): """Run the full enrichment process.""" logger.info(f"Starting document enrichment from directory: {data_dir}") # Get all supported documents from the data directory file_paths = get_all_documents_from_data_dir(data_dir) if not file_paths: logger.warning(f"No supported documents found in {data_dir}") return logger.info(f"Found {len(file_paths)} documents to process") # Initialize the document enricher enricher = DocumentEnricher(vector_store) # Run the enrichment process enricher.enrich_and_store(file_paths) logger.info("Document enrichment process completed!") if __name__ == "__main__": # Example usage from vector_storage import initialize_vector_store # Initialize vector store vector_store = initialize_vector_store() # Run enrichment process run_enrichment_process(vector_store)