Files
rag-solution/services/rag/langchain/enrichment.py
2026-02-03 22:55:12 +03:00

276 lines
10 KiB
Python

"""Document enrichment module for loading documents into vector storage."""
import os
import hashlib
from pathlib import Path
from typing import List, Dict, Any
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
# Dynamically import other loaders to handle optional dependencies
try:
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
except ImportError:
UnstructuredWordDocumentLoader = None
try:
from langchain_community.document_loaders import UnstructuredPowerPointLoader
except ImportError:
UnstructuredPowerPointLoader = None
try:
from langchain_community.document_loaders import UnstructuredExcelLoader
except ImportError:
UnstructuredExcelLoader = None
try:
from langchain_community.document_loaders import UnstructuredImageLoader
except ImportError:
UnstructuredImageLoader = None
try:
from langchain_community.document_loaders import UnstructuredODTLoader
except ImportError:
UnstructuredODTLoader = None
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from loguru import logger
import sqlite3
# Define the path to the data directory
DATA_DIR = Path("../../../data").resolve()
DB_PATH = Path("document_tracking.db").resolve()
Base = declarative_base()
class ProcessedDocument(Base):
"""Database model for tracking processed documents."""
__tablename__ = "processed_documents"
id = Column(Integer, primary_key=True)
file_path = Column(String, unique=True, nullable=False)
file_hash = Column(String, nullable=False)
class DocumentEnricher:
"""Class responsible for enriching documents and loading them to vector storage."""
def __init__(self, vector_store):
self.vector_store = vector_store
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
# Initialize database for tracking processed documents
self._init_db()
def _init_db(self):
"""Initialize the SQLite database for tracking processed documents."""
self.engine = create_engine(f"sqlite:///{DB_PATH}")
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def _get_file_hash(self, file_path: str) -> str:
"""Calculate SHA256 hash of a file."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
# Read file in chunks to handle large files
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _is_document_processed(self, file_path: str) -> bool:
"""Check if a document has already been processed."""
file_hash = self._get_file_hash(file_path)
existing = self.session.query(ProcessedDocument).filter_by(
file_hash=file_hash
).first()
return existing is not None
def _mark_document_processed(self, file_path: str):
"""Mark a document as processed in the database."""
file_hash = self._get_file_hash(file_path)
doc_record = ProcessedDocument(
file_path=file_path,
file_hash=file_hash
)
self.session.add(doc_record)
self.session.commit()
def _get_loader_for_extension(self, file_path: str):
"""Get the appropriate loader for a given file extension."""
ext = Path(file_path).suffix.lower()
if ext == ".pdf":
return PyPDFLoader(file_path)
elif ext in [".docx", ".doc"]:
if UnstructuredWordDocumentLoader is None:
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
return None
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext == ".pptx":
if UnstructuredPowerPointLoader is None:
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
return None
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext in [".xlsx", ".xls"]:
if UnstructuredExcelLoader is None:
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
return None
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
if UnstructuredImageLoader is None:
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
return None
# Use OCR strategy for images to extract text
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
elif ext == ".odt":
if UnstructuredODTLoader is None:
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
return None
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
else:
# For text files and unsupported formats, try to load as text
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return None, content # Return content directly for text processing
except UnicodeDecodeError:
logger.warning(f"Could not decode file as text: {file_path}")
return None, None
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
"""Load documents from file paths and split them appropriately."""
all_docs = []
for file_path in file_paths:
if self._is_document_processed(file_path):
logger.info(f"Skipping already processed document: {file_path}")
continue
logger.info(f"Processing document: {file_path}")
# Get the appropriate loader for the file extension
loader = self._get_loader_for_extension(file_path)
if loader is None:
# For unsupported formats that we tried to load as text
continue
try:
# Load the document(s)
docs = loader.load()
# Add metadata to each document
for doc in docs:
# Extract metadata from the original file
doc.metadata["source"] = file_path
doc.metadata["filename"] = Path(file_path).name
doc.metadata["file_path"] = file_path
doc.metadata["file_size"] = os.path.getsize(file_path)
# Add page number if available in original metadata
if "page" in doc.metadata:
doc.metadata["page_number"] = doc.metadata["page"]
# Add file extension as metadata
doc.metadata["file_extension"] = Path(file_path).suffix
# Split documents if they are too large
split_docs = self.text_splitter.split_documents(docs)
# Add to the collection
all_docs.extend(split_docs)
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
continue
return all_docs
def enrich_and_store(self, file_paths: List[str]):
"""Load, enrich, and store documents in the vector store."""
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
# Load and split documents
documents = self.load_and_split_documents(file_paths)
if not documents:
logger.info("No new documents to process.")
return
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
# Add documents to vector store
try:
self.vector_store.add_documents(documents)
# Only mark documents as processed after successful insertion to vector store
processed_file_paths = set()
for doc in documents:
if 'source' in doc.metadata:
processed_file_paths.add(doc.metadata['source'])
for file_path in processed_file_paths:
self._mark_document_processed(file_path)
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
except Exception as e:
logger.error(f"Error adding documents to vector store: {str(e)}")
raise
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
"""Get all supported document file paths from the data directory."""
supported_extensions = {
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
'.webp', '.odt'
}
file_paths = []
for root, dirs, files in os.walk(data_dir):
for file in files:
if Path(file).suffix.lower() in supported_extensions:
file_paths.append(os.path.join(root, file))
return file_paths
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
"""Run the full enrichment process."""
logger.info(f"Starting document enrichment from directory: {data_dir}")
# Get all supported documents from the data directory
file_paths = get_all_documents_from_data_dir(data_dir)
if not file_paths:
logger.warning(f"No supported documents found in {data_dir}")
return
logger.info(f"Found {len(file_paths)} documents to process")
# Initialize the document enricher
enricher = DocumentEnricher(vector_store)
# Run the enrichment process
enricher.enrich_and_store(file_paths)
logger.info("Document enrichment process completed!")
if __name__ == "__main__":
# Example usage
from vector_storage import initialize_vector_store
# Initialize vector store
vector_store = initialize_vector_store()
# Run enrichment process
run_enrichment_process(vector_store)