From 06a3155b6ba8f33db14c9e027f46dd6ecfea5a60 Mon Sep 17 00:00:00 2001 From: idchlife Date: Tue, 10 Feb 2026 20:42:07 +0300 Subject: [PATCH] Working Yandex Disk integration for loading files. Tests for local and Yandex --- services/rag/langchain/.DS_Store | Bin 0 -> 6148 bytes services/rag/langchain/.gitignore | 1 + services/rag/langchain/PLANNING.md | 6 +- services/rag/langchain/helpers.py | 133 ++++++++++++++++-- .../langchain/test/samples/level1/first.md | 1 + services/rag/langchain/test/samples/root.txt | 1 + ...st_local_filesystem_adaptive_collection.py | 52 +++++++ .../test_yandex_disk_adaptive_collection.py | 40 ++++++ 8 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 services/rag/langchain/.DS_Store create mode 100644 services/rag/langchain/test/samples/level1/first.md create mode 100644 services/rag/langchain/test/samples/root.txt create mode 100644 services/rag/langchain/test/test_local_filesystem_adaptive_collection.py create mode 100644 services/rag/langchain/test/test_yandex_disk_adaptive_collection.py diff --git a/services/rag/langchain/.DS_Store b/services/rag/langchain/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a2eb533abf551d042629249e6fe5fb378b3b5ee0 GIT binary patch literal 6148 zcmeH~u?oUK42Bc!Ah>jNyu}Cb4Gz&K=nFU~E>c0O^F6wMazU^xC+1b{XuyJ79K1T}(S!u1*@b}wNMJ-@TJzTK|1JE}{6A`8N&+PC zX9Tp_belC^D(=>|*R%RAs List[str]: return events -class _AdaptiveFile: +class _AdaptiveFile(ABC): extension: str # Format: .jpg local_path: str @@ -122,14 +126,14 @@ class _AdaptiveFile: # Lambda: first argument is a local path @abstractmethod def work_with_file_locally(self, func: Callable[[str], None]): - pass + """Run callback with a local path to the file.""" -class _AdaptiveCollection: +class _AdaptiveCollection(ABC): # Generator method with yield @abstractmethod - def iterate(self, recursive: bool): - pass + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: + """Iterate files in collection.""" class LocalFilesystemAdaptiveFile(_AdaptiveFile): @@ -145,8 +149,119 @@ class LocalFilesystemAdaptiveCollection(_AdaptiveCollection): self.base_dir = base_dir - def iterate(self, recursive: bool): + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: for root, dirs, files in os.walk(self.base_dir): for file in files: full_path = os.path.join(root, file) - yield _AdaptiveFile(Path(full_path).suffix, full_path) + yield LocalFilesystemAdaptiveFile(Path(full_path).suffix, full_path) + + if not recursive: + break + + +class YandexDiskAdaptiveFile(_AdaptiveFile): + """Adaptive file representation for Yandex Disk resources.""" + + def __init__(self, extension: str, local_path: str, token: str): + super().__init__(extension, local_path) + self.token = token + + def _download_to_temp_file(self) -> str: + headers = {"Authorization": f"OAuth {self.token}"} + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources/download", + headers=headers, + params={"path": self.local_path}, + timeout=30, + ) + response.raise_for_status() + href = response.json()["href"] + + file_response = requests.get(href, timeout=120) + file_response.raise_for_status() + + suffix = Path(self.local_path).suffix + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + temp_file.write(file_response.content) + return temp_file.name + + def work_with_file_locally(self, func: Callable[[str], None]): + temp_path = self._download_to_temp_file() + try: + func(temp_path) + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + +class YandexDiskAdaptiveCollection(_AdaptiveCollection): + """Adaptive collection implementation for Yandex Disk.""" + + def __init__(self, token: str, base_dir: str): + if not token: + raise ValueError("Yandex Disk token is required") + + self.token = token + self.base_dir = base_dir + self._headers = {"Authorization": f"OAuth {self.token}"} + + @staticmethod + def _normalize_disk_path(path: str) -> str: + return path if path.startswith("disk:/") else f"disk:/{path.lstrip('/')}" + + def _get_resource_info(self, path: str) -> dict: + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources", + headers=self._headers, + params={"path": path, "limit": 1000}, + timeout=30, + ) + response.raise_for_status() + return response.json() + + def _iter_children(self, path: str) -> Iterator[dict]: + offset = 0 + while True: + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources", + headers=self._headers, + params={"path": path, "limit": 1000, "offset": offset}, + timeout=30, + ) + response.raise_for_status() + payload = response.json() + embedded = payload.get("_embedded", {}) + items = embedded.get("items", []) + if not items: + break + + for item in items: + yield item + + if len(items) < 1000: + break + offset += 1000 + + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: + root_path = self._normalize_disk_path(self.base_dir) + root_info = self._get_resource_info(root_path) + + if root_info.get("type") == "file": + path = root_info["path"] + logger.info(f"Found file on Yandex Disk: {path}") + yield YandexDiskAdaptiveFile(Path(path).suffix, path, self.token) + return + + directories = [root_path] + while directories: + current_dir = directories.pop(0) + for item in self._iter_children(current_dir): + item_type = item.get("type") + item_path = item.get("path") + if item_type == "file": + logger.info(f"Found file on Yandex Disk: {item_path}") + yield YandexDiskAdaptiveFile( + Path(item_path).suffix, item_path, self.token + ) + elif recursive and item_type == "dir": + directories.append(item_path) diff --git a/services/rag/langchain/test/samples/level1/first.md b/services/rag/langchain/test/samples/level1/first.md new file mode 100644 index 0000000..4bdc263 --- /dev/null +++ b/services/rag/langchain/test/samples/level1/first.md @@ -0,0 +1 @@ +first level diff --git a/services/rag/langchain/test/samples/root.txt b/services/rag/langchain/test/samples/root.txt new file mode 100644 index 0000000..3e5fd7f --- /dev/null +++ b/services/rag/langchain/test/samples/root.txt @@ -0,0 +1 @@ +root file diff --git a/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py new file mode 100644 index 0000000..0da5a8b --- /dev/null +++ b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py @@ -0,0 +1,52 @@ +import os +import unittest +from pathlib import Path + +from helpers import LocalFilesystemAdaptiveCollection, LocalFilesystemAdaptiveFile + + +class TestLocalFilesystemAdaptiveCollection(unittest.TestCase): + def setUp(self): + self.samples_dir = Path(__file__).parent / "samples" + + def test_iterate_non_recursive_returns_only_root_files(self): + collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir)) + + files = list(collection.iterate(recursive=False)) + file_names = sorted(Path(file.local_path).name for file in files) + + self.assertEqual(file_names, ["root.txt"]) + self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files)) + + def test_iterate_recursive_returns_nested_files(self): + collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir)) + + files = list(collection.iterate(recursive=True)) + relative_paths = sorted( + str(Path(file.local_path).relative_to(self.samples_dir)) for file in files + ) + + self.assertEqual( + relative_paths, + ["level1/first.md", "level1/level2/second.log", "root.txt"], + ) + + def test_work_with_file_locally_provides_existing_path(self): + target_path = self.samples_dir / "root.txt" + adaptive_file = LocalFilesystemAdaptiveFile(target_path.suffix, str(target_path)) + + observed = {} + + def callback(path: str): + observed["path"] = path + with open(path, "r", encoding="utf-8") as handle: + observed["content"] = handle.read().strip() + + adaptive_file.work_with_file_locally(callback) + + self.assertEqual(observed["path"], str(target_path)) + self.assertEqual(observed["content"], "root file") + + +if __name__ == "__main__": + unittest.main() diff --git a/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py new file mode 100644 index 0000000..b423d4f --- /dev/null +++ b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py @@ -0,0 +1,40 @@ +import os +import unittest +from pathlib import Path + +import requests +from loguru import logger +from dotenv import load_dotenv +from helpers import YandexDiskAdaptiveCollection + +load_dotenv(dotenv_path=Path(__file__).resolve().parent.parent / ".env.test") + + +class TestYandexDiskAdaptiveCollection(unittest.TestCase): + def test_constructor_requires_token(self): + with self.assertRaises(ValueError): + YandexDiskAdaptiveCollection(token="", base_dir="Общая/Информация") + + def test_iterate_logs_found_files_for_shared_folder(self): + token = os.getenv("YADISK_TOKEN") + if not token: + self.skipTest("YADISK_TOKEN is not configured") + + collection = YandexDiskAdaptiveCollection( + token=token, + base_dir="Общая/Информация", + ) + + try: + files = list(collection.iterate(recursive=True)) + except requests.RequestException as exc: + self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}") + + for item in files: + logger.info(f"Yandex file found during test iteration: {item.local_path}") + + self.assertIsInstance(files, list) + + +if __name__ == "__main__": + unittest.main()