- main feat: adaptation for async enrichment

- added file_type, this will hold the "таблица", "презентация" and so on types
- file source metadata is now taken either from local source or yandex disk.
This commit is contained in:
2026-02-11 15:46:54 +03:00
parent 7b52887558
commit f5659675ec
3 changed files with 242 additions and 107 deletions

View File

@@ -2,13 +2,19 @@
import hashlib
import os
import queue
import threading
from pathlib import Path
from typing import Iterator, List, Tuple
from typing import List, Optional, Tuple
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from loguru import logger
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Dynamically import other loaders to handle optional dependencies
try:
@@ -35,14 +41,11 @@ try:
from langchain_community.document_loaders import UnstructuredODTLoader
except ImportError:
UnstructuredODTLoader = None
from loguru import logger
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from helpers import (
LocalFilesystemAdaptiveCollection,
YandexDiskAdaptiveCollection,
YandexDiskAdaptiveFile,
_AdaptiveCollection,
_AdaptiveFile,
extract_russian_event_names,
@@ -52,7 +55,6 @@ from helpers import (
# Load environment variables
load_dotenv()
# Define the path to the data directory
DATA_DIR = Path("../../../data").resolve()
DB_PATH = Path("document_tracking.db").resolve()
@@ -61,6 +63,17 @@ ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH")
ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH")
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
ENRICHMENT_PROCESSING_MODE = os.getenv("ENRICHMENT_PROCESSING_MODE", "async").lower()
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT = int(
os.getenv("ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT", "5")
)
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS = int(
os.getenv("ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS", "4")
)
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS = int(
os.getenv("ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS", "4")
)
SUPPORTED_EXTENSIONS = {
".pdf",
".docx",
@@ -76,20 +89,9 @@ SUPPORTED_EXTENSIONS = {
".tiff",
".webp",
".odt",
".txt", # this one is obvious but was unexpected to see in data lol
}
def try_guess_source(extension: str) -> str:
if extension in [".xlsx", "xls"]:
return "таблица"
elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
return "изображение"
elif extension in [".pptx"]:
return "презентация"
else:
return "документ"
Base = declarative_base()
@@ -103,6 +105,25 @@ class ProcessedDocument(Base):
file_hash = Column(String, nullable=False)
# to guess the filetype in russian language, for searching it
def try_guess_file_type(extension: str) -> str:
if extension in [".xlsx", "xls"]:
return "таблица"
elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
return "изображение"
elif extension in [".pptx"]:
return "презентация"
else:
return "документ"
def identify_adaptive_file_source(adaptive_file: _AdaptiveFile) -> str:
if adaptive_file is YandexDiskAdaptiveFile:
return "Яндекс Диск"
else:
return "Локальный Файл"
class DocumentEnricher:
"""Class responsible for enriching documents and loading them to vector storage."""
@@ -114,6 +135,34 @@ class DocumentEnricher:
length_function=len,
)
# In sync mode we force minimal concurrency values.
if ENRICHMENT_PROCESSING_MODE == "sync":
self.adaptive_files_queue_limit = 1
self.file_process_threads_count = 1
self.document_upload_threads_count = 1
else:
self.adaptive_files_queue_limit = max(
1, ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT
)
self.file_process_threads_count = max(
1, ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS
)
self.document_upload_threads_count = max(
1, ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS
)
# Phase 13 queues
self.ADAPTIVE_FILES_QUEUE: queue.Queue = queue.Queue(
maxsize=self.adaptive_files_queue_limit
)
self.PROCESSED_DOCUMENTS_QUEUE: queue.Queue = queue.Queue(
maxsize=max(1, self.adaptive_files_queue_limit * 2)
)
# Shared state for thread lifecycle
self.collection_finished = threading.Event()
self.processing_finished = threading.Event()
# Initialize database for tracking processed documents
self._init_db()
@@ -121,30 +170,45 @@ class DocumentEnricher:
"""Initialize the SQLite database for tracking processed documents."""
self.engine = create_engine(f"sqlite:///{DB_PATH}")
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
self.SessionLocal = sessionmaker(bind=self.engine)
def _get_file_hash(self, file_path: str) -> str:
"""Calculate SHA256 hash of a file."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
# Read file in chunks to handle large files
for chunk in iter(lambda: f.read(4096), b""):
with open(file_path, "rb") as file_handle:
for chunk in iter(lambda: file_handle.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _is_document_hash_processed(self, file_hash: str) -> bool:
"""Check if a document hash has already been processed."""
existing = (
self.session.query(ProcessedDocument).filter_by(file_hash=file_hash).first()
)
return existing is not None
session = self.SessionLocal()
try:
existing = (
session.query(ProcessedDocument).filter_by(file_hash=file_hash).first()
)
return existing is not None
finally:
session.close()
def _mark_document_processed(self, file_identifier: str, file_hash: str):
"""Mark a document as processed in the database."""
doc_record = ProcessedDocument(file_path=file_identifier, file_hash=file_hash)
self.session.add(doc_record)
self.session.commit()
session = self.SessionLocal()
try:
existing = (
session.query(ProcessedDocument)
.filter_by(file_path=file_identifier)
.first()
)
if existing is not None:
existing.file_hash = file_hash
else:
session.add(
ProcessedDocument(file_path=file_identifier, file_hash=file_hash)
)
session.commit()
finally:
session.close()
def _get_loader_for_extension(self, file_path: str):
"""Get the appropriate loader for a given file extension."""
@@ -152,7 +216,7 @@ class DocumentEnricher:
if ext == ".pdf":
return PyPDFLoader(file_path)
elif ext in [".docx", ".doc"]:
if ext in [".docx", ".doc"]:
if UnstructuredWordDocumentLoader is None:
logger.warning(
f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping."
@@ -161,7 +225,7 @@ class DocumentEnricher:
return UnstructuredWordDocumentLoader(
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
)
elif ext == ".pptx":
if ext == ".pptx":
if UnstructuredPowerPointLoader is None:
logger.warning(
f"UnstructuredPowerPointLoader not available for {file_path}. Skipping."
@@ -170,7 +234,7 @@ class DocumentEnricher:
return UnstructuredPowerPointLoader(
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
)
elif ext in [".xlsx", ".xls"]:
if ext in [".xlsx", ".xls"]:
if UnstructuredExcelLoader is None:
logger.warning(
f"UnstructuredExcelLoader not available for {file_path}. Skipping."
@@ -179,17 +243,16 @@ class DocumentEnricher:
return UnstructuredExcelLoader(
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
)
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
if UnstructuredImageLoader is None:
logger.warning(
f"UnstructuredImageLoader not available for {file_path}. Skipping."
)
return None
# Use OCR strategy for images to extract text
return UnstructuredImageLoader(
file_path, **{"strategy": "ocr_only", "languages": ["rus"]}
)
elif ext == ".odt":
if ext == ".odt":
if UnstructuredODTLoader is None:
logger.warning(
f"UnstructuredODTLoader not available for {file_path}. Skipping."
@@ -198,20 +261,20 @@ class DocumentEnricher:
return UnstructuredODTLoader(
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
)
else:
return None
return None
def _load_one_adaptive_file(
self, adaptive_file: _AdaptiveFile
) -> Tuple[List[Document], str | None]:
) -> Tuple[List[Document], Optional[Tuple[str, str]]]:
"""Load and split one adaptive file by using its local working callback."""
loaded_docs: List[Document] = []
file_hash: str | None = None
source_identifier = try_guess_source(adaptive_file.extension)
processed_record: Optional[Tuple[str, str]] = None
source_identifier = identify_adaptive_file_source(adaptive_file)
extension = adaptive_file.extension.lower()
file_type = try_guess_file_type(extension)
def process_local_file(local_file_path: str):
nonlocal loaded_docs, file_hash
nonlocal loaded_docs, processed_record
file_hash = self._get_file_hash(local_file_path)
if self._is_document_hash_processed(file_hash):
@@ -227,6 +290,7 @@ class DocumentEnricher:
docs = loader.load()
for doc in docs:
doc.metadata["file_type"] = file_type
doc.metadata["source"] = source_identifier
doc.metadata["filename"] = adaptive_file.filename
doc.metadata["file_path"] = source_identifier
@@ -238,91 +302,145 @@ class DocumentEnricher:
split_docs = self.text_splitter.split_documents(docs)
for chunk in split_docs:
years = extract_years_from_text(chunk.page_content)
events = extract_russian_event_names(chunk.page_content)
chunk.metadata["years"] = years
chunk.metadata["events"] = events
chunk.metadata["years"] = extract_years_from_text(chunk.page_content)
chunk.metadata["events"] = extract_russian_event_names(
chunk.page_content
)
loaded_docs = split_docs
processed_record = (source_identifier, file_hash)
adaptive_file.work_with_file_locally(process_local_file)
return loaded_docs, file_hash
return loaded_docs, processed_record
def load_and_split_documents(
# Phase 13 API: inserts adaptive files into ADAPTIVE_FILES_QUEUE
def insert_adaptive_files_queue(
self, adaptive_collection: _AdaptiveCollection, recursive: bool = True
) -> Iterator[Tuple[List[Document], List[Tuple[str, str]]]]:
"""Load documents from adaptive collection and split them appropriately."""
docs_chunk: List[Document] = []
processed_file_records: dict[str, str] = {}
):
for adaptive_file in adaptive_collection.iterate(recursive=recursive):
if len(processed_file_records) >= 2:
yield docs_chunk, list(processed_file_records.items())
docs_chunk = []
processed_file_records = {}
if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS:
logger.debug(
f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}"
)
continue
logger.info(f"Processing document: {adaptive_file.filename}")
self.ADAPTIVE_FILES_QUEUE.put(adaptive_file)
self.collection_finished.set()
# Phase 13 API: reads adaptive files and writes processed docs into PROCESSED_DOCUMENTS_QUEUE
def process_adaptive_files_queue(self):
while True:
try:
split_docs, file_hash = self._load_one_adaptive_file(adaptive_file)
if split_docs:
docs_chunk.extend(split_docs)
if file_hash:
processed_file_records[adaptive_file.filename] = file_hash
except Exception as e:
logger.error(f"Error processing {adaptive_file.filename}: {str(e)}")
adaptive_file = self.ADAPTIVE_FILES_QUEUE.get(timeout=0.2)
except queue.Empty:
if self.collection_finished.is_set():
return
continue
try:
split_docs, processed_record = self._load_one_adaptive_file(
adaptive_file
)
if split_docs:
self.PROCESSED_DOCUMENTS_QUEUE.put((split_docs, processed_record))
except Exception as error:
logger.error(f"Error processing {adaptive_file.filename}: {error}")
finally:
self.ADAPTIVE_FILES_QUEUE.task_done()
# Phase 13 API: uploads chunked docs and marks file processed
def upload_processed_documents_from_queue(self):
while True:
try:
payload = self.PROCESSED_DOCUMENTS_QUEUE.get(timeout=0.2)
except queue.Empty:
if self.processing_finished.is_set():
return
continue
try:
documents, processed_record = payload
self.vector_store.add_documents(documents)
if processed_record is not None:
self._mark_document_processed(
processed_record[0], processed_record[1]
)
except Exception as error:
logger.error(f"Error uploading processed documents: {error}")
raise
finally:
self.PROCESSED_DOCUMENTS_QUEUE.task_done()
def _run_threaded_pipeline(self, adaptive_collection: _AdaptiveCollection):
"""Run Phase 13 queue/thread pipeline."""
process_threads = [
threading.Thread(
target=self.process_adaptive_files_queue,
name=f"adaptive-file-processor-{index}",
daemon=True,
)
for index in range(self.file_process_threads_count)
]
upload_threads = [
threading.Thread(
target=self.upload_processed_documents_from_queue,
name=f"document-uploader-{index}",
daemon=True,
)
for index in range(self.document_upload_threads_count)
]
for thread in process_threads:
thread.start()
for thread in upload_threads:
thread.start()
# This one intentionally runs on main thread per Phase 13 requirement.
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
# Wait file queue completion and processing threads end.
self.ADAPTIVE_FILES_QUEUE.join()
for thread in process_threads:
thread.join()
# Signal upload workers no more payload is expected.
self.processing_finished.set()
# Wait upload completion and upload threads end.
self.PROCESSED_DOCUMENTS_QUEUE.join()
for thread in upload_threads:
thread.join()
def _run_sync_pipeline(self, adaptive_collection: _AdaptiveCollection):
"""Sequential pipeline for sync mode."""
logger.info("Running enrichment in sync mode")
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
self.process_adaptive_files_queue()
self.processing_finished.set()
self.upload_processed_documents_from_queue()
def enrich_and_store(self, adaptive_collection: _AdaptiveCollection):
"""Load, enrich, and store documents in the vector store."""
logger.info("Starting enrichment process...")
# Load and split documents
for documents, processed_file_records in self.load_and_split_documents(
adaptive_collection
):
if not documents:
logger.info("No new documents to process.")
return
if ENRICHMENT_PROCESSING_MODE == "sync":
logger.info("Document enrichment process starting in SYNC mode")
self._run_sync_pipeline(adaptive_collection)
return
logger.info(
f"Loaded and split {len(documents)} document chunks, adding to vector store..."
)
logger.debug(
f"Documents len: {len(documents)}, processed_file_records len: {len(processed_file_records)}"
)
# Add documents to vector store
try:
self.vector_store.add_documents(documents)
# Only mark documents as processed after successful insertion to vector store
for file_identifier, file_hash in processed_file_records:
self._mark_document_processed(file_identifier, file_hash)
logger.info(
f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed."
)
except Exception as e:
logger.error(f"Error adding documents to vector store: {str(e)}")
raise
logger.info("Document enrichment process starting in ASYNC/THREAD mode")
self._run_threaded_pipeline(adaptive_collection)
def get_enrichment_adaptive_collection() -> _AdaptiveCollection:
def get_enrichment_adaptive_collection(
data_dir: str = str(DATA_DIR),
) -> _AdaptiveCollection:
"""Create adaptive collection based on environment source configuration."""
source = ENRICHMENT_SOURCE
if source == "local":
local_path = ENRICHMENT_LOCAL_PATH
if local_path is None:
raise RuntimeError(
"Enrichment strategy is local, but no ENRICHMENT_LOCAL_PATH is defined!"
)
local_path = ENRICHMENT_LOCAL_PATH or data_dir
logger.info(f"Using local adaptive collection from path: {local_path}")
return LocalFilesystemAdaptiveCollection(local_path)
@@ -346,11 +464,11 @@ def get_enrichment_adaptive_collection() -> _AdaptiveCollection:
)
def run_enrichment_process(vector_store):
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
"""Run the full enrichment process."""
logger.info("Starting document enrichment process")
adaptive_collection = get_enrichment_adaptive_collection()
adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir)
# Initialize the document enricher
enricher = DocumentEnricher(vector_store)
@@ -362,11 +480,7 @@ def run_enrichment_process(vector_store):
if __name__ == "__main__":
# Example usage
from vector_storage import initialize_vector_store
# Initialize vector store
vector_store = initialize_vector_store()
# Run enrichment process
run_enrichment_process(vector_store)