Compare commits
7 Commits
447ecaba39
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 93d538ecc6 | |||
| f5659675ec | |||
| 7b52887558 | |||
| 1e6ab247b9 | |||
| e9dd28ad55 | |||
| 06a3155b6b | |||
| 63c3e2c5c7 |
BIN
services/rag/langchain/.DS_Store
vendored
Normal file
BIN
services/rag/langchain/.DS_Store
vendored
Normal file
Binary file not shown.
@@ -6,3 +6,11 @@ CHAT_MODEL_STRATEGY=ollama
|
||||
QDRANT_HOST=HOST
|
||||
QDRANT_REST_PORT=PORT
|
||||
QDRANT_GRPC_PORT=PORT
|
||||
YADISK_TOKEN=TOKEN
|
||||
ENRICHMENT_SOURCE=local/yadisk
|
||||
ENRICHMENT_LOCAL_PATH=path
|
||||
ENRICHMENT_YADISK_PATH=path
|
||||
ENRICHMENT_PROCESSING_MODE=async/sync
|
||||
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT=5
|
||||
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS=4
|
||||
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS=4
|
||||
|
||||
1
services/rag/langchain/.gitignore
vendored
1
services/rag/langchain/.gitignore
vendored
@@ -215,3 +215,4 @@ __marimo__/
|
||||
# Streamlit
|
||||
.streamlit/secrets.toml
|
||||
document_tracking.db
|
||||
.env.test
|
||||
|
||||
@@ -65,3 +65,39 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
||||
- [x] Create heuristic, regex function in helpers module for extracting name of event, in Russian language. We need to use regex and possible words before, after the event, etc.
|
||||
- [x] Durint enriching vector storage, try to extract event name from the chunk and save in metadata in field "events", which will contain list of strings, possible evennts. Helper function usage is advised.
|
||||
- [x] In VectorStoreRetriever._get_relevant_documents add similarity search for the event name, if event name is present in the query. Helper function should be used here to try to extract the event name.
|
||||
|
||||
# Phase 11 (adaptive collection, to attach different filesystems in the future)
|
||||
|
||||
- [x] Create adaptive collection class and adaptive file class in the helpers, which will be as abstract classes, that should encompass feature of iterating and working with files locally
|
||||
- [x] Write local filesystem implementation of adaptive collection
|
||||
- [x] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess
|
||||
- [x] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk.
|
||||
- [x] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env.test has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done.
|
||||
|
||||
# Phase 12 (using local file system or yandex disk)
|
||||
|
||||
During enrichment, we should use adaptive collection from the helpers, for loading documents. We should not use directly local filesystem, but use adaptive collection as a wrapper.
|
||||
|
||||
- [x] Adaptive file in helper now has filename in it, so tests should be adjusted for this
|
||||
- [x] Add conditional usage of adaptive collection in the enrichment stage. .env has now variable ENRICHMENT_SOURCE with 2 possible values: yadisk, local
|
||||
- [x] With local source, use env variable for local filesystem adaptive collection: ENRICHMENT_LOCAL_PATH
|
||||
- [x] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system
|
||||
- [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them.
|
||||
- [x] Adaptive files has filename in them, so it should be used when extracting metadata
|
||||
|
||||
|
||||
# Phase 13 (async processing of files)
|
||||
|
||||
During this Phase we create asynchronous process of enrichment, utilizing async/await
|
||||
|
||||
- [x] Prepare enrichment to be async process, so adjust neede libraries, etc. that are needed to be processed.
|
||||
- [x] Create queue for adaptive files. It will store adaptive files that needs to be processed
|
||||
- [x] Create queue for documents that were taken from the adaptive files.
|
||||
- [x] Create function that iterates through the adaptive collection and adds it to the adaptive files queue ADAPTIVE_FILES_QUEUE. Let's call it insert_adaptive_files_queue
|
||||
- [x] Create function that takes adaptive file from the adaptive files queue (PROCESSED_DOCUMENTS_QUEUE) and processed it, by splitting into chunks of documents. Let's call it process_adaptive_files_queue
|
||||
- [x] Create function that takes chunk of documents from the processed documents queue, and sends them into the vector storage. It marks document, of which these chunks, as processed in the local database (existing feature adapted here. Let's call it upload_processed_documents_from_queue
|
||||
- [x] Utilize Python threading machinery, to create threads for several our functions. There will be environment variables: ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT (default 5), ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS (default 4), ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS (default 4)
|
||||
- [x] Function insert_adaptive_files_queue would not be in a thread. It will iterate through adaptive collection and wait while queue has less than ENRICHMENT_ADAPTIVE_FILE_LOAD_QUEUE_LIMIT.
|
||||
- [x] Function process_adaptive_files_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS)
|
||||
- [x] Function upload_processed_documents_from_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS)
|
||||
- [x] Program should control threads. Function insert_adaptive_files_queue, after adaptive collection ends, then should wait untill all theads finish. What does finish mean? It means when our insert_adaptive_files_queue function realizes that there is no adaptive files left in collection, it marks shared variable between threads, that collection finished. When our other functions in threads sees that this variable became true - they deplete queue and do not go to the next loop to wait for new items in queue, and just finish. This would eventually finish the program. Each thread finishes, and main program too as usual after processing all of things.
|
||||
|
||||
@@ -37,15 +37,16 @@ def ping():
|
||||
name="enrich",
|
||||
help="Load documents from data directory and store in vector database",
|
||||
)
|
||||
@click.option("--data-dir", default="../../../data", help="Path to the data directory")
|
||||
@click.option(
|
||||
"--collection-name",
|
||||
default="documents_langchain",
|
||||
help="Name of the vector store collection",
|
||||
)
|
||||
def enrich(data_dir, collection_name):
|
||||
def enrich(collection_name):
|
||||
"""Load documents from data directory and store in vector database"""
|
||||
logger.info(f"Starting enrichment process for directory: {data_dir}")
|
||||
logger.info(
|
||||
f"Starting enrichment process. Enrichment source: {os.getenv('ENRICHMENT_SOURCE')}"
|
||||
)
|
||||
|
||||
try:
|
||||
# Import here to avoid circular dependencies
|
||||
@@ -56,7 +57,7 @@ def enrich(data_dir, collection_name):
|
||||
vector_store = initialize_vector_store(collection_name=collection_name)
|
||||
|
||||
# Run enrichment process
|
||||
run_enrichment_process(vector_store, data_dir=data_dir)
|
||||
run_enrichment_process(vector_store)
|
||||
|
||||
logger.info("Enrichment process completed successfully!")
|
||||
click.echo("Documents have been successfully loaded into the vector store.")
|
||||
|
||||
@@ -1,13 +1,21 @@
|
||||
"""Document enrichment module for loading documents into vector storage."""
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
from langchain_core.documents import Document
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
from loguru import logger
|
||||
from sqlalchemy import Column, Integer, String, create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Dynamically import other loaders to handle optional dependencies
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
||||
@@ -33,27 +41,63 @@ try:
|
||||
from langchain_community.document_loaders import UnstructuredODTLoader
|
||||
except ImportError:
|
||||
UnstructuredODTLoader = None
|
||||
from sqlalchemy import create_engine, Column, Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from loguru import logger
|
||||
import sqlite3
|
||||
|
||||
from helpers import extract_russian_event_names, extract_years_from_text
|
||||
from helpers import (
|
||||
LocalFilesystemAdaptiveCollection,
|
||||
YandexDiskAdaptiveCollection,
|
||||
YandexDiskAdaptiveFile,
|
||||
_AdaptiveCollection,
|
||||
_AdaptiveFile,
|
||||
extract_russian_event_names,
|
||||
extract_years_from_text,
|
||||
)
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# Define the path to the data directory
|
||||
DATA_DIR = Path("../../../data").resolve()
|
||||
DB_PATH = Path("document_tracking.db").resolve()
|
||||
ENRICHMENT_SOURCE = os.getenv("ENRICHMENT_SOURCE", "local").lower()
|
||||
ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH")
|
||||
ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH")
|
||||
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
|
||||
|
||||
ENRICHMENT_PROCESSING_MODE = os.getenv("ENRICHMENT_PROCESSING_MODE", "async").lower()
|
||||
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT = int(
|
||||
os.getenv("ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT", "5")
|
||||
)
|
||||
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS = int(
|
||||
os.getenv("ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS", "4")
|
||||
)
|
||||
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS = int(
|
||||
os.getenv("ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS", "4")
|
||||
)
|
||||
|
||||
SUPPORTED_EXTENSIONS = {
|
||||
".pdf",
|
||||
".docx",
|
||||
".doc",
|
||||
".pptx",
|
||||
".xlsx",
|
||||
".xls",
|
||||
".jpg",
|
||||
".jpeg",
|
||||
".png",
|
||||
".gif",
|
||||
".bmp",
|
||||
".tiff",
|
||||
".webp",
|
||||
".odt",
|
||||
".txt", # this one is obvious but was unexpected to see in data lol
|
||||
}
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class ProcessedDocument(Base):
|
||||
"""Database model for tracking processed documents."""
|
||||
|
||||
__tablename__ = "processed_documents"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
@@ -61,6 +105,25 @@ class ProcessedDocument(Base):
|
||||
file_hash = Column(String, nullable=False)
|
||||
|
||||
|
||||
# to guess the filetype in russian language, for searching it
|
||||
def try_guess_file_type(extension: str) -> str:
|
||||
if extension in [".xlsx", "xls"]:
|
||||
return "таблица"
|
||||
elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||
return "изображение"
|
||||
elif extension in [".pptx"]:
|
||||
return "презентация"
|
||||
else:
|
||||
return "документ"
|
||||
|
||||
|
||||
def identify_adaptive_file_source(adaptive_file: _AdaptiveFile) -> str:
|
||||
if isinstance(adaptive_file, YandexDiskAdaptiveFile):
|
||||
return "Яндекс Диск"
|
||||
else:
|
||||
return "Локальный Файл"
|
||||
|
||||
|
||||
class DocumentEnricher:
|
||||
"""Class responsible for enriching documents and loading them to vector storage."""
|
||||
|
||||
@@ -72,6 +135,34 @@ class DocumentEnricher:
|
||||
length_function=len,
|
||||
)
|
||||
|
||||
# In sync mode we force minimal concurrency values.
|
||||
if ENRICHMENT_PROCESSING_MODE == "sync":
|
||||
self.adaptive_files_queue_limit = 1
|
||||
self.file_process_threads_count = 1
|
||||
self.document_upload_threads_count = 1
|
||||
else:
|
||||
self.adaptive_files_queue_limit = max(
|
||||
1, ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT
|
||||
)
|
||||
self.file_process_threads_count = max(
|
||||
1, ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS
|
||||
)
|
||||
self.document_upload_threads_count = max(
|
||||
1, ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS
|
||||
)
|
||||
|
||||
# Phase 13 queues
|
||||
self.ADAPTIVE_FILES_QUEUE: queue.Queue = queue.Queue(
|
||||
maxsize=self.adaptive_files_queue_limit
|
||||
)
|
||||
self.PROCESSED_DOCUMENTS_QUEUE: queue.Queue = queue.Queue(
|
||||
maxsize=max(1, self.adaptive_files_queue_limit * 2)
|
||||
)
|
||||
|
||||
# Shared state for thread lifecycle
|
||||
self.collection_finished = threading.Event()
|
||||
self.processing_finished = threading.Event()
|
||||
|
||||
# Initialize database for tracking processed documents
|
||||
self._init_db()
|
||||
|
||||
@@ -79,35 +170,45 @@ class DocumentEnricher:
|
||||
"""Initialize the SQLite database for tracking processed documents."""
|
||||
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
||||
Base.metadata.create_all(self.engine)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
self.session = Session()
|
||||
self.SessionLocal = sessionmaker(bind=self.engine)
|
||||
|
||||
def _get_file_hash(self, file_path: str) -> str:
|
||||
"""Calculate SHA256 hash of a file."""
|
||||
hash_sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
# Read file in chunks to handle large files
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
with open(file_path, "rb") as file_handle:
|
||||
for chunk in iter(lambda: file_handle.read(4096), b""):
|
||||
hash_sha256.update(chunk)
|
||||
return hash_sha256.hexdigest()
|
||||
|
||||
def _is_document_processed(self, file_path: str) -> bool:
|
||||
"""Check if a document has already been processed."""
|
||||
file_hash = self._get_file_hash(file_path)
|
||||
existing = self.session.query(ProcessedDocument).filter_by(
|
||||
file_hash=file_hash
|
||||
).first()
|
||||
return existing is not None
|
||||
|
||||
def _mark_document_processed(self, file_path: str):
|
||||
"""Mark a document as processed in the database."""
|
||||
file_hash = self._get_file_hash(file_path)
|
||||
doc_record = ProcessedDocument(
|
||||
file_path=file_path,
|
||||
file_hash=file_hash
|
||||
def _is_document_hash_processed(self, file_hash: str) -> bool:
|
||||
"""Check if a document hash has already been processed."""
|
||||
session = self.SessionLocal()
|
||||
try:
|
||||
existing = (
|
||||
session.query(ProcessedDocument).filter_by(file_hash=file_hash).first()
|
||||
)
|
||||
self.session.add(doc_record)
|
||||
self.session.commit()
|
||||
return existing is not None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _mark_document_processed(self, file_identifier: str, file_hash: str):
|
||||
"""Mark a document as processed in the database."""
|
||||
session = self.SessionLocal()
|
||||
try:
|
||||
existing = (
|
||||
session.query(ProcessedDocument)
|
||||
.filter_by(file_path=file_identifier)
|
||||
.first()
|
||||
)
|
||||
if existing is not None:
|
||||
existing.file_hash = file_hash
|
||||
else:
|
||||
session.add(
|
||||
ProcessedDocument(file_path=file_identifier, file_hash=file_hash)
|
||||
)
|
||||
session.commit()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _get_loader_for_extension(self, file_path: str):
|
||||
"""Get the appropriate loader for a given file extension."""
|
||||
@@ -115,175 +216,275 @@ class DocumentEnricher:
|
||||
|
||||
if ext == ".pdf":
|
||||
return PyPDFLoader(file_path)
|
||||
elif ext in [".docx", ".doc"]:
|
||||
if ext in [".docx", ".doc"]:
|
||||
if UnstructuredWordDocumentLoader is None:
|
||||
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
|
||||
logger.warning(
|
||||
f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping."
|
||||
)
|
||||
return None
|
||||
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext == ".pptx":
|
||||
return UnstructuredWordDocumentLoader(
|
||||
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||
)
|
||||
if ext == ".pptx":
|
||||
if UnstructuredPowerPointLoader is None:
|
||||
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
|
||||
logger.warning(
|
||||
f"UnstructuredPowerPointLoader not available for {file_path}. Skipping."
|
||||
)
|
||||
return None
|
||||
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext in [".xlsx", ".xls"]:
|
||||
return UnstructuredPowerPointLoader(
|
||||
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||
)
|
||||
if ext in [".xlsx", ".xls"]:
|
||||
if UnstructuredExcelLoader is None:
|
||||
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
|
||||
logger.warning(
|
||||
f"UnstructuredExcelLoader not available for {file_path}. Skipping."
|
||||
)
|
||||
return None
|
||||
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||
return UnstructuredExcelLoader(
|
||||
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||
)
|
||||
if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||
if UnstructuredImageLoader is None:
|
||||
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
|
||||
logger.warning(
|
||||
f"UnstructuredImageLoader not available for {file_path}. Skipping."
|
||||
)
|
||||
return None
|
||||
# Use OCR strategy for images to extract text
|
||||
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
|
||||
elif ext == ".odt":
|
||||
return UnstructuredImageLoader(
|
||||
file_path, **{"strategy": "ocr_only", "languages": ["rus"]}
|
||||
)
|
||||
if ext == ".odt":
|
||||
if UnstructuredODTLoader is None:
|
||||
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
|
||||
logger.warning(
|
||||
f"UnstructuredODTLoader not available for {file_path}. Skipping."
|
||||
)
|
||||
return None
|
||||
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
return UnstructuredODTLoader(
|
||||
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||
)
|
||||
return None
|
||||
|
||||
def _load_one_adaptive_file(
|
||||
self, adaptive_file: _AdaptiveFile
|
||||
) -> Tuple[List[Document], Optional[Tuple[str, str]]]:
|
||||
"""Load and split one adaptive file by using its local working callback."""
|
||||
loaded_docs: List[Document] = []
|
||||
processed_record: Optional[Tuple[str, str]] = None
|
||||
source_identifier = identify_adaptive_file_source(adaptive_file)
|
||||
extension = adaptive_file.extension.lower()
|
||||
file_type = try_guess_file_type(extension)
|
||||
|
||||
def process_local_file(local_file_path: str):
|
||||
nonlocal loaded_docs, processed_record
|
||||
|
||||
file_hash = self._get_file_hash(local_file_path)
|
||||
if self._is_document_hash_processed(file_hash):
|
||||
logger.info(
|
||||
f"SKIPPING already processed document hash for: {source_identifier}"
|
||||
)
|
||||
return
|
||||
else:
|
||||
# For text files and unsupported formats, try to load as text
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
return None, content # Return content directly for text processing
|
||||
except UnicodeDecodeError:
|
||||
logger.warning(f"Could not decode file as text: {file_path}")
|
||||
return None, None
|
||||
|
||||
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
||||
"""Load documents from file paths and split them appropriately."""
|
||||
all_docs = []
|
||||
|
||||
for file_path in file_paths:
|
||||
if self._is_document_processed(file_path):
|
||||
logger.info(f"Skipping already processed document: {file_path}")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing document: {file_path}")
|
||||
|
||||
# Get the appropriate loader for the file extension
|
||||
loader = self._get_loader_for_extension(file_path)
|
||||
logger.info("Document is not processed! Doing it")
|
||||
|
||||
loader = self._get_loader_for_extension(local_file_path)
|
||||
if loader is None:
|
||||
# For unsupported formats that we tried to load as text
|
||||
continue
|
||||
logger.warning(f"No loader available for file: {source_identifier}")
|
||||
return
|
||||
|
||||
try:
|
||||
# Load the document(s)
|
||||
docs = loader.load()
|
||||
|
||||
# Add metadata to each document
|
||||
for doc in docs:
|
||||
# Extract metadata from the original file
|
||||
doc.metadata["source"] = file_path
|
||||
doc.metadata["filename"] = Path(file_path).name
|
||||
doc.metadata["file_path"] = file_path
|
||||
doc.metadata["file_size"] = os.path.getsize(file_path)
|
||||
doc.metadata["file_type"] = file_type
|
||||
doc.metadata["source"] = source_identifier
|
||||
doc.metadata["filename"] = adaptive_file.filename
|
||||
doc.metadata["file_path"] = source_identifier
|
||||
doc.metadata["file_size"] = os.path.getsize(local_file_path)
|
||||
doc.metadata["file_extension"] = extension
|
||||
|
||||
# Add page number if available in original metadata
|
||||
if "page" in doc.metadata:
|
||||
doc.metadata["page_number"] = doc.metadata["page"]
|
||||
|
||||
# Add file extension as metadata
|
||||
doc.metadata["file_extension"] = Path(file_path).suffix
|
||||
|
||||
# Split documents if they are too large
|
||||
split_docs = self.text_splitter.split_documents(docs)
|
||||
|
||||
# Extract additional metadata from each chunk.
|
||||
for chunk in split_docs:
|
||||
years = extract_years_from_text(chunk.page_content)
|
||||
events = extract_russian_event_names(chunk.page_content)
|
||||
chunk.metadata["years"] = years
|
||||
chunk.metadata["events"] = events
|
||||
chunk.metadata["years"] = extract_years_from_text(chunk.page_content)
|
||||
chunk.metadata["events"] = extract_russian_event_names(
|
||||
chunk.page_content
|
||||
)
|
||||
|
||||
# Add to the collection
|
||||
all_docs.extend(split_docs)
|
||||
loaded_docs = split_docs
|
||||
processed_record = (source_identifier, file_hash)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {str(e)}")
|
||||
adaptive_file.work_with_file_locally(process_local_file)
|
||||
return loaded_docs, processed_record
|
||||
|
||||
# Phase 13 API: inserts adaptive files into ADAPTIVE_FILES_QUEUE
|
||||
def insert_adaptive_files_queue(
|
||||
self, adaptive_collection: _AdaptiveCollection, recursive: bool = True
|
||||
):
|
||||
for adaptive_file in adaptive_collection.iterate(recursive=recursive):
|
||||
if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS:
|
||||
logger.debug(
|
||||
f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}"
|
||||
)
|
||||
continue
|
||||
|
||||
return all_docs
|
||||
self.ADAPTIVE_FILES_QUEUE.put(adaptive_file)
|
||||
|
||||
def enrich_and_store(self, file_paths: List[str]):
|
||||
"""Load, enrich, and store documents in the vector store."""
|
||||
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
||||
logger.debug("ADAPTIVE COLLECTION DEPLETED!")
|
||||
self.collection_finished.set()
|
||||
|
||||
# Load and split documents
|
||||
documents = self.load_and_split_documents(file_paths)
|
||||
|
||||
if not documents:
|
||||
logger.info("No new documents to process.")
|
||||
return
|
||||
|
||||
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
||||
|
||||
# Add documents to vector store
|
||||
# Phase 13 API: reads adaptive files and writes processed docs into PROCESSED_DOCUMENTS_QUEUE
|
||||
def process_adaptive_files_queue(self):
|
||||
while True:
|
||||
try:
|
||||
adaptive_file = self.ADAPTIVE_FILES_QUEUE.get(timeout=0.2)
|
||||
except queue.Empty:
|
||||
if self.collection_finished.is_set():
|
||||
return
|
||||
continue
|
||||
|
||||
try:
|
||||
split_docs, processed_record = self._load_one_adaptive_file(
|
||||
adaptive_file
|
||||
)
|
||||
if split_docs:
|
||||
self.PROCESSED_DOCUMENTS_QUEUE.put((split_docs, processed_record))
|
||||
except Exception as error:
|
||||
logger.error(f"Error processing {adaptive_file.filename}: {error}")
|
||||
finally:
|
||||
self.ADAPTIVE_FILES_QUEUE.task_done()
|
||||
|
||||
# Phase 13 API: uploads chunked docs and marks file processed
|
||||
def upload_processed_documents_from_queue(self):
|
||||
while True:
|
||||
try:
|
||||
payload = self.PROCESSED_DOCUMENTS_QUEUE.get(timeout=0.2)
|
||||
except queue.Empty:
|
||||
if self.processing_finished.is_set():
|
||||
return
|
||||
continue
|
||||
|
||||
try:
|
||||
documents, processed_record = payload
|
||||
self.vector_store.add_documents(documents)
|
||||
|
||||
# Only mark documents as processed after successful insertion to vector store
|
||||
processed_file_paths = set()
|
||||
for doc in documents:
|
||||
if 'source' in doc.metadata:
|
||||
processed_file_paths.add(doc.metadata['source'])
|
||||
if processed_record is not None:
|
||||
self._mark_document_processed(
|
||||
processed_record[0], processed_record[1]
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error uploading processed documents: {error}. But swallowing error. NOT raising."
|
||||
)
|
||||
finally:
|
||||
self.PROCESSED_DOCUMENTS_QUEUE.task_done()
|
||||
|
||||
for file_path in processed_file_paths:
|
||||
self._mark_document_processed(file_path)
|
||||
def _run_threaded_pipeline(self, adaptive_collection: _AdaptiveCollection):
|
||||
"""Run Phase 13 queue/thread pipeline."""
|
||||
process_threads = [
|
||||
threading.Thread(
|
||||
target=self.process_adaptive_files_queue,
|
||||
name=f"adaptive-file-processor-{index}",
|
||||
daemon=True,
|
||||
)
|
||||
for index in range(self.file_process_threads_count)
|
||||
]
|
||||
upload_threads = [
|
||||
threading.Thread(
|
||||
target=self.upload_processed_documents_from_queue,
|
||||
name=f"document-uploader-{index}",
|
||||
daemon=True,
|
||||
)
|
||||
for index in range(self.document_upload_threads_count)
|
||||
]
|
||||
|
||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding documents to vector store: {str(e)}")
|
||||
raise
|
||||
for thread in process_threads:
|
||||
thread.start()
|
||||
for thread in upload_threads:
|
||||
thread.start()
|
||||
|
||||
# This one intentionally runs on main thread per Phase 13 requirement.
|
||||
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
|
||||
|
||||
# Wait file queue completion and processing threads end.
|
||||
self.ADAPTIVE_FILES_QUEUE.join()
|
||||
for thread in process_threads:
|
||||
thread.join()
|
||||
|
||||
# Signal upload workers no more payload is expected.
|
||||
self.processing_finished.set()
|
||||
|
||||
# Wait upload completion and upload threads end.
|
||||
self.PROCESSED_DOCUMENTS_QUEUE.join()
|
||||
for thread in upload_threads:
|
||||
thread.join()
|
||||
|
||||
def _run_sync_pipeline(self, adaptive_collection: _AdaptiveCollection):
|
||||
"""Sequential pipeline for sync mode."""
|
||||
logger.info("Running enrichment in sync mode")
|
||||
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
|
||||
self.process_adaptive_files_queue()
|
||||
self.processing_finished.set()
|
||||
self.upload_processed_documents_from_queue()
|
||||
|
||||
def enrich_and_store(self, adaptive_collection: _AdaptiveCollection):
|
||||
"""Load, enrich, and store documents in the vector store."""
|
||||
logger.info("Starting enrichment process...")
|
||||
|
||||
if ENRICHMENT_PROCESSING_MODE == "sync":
|
||||
logger.info("Document enrichment process starting in SYNC mode")
|
||||
self._run_sync_pipeline(adaptive_collection)
|
||||
return
|
||||
|
||||
logger.info("Document enrichment process starting in ASYNC/THREAD mode")
|
||||
self._run_threaded_pipeline(adaptive_collection)
|
||||
|
||||
|
||||
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
||||
"""Get all supported document file paths from the data directory."""
|
||||
supported_extensions = {
|
||||
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
|
||||
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
|
||||
'.webp', '.odt'
|
||||
}
|
||||
def get_enrichment_adaptive_collection(
|
||||
data_dir: str = str(DATA_DIR),
|
||||
) -> _AdaptiveCollection:
|
||||
"""Create adaptive collection based on environment source configuration."""
|
||||
source = ENRICHMENT_SOURCE
|
||||
if source == "local":
|
||||
local_path = ENRICHMENT_LOCAL_PATH or data_dir
|
||||
logger.info(f"Using local adaptive collection from path: {local_path}")
|
||||
return LocalFilesystemAdaptiveCollection(local_path)
|
||||
|
||||
file_paths = []
|
||||
for root, dirs, files in os.walk(data_dir):
|
||||
for file in files:
|
||||
if Path(file).suffix.lower() in supported_extensions:
|
||||
file_paths.append(os.path.join(root, file))
|
||||
if source == "yadisk":
|
||||
if not YADISK_TOKEN:
|
||||
raise ValueError("YADISK_TOKEN must be set when ENRICHMENT_SOURCE=yadisk")
|
||||
if not ENRICHMENT_YADISK_PATH:
|
||||
raise ValueError(
|
||||
"ENRICHMENT_YADISK_PATH must be set when ENRICHMENT_SOURCE=yadisk"
|
||||
)
|
||||
logger.info(
|
||||
f"Using Yandex Disk adaptive collection from path: {ENRICHMENT_YADISK_PATH}"
|
||||
)
|
||||
return YandexDiskAdaptiveCollection(
|
||||
token=YADISK_TOKEN,
|
||||
base_dir=ENRICHMENT_YADISK_PATH,
|
||||
)
|
||||
|
||||
return file_paths
|
||||
raise ValueError(
|
||||
f"Unsupported ENRICHMENT_SOURCE='{source}'. Allowed values: local, yadisk"
|
||||
)
|
||||
|
||||
|
||||
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
||||
"""Run the full enrichment process."""
|
||||
logger.info(f"Starting document enrichment from directory: {data_dir}")
|
||||
logger.info("Starting document enrichment process")
|
||||
|
||||
# Get all supported documents from the data directory
|
||||
file_paths = get_all_documents_from_data_dir(data_dir)
|
||||
|
||||
if not file_paths:
|
||||
logger.warning(f"No supported documents found in {data_dir}")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(file_paths)} documents to process")
|
||||
adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir)
|
||||
|
||||
# Initialize the document enricher
|
||||
enricher = DocumentEnricher(vector_store)
|
||||
|
||||
# Run the enrichment process
|
||||
enricher.enrich_and_store(file_paths)
|
||||
enricher.enrich_and_store(adaptive_collection)
|
||||
|
||||
logger.info("Document enrichment process completed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
from vector_storage import initialize_vector_store
|
||||
|
||||
# Initialize vector store
|
||||
vector_store = initialize_vector_store()
|
||||
|
||||
# Run enrichment process
|
||||
run_enrichment_process(vector_store)
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
"""Helper utilities for metadata extraction from Russian text."""
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import List
|
||||
import tempfile
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Callable, Iterator, List
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
_YEAR_PATTERN = re.compile(r"(?<!\d)(1\d{3}|20\d{2}|2100)(?!\d)")
|
||||
|
||||
@@ -105,3 +111,170 @@ def extract_russian_event_names(text: str) -> List[str]:
|
||||
seen.add(quoted)
|
||||
|
||||
return events
|
||||
|
||||
|
||||
class _AdaptiveFile(ABC):
|
||||
extension: str # Format: .jpg
|
||||
filename: str
|
||||
|
||||
def __init__(self, filename: str, extension: str):
|
||||
self.filename = filename
|
||||
self.extension = extension
|
||||
|
||||
# This method allows to work with file locally, and lambda should be provided for this.
|
||||
# Why separate method? For possible cleanup after work is done. And to download file, if needed
|
||||
# Lambda: first argument is a local path
|
||||
@abstractmethod
|
||||
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||
"""Run callback with a local path to the file."""
|
||||
|
||||
|
||||
class _AdaptiveCollection(ABC):
|
||||
# Generator method with yield
|
||||
@abstractmethod
|
||||
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||
"""Iterate files in collection."""
|
||||
|
||||
|
||||
class LocalFilesystemAdaptiveFile(_AdaptiveFile):
|
||||
local_path: str
|
||||
|
||||
def __init__(self, filename: str, extension: str, local_path: str):
|
||||
super().__init__(filename, extension)
|
||||
self.local_path = local_path
|
||||
|
||||
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||
func(self.local_path)
|
||||
|
||||
|
||||
class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
|
||||
base_dir: str
|
||||
|
||||
def __init__(self, base_dir: str):
|
||||
super().__init__()
|
||||
|
||||
self.base_dir = base_dir
|
||||
|
||||
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||
for root, dirs, files in os.walk(self.base_dir):
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
p = Path(full_path)
|
||||
yield LocalFilesystemAdaptiveFile(p.name, p.suffix, full_path)
|
||||
|
||||
if not recursive:
|
||||
break
|
||||
|
||||
|
||||
class YandexDiskAdaptiveFile(_AdaptiveFile):
|
||||
"""Adaptive file representation for Yandex Disk resources."""
|
||||
|
||||
remote_path: str
|
||||
|
||||
def __init__(self, filename: str, extension: str, remote_path: str, token: str):
|
||||
super().__init__(filename, extension)
|
||||
self.token = token
|
||||
self.remote_path = remote_path
|
||||
|
||||
def _download_to_temp_file(self) -> str:
|
||||
headers = {"Authorization": f"OAuth {self.token}"}
|
||||
response = requests.get(
|
||||
"https://cloud-api.yandex.net/v1/disk/resources/download",
|
||||
headers=headers,
|
||||
params={"path": self.remote_path},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
href = response.json()["href"]
|
||||
|
||||
file_response = requests.get(href, timeout=120)
|
||||
file_response.raise_for_status()
|
||||
|
||||
p = Path(self.remote_path)
|
||||
suffix = p.suffix
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
||||
temp_file.write(file_response.content)
|
||||
return temp_file.name
|
||||
|
||||
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||
temp_path = self._download_to_temp_file()
|
||||
try:
|
||||
func(temp_path)
|
||||
finally:
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
|
||||
|
||||
class YandexDiskAdaptiveCollection(_AdaptiveCollection):
|
||||
"""Adaptive collection implementation for Yandex Disk."""
|
||||
|
||||
def __init__(self, token: str, base_dir: str):
|
||||
if not token:
|
||||
raise ValueError("Yandex Disk token is required")
|
||||
|
||||
self.token = token
|
||||
self.base_dir = base_dir
|
||||
self._headers = {"Authorization": f"OAuth {self.token}"}
|
||||
|
||||
@staticmethod
|
||||
def _normalize_disk_path(path: str) -> str:
|
||||
return path if path.startswith("disk:/") else f"disk:/{path.lstrip('/')}"
|
||||
|
||||
def _get_resource_info(self, path: str) -> dict:
|
||||
response = requests.get(
|
||||
"https://cloud-api.yandex.net/v1/disk/resources",
|
||||
headers=self._headers,
|
||||
params={"path": path, "limit": 1000},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def _iter_children(self, path: str) -> Iterator[dict]:
|
||||
offset = 0
|
||||
while True:
|
||||
response = requests.get(
|
||||
"https://cloud-api.yandex.net/v1/disk/resources",
|
||||
headers=self._headers,
|
||||
params={"path": path, "limit": 1000, "offset": offset},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
embedded = payload.get("_embedded", {})
|
||||
items = embedded.get("items", [])
|
||||
if not items:
|
||||
break
|
||||
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
if len(items) < 1000:
|
||||
break
|
||||
offset += 1000
|
||||
|
||||
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||
root_path = self._normalize_disk_path(self.base_dir)
|
||||
root_info = self._get_resource_info(root_path)
|
||||
|
||||
if root_info.get("type") == "file":
|
||||
path = root_info["path"]
|
||||
logger.info(f"Found file on Yandex Disk: {path}")
|
||||
p = Path(path)
|
||||
yield YandexDiskAdaptiveFile(p.name, p.suffix, path, self.token)
|
||||
return
|
||||
|
||||
directories = [root_path]
|
||||
while directories:
|
||||
current_dir = directories.pop(0)
|
||||
for item in self._iter_children(current_dir):
|
||||
item_type = item.get("type")
|
||||
item_path = str(item.get("path"))
|
||||
if item_type == "file":
|
||||
logger.info(f"Found file on Yandex Disk: {item_path}")
|
||||
p = Path(item_path)
|
||||
yield YandexDiskAdaptiveFile(
|
||||
p.name, p.suffix, item_path, self.token
|
||||
)
|
||||
elif recursive and item_type == "dir":
|
||||
directories.append(item_path)
|
||||
|
||||
1
services/rag/langchain/test/samples/level1/first.md
Normal file
1
services/rag/langchain/test/samples/level1/first.md
Normal file
@@ -0,0 +1 @@
|
||||
first level
|
||||
1
services/rag/langchain/test/samples/root.txt
Normal file
1
services/rag/langchain/test/samples/root.txt
Normal file
@@ -0,0 +1 @@
|
||||
root file
|
||||
@@ -0,0 +1,55 @@
|
||||
import os
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from helpers import LocalFilesystemAdaptiveCollection, LocalFilesystemAdaptiveFile
|
||||
|
||||
|
||||
class TestLocalFilesystemAdaptiveCollection(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.samples_dir = Path(__file__).parent / "samples"
|
||||
|
||||
def test_iterate_non_recursive_returns_only_root_files(self):
|
||||
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
||||
|
||||
files = list(collection.iterate(recursive=False))
|
||||
file_names = sorted(file.filename for file in files)
|
||||
|
||||
self.assertEqual(file_names, ["root.txt"])
|
||||
self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files))
|
||||
|
||||
def test_iterate_recursive_returns_nested_files(self):
|
||||
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
||||
|
||||
files = list(collection.iterate(recursive=True))
|
||||
relative_paths = sorted(
|
||||
str(Path(file.local_path).relative_to(self.samples_dir)) for file in files
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
relative_paths,
|
||||
["level1/first.md", "level1/level2/second.log", "root.txt"],
|
||||
)
|
||||
|
||||
def test_work_with_file_locally_provides_existing_path(self):
|
||||
target_path = self.samples_dir / "root.txt"
|
||||
adaptive_file = LocalFilesystemAdaptiveFile(
|
||||
target_path.name, target_path.suffix, str(target_path)
|
||||
)
|
||||
|
||||
observed = {}
|
||||
|
||||
def callback(path: str):
|
||||
observed["path"] = path
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
observed["content"] = handle.read().strip()
|
||||
|
||||
adaptive_file.work_with_file_locally(callback)
|
||||
|
||||
self.assertEqual(adaptive_file.filename, "root.txt")
|
||||
self.assertEqual(observed["path"], str(target_path))
|
||||
self.assertEqual(observed["content"], "root file")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,41 @@
|
||||
import os
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
from dotenv import load_dotenv
|
||||
from helpers import YandexDiskAdaptiveCollection
|
||||
|
||||
load_dotenv(dotenv_path=Path(__file__).resolve().parent.parent / ".env.test")
|
||||
|
||||
|
||||
class TestYandexDiskAdaptiveCollection(unittest.TestCase):
|
||||
def test_constructor_requires_token(self):
|
||||
with self.assertRaises(ValueError):
|
||||
YandexDiskAdaptiveCollection(token="", base_dir="Общая/Информация")
|
||||
|
||||
def test_iterate_logs_found_files_for_shared_folder(self):
|
||||
token = os.getenv("YADISK_TOKEN")
|
||||
if not token:
|
||||
self.skipTest("YADISK_TOKEN is not configured")
|
||||
|
||||
collection = YandexDiskAdaptiveCollection(
|
||||
token=token,
|
||||
base_dir="Общая/Информация",
|
||||
)
|
||||
|
||||
try:
|
||||
files = list(collection.iterate(recursive=True))
|
||||
except requests.RequestException as exc:
|
||||
self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}")
|
||||
|
||||
for item in files:
|
||||
self.assertTrue(item.filename)
|
||||
logger.info(f"Yandex file found during test iteration: {item.local_path}")
|
||||
|
||||
self.assertIsInstance(files, list)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user