Files
rag-solution/services/rag/langchain/enrichment.py

276 lines
10 KiB
Python
Raw Normal View History

"""Document enrichment module for loading documents into vector storage."""
import os
import hashlib
from pathlib import Path
from typing import List, Dict, Any
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
2026-02-03 22:55:12 +03:00
from langchain_community.document_loaders import PyPDFLoader
# Dynamically import other loaders to handle optional dependencies
try:
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
except ImportError:
UnstructuredWordDocumentLoader = None
try:
from langchain_community.document_loaders import UnstructuredPowerPointLoader
except ImportError:
UnstructuredPowerPointLoader = None
try:
from langchain_community.document_loaders import UnstructuredExcelLoader
except ImportError:
UnstructuredExcelLoader = None
try:
from langchain_community.document_loaders import UnstructuredImageLoader
except ImportError:
UnstructuredImageLoader = None
try:
from langchain_community.document_loaders import UnstructuredODTLoader
except ImportError:
UnstructuredODTLoader = None
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from loguru import logger
import sqlite3
# Define the path to the data directory
DATA_DIR = Path("../../../data").resolve()
DB_PATH = Path("document_tracking.db").resolve()
Base = declarative_base()
class ProcessedDocument(Base):
"""Database model for tracking processed documents."""
__tablename__ = "processed_documents"
id = Column(Integer, primary_key=True)
file_path = Column(String, unique=True, nullable=False)
file_hash = Column(String, nullable=False)
class DocumentEnricher:
"""Class responsible for enriching documents and loading them to vector storage."""
def __init__(self, vector_store):
self.vector_store = vector_store
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
# Initialize database for tracking processed documents
self._init_db()
def _init_db(self):
"""Initialize the SQLite database for tracking processed documents."""
self.engine = create_engine(f"sqlite:///{DB_PATH}")
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def _get_file_hash(self, file_path: str) -> str:
"""Calculate SHA256 hash of a file."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
# Read file in chunks to handle large files
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _is_document_processed(self, file_path: str) -> bool:
"""Check if a document has already been processed."""
file_hash = self._get_file_hash(file_path)
existing = self.session.query(ProcessedDocument).filter_by(
file_hash=file_hash
).first()
return existing is not None
def _mark_document_processed(self, file_path: str):
"""Mark a document as processed in the database."""
file_hash = self._get_file_hash(file_path)
doc_record = ProcessedDocument(
file_path=file_path,
file_hash=file_hash
)
self.session.add(doc_record)
self.session.commit()
def _get_loader_for_extension(self, file_path: str):
"""Get the appropriate loader for a given file extension."""
ext = Path(file_path).suffix.lower()
2026-02-03 22:55:12 +03:00
if ext == ".pdf":
return PyPDFLoader(file_path)
elif ext in [".docx", ".doc"]:
2026-02-03 22:55:12 +03:00
if UnstructuredWordDocumentLoader is None:
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
return None
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext == ".pptx":
2026-02-03 22:55:12 +03:00
if UnstructuredPowerPointLoader is None:
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
return None
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext in [".xlsx", ".xls"]:
2026-02-03 22:55:12 +03:00
if UnstructuredExcelLoader is None:
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
return None
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
2026-02-03 22:55:12 +03:00
if UnstructuredImageLoader is None:
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
return None
# Use OCR strategy for images to extract text
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
elif ext == ".odt":
2026-02-03 22:55:12 +03:00
if UnstructuredODTLoader is None:
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
return None
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
else:
# For text files and unsupported formats, try to load as text
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return None, content # Return content directly for text processing
except UnicodeDecodeError:
logger.warning(f"Could not decode file as text: {file_path}")
return None, None
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
"""Load documents from file paths and split them appropriately."""
all_docs = []
2026-02-03 22:55:12 +03:00
for file_path in file_paths:
if self._is_document_processed(file_path):
logger.info(f"Skipping already processed document: {file_path}")
continue
2026-02-03 22:55:12 +03:00
logger.info(f"Processing document: {file_path}")
2026-02-03 22:55:12 +03:00
# Get the appropriate loader for the file extension
loader = self._get_loader_for_extension(file_path)
2026-02-03 22:55:12 +03:00
if loader is None:
# For unsupported formats that we tried to load as text
continue
2026-02-03 22:55:12 +03:00
try:
# Load the document(s)
docs = loader.load()
2026-02-03 22:55:12 +03:00
# Add metadata to each document
for doc in docs:
# Extract metadata from the original file
doc.metadata["source"] = file_path
doc.metadata["filename"] = Path(file_path).name
doc.metadata["file_path"] = file_path
doc.metadata["file_size"] = os.path.getsize(file_path)
2026-02-03 22:55:12 +03:00
# Add page number if available in original metadata
if "page" in doc.metadata:
doc.metadata["page_number"] = doc.metadata["page"]
2026-02-03 22:55:12 +03:00
# Add file extension as metadata
doc.metadata["file_extension"] = Path(file_path).suffix
2026-02-03 22:55:12 +03:00
# Split documents if they are too large
split_docs = self.text_splitter.split_documents(docs)
2026-02-03 22:55:12 +03:00
# Add to the collection
all_docs.extend(split_docs)
2026-02-03 22:55:12 +03:00
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
continue
2026-02-03 22:55:12 +03:00
return all_docs
def enrich_and_store(self, file_paths: List[str]):
"""Load, enrich, and store documents in the vector store."""
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
2026-02-03 22:55:12 +03:00
# Load and split documents
documents = self.load_and_split_documents(file_paths)
2026-02-03 22:55:12 +03:00
if not documents:
logger.info("No new documents to process.")
return
2026-02-03 22:55:12 +03:00
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
2026-02-03 22:55:12 +03:00
# Add documents to vector store
2026-02-03 22:55:12 +03:00
try:
self.vector_store.add_documents(documents)
# Only mark documents as processed after successful insertion to vector store
processed_file_paths = set()
for doc in documents:
if 'source' in doc.metadata:
processed_file_paths.add(doc.metadata['source'])
for file_path in processed_file_paths:
self._mark_document_processed(file_path)
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
except Exception as e:
logger.error(f"Error adding documents to vector store: {str(e)}")
raise
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
"""Get all supported document file paths from the data directory."""
supported_extensions = {
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
'.webp', '.odt'
}
file_paths = []
for root, dirs, files in os.walk(data_dir):
for file in files:
if Path(file).suffix.lower() in supported_extensions:
file_paths.append(os.path.join(root, file))
return file_paths
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
"""Run the full enrichment process."""
logger.info(f"Starting document enrichment from directory: {data_dir}")
# Get all supported documents from the data directory
file_paths = get_all_documents_from_data_dir(data_dir)
if not file_paths:
logger.warning(f"No supported documents found in {data_dir}")
return
logger.info(f"Found {len(file_paths)} documents to process")
# Initialize the document enricher
enricher = DocumentEnricher(vector_store)
# Run the enrichment process
enricher.enrich_and_store(file_paths)
logger.info("Document enrichment process completed!")
if __name__ == "__main__":
# Example usage
from vector_storage import initialize_vector_store
# Initialize vector store
vector_store = initialize_vector_store()
# Run enrichment process
run_enrichment_process(vector_store)