From 1e6ab247b99a98f5224e927b7cfa0e88ad2eabfb Mon Sep 17 00:00:00 2001 From: idchlife Date: Tue, 10 Feb 2026 22:19:27 +0300 Subject: [PATCH] Phase 12 done... loading via adaptive collection, yadisk or local --- services/rag/langchain/PLANNING.md | 12 +- services/rag/langchain/enrichment.py | 237 ++++++++++-------- services/rag/langchain/helpers.py | 10 +- ...st_local_filesystem_adaptive_collection.py | 7 +- .../test_yandex_disk_adaptive_collection.py | 1 + 5 files changed, 154 insertions(+), 113 deletions(-) diff --git a/services/rag/langchain/PLANNING.md b/services/rag/langchain/PLANNING.md index 30d2ec4..ff5df17 100644 --- a/services/rag/langchain/PLANNING.md +++ b/services/rag/langchain/PLANNING.md @@ -78,9 +78,9 @@ Chosen data folder: relatve ./../../../data - from the current folder During enrichment, we should use adaptive collection from the helpers, for loading documents. We should not use directly local filesystem, but use adaptive collection as a wrapper. -- [ ] Adaptive file in helper now has filename in it, so tests should be adjusted for this -- [ ] Add conditional usage of adaptive collection in the enrichment stage. .env has now variable ENRICHMENT_SOURCE with 2 possible values: yadisk, local -- [ ] With local source, use env variable for local filesystem adaptive collection: ENRICHMENT_LOCAL_PATH -- [ ] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system -- [ ] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them. -- [ ] Adaptive files has filename in them, so it should be used when extracting metadata +- [x] Adaptive file in helper now has filename in it, so tests should be adjusted for this +- [x] Add conditional usage of adaptive collection in the enrichment stage. .env has now variable ENRICHMENT_SOURCE with 2 possible values: yadisk, local +- [x] With local source, use env variable for local filesystem adaptive collection: ENRICHMENT_LOCAL_PATH +- [x] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system +- [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them. +- [x] Adaptive files has filename in them, so it should be used when extracting metadata diff --git a/services/rag/langchain/enrichment.py b/services/rag/langchain/enrichment.py index 9607085..dd45435 100644 --- a/services/rag/langchain/enrichment.py +++ b/services/rag/langchain/enrichment.py @@ -3,7 +3,7 @@ import os import hashlib from pathlib import Path -from typing import List, Dict, Any +from typing import List, Tuple from dotenv import load_dotenv from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter @@ -37,9 +37,15 @@ from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from loguru import logger -import sqlite3 -from helpers import extract_russian_event_names, extract_years_from_text +from helpers import ( + LocalFilesystemAdaptiveCollection, + YandexDiskAdaptiveCollection, + _AdaptiveCollection, + _AdaptiveFile, + extract_russian_event_names, + extract_years_from_text, +) # Load environment variables load_dotenv() @@ -48,6 +54,27 @@ load_dotenv() # Define the path to the data directory DATA_DIR = Path("../../../data").resolve() DB_PATH = Path("document_tracking.db").resolve() +ENRICHMENT_SOURCE = os.getenv("ENRICHMENT_SOURCE", "local").lower() +ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH") +ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH") +YADISK_TOKEN = os.getenv("YADISK_TOKEN") + +SUPPORTED_EXTENSIONS = { + ".pdf", + ".docx", + ".doc", + ".pptx", + ".xlsx", + ".xls", + ".jpg", + ".jpeg", + ".png", + ".gif", + ".bmp", + ".tiff", + ".webp", + ".odt", +} Base = declarative_base() @@ -91,19 +118,17 @@ class DocumentEnricher: hash_sha256.update(chunk) return hash_sha256.hexdigest() - def _is_document_processed(self, file_path: str) -> bool: - """Check if a document has already been processed.""" - file_hash = self._get_file_hash(file_path) + def _is_document_hash_processed(self, file_hash: str) -> bool: + """Check if a document hash has already been processed.""" existing = self.session.query(ProcessedDocument).filter_by( file_hash=file_hash ).first() return existing is not None - def _mark_document_processed(self, file_path: str): + def _mark_document_processed(self, file_identifier: str, file_hash: str): """Mark a document as processed in the database.""" - file_hash = self._get_file_hash(file_path) doc_record = ProcessedDocument( - file_path=file_path, + file_path=file_identifier, file_hash=file_hash ) self.session.add(doc_record) @@ -142,77 +167,88 @@ class DocumentEnricher: return None return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]}) else: - # For text files and unsupported formats, try to load as text - try: - with open(file_path, 'r', encoding='utf-8') as f: - content = f.read() - return None, content # Return content directly for text processing - except UnicodeDecodeError: - logger.warning(f"Could not decode file as text: {file_path}") - return None, None + return None - def load_and_split_documents(self, file_paths: List[str]) -> List[Document]: - """Load documents from file paths and split them appropriately.""" - all_docs = [] + def _load_one_adaptive_file( + self, adaptive_file: _AdaptiveFile + ) -> Tuple[List[Document], str | None]: + """Load and split one adaptive file by using its local working callback.""" + loaded_docs: List[Document] = [] + file_hash: str | None = None + source_identifier = adaptive_file.local_path + extension = adaptive_file.extension.lower() - for file_path in file_paths: - if self._is_document_processed(file_path): - logger.info(f"Skipping already processed document: {file_path}") - continue + def process_local_file(local_file_path: str): + nonlocal loaded_docs, file_hash - logger.info(f"Processing document: {file_path}") - - # Get the appropriate loader for the file extension - loader = self._get_loader_for_extension(file_path) + file_hash = self._get_file_hash(local_file_path) + if self._is_document_hash_processed(file_hash): + logger.info(f"Skipping already processed document hash for: {source_identifier}") + return + loader = self._get_loader_for_extension(local_file_path) if loader is None: - # For unsupported formats that we tried to load as text + logger.warning(f"No loader available for file: {source_identifier}") + return + + docs = loader.load() + for doc in docs: + doc.metadata["source"] = source_identifier + doc.metadata["filename"] = adaptive_file.filename + doc.metadata["file_path"] = source_identifier + doc.metadata["file_size"] = os.path.getsize(local_file_path) + doc.metadata["file_extension"] = extension + + if "page" in doc.metadata: + doc.metadata["page_number"] = doc.metadata["page"] + + split_docs = self.text_splitter.split_documents(docs) + for chunk in split_docs: + years = extract_years_from_text(chunk.page_content) + events = extract_russian_event_names(chunk.page_content) + chunk.metadata["years"] = years + chunk.metadata["events"] = events + + loaded_docs = split_docs + + adaptive_file.work_with_file_locally(process_local_file) + return loaded_docs, file_hash + + def load_and_split_documents( + self, adaptive_collection: _AdaptiveCollection, recursive: bool = True + ) -> Tuple[List[Document], List[Tuple[str, str]]]: + """Load documents from adaptive collection and split them appropriately.""" + all_docs: List[Document] = [] + processed_file_records: dict[str, str] = {} + + for adaptive_file in adaptive_collection.iterate(recursive=recursive): + if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS: + logger.debug( + f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}" + ) continue + logger.info(f"Processing document: {adaptive_file.local_path}") try: - # Load the document(s) - docs = loader.load() - - # Add metadata to each document - for doc in docs: - # Extract metadata from the original file - doc.metadata["source"] = file_path - doc.metadata["filename"] = Path(file_path).name - doc.metadata["file_path"] = file_path - doc.metadata["file_size"] = os.path.getsize(file_path) - - # Add page number if available in original metadata - if "page" in doc.metadata: - doc.metadata["page_number"] = doc.metadata["page"] - - # Add file extension as metadata - doc.metadata["file_extension"] = Path(file_path).suffix - - # Split documents if they are too large - split_docs = self.text_splitter.split_documents(docs) - - # Extract additional metadata from each chunk. - for chunk in split_docs: - years = extract_years_from_text(chunk.page_content) - events = extract_russian_event_names(chunk.page_content) - chunk.metadata["years"] = years - chunk.metadata["events"] = events - - # Add to the collection - all_docs.extend(split_docs) - + split_docs, file_hash = self._load_one_adaptive_file(adaptive_file) + if split_docs: + all_docs.extend(split_docs) + if file_hash: + processed_file_records[adaptive_file.local_path] = file_hash except Exception as e: - logger.error(f"Error processing {file_path}: {str(e)}") + logger.error(f"Error processing {adaptive_file.local_path}: {str(e)}") continue - return all_docs + return all_docs, list(processed_file_records.items()) - def enrich_and_store(self, file_paths: List[str]): + def enrich_and_store(self, adaptive_collection: _AdaptiveCollection): """Load, enrich, and store documents in the vector store.""" - logger.info(f"Starting enrichment process for {len(file_paths)} files...") + logger.info("Starting enrichment process...") # Load and split documents - documents = self.load_and_split_documents(file_paths) + documents, processed_file_records = self.load_and_split_documents( + adaptive_collection + ) if not documents: logger.info("No new documents to process.") @@ -225,55 +261,58 @@ class DocumentEnricher: self.vector_store.add_documents(documents) # Only mark documents as processed after successful insertion to vector store - processed_file_paths = set() - for doc in documents: - if 'source' in doc.metadata: - processed_file_paths.add(doc.metadata['source']) + for file_identifier, file_hash in processed_file_records: + self._mark_document_processed(file_identifier, file_hash) - for file_path in processed_file_paths: - self._mark_document_processed(file_path) - - logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.") + logger.info( + f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed." + ) except Exception as e: logger.error(f"Error adding documents to vector store: {str(e)}") raise -def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]: - """Get all supported document file paths from the data directory.""" - supported_extensions = { - '.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls', - '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', - '.webp', '.odt' - } - - file_paths = [] - for root, dirs, files in os.walk(data_dir): - for file in files: - if Path(file).suffix.lower() in supported_extensions: - file_paths.append(os.path.join(root, file)) - - return file_paths +def get_enrichment_adaptive_collection( + data_dir: str = str(DATA_DIR), +) -> _AdaptiveCollection: + """Create adaptive collection based on environment source configuration.""" + source = ENRICHMENT_SOURCE + if source == "local": + local_path = ENRICHMENT_LOCAL_PATH or data_dir + logger.info(f"Using local adaptive collection from path: {local_path}") + return LocalFilesystemAdaptiveCollection(local_path) + + if source == "yadisk": + if not YADISK_TOKEN: + raise ValueError("YADISK_TOKEN must be set when ENRICHMENT_SOURCE=yadisk") + if not ENRICHMENT_YADISK_PATH: + raise ValueError( + "ENRICHMENT_YADISK_PATH must be set when ENRICHMENT_SOURCE=yadisk" + ) + logger.info( + f"Using Yandex Disk adaptive collection from path: {ENRICHMENT_YADISK_PATH}" + ) + return YandexDiskAdaptiveCollection( + token=YADISK_TOKEN, + base_dir=ENRICHMENT_YADISK_PATH, + ) + + raise ValueError( + f"Unsupported ENRICHMENT_SOURCE='{source}'. Allowed values: local, yadisk" + ) def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)): """Run the full enrichment process.""" - logger.info(f"Starting document enrichment from directory: {data_dir}") - - # Get all supported documents from the data directory - file_paths = get_all_documents_from_data_dir(data_dir) - - if not file_paths: - logger.warning(f"No supported documents found in {data_dir}") - return - - logger.info(f"Found {len(file_paths)} documents to process") + logger.info("Starting document enrichment process") + + adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir) # Initialize the document enricher enricher = DocumentEnricher(vector_store) # Run the enrichment process - enricher.enrich_and_store(file_paths) + enricher.enrich_and_store(adaptive_collection) logger.info("Document enrichment process completed!") diff --git a/services/rag/langchain/helpers.py b/services/rag/langchain/helpers.py index 1965f76..1471cd2 100644 --- a/services/rag/langchain/helpers.py +++ b/services/rag/langchain/helpers.py @@ -118,9 +118,10 @@ class _AdaptiveFile(ABC): local_path: str filename: str - def __init__(self, filename: str, extension: str): + def __init__(self, filename: str, extension: str, local_path: str): self.filename = filename self.extension = extension + self.local_path = local_path # This method allows to work with file locally, and lambda should be provided for this. # Why separate method? For possible cleanup after work is done. And to download file, if needed @@ -138,11 +139,8 @@ class _AdaptiveCollection(ABC): class LocalFilesystemAdaptiveFile(_AdaptiveFile): - local_path: str - def __init__(self, filename: str, extension: str, local_path: str): - super().__init__(filename, extension) - self.local_path = local_path + super().__init__(filename, extension, local_path) def work_with_file_locally(self, func: Callable[[str], None]): func(self.local_path) @@ -173,7 +171,7 @@ class YandexDiskAdaptiveFile(_AdaptiveFile): remote_path: str def __init__(self, filename: str, extension: str, remote_path: str, token: str): - super().__init__(filename, extension) + super().__init__(filename, extension, remote_path) self.token = token self.remote_path = remote_path diff --git a/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py index 0da5a8b..f85d2cb 100644 --- a/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py +++ b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py @@ -13,7 +13,7 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase): collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir)) files = list(collection.iterate(recursive=False)) - file_names = sorted(Path(file.local_path).name for file in files) + file_names = sorted(file.filename for file in files) self.assertEqual(file_names, ["root.txt"]) self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files)) @@ -33,7 +33,9 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase): def test_work_with_file_locally_provides_existing_path(self): target_path = self.samples_dir / "root.txt" - adaptive_file = LocalFilesystemAdaptiveFile(target_path.suffix, str(target_path)) + adaptive_file = LocalFilesystemAdaptiveFile( + target_path.name, target_path.suffix, str(target_path) + ) observed = {} @@ -44,6 +46,7 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase): adaptive_file.work_with_file_locally(callback) + self.assertEqual(adaptive_file.filename, "root.txt") self.assertEqual(observed["path"], str(target_path)) self.assertEqual(observed["content"], "root file") diff --git a/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py index b423d4f..ab5e6f4 100644 --- a/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py +++ b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py @@ -31,6 +31,7 @@ class TestYandexDiskAdaptiveCollection(unittest.TestCase): self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}") for item in files: + self.assertTrue(item.filename) logger.info(f"Yandex file found during test iteration: {item.local_path}") self.assertIsInstance(files, list)