diff --git a/services/rag/langchain/.env.dist b/services/rag/langchain/.env.dist index 9ca9d7c..b0349a4 100644 --- a/services/rag/langchain/.env.dist +++ b/services/rag/langchain/.env.dist @@ -3,6 +3,7 @@ OLLAMA_CHAT_MODEL=MODEL OPENAI_CHAT_URL=URL OPENAI_CHAT_KEY=KEY CHAT_MODEL_STRATEGY=ollama +PREFECT_API_URL=URL QDRANT_HOST=HOST QDRANT_REST_PORT=PORT QDRANT_GRPC_PORT=PORT diff --git a/services/rag/langchain/PLANNING.md b/services/rag/langchain/PLANNING.md index 81f8ac4..b05fd24 100644 --- a/services/rag/langchain/PLANNING.md +++ b/services/rag/langchain/PLANNING.md @@ -105,9 +105,9 @@ During this Phase we create asynchronous process of enrichment, utilizing async/ # Phase 14 (integration of Prefect client, for creating flow and tasks on remote Prefect server) - [ ] Install Prefect client library. -- [ ] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server -- [ ] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase. -- [ ] Create prefect flow called "analyze_yadisk_file_urls" -- [ ] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings. -- [ ] In our pefect file add function for flow to serve, as per prefect documentation on serving flows -- [ ] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase. +- [x] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server +- [x] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase. +- [x] Create prefect flow called "analyze_yadisk_file_urls" +- [x] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings. +- [x] In our pefect file add function for flow to serve, as per prefect documentation on serving flows +- [x] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase. diff --git a/services/rag/langchain/prefect/01_yadisk_analyze.py b/services/rag/langchain/prefect/01_yadisk_analyze.py new file mode 100644 index 0000000..dd19fd4 --- /dev/null +++ b/services/rag/langchain/prefect/01_yadisk_analyze.py @@ -0,0 +1,85 @@ +"""Prefect flow for analyzing Yandex Disk files and storing file paths.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import List + +from dotenv import load_dotenv +from prefect.logging import get_run_logger + +from prefect import flow, task + +load_dotenv() + +PREFECT_API_URL = os.getenv("PREFECT_API_URL") +YADISK_TOKEN = os.getenv("YADISK_TOKEN") +YADISK_ROOT_PATH = "Общая" +OUTPUT_JSON_PATH = ( + Path(__file__).resolve().parent.parent / "../../../yadisk_files.json" +).resolve() + +if PREFECT_API_URL: + os.environ["PREFECT_API_URL"] = PREFECT_API_URL + + +@task(name="iterate_yadisk_folder_and_store_file_paths") +def iterate_yadisk_folder_and_store_file_paths() -> List[str]: + """Iterate Yandex Disk recursively from `Общая` and save file paths to JSON.""" + if not YADISK_TOKEN: + raise ValueError("YADISK_TOKEN is required to analyze Yandex Disk") + + try: + import yadisk + except ImportError as error: + raise RuntimeError( + "yadisk package is required for this task. Install dependencies in venv first." + ) from error + + yandex_disk = yadisk.YaDisk(token=YADISK_TOKEN) + file_paths: List[str] = [] + + logger = get_run_logger() + + def walk_folder(folder_path: str) -> None: + for item in yandex_disk.listdir(folder_path): + item_type = getattr(item, "type", None) + item_path = getattr(item, "path", None) + + if item_path is None and isinstance(item, dict): + item_type = item.get("type") + item_path = item.get("path") + + if not item_path: + continue + + if item_type == "dir": + walk_folder(item_path) + elif item_type == "file": + logger.info(f"Added {len(file_paths)} file into the list") + file_paths.append(item_path) + + walk_folder(YADISK_ROOT_PATH) + + OUTPUT_JSON_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as output_file: + json.dump(file_paths, output_file, ensure_ascii=False, indent=2) + + return file_paths + + +@flow(name="analyze_yadisk_file_urls") +def analyze_yadisk_file_urls() -> List[str]: + """Run Yandex Disk analysis task and return collected file paths.""" + return iterate_yadisk_folder_and_store_file_paths() + + +def serve_analyze_yadisk_file_urls() -> None: + """Serve the flow as a deployment for remote Prefect execution.""" + analyze_yadisk_file_urls.serve(name="analyze-yadisk-file-urls") + + +if __name__ == "__main__": + serve_analyze_yadisk_file_urls() diff --git a/services/rag/langchain/requirements.txt b/services/rag/langchain/requirements.txt index 812e316..68af25f 100644 --- a/services/rag/langchain/requirements.txt +++ b/services/rag/langchain/requirements.txt @@ -54,4 +54,5 @@ unstructured-inference>=0.7.0 unstructured-pytesseract>=0.3.12 # System and utilities -ollama>=0.3.0 \ No newline at end of file +ollama>=0.3.0 +prefect>=2.19.0 diff --git a/services/rag/langchain/server.py b/services/rag/langchain/server.py index 37cae8e..e9f751d 100644 --- a/services/rag/langchain/server.py +++ b/services/rag/langchain/server.py @@ -4,8 +4,8 @@ import json import os from contextlib import asynccontextmanager from typing import Any, Dict -from dotenv import load_dotenv +from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from loguru import logger