2026-02-03 20:52:08 +03:00
|
|
|
"""Document enrichment module for loading documents into vector storage."""
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import hashlib
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import List, Dict, Any
|
|
|
|
|
from langchain_core.documents import Document
|
|
|
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
2026-02-03 22:55:12 +03:00
|
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
|
|
|
# Dynamically import other loaders to handle optional dependencies
|
|
|
|
|
try:
|
|
|
|
|
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
|
|
|
|
except ImportError:
|
|
|
|
|
UnstructuredWordDocumentLoader = None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from langchain_community.document_loaders import UnstructuredPowerPointLoader
|
|
|
|
|
except ImportError:
|
|
|
|
|
UnstructuredPowerPointLoader = None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from langchain_community.document_loaders import UnstructuredExcelLoader
|
|
|
|
|
except ImportError:
|
|
|
|
|
UnstructuredExcelLoader = None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from langchain_community.document_loaders import UnstructuredImageLoader
|
|
|
|
|
except ImportError:
|
|
|
|
|
UnstructuredImageLoader = None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from langchain_community.document_loaders import UnstructuredODTLoader
|
|
|
|
|
except ImportError:
|
|
|
|
|
UnstructuredODTLoader = None
|
2026-02-03 20:52:08 +03:00
|
|
|
from sqlalchemy import create_engine, Column, Integer, String
|
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
|
from loguru import logger
|
|
|
|
|
import sqlite3
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Define the path to the data directory
|
|
|
|
|
DATA_DIR = Path("../../../data").resolve()
|
|
|
|
|
DB_PATH = Path("document_tracking.db").resolve()
|
|
|
|
|
|
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProcessedDocument(Base):
|
|
|
|
|
"""Database model for tracking processed documents."""
|
|
|
|
|
__tablename__ = "processed_documents"
|
|
|
|
|
|
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
|
file_path = Column(String, unique=True, nullable=False)
|
|
|
|
|
file_hash = Column(String, nullable=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DocumentEnricher:
|
|
|
|
|
"""Class responsible for enriching documents and loading them to vector storage."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, vector_store):
|
|
|
|
|
self.vector_store = vector_store
|
|
|
|
|
self.text_splitter = RecursiveCharacterTextSplitter(
|
|
|
|
|
chunk_size=1000,
|
|
|
|
|
chunk_overlap=200,
|
|
|
|
|
length_function=len,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Initialize database for tracking processed documents
|
|
|
|
|
self._init_db()
|
|
|
|
|
|
|
|
|
|
def _init_db(self):
|
|
|
|
|
"""Initialize the SQLite database for tracking processed documents."""
|
|
|
|
|
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
|
|
|
|
Base.metadata.create_all(self.engine)
|
|
|
|
|
Session = sessionmaker(bind=self.engine)
|
|
|
|
|
self.session = Session()
|
|
|
|
|
|
|
|
|
|
def _get_file_hash(self, file_path: str) -> str:
|
|
|
|
|
"""Calculate SHA256 hash of a file."""
|
|
|
|
|
hash_sha256 = hashlib.sha256()
|
|
|
|
|
with open(file_path, "rb") as f:
|
|
|
|
|
# Read file in chunks to handle large files
|
|
|
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
|
|
|
hash_sha256.update(chunk)
|
|
|
|
|
return hash_sha256.hexdigest()
|
|
|
|
|
|
|
|
|
|
def _is_document_processed(self, file_path: str) -> bool:
|
|
|
|
|
"""Check if a document has already been processed."""
|
|
|
|
|
file_hash = self._get_file_hash(file_path)
|
|
|
|
|
existing = self.session.query(ProcessedDocument).filter_by(
|
|
|
|
|
file_hash=file_hash
|
|
|
|
|
).first()
|
|
|
|
|
return existing is not None
|
|
|
|
|
|
|
|
|
|
def _mark_document_processed(self, file_path: str):
|
|
|
|
|
"""Mark a document as processed in the database."""
|
|
|
|
|
file_hash = self._get_file_hash(file_path)
|
|
|
|
|
doc_record = ProcessedDocument(
|
|
|
|
|
file_path=file_path,
|
|
|
|
|
file_hash=file_hash
|
|
|
|
|
)
|
|
|
|
|
self.session.add(doc_record)
|
|
|
|
|
self.session.commit()
|
|
|
|
|
|
|
|
|
|
def _get_loader_for_extension(self, file_path: str):
|
|
|
|
|
"""Get the appropriate loader for a given file extension."""
|
|
|
|
|
ext = Path(file_path).suffix.lower()
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
if ext == ".pdf":
|
|
|
|
|
return PyPDFLoader(file_path)
|
|
|
|
|
elif ext in [".docx", ".doc"]:
|
2026-02-03 22:55:12 +03:00
|
|
|
if UnstructuredWordDocumentLoader is None:
|
|
|
|
|
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
|
|
|
|
|
return None
|
|
|
|
|
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
2026-02-03 20:52:08 +03:00
|
|
|
elif ext == ".pptx":
|
2026-02-03 22:55:12 +03:00
|
|
|
if UnstructuredPowerPointLoader is None:
|
|
|
|
|
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
|
|
|
|
|
return None
|
|
|
|
|
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
2026-02-03 20:52:08 +03:00
|
|
|
elif ext in [".xlsx", ".xls"]:
|
2026-02-03 22:55:12 +03:00
|
|
|
if UnstructuredExcelLoader is None:
|
|
|
|
|
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
|
|
|
|
|
return None
|
|
|
|
|
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
2026-02-03 20:52:08 +03:00
|
|
|
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
2026-02-03 22:55:12 +03:00
|
|
|
if UnstructuredImageLoader is None:
|
|
|
|
|
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
|
|
|
|
|
return None
|
|
|
|
|
# Use OCR strategy for images to extract text
|
|
|
|
|
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
|
2026-02-03 20:52:08 +03:00
|
|
|
elif ext == ".odt":
|
2026-02-03 22:55:12 +03:00
|
|
|
if UnstructuredODTLoader is None:
|
|
|
|
|
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
|
|
|
|
|
return None
|
|
|
|
|
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
2026-02-03 20:52:08 +03:00
|
|
|
else:
|
|
|
|
|
# For text files and unsupported formats, try to load as text
|
|
|
|
|
try:
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
|
|
content = f.read()
|
|
|
|
|
return None, content # Return content directly for text processing
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
logger.warning(f"Could not decode file as text: {file_path}")
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
|
|
|
|
"""Load documents from file paths and split them appropriately."""
|
|
|
|
|
all_docs = []
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
for file_path in file_paths:
|
|
|
|
|
if self._is_document_processed(file_path):
|
|
|
|
|
logger.info(f"Skipping already processed document: {file_path}")
|
|
|
|
|
continue
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
logger.info(f"Processing document: {file_path}")
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Get the appropriate loader for the file extension
|
|
|
|
|
loader = self._get_loader_for_extension(file_path)
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
if loader is None:
|
|
|
|
|
# For unsupported formats that we tried to load as text
|
|
|
|
|
continue
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
try:
|
|
|
|
|
# Load the document(s)
|
|
|
|
|
docs = loader.load()
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Add metadata to each document
|
|
|
|
|
for doc in docs:
|
|
|
|
|
# Extract metadata from the original file
|
|
|
|
|
doc.metadata["source"] = file_path
|
|
|
|
|
doc.metadata["filename"] = Path(file_path).name
|
|
|
|
|
doc.metadata["file_path"] = file_path
|
|
|
|
|
doc.metadata["file_size"] = os.path.getsize(file_path)
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Add page number if available in original metadata
|
|
|
|
|
if "page" in doc.metadata:
|
|
|
|
|
doc.metadata["page_number"] = doc.metadata["page"]
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Add file extension as metadata
|
|
|
|
|
doc.metadata["file_extension"] = Path(file_path).suffix
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Split documents if they are too large
|
|
|
|
|
split_docs = self.text_splitter.split_documents(docs)
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Add to the collection
|
|
|
|
|
all_docs.extend(split_docs)
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error processing {file_path}: {str(e)}")
|
|
|
|
|
continue
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
return all_docs
|
|
|
|
|
|
|
|
|
|
def enrich_and_store(self, file_paths: List[str]):
|
|
|
|
|
"""Load, enrich, and store documents in the vector store."""
|
|
|
|
|
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Load and split documents
|
|
|
|
|
documents = self.load_and_split_documents(file_paths)
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
if not documents:
|
|
|
|
|
logger.info("No new documents to process.")
|
|
|
|
|
return
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
2026-02-03 22:55:12 +03:00
|
|
|
|
2026-02-03 20:52:08 +03:00
|
|
|
# Add documents to vector store
|
2026-02-03 22:55:12 +03:00
|
|
|
try:
|
|
|
|
|
self.vector_store.add_documents(documents)
|
|
|
|
|
|
|
|
|
|
# Only mark documents as processed after successful insertion to vector store
|
|
|
|
|
processed_file_paths = set()
|
|
|
|
|
for doc in documents:
|
|
|
|
|
if 'source' in doc.metadata:
|
|
|
|
|
processed_file_paths.add(doc.metadata['source'])
|
|
|
|
|
|
|
|
|
|
for file_path in processed_file_paths:
|
|
|
|
|
self._mark_document_processed(file_path)
|
|
|
|
|
|
|
|
|
|
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error adding documents to vector store: {str(e)}")
|
|
|
|
|
raise
|
2026-02-03 20:52:08 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
|
|
|
|
"""Get all supported document file paths from the data directory."""
|
|
|
|
|
supported_extensions = {
|
|
|
|
|
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
|
|
|
|
|
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
|
|
|
|
|
'.webp', '.odt'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
file_paths = []
|
|
|
|
|
for root, dirs, files in os.walk(data_dir):
|
|
|
|
|
for file in files:
|
|
|
|
|
if Path(file).suffix.lower() in supported_extensions:
|
|
|
|
|
file_paths.append(os.path.join(root, file))
|
|
|
|
|
|
|
|
|
|
return file_paths
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
|
|
|
|
"""Run the full enrichment process."""
|
|
|
|
|
logger.info(f"Starting document enrichment from directory: {data_dir}")
|
|
|
|
|
|
|
|
|
|
# Get all supported documents from the data directory
|
|
|
|
|
file_paths = get_all_documents_from_data_dir(data_dir)
|
|
|
|
|
|
|
|
|
|
if not file_paths:
|
|
|
|
|
logger.warning(f"No supported documents found in {data_dir}")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logger.info(f"Found {len(file_paths)} documents to process")
|
|
|
|
|
|
|
|
|
|
# Initialize the document enricher
|
|
|
|
|
enricher = DocumentEnricher(vector_store)
|
|
|
|
|
|
|
|
|
|
# Run the enrichment process
|
|
|
|
|
enricher.enrich_and_store(file_paths)
|
|
|
|
|
|
|
|
|
|
logger.info("Document enrichment process completed!")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
# Example usage
|
|
|
|
|
from vector_storage import initialize_vector_store
|
|
|
|
|
|
|
|
|
|
# Initialize vector store
|
|
|
|
|
vector_store = initialize_vector_store()
|
|
|
|
|
|
|
|
|
|
# Run enrichment process
|
|
|
|
|
run_enrichment_process(vector_store)
|