Prefect client prep for langchain

This commit is contained in:
2026-02-16 15:12:44 +03:00
parent 93d538ecc6
commit 77c578c9e6
6 changed files with 148 additions and 94 deletions

Binary file not shown.

View File

@@ -6,7 +6,7 @@ Use if possible logging, using library `loguru`, for steps. Use logrotation in f
Chosen RAG framework: Langchain
Chosen Vector Storage: Qdrant
Chosen data folder: relatve ./../../../data - from the current folder
Chosen data folder: relative ./../../../data - from the current folder
# Phase 1 (cli entrypoint)
@@ -101,3 +101,13 @@ During this Phase we create asynchronous process of enrichment, utilizing async/
- [x] Function process_adaptive_files_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS)
- [x] Function upload_processed_documents_from_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS)
- [x] Program should control threads. Function insert_adaptive_files_queue, after adaptive collection ends, then should wait untill all theads finish. What does finish mean? It means when our insert_adaptive_files_queue function realizes that there is no adaptive files left in collection, it marks shared variable between threads, that collection finished. When our other functions in threads sees that this variable became true - they deplete queue and do not go to the next loop to wait for new items in queue, and just finish. This would eventually finish the program. Each thread finishes, and main program too as usual after processing all of things.
# Phase 14 (integration of Prefect client, for creating flow and tasks on remote Prefect server)
- [ ] Install Prefect client library.
- [ ] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
- [ ] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase.
- [ ] Create prefect flow called "analyze_yadisk_file_urls"
- [ ] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings.
- [ ] In our pefect file add function for flow to serve, as per prefect documentation on serving flows
- [ ] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase.

View File

@@ -81,13 +81,13 @@ SUPPORTED_EXTENSIONS = {
".pptx",
".xlsx",
".xls",
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tiff",
".webp",
# ".jpg",
# ".jpeg",
# ".png",
# ".gif",
# ".bmp",
# ".tiff",
# ".webp",
".odt",
".txt", # this one is obvious but was unexpected to see in data lol
}
@@ -273,7 +273,7 @@ class DocumentEnricher:
extension = adaptive_file.extension.lower()
file_type = try_guess_file_type(extension)
def process_local_file(local_file_path: str):
def process_local_file(original_path: str, local_file_path: str):
nonlocal loaded_docs, processed_record
file_hash = self._get_file_hash(local_file_path)
@@ -295,7 +295,7 @@ class DocumentEnricher:
doc.metadata["file_type"] = file_type
doc.metadata["source"] = source_identifier
doc.metadata["filename"] = adaptive_file.filename
doc.metadata["file_path"] = source_identifier
doc.metadata["file_path"] = original_path
doc.metadata["file_size"] = os.path.getsize(local_file_path)
doc.metadata["file_extension"] = extension
@@ -310,7 +310,7 @@ class DocumentEnricher:
)
loaded_docs = split_docs
processed_record = (source_identifier, file_hash)
processed_record = (original_path, file_hash)
adaptive_file.work_with_file_locally(process_local_file)
return loaded_docs, processed_record

View File

@@ -123,9 +123,9 @@ class _AdaptiveFile(ABC):
# This method allows to work with file locally, and lambda should be provided for this.
# Why separate method? For possible cleanup after work is done. And to download file, if needed
# Lambda: first argument is a local path
# Lambda: first argument is an original path, second: local path. In case of just local files, these will be the same
@abstractmethod
def work_with_file_locally(self, func: Callable[[str], None]):
def work_with_file_locally(self, func: Callable[[str, str], None]):
"""Run callback with a local path to the file."""
@@ -143,8 +143,8 @@ class LocalFilesystemAdaptiveFile(_AdaptiveFile):
super().__init__(filename, extension)
self.local_path = local_path
def work_with_file_locally(self, func: Callable[[str], None]):
func(self.local_path)
def work_with_file_locally(self, func: Callable[[str, str], None]):
func(self.local_path, self.local_path)
class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
@@ -196,10 +196,10 @@ class YandexDiskAdaptiveFile(_AdaptiveFile):
temp_file.write(file_response.content)
return temp_file.name
def work_with_file_locally(self, func: Callable[[str], None]):
def work_with_file_locally(self, func: Callable[[str, str], None]):
temp_path = self._download_to_temp_file()
try:
func(temp_path)
func(self.remote_path, temp_path)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)