diff --git a/services/rag/langchain/.DS_Store b/services/rag/langchain/.DS_Store new file mode 100644 index 0000000..a2eb533 Binary files /dev/null and b/services/rag/langchain/.DS_Store differ diff --git a/services/rag/langchain/.gitignore b/services/rag/langchain/.gitignore index 43a6f68..7555585 100644 --- a/services/rag/langchain/.gitignore +++ b/services/rag/langchain/.gitignore @@ -215,3 +215,4 @@ __marimo__/ # Streamlit .streamlit/secrets.toml document_tracking.db +.env.test diff --git a/services/rag/langchain/PLANNING.md b/services/rag/langchain/PLANNING.md index b29f943..40b2cdf 100644 --- a/services/rag/langchain/PLANNING.md +++ b/services/rag/langchain/PLANNING.md @@ -70,6 +70,6 @@ Chosen data folder: relatve ./../../../data - from the current folder - [x] Create adaptive collection class and adaptive file class in the helpers, which will be as abstract classes, that should encompass feature of iterating and working with files locally - [x] Write local filesystem implementation of adaptive collection -- [ ] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess -- [ ] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk. -- [ ] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done. +- [x] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess +- [x] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk. +- [x] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env.test has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done. diff --git a/services/rag/langchain/helpers.py b/services/rag/langchain/helpers.py index dbc93cd..aca31a2 100644 --- a/services/rag/langchain/helpers.py +++ b/services/rag/langchain/helpers.py @@ -2,9 +2,13 @@ import os import re -from abc import abstractmethod +import tempfile +from abc import ABC, abstractmethod from pathlib import Path -from typing import Callable, List +from typing import Callable, Iterator, List + +import requests +from loguru import logger _YEAR_PATTERN = re.compile(r"(? List[str]: return events -class _AdaptiveFile: +class _AdaptiveFile(ABC): extension: str # Format: .jpg local_path: str @@ -122,14 +126,14 @@ class _AdaptiveFile: # Lambda: first argument is a local path @abstractmethod def work_with_file_locally(self, func: Callable[[str], None]): - pass + """Run callback with a local path to the file.""" -class _AdaptiveCollection: +class _AdaptiveCollection(ABC): # Generator method with yield @abstractmethod - def iterate(self, recursive: bool): - pass + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: + """Iterate files in collection.""" class LocalFilesystemAdaptiveFile(_AdaptiveFile): @@ -145,8 +149,119 @@ class LocalFilesystemAdaptiveCollection(_AdaptiveCollection): self.base_dir = base_dir - def iterate(self, recursive: bool): + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: for root, dirs, files in os.walk(self.base_dir): for file in files: full_path = os.path.join(root, file) - yield _AdaptiveFile(Path(full_path).suffix, full_path) + yield LocalFilesystemAdaptiveFile(Path(full_path).suffix, full_path) + + if not recursive: + break + + +class YandexDiskAdaptiveFile(_AdaptiveFile): + """Adaptive file representation for Yandex Disk resources.""" + + def __init__(self, extension: str, local_path: str, token: str): + super().__init__(extension, local_path) + self.token = token + + def _download_to_temp_file(self) -> str: + headers = {"Authorization": f"OAuth {self.token}"} + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources/download", + headers=headers, + params={"path": self.local_path}, + timeout=30, + ) + response.raise_for_status() + href = response.json()["href"] + + file_response = requests.get(href, timeout=120) + file_response.raise_for_status() + + suffix = Path(self.local_path).suffix + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + temp_file.write(file_response.content) + return temp_file.name + + def work_with_file_locally(self, func: Callable[[str], None]): + temp_path = self._download_to_temp_file() + try: + func(temp_path) + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + +class YandexDiskAdaptiveCollection(_AdaptiveCollection): + """Adaptive collection implementation for Yandex Disk.""" + + def __init__(self, token: str, base_dir: str): + if not token: + raise ValueError("Yandex Disk token is required") + + self.token = token + self.base_dir = base_dir + self._headers = {"Authorization": f"OAuth {self.token}"} + + @staticmethod + def _normalize_disk_path(path: str) -> str: + return path if path.startswith("disk:/") else f"disk:/{path.lstrip('/')}" + + def _get_resource_info(self, path: str) -> dict: + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources", + headers=self._headers, + params={"path": path, "limit": 1000}, + timeout=30, + ) + response.raise_for_status() + return response.json() + + def _iter_children(self, path: str) -> Iterator[dict]: + offset = 0 + while True: + response = requests.get( + "https://cloud-api.yandex.net/v1/disk/resources", + headers=self._headers, + params={"path": path, "limit": 1000, "offset": offset}, + timeout=30, + ) + response.raise_for_status() + payload = response.json() + embedded = payload.get("_embedded", {}) + items = embedded.get("items", []) + if not items: + break + + for item in items: + yield item + + if len(items) < 1000: + break + offset += 1000 + + def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: + root_path = self._normalize_disk_path(self.base_dir) + root_info = self._get_resource_info(root_path) + + if root_info.get("type") == "file": + path = root_info["path"] + logger.info(f"Found file on Yandex Disk: {path}") + yield YandexDiskAdaptiveFile(Path(path).suffix, path, self.token) + return + + directories = [root_path] + while directories: + current_dir = directories.pop(0) + for item in self._iter_children(current_dir): + item_type = item.get("type") + item_path = item.get("path") + if item_type == "file": + logger.info(f"Found file on Yandex Disk: {item_path}") + yield YandexDiskAdaptiveFile( + Path(item_path).suffix, item_path, self.token + ) + elif recursive and item_type == "dir": + directories.append(item_path) diff --git a/services/rag/langchain/test/samples/level1/first.md b/services/rag/langchain/test/samples/level1/first.md new file mode 100644 index 0000000..4bdc263 --- /dev/null +++ b/services/rag/langchain/test/samples/level1/first.md @@ -0,0 +1 @@ +first level diff --git a/services/rag/langchain/test/samples/root.txt b/services/rag/langchain/test/samples/root.txt new file mode 100644 index 0000000..3e5fd7f --- /dev/null +++ b/services/rag/langchain/test/samples/root.txt @@ -0,0 +1 @@ +root file diff --git a/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py new file mode 100644 index 0000000..0da5a8b --- /dev/null +++ b/services/rag/langchain/test/test_local_filesystem_adaptive_collection.py @@ -0,0 +1,52 @@ +import os +import unittest +from pathlib import Path + +from helpers import LocalFilesystemAdaptiveCollection, LocalFilesystemAdaptiveFile + + +class TestLocalFilesystemAdaptiveCollection(unittest.TestCase): + def setUp(self): + self.samples_dir = Path(__file__).parent / "samples" + + def test_iterate_non_recursive_returns_only_root_files(self): + collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir)) + + files = list(collection.iterate(recursive=False)) + file_names = sorted(Path(file.local_path).name for file in files) + + self.assertEqual(file_names, ["root.txt"]) + self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files)) + + def test_iterate_recursive_returns_nested_files(self): + collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir)) + + files = list(collection.iterate(recursive=True)) + relative_paths = sorted( + str(Path(file.local_path).relative_to(self.samples_dir)) for file in files + ) + + self.assertEqual( + relative_paths, + ["level1/first.md", "level1/level2/second.log", "root.txt"], + ) + + def test_work_with_file_locally_provides_existing_path(self): + target_path = self.samples_dir / "root.txt" + adaptive_file = LocalFilesystemAdaptiveFile(target_path.suffix, str(target_path)) + + observed = {} + + def callback(path: str): + observed["path"] = path + with open(path, "r", encoding="utf-8") as handle: + observed["content"] = handle.read().strip() + + adaptive_file.work_with_file_locally(callback) + + self.assertEqual(observed["path"], str(target_path)) + self.assertEqual(observed["content"], "root file") + + +if __name__ == "__main__": + unittest.main() diff --git a/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py new file mode 100644 index 0000000..b423d4f --- /dev/null +++ b/services/rag/langchain/test/test_yandex_disk_adaptive_collection.py @@ -0,0 +1,40 @@ +import os +import unittest +from pathlib import Path + +import requests +from loguru import logger +from dotenv import load_dotenv +from helpers import YandexDiskAdaptiveCollection + +load_dotenv(dotenv_path=Path(__file__).resolve().parent.parent / ".env.test") + + +class TestYandexDiskAdaptiveCollection(unittest.TestCase): + def test_constructor_requires_token(self): + with self.assertRaises(ValueError): + YandexDiskAdaptiveCollection(token="", base_dir="Общая/Информация") + + def test_iterate_logs_found_files_for_shared_folder(self): + token = os.getenv("YADISK_TOKEN") + if not token: + self.skipTest("YADISK_TOKEN is not configured") + + collection = YandexDiskAdaptiveCollection( + token=token, + base_dir="Общая/Информация", + ) + + try: + files = list(collection.iterate(recursive=True)) + except requests.RequestException as exc: + self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}") + + for item in files: + logger.info(f"Yandex file found during test iteration: {item.local_path}") + + self.assertIsInstance(files, list) + + +if __name__ == "__main__": + unittest.main()