prefect flow and task working inside langchain project
This commit is contained in:
@@ -3,6 +3,7 @@ OLLAMA_CHAT_MODEL=MODEL
|
||||
OPENAI_CHAT_URL=URL
|
||||
OPENAI_CHAT_KEY=KEY
|
||||
CHAT_MODEL_STRATEGY=ollama
|
||||
PREFECT_API_URL=URL
|
||||
QDRANT_HOST=HOST
|
||||
QDRANT_REST_PORT=PORT
|
||||
QDRANT_GRPC_PORT=PORT
|
||||
|
||||
@@ -105,9 +105,9 @@ During this Phase we create asynchronous process of enrichment, utilizing async/
|
||||
# Phase 14 (integration of Prefect client, for creating flow and tasks on remote Prefect server)
|
||||
|
||||
- [ ] Install Prefect client library.
|
||||
- [ ] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
|
||||
- [ ] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase.
|
||||
- [ ] Create prefect flow called "analyze_yadisk_file_urls"
|
||||
- [ ] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings.
|
||||
- [ ] In our pefect file add function for flow to serve, as per prefect documentation on serving flows
|
||||
- [ ] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase.
|
||||
- [x] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
|
||||
- [x] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase.
|
||||
- [x] Create prefect flow called "analyze_yadisk_file_urls"
|
||||
- [x] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings.
|
||||
- [x] In our pefect file add function for flow to serve, as per prefect documentation on serving flows
|
||||
- [x] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase.
|
||||
|
||||
85
services/rag/langchain/prefect/01_yadisk_analyze.py
Normal file
85
services/rag/langchain/prefect/01_yadisk_analyze.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Prefect flow for analyzing Yandex Disk files and storing file paths."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from prefect.logging import get_run_logger
|
||||
|
||||
from prefect import flow, task
|
||||
|
||||
load_dotenv()
|
||||
|
||||
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
|
||||
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
|
||||
YADISK_ROOT_PATH = "Общая"
|
||||
OUTPUT_JSON_PATH = (
|
||||
Path(__file__).resolve().parent.parent / "../../../yadisk_files.json"
|
||||
).resolve()
|
||||
|
||||
if PREFECT_API_URL:
|
||||
os.environ["PREFECT_API_URL"] = PREFECT_API_URL
|
||||
|
||||
|
||||
@task(name="iterate_yadisk_folder_and_store_file_paths")
|
||||
def iterate_yadisk_folder_and_store_file_paths() -> List[str]:
|
||||
"""Iterate Yandex Disk recursively from `Общая` and save file paths to JSON."""
|
||||
if not YADISK_TOKEN:
|
||||
raise ValueError("YADISK_TOKEN is required to analyze Yandex Disk")
|
||||
|
||||
try:
|
||||
import yadisk
|
||||
except ImportError as error:
|
||||
raise RuntimeError(
|
||||
"yadisk package is required for this task. Install dependencies in venv first."
|
||||
) from error
|
||||
|
||||
yandex_disk = yadisk.YaDisk(token=YADISK_TOKEN)
|
||||
file_paths: List[str] = []
|
||||
|
||||
logger = get_run_logger()
|
||||
|
||||
def walk_folder(folder_path: str) -> None:
|
||||
for item in yandex_disk.listdir(folder_path):
|
||||
item_type = getattr(item, "type", None)
|
||||
item_path = getattr(item, "path", None)
|
||||
|
||||
if item_path is None and isinstance(item, dict):
|
||||
item_type = item.get("type")
|
||||
item_path = item.get("path")
|
||||
|
||||
if not item_path:
|
||||
continue
|
||||
|
||||
if item_type == "dir":
|
||||
walk_folder(item_path)
|
||||
elif item_type == "file":
|
||||
logger.info(f"Added {len(file_paths)} file into the list")
|
||||
file_paths.append(item_path)
|
||||
|
||||
walk_folder(YADISK_ROOT_PATH)
|
||||
|
||||
OUTPUT_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as output_file:
|
||||
json.dump(file_paths, output_file, ensure_ascii=False, indent=2)
|
||||
|
||||
return file_paths
|
||||
|
||||
|
||||
@flow(name="analyze_yadisk_file_urls")
|
||||
def analyze_yadisk_file_urls() -> List[str]:
|
||||
"""Run Yandex Disk analysis task and return collected file paths."""
|
||||
return iterate_yadisk_folder_and_store_file_paths()
|
||||
|
||||
|
||||
def serve_analyze_yadisk_file_urls() -> None:
|
||||
"""Serve the flow as a deployment for remote Prefect execution."""
|
||||
analyze_yadisk_file_urls.serve(name="analyze-yadisk-file-urls")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
serve_analyze_yadisk_file_urls()
|
||||
@@ -55,3 +55,4 @@ unstructured-pytesseract>=0.3.12
|
||||
|
||||
# System and utilities
|
||||
ollama>=0.3.0
|
||||
prefect>=2.19.0
|
||||
|
||||
@@ -4,8 +4,8 @@ import json
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Dict
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from loguru import logger
|
||||
|
||||
Reference in New Issue
Block a user