From f5659675ec01def8a415b3452136595e83a34c2b Mon Sep 17 00:00:00 2001 From: idchlife Date: Wed, 11 Feb 2026 15:46:54 +0300 Subject: [PATCH] =?UTF-8?q?-=20main=20feat:=20adaptation=20for=20async=20e?= =?UTF-8?q?nrichment=20-=20added=20file=5Ftype,=20this=20will=20hold=20the?= =?UTF-8?q?=20"=D1=82=D0=B0=D0=B1=D0=BB=D0=B8=D1=86=D0=B0",=20"=D0=BF?= =?UTF-8?q?=D1=80=D0=B5=D0=B7=D0=B5=D0=BD=D1=82=D0=B0=D1=86=D0=B8=D1=8F"?= =?UTF-8?q?=20and=20so=20on=20types=20-=20file=20source=20metadata=20is=20?= =?UTF-8?q?now=20taken=20either=20from=20local=20source=20or=20yandex=20di?= =?UTF-8?q?sk.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/rag/langchain/.env.dist | 4 + services/rag/langchain/PLANNING.md | 17 ++ services/rag/langchain/enrichment.py | 328 ++++++++++++++++++--------- 3 files changed, 242 insertions(+), 107 deletions(-) diff --git a/services/rag/langchain/.env.dist b/services/rag/langchain/.env.dist index 11cfba4..9ca9d7c 100644 --- a/services/rag/langchain/.env.dist +++ b/services/rag/langchain/.env.dist @@ -10,3 +10,7 @@ YADISK_TOKEN=TOKEN ENRICHMENT_SOURCE=local/yadisk ENRICHMENT_LOCAL_PATH=path ENRICHMENT_YADISK_PATH=path +ENRICHMENT_PROCESSING_MODE=async/sync +ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT=5 +ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS=4 +ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS=4 diff --git a/services/rag/langchain/PLANNING.md b/services/rag/langchain/PLANNING.md index ff5df17..dd65f8f 100644 --- a/services/rag/langchain/PLANNING.md +++ b/services/rag/langchain/PLANNING.md @@ -84,3 +84,20 @@ During enrichment, we should use adaptive collection from the helpers, for loadi - [x] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system - [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them. - [x] Adaptive files has filename in them, so it should be used when extracting metadata + + +# Phase 13 (async processing of files) + +During this Phase we create asynchronous process of enrichment, utilizing async/await + +- [x] Prepare enrichment to be async process, so adjust neede libraries, etc. that are needed to be processed. +- [x] Create queue for adaptive files. It will store adaptive files that needs to be processed +- [x] Create queue for documents that were taken from the adaptive files. +- [x] Create function that iterates through the adaptive collection and adds it to the adaptive files queue ADAPTIVE_FILES_QUEUE. Let's call it insert_adaptive_files_queue +- [x] Create function that takes adaptive file from the adaptive files queue (PROCESSED_DOCUMENTS_QUEUE) and processed it, by splitting into chunks of documents. Let's call it process_adaptive_files_queue +- [x] Create function that takes chunk of documents from the processed documents queue, and sends them into the vector storage. It marks document, of which these chunks, as processed in the local database (existing feature adapted here. Let's call it upload_processed_documents_from_queue +- [x] Utilize Python threading machinery, to create threads for several our functions. There will be environment variables: ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT (default 5), ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS (default 4), ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS (default 4) +- [x] Function insert_adaptive_files_queue would not be in a thread. It will iterate through adaptive collection and wait while queue has less than ENRICHMENT_ADAPTIVE_FILE_LOAD_QUEUE_LIMIT. +- [x] Function process_adaptive_files_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS) +- [x] Function upload_processed_documents_from_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS) +- [x] Program should control threads. Function insert_adaptive_files_queue, after adaptive collection ends, then should wait untill all theads finish. What does finish mean? It means when our insert_adaptive_files_queue function realizes that there is no adaptive files left in collection, it marks shared variable between threads, that collection finished. When our other functions in threads sees that this variable became true - they deplete queue and do not go to the next loop to wait for new items in queue, and just finish. This would eventually finish the program. Each thread finishes, and main program too as usual after processing all of things. diff --git a/services/rag/langchain/enrichment.py b/services/rag/langchain/enrichment.py index 22ca1c5..1e6f43b 100644 --- a/services/rag/langchain/enrichment.py +++ b/services/rag/langchain/enrichment.py @@ -2,13 +2,19 @@ import hashlib import os +import queue +import threading from pathlib import Path -from typing import Iterator, List, Tuple +from typing import List, Optional, Tuple from dotenv import load_dotenv from langchain_community.document_loaders import PyPDFLoader from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter +from loguru import logger +from sqlalchemy import Column, Integer, String, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker # Dynamically import other loaders to handle optional dependencies try: @@ -35,14 +41,11 @@ try: from langchain_community.document_loaders import UnstructuredODTLoader except ImportError: UnstructuredODTLoader = None -from loguru import logger -from sqlalchemy import Column, Integer, String, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker from helpers import ( LocalFilesystemAdaptiveCollection, YandexDiskAdaptiveCollection, + YandexDiskAdaptiveFile, _AdaptiveCollection, _AdaptiveFile, extract_russian_event_names, @@ -52,7 +55,6 @@ from helpers import ( # Load environment variables load_dotenv() - # Define the path to the data directory DATA_DIR = Path("../../../data").resolve() DB_PATH = Path("document_tracking.db").resolve() @@ -61,6 +63,17 @@ ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH") ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH") YADISK_TOKEN = os.getenv("YADISK_TOKEN") +ENRICHMENT_PROCESSING_MODE = os.getenv("ENRICHMENT_PROCESSING_MODE", "async").lower() +ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT = int( + os.getenv("ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT", "5") +) +ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS = int( + os.getenv("ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS", "4") +) +ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS = int( + os.getenv("ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS", "4") +) + SUPPORTED_EXTENSIONS = { ".pdf", ".docx", @@ -76,20 +89,9 @@ SUPPORTED_EXTENSIONS = { ".tiff", ".webp", ".odt", + ".txt", # this one is obvious but was unexpected to see in data lol } - -def try_guess_source(extension: str) -> str: - if extension in [".xlsx", "xls"]: - return "таблица" - elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: - return "изображение" - elif extension in [".pptx"]: - return "презентация" - else: - return "документ" - - Base = declarative_base() @@ -103,6 +105,25 @@ class ProcessedDocument(Base): file_hash = Column(String, nullable=False) +# to guess the filetype in russian language, for searching it +def try_guess_file_type(extension: str) -> str: + if extension in [".xlsx", "xls"]: + return "таблица" + elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: + return "изображение" + elif extension in [".pptx"]: + return "презентация" + else: + return "документ" + + +def identify_adaptive_file_source(adaptive_file: _AdaptiveFile) -> str: + if adaptive_file is YandexDiskAdaptiveFile: + return "Яндекс Диск" + else: + return "Локальный Файл" + + class DocumentEnricher: """Class responsible for enriching documents and loading them to vector storage.""" @@ -114,6 +135,34 @@ class DocumentEnricher: length_function=len, ) + # In sync mode we force minimal concurrency values. + if ENRICHMENT_PROCESSING_MODE == "sync": + self.adaptive_files_queue_limit = 1 + self.file_process_threads_count = 1 + self.document_upload_threads_count = 1 + else: + self.adaptive_files_queue_limit = max( + 1, ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT + ) + self.file_process_threads_count = max( + 1, ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS + ) + self.document_upload_threads_count = max( + 1, ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS + ) + + # Phase 13 queues + self.ADAPTIVE_FILES_QUEUE: queue.Queue = queue.Queue( + maxsize=self.adaptive_files_queue_limit + ) + self.PROCESSED_DOCUMENTS_QUEUE: queue.Queue = queue.Queue( + maxsize=max(1, self.adaptive_files_queue_limit * 2) + ) + + # Shared state for thread lifecycle + self.collection_finished = threading.Event() + self.processing_finished = threading.Event() + # Initialize database for tracking processed documents self._init_db() @@ -121,30 +170,45 @@ class DocumentEnricher: """Initialize the SQLite database for tracking processed documents.""" self.engine = create_engine(f"sqlite:///{DB_PATH}") Base.metadata.create_all(self.engine) - Session = sessionmaker(bind=self.engine) - self.session = Session() + self.SessionLocal = sessionmaker(bind=self.engine) def _get_file_hash(self, file_path: str) -> str: """Calculate SHA256 hash of a file.""" hash_sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - # Read file in chunks to handle large files - for chunk in iter(lambda: f.read(4096), b""): + with open(file_path, "rb") as file_handle: + for chunk in iter(lambda: file_handle.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def _is_document_hash_processed(self, file_hash: str) -> bool: """Check if a document hash has already been processed.""" - existing = ( - self.session.query(ProcessedDocument).filter_by(file_hash=file_hash).first() - ) - return existing is not None + session = self.SessionLocal() + try: + existing = ( + session.query(ProcessedDocument).filter_by(file_hash=file_hash).first() + ) + return existing is not None + finally: + session.close() def _mark_document_processed(self, file_identifier: str, file_hash: str): """Mark a document as processed in the database.""" - doc_record = ProcessedDocument(file_path=file_identifier, file_hash=file_hash) - self.session.add(doc_record) - self.session.commit() + session = self.SessionLocal() + try: + existing = ( + session.query(ProcessedDocument) + .filter_by(file_path=file_identifier) + .first() + ) + if existing is not None: + existing.file_hash = file_hash + else: + session.add( + ProcessedDocument(file_path=file_identifier, file_hash=file_hash) + ) + session.commit() + finally: + session.close() def _get_loader_for_extension(self, file_path: str): """Get the appropriate loader for a given file extension.""" @@ -152,7 +216,7 @@ class DocumentEnricher: if ext == ".pdf": return PyPDFLoader(file_path) - elif ext in [".docx", ".doc"]: + if ext in [".docx", ".doc"]: if UnstructuredWordDocumentLoader is None: logger.warning( f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping." @@ -161,7 +225,7 @@ class DocumentEnricher: return UnstructuredWordDocumentLoader( file_path, **{"strategy": "hi_res", "languages": ["rus"]} ) - elif ext == ".pptx": + if ext == ".pptx": if UnstructuredPowerPointLoader is None: logger.warning( f"UnstructuredPowerPointLoader not available for {file_path}. Skipping." @@ -170,7 +234,7 @@ class DocumentEnricher: return UnstructuredPowerPointLoader( file_path, **{"strategy": "hi_res", "languages": ["rus"]} ) - elif ext in [".xlsx", ".xls"]: + if ext in [".xlsx", ".xls"]: if UnstructuredExcelLoader is None: logger.warning( f"UnstructuredExcelLoader not available for {file_path}. Skipping." @@ -179,17 +243,16 @@ class DocumentEnricher: return UnstructuredExcelLoader( file_path, **{"strategy": "hi_res", "languages": ["rus"]} ) - elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: + if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: if UnstructuredImageLoader is None: logger.warning( f"UnstructuredImageLoader not available for {file_path}. Skipping." ) return None - # Use OCR strategy for images to extract text return UnstructuredImageLoader( file_path, **{"strategy": "ocr_only", "languages": ["rus"]} ) - elif ext == ".odt": + if ext == ".odt": if UnstructuredODTLoader is None: logger.warning( f"UnstructuredODTLoader not available for {file_path}. Skipping." @@ -198,20 +261,20 @@ class DocumentEnricher: return UnstructuredODTLoader( file_path, **{"strategy": "hi_res", "languages": ["rus"]} ) - else: - return None + return None def _load_one_adaptive_file( self, adaptive_file: _AdaptiveFile - ) -> Tuple[List[Document], str | None]: + ) -> Tuple[List[Document], Optional[Tuple[str, str]]]: """Load and split one adaptive file by using its local working callback.""" loaded_docs: List[Document] = [] - file_hash: str | None = None - source_identifier = try_guess_source(adaptive_file.extension) + processed_record: Optional[Tuple[str, str]] = None + source_identifier = identify_adaptive_file_source(adaptive_file) extension = adaptive_file.extension.lower() + file_type = try_guess_file_type(extension) def process_local_file(local_file_path: str): - nonlocal loaded_docs, file_hash + nonlocal loaded_docs, processed_record file_hash = self._get_file_hash(local_file_path) if self._is_document_hash_processed(file_hash): @@ -227,6 +290,7 @@ class DocumentEnricher: docs = loader.load() for doc in docs: + doc.metadata["file_type"] = file_type doc.metadata["source"] = source_identifier doc.metadata["filename"] = adaptive_file.filename doc.metadata["file_path"] = source_identifier @@ -238,91 +302,145 @@ class DocumentEnricher: split_docs = self.text_splitter.split_documents(docs) for chunk in split_docs: - years = extract_years_from_text(chunk.page_content) - events = extract_russian_event_names(chunk.page_content) - chunk.metadata["years"] = years - chunk.metadata["events"] = events + chunk.metadata["years"] = extract_years_from_text(chunk.page_content) + chunk.metadata["events"] = extract_russian_event_names( + chunk.page_content + ) loaded_docs = split_docs + processed_record = (source_identifier, file_hash) adaptive_file.work_with_file_locally(process_local_file) - return loaded_docs, file_hash + return loaded_docs, processed_record - def load_and_split_documents( + # Phase 13 API: inserts adaptive files into ADAPTIVE_FILES_QUEUE + def insert_adaptive_files_queue( self, adaptive_collection: _AdaptiveCollection, recursive: bool = True - ) -> Iterator[Tuple[List[Document], List[Tuple[str, str]]]]: - """Load documents from adaptive collection and split them appropriately.""" - docs_chunk: List[Document] = [] - processed_file_records: dict[str, str] = {} - + ): for adaptive_file in adaptive_collection.iterate(recursive=recursive): - if len(processed_file_records) >= 2: - yield docs_chunk, list(processed_file_records.items()) - docs_chunk = [] - processed_file_records = {} - if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS: logger.debug( f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}" ) continue - logger.info(f"Processing document: {adaptive_file.filename}") + self.ADAPTIVE_FILES_QUEUE.put(adaptive_file) + + self.collection_finished.set() + + # Phase 13 API: reads adaptive files and writes processed docs into PROCESSED_DOCUMENTS_QUEUE + def process_adaptive_files_queue(self): + while True: try: - split_docs, file_hash = self._load_one_adaptive_file(adaptive_file) - if split_docs: - docs_chunk.extend(split_docs) - if file_hash: - processed_file_records[adaptive_file.filename] = file_hash - except Exception as e: - logger.error(f"Error processing {adaptive_file.filename}: {str(e)}") + adaptive_file = self.ADAPTIVE_FILES_QUEUE.get(timeout=0.2) + except queue.Empty: + if self.collection_finished.is_set(): + return continue + try: + split_docs, processed_record = self._load_one_adaptive_file( + adaptive_file + ) + if split_docs: + self.PROCESSED_DOCUMENTS_QUEUE.put((split_docs, processed_record)) + except Exception as error: + logger.error(f"Error processing {adaptive_file.filename}: {error}") + finally: + self.ADAPTIVE_FILES_QUEUE.task_done() + + # Phase 13 API: uploads chunked docs and marks file processed + def upload_processed_documents_from_queue(self): + while True: + try: + payload = self.PROCESSED_DOCUMENTS_QUEUE.get(timeout=0.2) + except queue.Empty: + if self.processing_finished.is_set(): + return + continue + + try: + documents, processed_record = payload + self.vector_store.add_documents(documents) + + if processed_record is not None: + self._mark_document_processed( + processed_record[0], processed_record[1] + ) + except Exception as error: + logger.error(f"Error uploading processed documents: {error}") + raise + finally: + self.PROCESSED_DOCUMENTS_QUEUE.task_done() + + def _run_threaded_pipeline(self, adaptive_collection: _AdaptiveCollection): + """Run Phase 13 queue/thread pipeline.""" + process_threads = [ + threading.Thread( + target=self.process_adaptive_files_queue, + name=f"adaptive-file-processor-{index}", + daemon=True, + ) + for index in range(self.file_process_threads_count) + ] + upload_threads = [ + threading.Thread( + target=self.upload_processed_documents_from_queue, + name=f"document-uploader-{index}", + daemon=True, + ) + for index in range(self.document_upload_threads_count) + ] + + for thread in process_threads: + thread.start() + for thread in upload_threads: + thread.start() + + # This one intentionally runs on main thread per Phase 13 requirement. + self.insert_adaptive_files_queue(adaptive_collection, recursive=True) + + # Wait file queue completion and processing threads end. + self.ADAPTIVE_FILES_QUEUE.join() + for thread in process_threads: + thread.join() + + # Signal upload workers no more payload is expected. + self.processing_finished.set() + + # Wait upload completion and upload threads end. + self.PROCESSED_DOCUMENTS_QUEUE.join() + for thread in upload_threads: + thread.join() + + def _run_sync_pipeline(self, adaptive_collection: _AdaptiveCollection): + """Sequential pipeline for sync mode.""" + logger.info("Running enrichment in sync mode") + self.insert_adaptive_files_queue(adaptive_collection, recursive=True) + self.process_adaptive_files_queue() + self.processing_finished.set() + self.upload_processed_documents_from_queue() + def enrich_and_store(self, adaptive_collection: _AdaptiveCollection): """Load, enrich, and store documents in the vector store.""" logger.info("Starting enrichment process...") - # Load and split documents - for documents, processed_file_records in self.load_and_split_documents( - adaptive_collection - ): - if not documents: - logger.info("No new documents to process.") - return + if ENRICHMENT_PROCESSING_MODE == "sync": + logger.info("Document enrichment process starting in SYNC mode") + self._run_sync_pipeline(adaptive_collection) + return - logger.info( - f"Loaded and split {len(documents)} document chunks, adding to vector store..." - ) - logger.debug( - f"Documents len: {len(documents)}, processed_file_records len: {len(processed_file_records)}" - ) - - # Add documents to vector store - try: - self.vector_store.add_documents(documents) - - # Only mark documents as processed after successful insertion to vector store - for file_identifier, file_hash in processed_file_records: - self._mark_document_processed(file_identifier, file_hash) - - logger.info( - f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed." - ) - except Exception as e: - logger.error(f"Error adding documents to vector store: {str(e)}") - raise + logger.info("Document enrichment process starting in ASYNC/THREAD mode") + self._run_threaded_pipeline(adaptive_collection) -def get_enrichment_adaptive_collection() -> _AdaptiveCollection: +def get_enrichment_adaptive_collection( + data_dir: str = str(DATA_DIR), +) -> _AdaptiveCollection: """Create adaptive collection based on environment source configuration.""" source = ENRICHMENT_SOURCE if source == "local": - local_path = ENRICHMENT_LOCAL_PATH - if local_path is None: - raise RuntimeError( - "Enrichment strategy is local, but no ENRICHMENT_LOCAL_PATH is defined!" - ) - + local_path = ENRICHMENT_LOCAL_PATH or data_dir logger.info(f"Using local adaptive collection from path: {local_path}") return LocalFilesystemAdaptiveCollection(local_path) @@ -346,11 +464,11 @@ def get_enrichment_adaptive_collection() -> _AdaptiveCollection: ) -def run_enrichment_process(vector_store): +def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)): """Run the full enrichment process.""" logger.info("Starting document enrichment process") - adaptive_collection = get_enrichment_adaptive_collection() + adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir) # Initialize the document enricher enricher = DocumentEnricher(vector_store) @@ -362,11 +480,7 @@ def run_enrichment_process(vector_store): if __name__ == "__main__": - # Example usage from vector_storage import initialize_vector_store - # Initialize vector store vector_store = initialize_vector_store() - - # Run enrichment process run_enrichment_process(vector_store)