ragflow in the repository, with codex-created yandex disk plugin JUST IN CASE, also llamaindex enrichment with yandex disk predefined data
This commit is contained in:
@@ -6,9 +6,21 @@ EMBEDDING_STRATEGY=ollama
|
||||
OLLAMA_EMBEDDING_MODEL=MODEL
|
||||
OLLAMA_CHAT_MODEL=MODEL
|
||||
|
||||
# Qdrant Configuration
|
||||
QDRANT_HOST=localhost
|
||||
QDRANT_REST_PORT=6333
|
||||
QDRANT_GRPC_PORT=6334
|
||||
|
||||
# OpenAI Configuration (for reference - uncomment and configure when using OpenAI strategy)
|
||||
# OPENAI_CHAT_URL=https://api.openai.com/v1
|
||||
# OPENAI_CHAT_KEY=your_openai_api_key_here
|
||||
# OPENAI_EMBEDDING_MODEL=text-embedding-3-small
|
||||
# OPENAI_EMBEDDING_BASE_URL=https://api.openai.com/v1
|
||||
# OPENAI_EMBEDDING_API_KEY=your_openai_api_key_here
|
||||
|
||||
# Yandex Disk + Prefect (Phase 9)
|
||||
YADISK_TOKEN=your_yadisk_token_here
|
||||
PREFECT_API_URL=https://your-prefect-server.example/api
|
||||
QDRANT_HOST=HOST
|
||||
QDRANT_REST_PORT=PORT
|
||||
QDRANT_GRPC_PORT=PORT
|
||||
|
||||
@@ -49,10 +49,23 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
||||
|
||||
# Phase 8 (comment unsupported formats for now)
|
||||
|
||||
- [ ] Remove for now formats, extensions for images of any kind, archives of any kind, and add possible text documents, documents formats, like .txt, .xlsx, etc.
|
||||
- [x] Remove for now formats, extensions for images of any kind, archives of any kind, and add possible text documents, documents formats, like .txt, .xlsx, etc. in enrichment processes/functions.
|
||||
|
||||
# Phase 9 (integration of Prefect client, for creating flow and tasks on remote Prefect server)
|
||||
|
||||
- [ ] Install Prefect client library.
|
||||
- [ ] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
|
||||
- [ ] Create
|
||||
- [x] Install Prefect client library.
|
||||
- [x] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
|
||||
- [x] Create prefect client file in `prefect/01_yadisk_predefined_enrich.py`. This file will firt load file from ./../../../yadisk_files.json into array of paths. After that, array of paths will be filtered, and only supported in enrichment extensions will be left. After that, code will iterate through each path in this filtered array, use yadisk library to download file, process it for enrichment, and the remove it after processing. There should be statistics for this, at runtime, with progressbar that shows how many files processed out of how many left. Also, near the progressbar there should be counter of errors. Yes, if there is an error, it should be swallowed, even if it is inside thred or async function.
|
||||
- [x] For yandex disk integration use library yadisk. In .env file there should be variable YADISK_TOKEN for accessing the needed connection
|
||||
- [x] Code for loading should be reflected upon, and then made it so it would be done in async way, with as much as possible simulatenous tasks. yadisk async integration should be used (async features can be checked here: https://pypi.org/project/yadisk/)
|
||||
- [x] No tests for code should be done at this phase, all tests will be done manually, because loading of documents can take a long time for automated test.
|
||||
|
||||
# Phase 10 (qdrant connection credentials in .env)
|
||||
|
||||
- [x] Add Qdrant connection variables to the .env file: QDRANT_HOST, QDRANT_REST_PORT, QDRANT_GRPC_PORT
|
||||
- [x] Replace everywhere where Qdran connection used hardcoded values into the usage of Qdrant .env variables
|
||||
|
||||
# Phase 11 (http endpoint to retrieve data from the vector storage by query)
|
||||
|
||||
- [ ] Create file `server.py`, with web framework fastapi, for example
|
||||
- [ ] Add POST endpoint "/api/test-query" which will use agent, and retrieve response for query, sent in JSON format, field "query"
|
||||
|
||||
@@ -11,7 +11,7 @@ import os
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from llama_index.core import Document, SimpleDirectoryReader
|
||||
from llama_index.core.node_parser import CodeSplitter, SentenceSplitter
|
||||
@@ -25,6 +25,35 @@ from config import get_embedding_model
|
||||
from vector_storage import get_vector_store_and_index
|
||||
|
||||
|
||||
SUPPORTED_ENRICHMENT_EXTENSIONS = {
|
||||
".csv",
|
||||
".doc",
|
||||
".docx",
|
||||
".epub",
|
||||
".htm",
|
||||
".html",
|
||||
".json",
|
||||
".jsonl",
|
||||
".md",
|
||||
".odt",
|
||||
".pdf",
|
||||
".ppt",
|
||||
".pptx",
|
||||
".rtf",
|
||||
".rst",
|
||||
".tsv",
|
||||
".txt",
|
||||
".xls",
|
||||
".xlsx",
|
||||
".xml",
|
||||
}
|
||||
|
||||
|
||||
def get_supported_enrichment_extensions() -> set[str]:
|
||||
"""Return the file extensions currently supported by enrichment."""
|
||||
return set(SUPPORTED_ENRICHMENT_EXTENSIONS)
|
||||
|
||||
|
||||
class DocumentTracker:
|
||||
"""Class to handle tracking of processed documents to avoid re-processing."""
|
||||
|
||||
@@ -251,24 +280,7 @@ def process_documents_from_data_folder(
|
||||
return
|
||||
|
||||
# Find all supported files in the data directory
|
||||
supported_extensions = {
|
||||
".pdf",
|
||||
".docx",
|
||||
".xlsx",
|
||||
".pptx",
|
||||
".odt",
|
||||
".txt",
|
||||
".png",
|
||||
".jpg",
|
||||
".jpeg",
|
||||
".gif",
|
||||
".bmp",
|
||||
".svg",
|
||||
".zip",
|
||||
".rar",
|
||||
".tar",
|
||||
".gz",
|
||||
}
|
||||
supported_extensions = get_supported_enrichment_extensions()
|
||||
|
||||
# Walk through the directory structure
|
||||
all_files = []
|
||||
@@ -285,10 +297,13 @@ def process_documents_from_data_folder(
|
||||
if file_ext in supported_extensions:
|
||||
all_files.append(str(file))
|
||||
|
||||
logger.info(f"Found {len(all_files)} files to process")
|
||||
logger.info(
|
||||
f"Found {len(all_files)} supported files to process (extensions: {', '.join(sorted(supported_extensions))})"
|
||||
)
|
||||
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
error_count = 0
|
||||
|
||||
# Initialize progress bar
|
||||
pbar = tqdm(total=len(all_files), desc="Processing documents", unit="file")
|
||||
@@ -298,113 +313,126 @@ def process_documents_from_data_folder(
|
||||
f"Processing file: {file_path} ({processed_count + skipped_count + 1}/{len(all_files)})"
|
||||
)
|
||||
|
||||
# Check if document has already been processed
|
||||
if tracker.is_document_processed(file_path):
|
||||
logger.info(f"Skipping already processed file: {file_path}")
|
||||
skipped_count += 1
|
||||
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Load the document using SimpleDirectoryReader
|
||||
# This automatically selects the appropriate reader based on file extension
|
||||
def file_metadata_func(file_path_str):
|
||||
# Apply proper encoding to filename
|
||||
filename = ensure_proper_encoding(Path(file_path_str).name)
|
||||
return {"filename": filename}
|
||||
|
||||
reader = SimpleDirectoryReader(
|
||||
input_files=[file_path], file_metadata=file_metadata_func
|
||||
result = process_document_file(file_path, tracker=tracker, index=index)
|
||||
if result["status"] == "processed":
|
||||
processed_count += 1
|
||||
elif result["status"] == "skipped":
|
||||
skipped_count += 1
|
||||
else:
|
||||
error_count += 1
|
||||
pbar.set_postfix(
|
||||
{"Processed": processed_count, "Skipped": skipped_count, "Errors": error_count}
|
||||
)
|
||||
documents = reader.load_data()
|
||||
|
||||
# Process each document
|
||||
for doc in documents:
|
||||
# Extract additional metadata based on document type
|
||||
file_ext = Path(file_path).suffix
|
||||
|
||||
# Apply proper encoding to file path
|
||||
encoded_file_path = ensure_proper_encoding(file_path)
|
||||
|
||||
# Add additional metadata
|
||||
doc.metadata["file_path"] = encoded_file_path
|
||||
doc.metadata["processed_at"] = datetime.now().isoformat()
|
||||
|
||||
# Handle document-type-specific metadata
|
||||
if file_ext.lower() == ".pdf":
|
||||
# PDF-specific metadata
|
||||
doc.metadata["page_label"] = ensure_proper_encoding(
|
||||
doc.metadata.get("page_label", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "pdf"
|
||||
|
||||
elif file_ext.lower() in [".docx", ".odt"]:
|
||||
# Word document metadata
|
||||
doc.metadata["section"] = ensure_proper_encoding(
|
||||
doc.metadata.get("section", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "document"
|
||||
|
||||
elif file_ext.lower() == ".pptx":
|
||||
# PowerPoint metadata
|
||||
doc.metadata["slide_id"] = ensure_proper_encoding(
|
||||
doc.metadata.get("slide_id", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "presentation"
|
||||
|
||||
elif file_ext.lower() == ".xlsx":
|
||||
# Excel metadata
|
||||
doc.metadata["sheet_name"] = ensure_proper_encoding(
|
||||
doc.metadata.get("sheet_name", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "spreadsheet"
|
||||
|
||||
# Determine the appropriate text splitter based on file type
|
||||
splitter = get_text_splitter(file_ext)
|
||||
|
||||
# Split the document into nodes
|
||||
nodes = splitter.get_nodes_from_documents([doc])
|
||||
|
||||
# Insert nodes into the vector index
|
||||
nodes_with_enhanced_metadata = []
|
||||
for i, node in enumerate(nodes):
|
||||
# Enhance node metadata with additional information
|
||||
node.metadata["original_doc_id"] = ensure_proper_encoding(
|
||||
doc.doc_id
|
||||
)
|
||||
node.metadata["chunk_number"] = i
|
||||
node.metadata["total_chunks"] = len(nodes)
|
||||
node.metadata["file_path"] = encoded_file_path
|
||||
|
||||
# Ensure the text content is properly encoded
|
||||
node.text = ensure_proper_encoding(node.text)
|
||||
|
||||
nodes_with_enhanced_metadata.append(node)
|
||||
|
||||
# Add all nodes to the index at once
|
||||
if nodes_with_enhanced_metadata:
|
||||
index.insert_nodes(nodes_with_enhanced_metadata)
|
||||
|
||||
logger.info(f"Processed {len(nodes)} nodes from {encoded_file_path}")
|
||||
|
||||
# Mark document as processed only after successful insertion
|
||||
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
|
||||
processed_count += 1
|
||||
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing file {file_path}: {str(e)}")
|
||||
error_count += 1
|
||||
pbar.set_postfix(
|
||||
{"Processed": processed_count, "Skipped": skipped_count, "Errors": error_count}
|
||||
)
|
||||
|
||||
# Update progress bar regardless of success or failure
|
||||
pbar.update(1)
|
||||
|
||||
pbar.close()
|
||||
logger.info(
|
||||
f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}"
|
||||
f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}, Errors: {error_count}"
|
||||
)
|
||||
|
||||
|
||||
def process_document_file(
|
||||
file_path: str,
|
||||
tracker: Optional[DocumentTracker] = None,
|
||||
index=None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Process a single document file and store its chunks in the vector index.
|
||||
|
||||
Returns a dict with status and counters. Status is one of:
|
||||
`processed`, `skipped`, `error`.
|
||||
"""
|
||||
file_ext = Path(file_path).suffix.lower()
|
||||
if file_ext not in get_supported_enrichment_extensions():
|
||||
logger.info(f"Skipping unsupported extension for file: {file_path}")
|
||||
return {"status": "skipped", "reason": "unsupported_extension", "nodes": 0}
|
||||
|
||||
tracker = tracker or DocumentTracker()
|
||||
|
||||
if tracker.is_document_processed(file_path):
|
||||
logger.info(f"Skipping already processed file: {file_path}")
|
||||
return {"status": "skipped", "reason": "already_processed", "nodes": 0}
|
||||
|
||||
if index is None:
|
||||
_, index = get_vector_store_and_index()
|
||||
|
||||
try:
|
||||
def file_metadata_func(file_path_str):
|
||||
filename = ensure_proper_encoding(Path(file_path_str).name)
|
||||
return {"filename": filename}
|
||||
|
||||
reader = SimpleDirectoryReader(
|
||||
input_files=[file_path], file_metadata=file_metadata_func
|
||||
)
|
||||
documents = reader.load_data()
|
||||
|
||||
total_nodes_inserted = 0
|
||||
for doc in documents:
|
||||
current_file_ext = Path(file_path).suffix
|
||||
encoded_file_path = ensure_proper_encoding(file_path)
|
||||
|
||||
doc.metadata["file_path"] = encoded_file_path
|
||||
doc.metadata["processed_at"] = datetime.now().isoformat()
|
||||
|
||||
if current_file_ext.lower() == ".pdf":
|
||||
doc.metadata["page_label"] = ensure_proper_encoding(
|
||||
doc.metadata.get("page_label", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "pdf"
|
||||
elif current_file_ext.lower() in [".docx", ".odt", ".doc", ".rtf"]:
|
||||
doc.metadata["section"] = ensure_proper_encoding(
|
||||
doc.metadata.get("section", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "document"
|
||||
elif current_file_ext.lower() in [".pptx", ".ppt"]:
|
||||
doc.metadata["slide_id"] = ensure_proper_encoding(
|
||||
doc.metadata.get("slide_id", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "presentation"
|
||||
elif current_file_ext.lower() in [".xlsx", ".xls", ".csv", ".tsv"]:
|
||||
doc.metadata["sheet_name"] = ensure_proper_encoding(
|
||||
doc.metadata.get("sheet_name", "unknown")
|
||||
)
|
||||
doc.metadata["file_type"] = "spreadsheet"
|
||||
|
||||
splitter = get_text_splitter(current_file_ext)
|
||||
nodes = splitter.get_nodes_from_documents([doc])
|
||||
|
||||
nodes_with_enhanced_metadata = []
|
||||
for i, node in enumerate(nodes):
|
||||
node.metadata["original_doc_id"] = ensure_proper_encoding(doc.doc_id)
|
||||
node.metadata["chunk_number"] = i
|
||||
node.metadata["total_chunks"] = len(nodes)
|
||||
node.metadata["file_path"] = encoded_file_path
|
||||
node.text = ensure_proper_encoding(node.text)
|
||||
nodes_with_enhanced_metadata.append(node)
|
||||
|
||||
if nodes_with_enhanced_metadata:
|
||||
index.insert_nodes(nodes_with_enhanced_metadata)
|
||||
total_nodes_inserted += len(nodes_with_enhanced_metadata)
|
||||
|
||||
logger.info(f"Processed {len(nodes)} nodes from {encoded_file_path}")
|
||||
|
||||
tracker.mark_document_processed(
|
||||
file_path,
|
||||
{"documents_count": len(documents), "nodes_count": total_nodes_inserted},
|
||||
)
|
||||
return {"status": "processed", "nodes": total_nodes_inserted}
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing file {file_path}: {e}")
|
||||
return {"status": "error", "reason": str(e), "nodes": 0}
|
||||
|
||||
|
||||
def enrich_documents():
|
||||
"""Main function to run the document enrichment process."""
|
||||
logger.info("Starting document enrichment process")
|
||||
|
||||
268
services/rag/llamaindex/prefect/01_yadisk_predefined_enrich.py
Normal file
268
services/rag/llamaindex/prefect/01_yadisk_predefined_enrich.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""
|
||||
Prefect flow for enriching documents from a predefined YaDisk file list.
|
||||
|
||||
Flow steps:
|
||||
1. Load file paths from ../../../yadisk_files.json
|
||||
2. Filter them by supported enrichment extensions
|
||||
3. Download each file from YaDisk asynchronously
|
||||
4. Enrich each downloaded file into vector storage
|
||||
5. Remove downloaded temporary files after processing
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from tqdm import tqdm
|
||||
|
||||
ROOT_DIR = Path(__file__).resolve().parents[1]
|
||||
load_dotenv(ROOT_DIR / ".env")
|
||||
|
||||
if str(ROOT_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT_DIR))
|
||||
|
||||
import yadisk
|
||||
|
||||
from enrichment import get_supported_enrichment_extensions, process_document_file
|
||||
from prefect import flow, task
|
||||
|
||||
DEFAULT_YADISK_LIST_PATH = (ROOT_DIR / "../../../yadisk_files.json").resolve()
|
||||
|
||||
|
||||
def setup_prefect_flow_logging() -> None:
|
||||
"""Configure loguru handlers for flow execution."""
|
||||
logs_dir = ROOT_DIR / "logs"
|
||||
logs_dir.mkdir(exist_ok=True)
|
||||
logger.remove()
|
||||
logger.add(
|
||||
str(logs_dir / "dev.log"),
|
||||
rotation="10 MB",
|
||||
retention="10 days",
|
||||
level="INFO",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {file}:{line} | {message}",
|
||||
)
|
||||
logger.add(
|
||||
sys.stdout,
|
||||
level="INFO",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
|
||||
colorize=True,
|
||||
)
|
||||
|
||||
|
||||
def _normalize_yadisk_paths(payload: Any) -> list[str]:
|
||||
"""Extract a list of file paths from several common JSON shapes."""
|
||||
if isinstance(payload, list):
|
||||
return [str(item) for item in payload if isinstance(item, (str, Path))]
|
||||
|
||||
if isinstance(payload, dict):
|
||||
for key in ("paths", "files", "items"):
|
||||
value = payload.get(key)
|
||||
if isinstance(value, list):
|
||||
normalized: list[str] = []
|
||||
for item in value:
|
||||
if isinstance(item, str):
|
||||
normalized.append(item)
|
||||
elif isinstance(item, dict):
|
||||
for item_key in ("path", "remote_path", "file_path", "name"):
|
||||
if item_key in item and item[item_key]:
|
||||
normalized.append(str(item[item_key]))
|
||||
break
|
||||
return normalized
|
||||
|
||||
raise ValueError(
|
||||
"Unsupported yadisk_files.json structure. Expected list or dict with paths/files/items."
|
||||
)
|
||||
|
||||
|
||||
def _make_temp_local_path(base_dir: Path, remote_path: str) -> Path:
|
||||
"""Create a deterministic temp file path for a remote YaDisk path."""
|
||||
remote_name = Path(remote_path).name or "downloaded_file"
|
||||
suffix = Path(remote_name).suffix
|
||||
stem = Path(remote_name).stem or "file"
|
||||
digest = hashlib.md5(remote_path.encode("utf-8")).hexdigest()[:10]
|
||||
safe_stem = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in stem)
|
||||
return base_dir / f"{safe_stem}_{digest}{suffix}"
|
||||
|
||||
|
||||
@task(name="load_yadisk_paths")
|
||||
def load_yadisk_paths(json_file_path: str) -> list[str]:
|
||||
"""Load remote file paths from JSON file."""
|
||||
path = Path(json_file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"YaDisk paths JSON file not found: {path}")
|
||||
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
payload = json.load(f)
|
||||
|
||||
paths = _normalize_yadisk_paths(payload)
|
||||
logger.info(f"Loaded {len(paths)} paths from {path}")
|
||||
return paths
|
||||
|
||||
|
||||
@task(name="filter_supported_yadisk_paths")
|
||||
def filter_supported_yadisk_paths(paths: list[str]) -> list[str]:
|
||||
"""Keep only paths supported by enrichment extension filters."""
|
||||
supported_extensions = get_supported_enrichment_extensions()
|
||||
filtered = [p for p in paths if Path(str(p)).suffix.lower() in supported_extensions]
|
||||
logger.info(
|
||||
f"Filtered YaDisk paths: {len(filtered)}/{len(paths)} supported "
|
||||
f"(extensions: {', '.join(sorted(supported_extensions))})"
|
||||
)
|
||||
return filtered
|
||||
|
||||
|
||||
async def _download_and_enrich_one(
|
||||
client: yadisk.AsyncClient,
|
||||
remote_path: str,
|
||||
temp_dir: Path,
|
||||
semaphore: asyncio.Semaphore,
|
||||
stats: dict[str, int],
|
||||
pbar: tqdm,
|
||||
pbar_lock: asyncio.Lock,
|
||||
) -> None:
|
||||
"""Download one YaDisk file, enrich it, remove it, and update stats."""
|
||||
local_path = _make_temp_local_path(temp_dir, remote_path)
|
||||
result_status = "error"
|
||||
|
||||
async with semaphore:
|
||||
try:
|
||||
local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
await client.download(remote_path, str(local_path))
|
||||
|
||||
# Run sync enrichment in a worker thread. Exceptions are swallowed below.
|
||||
enrich_result = await asyncio.to_thread(
|
||||
process_document_file, str(local_path)
|
||||
)
|
||||
result_status = str(enrich_result.get("status", "error"))
|
||||
|
||||
logger.info(
|
||||
f"YaDisk processed: remote={remote_path}, local={local_path}, status={result_status}"
|
||||
)
|
||||
except Exception as e:
|
||||
# Explicitly swallow errors as requested.
|
||||
logger.error(f"YaDisk processing error for {remote_path}: {e}")
|
||||
result_status = "error"
|
||||
finally:
|
||||
try:
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(
|
||||
f"Failed to remove temp file {local_path}: {cleanup_error}"
|
||||
)
|
||||
|
||||
async with pbar_lock:
|
||||
if result_status == "processed":
|
||||
stats["processed"] += 1
|
||||
elif result_status == "skipped":
|
||||
stats["skipped"] += 1
|
||||
else:
|
||||
stats["errors"] += 1
|
||||
stats["completed"] += 1
|
||||
|
||||
pbar.update(1)
|
||||
pbar.set_postfix(
|
||||
processed=stats["processed"],
|
||||
skipped=stats["skipped"],
|
||||
errors=stats["errors"],
|
||||
)
|
||||
|
||||
|
||||
@flow(name="yadisk_predefined_enrich")
|
||||
async def yadisk_predefined_enrich_flow(
|
||||
yadisk_json_path: str = str(DEFAULT_YADISK_LIST_PATH),
|
||||
concurrency: int = 4,
|
||||
) -> dict[str, int]:
|
||||
"""
|
||||
Download and enrich YaDisk files listed in the JSON file using async YaDisk client.
|
||||
"""
|
||||
setup_prefect_flow_logging()
|
||||
|
||||
prefect_api_url = os.getenv("PREFECT_API_URL", "").strip()
|
||||
yadisk_token = os.getenv("YADISK_TOKEN", "").strip()
|
||||
|
||||
if not prefect_api_url:
|
||||
logger.warning("PREFECT_API_URL is not set in environment/.env")
|
||||
else:
|
||||
# Prefect reads this env var for API connectivity.
|
||||
os.environ["PREFECT_API_URL"] = prefect_api_url
|
||||
logger.info(f"Using Prefect API URL: {prefect_api_url}")
|
||||
|
||||
if not yadisk_token:
|
||||
raise ValueError("YADISK_TOKEN is required in .env to access Yandex Disk")
|
||||
|
||||
all_paths = load_yadisk_paths(yadisk_json_path)
|
||||
supported_paths = filter_supported_yadisk_paths(all_paths)
|
||||
|
||||
stats = {
|
||||
"total": len(supported_paths),
|
||||
"completed": 0,
|
||||
"processed": 0,
|
||||
"skipped": 0,
|
||||
"errors": 0,
|
||||
}
|
||||
|
||||
if not supported_paths:
|
||||
logger.info("No supported YaDisk paths to process")
|
||||
return stats
|
||||
|
||||
concurrency = max(1, int(concurrency))
|
||||
logger.info(
|
||||
f"Starting async YaDisk enrichment for {len(supported_paths)} files with concurrency={concurrency}"
|
||||
)
|
||||
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
pbar_lock = asyncio.Lock()
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="yadisk_enrich_") as temp_dir_str:
|
||||
temp_dir = Path(temp_dir_str)
|
||||
pbar = tqdm(total=len(supported_paths), desc="YaDisk enrich", unit="file")
|
||||
try:
|
||||
async with yadisk.AsyncClient(token=yadisk_token) as client:
|
||||
try:
|
||||
is_token_valid = await client.check_token()
|
||||
logger.info(f"YaDisk token validation result: {is_token_valid}")
|
||||
except Exception as token_check_error:
|
||||
# Token check issues should not block processing attempts.
|
||||
logger.warning(f"YaDisk token check failed: {token_check_error}")
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
_download_and_enrich_one(
|
||||
client=client,
|
||||
remote_path=remote_path,
|
||||
temp_dir=temp_dir,
|
||||
semaphore=semaphore,
|
||||
stats=stats,
|
||||
pbar=pbar,
|
||||
pbar_lock=pbar_lock,
|
||||
)
|
||||
)
|
||||
for remote_path in supported_paths
|
||||
]
|
||||
|
||||
# Worker function swallows per-file errors, but keep gather resilient too.
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
pbar.close()
|
||||
|
||||
logger.info(
|
||||
"YaDisk enrichment flow finished. "
|
||||
f"Total={stats['total']}, Completed={stats['completed']}, "
|
||||
f"Processed={stats['processed']}, Skipped={stats['skipped']}, Errors={stats['errors']}"
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("SERVING PREFECT FLOW FOR YANDEX DISK ENRICHMENT OF PREDEFINED PATHS")
|
||||
yadisk_predefined_enrich_flow.serve()
|
||||
@@ -14,7 +14,7 @@ from llama_index.core.retrievers import VectorIndexRetriever
|
||||
from loguru import logger
|
||||
from pathlib import Path
|
||||
|
||||
from vector_storage import get_vector_store_and_index
|
||||
from vector_storage import get_qdrant_connection_config, get_vector_store_and_index
|
||||
|
||||
# Import the new configuration module
|
||||
from config import setup_global_models
|
||||
@@ -23,8 +23,9 @@ from config import setup_global_models
|
||||
def initialize_retriever(
|
||||
collection_name: str = "documents_llamaindex",
|
||||
similarity_top_k: int = 5,
|
||||
host: str = "localhost",
|
||||
port: int = 6333
|
||||
host: str | None = None,
|
||||
port: int | None = None,
|
||||
grpc_port: int | None = None,
|
||||
) -> RetrieverQueryEngine:
|
||||
"""
|
||||
Initialize the retriever query engine with the vector store.
|
||||
@@ -32,8 +33,9 @@ def initialize_retriever(
|
||||
Args:
|
||||
collection_name: Name of the Qdrant collection
|
||||
similarity_top_k: Number of top similar documents to retrieve
|
||||
host: Qdrant host address
|
||||
port: Qdrant REST API port
|
||||
host: Qdrant host address (defaults to QDRANT_HOST from .env)
|
||||
port: Qdrant REST API port (defaults to QDRANT_REST_PORT from .env)
|
||||
grpc_port: Qdrant gRPC API port (defaults to QDRANT_GRPC_PORT from .env)
|
||||
|
||||
Returns:
|
||||
RetrieverQueryEngine configured with the vector store
|
||||
@@ -44,8 +46,23 @@ def initialize_retriever(
|
||||
# Set up the global models to prevent defaulting to OpenAI
|
||||
setup_global_models()
|
||||
|
||||
qdrant_config = get_qdrant_connection_config()
|
||||
resolved_host = host or str(qdrant_config["host"])
|
||||
resolved_port = port or int(qdrant_config["port"])
|
||||
resolved_grpc_port = grpc_port or int(qdrant_config["grpc_port"])
|
||||
|
||||
logger.info(
|
||||
f"Retriever Qdrant connection: host={resolved_host}, "
|
||||
f"rest_port={resolved_port}, grpc_port={resolved_grpc_port}"
|
||||
)
|
||||
|
||||
# Get the vector store and index from the existing configuration
|
||||
vector_store, index = get_vector_store_and_index()
|
||||
vector_store, index = get_vector_store_and_index(
|
||||
collection_name=collection_name,
|
||||
host=resolved_host,
|
||||
port=resolved_port,
|
||||
grpc_port=resolved_grpc_port,
|
||||
)
|
||||
|
||||
# Create a retriever from the index
|
||||
retriever = VectorIndexRetriever(
|
||||
@@ -310,4 +327,4 @@ if __name__ == "__main__":
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test run: {e}")
|
||||
print(f"Error: {e}")
|
||||
print(f"Error: {e}")
|
||||
|
||||
@@ -10,6 +10,7 @@ This module provides initialization and configuration for:
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from llama_index.core import VectorStoreIndex
|
||||
from llama_index.vector_stores.qdrant import QdrantVectorStore
|
||||
from loguru import logger
|
||||
@@ -18,12 +19,26 @@ from qdrant_client import QdrantClient
|
||||
# Import the new configuration module
|
||||
from config import get_embedding_model
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
def get_qdrant_connection_config() -> dict[str, int | str]:
|
||||
"""Load Qdrant connection settings from environment variables."""
|
||||
host = os.getenv("QDRANT_HOST", "localhost")
|
||||
rest_port = int(os.getenv("QDRANT_REST_PORT", "6333"))
|
||||
grpc_port = int(os.getenv("QDRANT_GRPC_PORT", "6334"))
|
||||
return {
|
||||
"host": host,
|
||||
"port": rest_port,
|
||||
"grpc_port": grpc_port,
|
||||
}
|
||||
|
||||
|
||||
def initialize_vector_storage(
|
||||
collection_name: str = "documents_llamaindex",
|
||||
host: str = "localhost",
|
||||
port: int = 6333,
|
||||
grpc_port: int = 6334,
|
||||
host: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
grpc_port: Optional[int] = None,
|
||||
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
||||
"""
|
||||
Initialize Qdrant vector storage with embedding model based on configured strategy.
|
||||
@@ -37,11 +52,19 @@ def initialize_vector_storage(
|
||||
Returns:
|
||||
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
||||
"""
|
||||
logger.info(f"Initializing vector storage with collection: {collection_name}")
|
||||
qdrant_config = get_qdrant_connection_config()
|
||||
host = host or str(qdrant_config["host"])
|
||||
port = port or int(qdrant_config["port"])
|
||||
grpc_port = grpc_port or int(qdrant_config["grpc_port"])
|
||||
|
||||
logger.info(
|
||||
f"Initializing vector storage with collection: {collection_name} "
|
||||
f"(host={host}, rest_port={port}, grpc_port={grpc_port})"
|
||||
)
|
||||
|
||||
try:
|
||||
# Initialize Qdrant client
|
||||
client = QdrantClient(host=host, port=port)
|
||||
client = QdrantClient(host=host, port=port, grpc_port=grpc_port)
|
||||
|
||||
# Get the embedding model based on the configured strategy
|
||||
embed_model = get_embedding_model()
|
||||
@@ -131,14 +154,24 @@ def initialize_vector_storage(
|
||||
raise
|
||||
|
||||
|
||||
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
||||
def get_vector_store_and_index(
|
||||
collection_name: str = "documents_llamaindex",
|
||||
host: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
grpc_port: Optional[int] = None,
|
||||
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
||||
"""
|
||||
Convenience function to get the initialized vector store and index.
|
||||
|
||||
Returns:
|
||||
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
||||
"""
|
||||
return initialize_vector_storage()
|
||||
return initialize_vector_storage(
|
||||
collection_name=collection_name,
|
||||
host=host,
|
||||
port=port,
|
||||
grpc_port=grpc_port,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user