86 lines
2.6 KiB
Python
86 lines
2.6 KiB
Python
|
|
"""Prefect flow for analyzing Yandex Disk files and storing file paths."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import List
|
||
|
|
|
||
|
|
from dotenv import load_dotenv
|
||
|
|
from prefect.logging import get_run_logger
|
||
|
|
|
||
|
|
from prefect import flow, task
|
||
|
|
|
||
|
|
load_dotenv()
|
||
|
|
|
||
|
|
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
|
||
|
|
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
|
||
|
|
YADISK_ROOT_PATH = "Общая"
|
||
|
|
OUTPUT_JSON_PATH = (
|
||
|
|
Path(__file__).resolve().parent.parent / "../../../yadisk_files.json"
|
||
|
|
).resolve()
|
||
|
|
|
||
|
|
if PREFECT_API_URL:
|
||
|
|
os.environ["PREFECT_API_URL"] = PREFECT_API_URL
|
||
|
|
|
||
|
|
|
||
|
|
@task(name="iterate_yadisk_folder_and_store_file_paths")
|
||
|
|
def iterate_yadisk_folder_and_store_file_paths() -> List[str]:
|
||
|
|
"""Iterate Yandex Disk recursively from `Общая` and save file paths to JSON."""
|
||
|
|
if not YADISK_TOKEN:
|
||
|
|
raise ValueError("YADISK_TOKEN is required to analyze Yandex Disk")
|
||
|
|
|
||
|
|
try:
|
||
|
|
import yadisk
|
||
|
|
except ImportError as error:
|
||
|
|
raise RuntimeError(
|
||
|
|
"yadisk package is required for this task. Install dependencies in venv first."
|
||
|
|
) from error
|
||
|
|
|
||
|
|
yandex_disk = yadisk.YaDisk(token=YADISK_TOKEN)
|
||
|
|
file_paths: List[str] = []
|
||
|
|
|
||
|
|
logger = get_run_logger()
|
||
|
|
|
||
|
|
def walk_folder(folder_path: str) -> None:
|
||
|
|
for item in yandex_disk.listdir(folder_path):
|
||
|
|
item_type = getattr(item, "type", None)
|
||
|
|
item_path = getattr(item, "path", None)
|
||
|
|
|
||
|
|
if item_path is None and isinstance(item, dict):
|
||
|
|
item_type = item.get("type")
|
||
|
|
item_path = item.get("path")
|
||
|
|
|
||
|
|
if not item_path:
|
||
|
|
continue
|
||
|
|
|
||
|
|
if item_type == "dir":
|
||
|
|
walk_folder(item_path)
|
||
|
|
elif item_type == "file":
|
||
|
|
logger.info(f"Added {len(file_paths)} file into the list")
|
||
|
|
file_paths.append(item_path)
|
||
|
|
|
||
|
|
walk_folder(YADISK_ROOT_PATH)
|
||
|
|
|
||
|
|
OUTPUT_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as output_file:
|
||
|
|
json.dump(file_paths, output_file, ensure_ascii=False, indent=2)
|
||
|
|
|
||
|
|
return file_paths
|
||
|
|
|
||
|
|
|
||
|
|
@flow(name="analyze_yadisk_file_urls")
|
||
|
|
def analyze_yadisk_file_urls() -> List[str]:
|
||
|
|
"""Run Yandex Disk analysis task and return collected file paths."""
|
||
|
|
return iterate_yadisk_folder_and_store_file_paths()
|
||
|
|
|
||
|
|
|
||
|
|
def serve_analyze_yadisk_file_urls() -> None:
|
||
|
|
"""Serve the flow as a deployment for remote Prefect execution."""
|
||
|
|
analyze_yadisk_file_urls.serve(name="analyze-yadisk-file-urls")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
serve_analyze_yadisk_file_urls()
|