From 7b52887558ad63a6f7a2cfbc896272995b1d6559 Mon Sep 17 00:00:00 2001 From: idchlife Date: Wed, 11 Feb 2026 11:23:50 +0300 Subject: [PATCH] Enrichment now processed via chunks. 2 documents -> into the vector storage. Also geussing source from the file extension --- services/rag/langchain/.DS_Store | Bin 6148 -> 6148 bytes services/rag/langchain/cli.py | 9 +- services/rag/langchain/enrichment.py | 188 +++++++++++++++++---------- services/rag/langchain/helpers.py | 11 +- 4 files changed, 127 insertions(+), 81 deletions(-) diff --git a/services/rag/langchain/.DS_Store b/services/rag/langchain/.DS_Store index a2eb533abf551d042629249e6fe5fb378b3b5ee0..28e44bbe8555c9cb03b79beb0711ef4fd2fe42f5 100644 GIT binary patch delta 307 zcmZoMXfc=|#>B`mu~3YagMono$Pkfb0y01VL^Col2rwj-7Z)VuBn1{L;LXVzA!e%pkA~2PX$- zynsY?wW*P@j)I}3Wvz}vwWX1Pj)IA?S#2#Rhp4i?bx?eEPHtX)7tp~#zzFmW81O=A v7}Y&dM|xwr0NZ2(5thx|KsN(}WMkoX=E?jbimV{tDoi#M5#H<}vWFP}57bMv delta 82 zcmZoMXfc=|#>AjHu~3+iaq=BTX+b6i2w((KKrFzp`30jq+hzf#35=WBIruq%iZ?Sd aeP^D`FQUi^Qj`EvKiPywd2@`&3T6Ok-ww$D diff --git a/services/rag/langchain/cli.py b/services/rag/langchain/cli.py index f732c4e..bab1e82 100644 --- a/services/rag/langchain/cli.py +++ b/services/rag/langchain/cli.py @@ -37,15 +37,16 @@ def ping(): name="enrich", help="Load documents from data directory and store in vector database", ) -@click.option("--data-dir", default="../../../data", help="Path to the data directory") @click.option( "--collection-name", default="documents_langchain", help="Name of the vector store collection", ) -def enrich(data_dir, collection_name): +def enrich(collection_name): """Load documents from data directory and store in vector database""" - logger.info(f"Starting enrichment process for directory: {data_dir}") + logger.info( + f"Starting enrichment process. Enrichment source: {os.getenv('ENRICHMENT_SOURCE')}" + ) try: # Import here to avoid circular dependencies @@ -56,7 +57,7 @@ def enrich(data_dir, collection_name): vector_store = initialize_vector_store(collection_name=collection_name) # Run enrichment process - run_enrichment_process(vector_store, data_dir=data_dir) + run_enrichment_process(vector_store) logger.info("Enrichment process completed successfully!") click.echo("Documents have been successfully loaded into the vector store.") diff --git a/services/rag/langchain/enrichment.py b/services/rag/langchain/enrichment.py index dd45435..22ca1c5 100644 --- a/services/rag/langchain/enrichment.py +++ b/services/rag/langchain/enrichment.py @@ -1,13 +1,15 @@ """Document enrichment module for loading documents into vector storage.""" -import os import hashlib +import os from pathlib import Path -from typing import List, Tuple +from typing import Iterator, List, Tuple + from dotenv import load_dotenv +from langchain_community.document_loaders import PyPDFLoader from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter -from langchain_community.document_loaders import PyPDFLoader + # Dynamically import other loaders to handle optional dependencies try: from langchain_community.document_loaders import UnstructuredWordDocumentLoader @@ -33,10 +35,10 @@ try: from langchain_community.document_loaders import UnstructuredODTLoader except ImportError: UnstructuredODTLoader = None -from sqlalchemy import create_engine, Column, Integer, String +from loguru import logger +from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker -from loguru import logger from helpers import ( LocalFilesystemAdaptiveCollection, @@ -76,13 +78,26 @@ SUPPORTED_EXTENSIONS = { ".odt", } + +def try_guess_source(extension: str) -> str: + if extension in [".xlsx", "xls"]: + return "таблица" + elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: + return "изображение" + elif extension in [".pptx"]: + return "презентация" + else: + return "документ" + + Base = declarative_base() class ProcessedDocument(Base): """Database model for tracking processed documents.""" + __tablename__ = "processed_documents" - + id = Column(Integer, primary_key=True) file_path = Column(String, unique=True, nullable=False) file_hash = Column(String, nullable=False) @@ -90,7 +105,7 @@ class ProcessedDocument(Base): class DocumentEnricher: """Class responsible for enriching documents and loading them to vector storage.""" - + def __init__(self, vector_store): self.vector_store = vector_store self.text_splitter = RecursiveCharacterTextSplitter( @@ -98,17 +113,17 @@ class DocumentEnricher: chunk_overlap=200, length_function=len, ) - + # Initialize database for tracking processed documents self._init_db() - + def _init_db(self): """Initialize the SQLite database for tracking processed documents.""" self.engine = create_engine(f"sqlite:///{DB_PATH}") Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() - + def _get_file_hash(self, file_path: str) -> str: """Calculate SHA256 hash of a file.""" hash_sha256 = hashlib.sha256() @@ -117,23 +132,20 @@ class DocumentEnricher: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() - + def _is_document_hash_processed(self, file_hash: str) -> bool: """Check if a document hash has already been processed.""" - existing = self.session.query(ProcessedDocument).filter_by( - file_hash=file_hash - ).first() + existing = ( + self.session.query(ProcessedDocument).filter_by(file_hash=file_hash).first() + ) return existing is not None - + def _mark_document_processed(self, file_identifier: str, file_hash: str): """Mark a document as processed in the database.""" - doc_record = ProcessedDocument( - file_path=file_identifier, - file_hash=file_hash - ) + doc_record = ProcessedDocument(file_path=file_identifier, file_hash=file_hash) self.session.add(doc_record) self.session.commit() - + def _get_loader_for_extension(self, file_path: str): """Get the appropriate loader for a given file extension.""" ext = Path(file_path).suffix.lower() @@ -142,40 +154,60 @@ class DocumentEnricher: return PyPDFLoader(file_path) elif ext in [".docx", ".doc"]: if UnstructuredWordDocumentLoader is None: - logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.") + logger.warning( + f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping." + ) return None - return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) + return UnstructuredWordDocumentLoader( + file_path, **{"strategy": "hi_res", "languages": ["rus"]} + ) elif ext == ".pptx": if UnstructuredPowerPointLoader is None: - logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.") + logger.warning( + f"UnstructuredPowerPointLoader not available for {file_path}. Skipping." + ) return None - return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) + return UnstructuredPowerPointLoader( + file_path, **{"strategy": "hi_res", "languages": ["rus"]} + ) elif ext in [".xlsx", ".xls"]: if UnstructuredExcelLoader is None: - logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.") + logger.warning( + f"UnstructuredExcelLoader not available for {file_path}. Skipping." + ) return None - return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) + return UnstructuredExcelLoader( + file_path, **{"strategy": "hi_res", "languages": ["rus"]} + ) elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: if UnstructuredImageLoader is None: - logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.") + logger.warning( + f"UnstructuredImageLoader not available for {file_path}. Skipping." + ) return None # Use OCR strategy for images to extract text - return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]}) + return UnstructuredImageLoader( + file_path, **{"strategy": "ocr_only", "languages": ["rus"]} + ) elif ext == ".odt": if UnstructuredODTLoader is None: - logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.") + logger.warning( + f"UnstructuredODTLoader not available for {file_path}. Skipping." + ) return None - return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) + return UnstructuredODTLoader( + file_path, **{"strategy": "hi_res", "languages": ["rus"]} + ) else: return None - + def _load_one_adaptive_file( self, adaptive_file: _AdaptiveFile ) -> Tuple[List[Document], str | None]: """Load and split one adaptive file by using its local working callback.""" loaded_docs: List[Document] = [] file_hash: str | None = None - source_identifier = adaptive_file.local_path + source_identifier = try_guess_source(adaptive_file.extension) extension = adaptive_file.extension.lower() def process_local_file(local_file_path: str): @@ -183,7 +215,9 @@ class DocumentEnricher: file_hash = self._get_file_hash(local_file_path) if self._is_document_hash_processed(file_hash): - logger.info(f"Skipping already processed document hash for: {source_identifier}") + logger.info( + f"Skipping already processed document hash for: {source_identifier}" + ) return loader = self._get_loader_for_extension(local_file_path) @@ -216,69 +250,79 @@ class DocumentEnricher: def load_and_split_documents( self, adaptive_collection: _AdaptiveCollection, recursive: bool = True - ) -> Tuple[List[Document], List[Tuple[str, str]]]: + ) -> Iterator[Tuple[List[Document], List[Tuple[str, str]]]]: """Load documents from adaptive collection and split them appropriately.""" - all_docs: List[Document] = [] + docs_chunk: List[Document] = [] processed_file_records: dict[str, str] = {} for adaptive_file in adaptive_collection.iterate(recursive=recursive): + if len(processed_file_records) >= 2: + yield docs_chunk, list(processed_file_records.items()) + docs_chunk = [] + processed_file_records = {} + if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS: logger.debug( f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}" ) continue - logger.info(f"Processing document: {adaptive_file.local_path}") + logger.info(f"Processing document: {adaptive_file.filename}") try: split_docs, file_hash = self._load_one_adaptive_file(adaptive_file) if split_docs: - all_docs.extend(split_docs) + docs_chunk.extend(split_docs) if file_hash: - processed_file_records[adaptive_file.local_path] = file_hash + processed_file_records[adaptive_file.filename] = file_hash except Exception as e: - logger.error(f"Error processing {adaptive_file.local_path}: {str(e)}") + logger.error(f"Error processing {adaptive_file.filename}: {str(e)}") continue - return all_docs, list(processed_file_records.items()) - def enrich_and_store(self, adaptive_collection: _AdaptiveCollection): """Load, enrich, and store documents in the vector store.""" logger.info("Starting enrichment process...") # Load and split documents - documents, processed_file_records = self.load_and_split_documents( + for documents, processed_file_records in self.load_and_split_documents( adaptive_collection - ) - - if not documents: - logger.info("No new documents to process.") - return - - logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...") - - # Add documents to vector store - try: - self.vector_store.add_documents(documents) - - # Only mark documents as processed after successful insertion to vector store - for file_identifier, file_hash in processed_file_records: - self._mark_document_processed(file_identifier, file_hash) + ): + if not documents: + logger.info("No new documents to process.") + return logger.info( - f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed." + f"Loaded and split {len(documents)} document chunks, adding to vector store..." ) - except Exception as e: - logger.error(f"Error adding documents to vector store: {str(e)}") - raise + logger.debug( + f"Documents len: {len(documents)}, processed_file_records len: {len(processed_file_records)}" + ) + + # Add documents to vector store + try: + self.vector_store.add_documents(documents) + + # Only mark documents as processed after successful insertion to vector store + for file_identifier, file_hash in processed_file_records: + self._mark_document_processed(file_identifier, file_hash) + + logger.info( + f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed." + ) + except Exception as e: + logger.error(f"Error adding documents to vector store: {str(e)}") + raise -def get_enrichment_adaptive_collection( - data_dir: str = str(DATA_DIR), -) -> _AdaptiveCollection: +def get_enrichment_adaptive_collection() -> _AdaptiveCollection: """Create adaptive collection based on environment source configuration.""" source = ENRICHMENT_SOURCE if source == "local": - local_path = ENRICHMENT_LOCAL_PATH or data_dir + local_path = ENRICHMENT_LOCAL_PATH + if local_path is None: + raise RuntimeError( + "Enrichment strategy is local, but no ENRICHMENT_LOCAL_PATH is defined!" + ) + logger.info(f"Using local adaptive collection from path: {local_path}") return LocalFilesystemAdaptiveCollection(local_path) @@ -302,27 +346,27 @@ def get_enrichment_adaptive_collection( ) -def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)): +def run_enrichment_process(vector_store): """Run the full enrichment process.""" logger.info("Starting document enrichment process") - adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir) - + adaptive_collection = get_enrichment_adaptive_collection() + # Initialize the document enricher enricher = DocumentEnricher(vector_store) - + # Run the enrichment process enricher.enrich_and_store(adaptive_collection) - + logger.info("Document enrichment process completed!") if __name__ == "__main__": # Example usage from vector_storage import initialize_vector_store - + # Initialize vector store vector_store = initialize_vector_store() - + # Run enrichment process run_enrichment_process(vector_store) diff --git a/services/rag/langchain/helpers.py b/services/rag/langchain/helpers.py index 1471cd2..7950bd6 100644 --- a/services/rag/langchain/helpers.py +++ b/services/rag/langchain/helpers.py @@ -115,13 +115,11 @@ def extract_russian_event_names(text: str) -> List[str]: class _AdaptiveFile(ABC): extension: str # Format: .jpg - local_path: str filename: str - def __init__(self, filename: str, extension: str, local_path: str): + def __init__(self, filename: str, extension: str): self.filename = filename self.extension = extension - self.local_path = local_path # This method allows to work with file locally, and lambda should be provided for this. # Why separate method? For possible cleanup after work is done. And to download file, if needed @@ -139,8 +137,11 @@ class _AdaptiveCollection(ABC): class LocalFilesystemAdaptiveFile(_AdaptiveFile): + local_path: str + def __init__(self, filename: str, extension: str, local_path: str): - super().__init__(filename, extension, local_path) + super().__init__(filename, extension) + self.local_path = local_path def work_with_file_locally(self, func: Callable[[str], None]): func(self.local_path) @@ -171,7 +172,7 @@ class YandexDiskAdaptiveFile(_AdaptiveFile): remote_path: str def __init__(self, filename: str, extension: str, remote_path: str, token: str): - super().__init__(filename, extension, remote_path) + super().__init__(filename, extension) self.token = token self.remote_path = remote_path