langchain loading documents into vector storage
This commit is contained in:
232
services/rag/langchain/enrichment.py
Normal file
232
services/rag/langchain/enrichment.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""Document enrichment module for loading documents into vector storage."""
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
from langchain_core.documents import Document
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from langchain_community.document_loaders import (
|
||||
PyPDFLoader,
|
||||
UnstructuredWordDocumentLoader,
|
||||
UnstructuredPowerPointLoader,
|
||||
PandasExcelLoader,
|
||||
UnstructuredImageLoader,
|
||||
UnstructuredODTLoader,
|
||||
)
|
||||
from sqlalchemy import create_engine, Column, Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from loguru import logger
|
||||
import sqlite3
|
||||
|
||||
|
||||
# Define the path to the data directory
|
||||
DATA_DIR = Path("../../../data").resolve()
|
||||
DB_PATH = Path("document_tracking.db").resolve()
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class ProcessedDocument(Base):
|
||||
"""Database model for tracking processed documents."""
|
||||
__tablename__ = "processed_documents"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
file_path = Column(String, unique=True, nullable=False)
|
||||
file_hash = Column(String, nullable=False)
|
||||
|
||||
|
||||
class DocumentEnricher:
|
||||
"""Class responsible for enriching documents and loading them to vector storage."""
|
||||
|
||||
def __init__(self, vector_store):
|
||||
self.vector_store = vector_store
|
||||
self.text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=1000,
|
||||
chunk_overlap=200,
|
||||
length_function=len,
|
||||
)
|
||||
|
||||
# Initialize database for tracking processed documents
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize the SQLite database for tracking processed documents."""
|
||||
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
||||
Base.metadata.create_all(self.engine)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
self.session = Session()
|
||||
|
||||
def _get_file_hash(self, file_path: str) -> str:
|
||||
"""Calculate SHA256 hash of a file."""
|
||||
hash_sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
# Read file in chunks to handle large files
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_sha256.update(chunk)
|
||||
return hash_sha256.hexdigest()
|
||||
|
||||
def _is_document_processed(self, file_path: str) -> bool:
|
||||
"""Check if a document has already been processed."""
|
||||
file_hash = self._get_file_hash(file_path)
|
||||
existing = self.session.query(ProcessedDocument).filter_by(
|
||||
file_hash=file_hash
|
||||
).first()
|
||||
return existing is not None
|
||||
|
||||
def _mark_document_processed(self, file_path: str):
|
||||
"""Mark a document as processed in the database."""
|
||||
file_hash = self._get_file_hash(file_path)
|
||||
doc_record = ProcessedDocument(
|
||||
file_path=file_path,
|
||||
file_hash=file_hash
|
||||
)
|
||||
self.session.add(doc_record)
|
||||
self.session.commit()
|
||||
|
||||
def _get_loader_for_extension(self, file_path: str):
|
||||
"""Get the appropriate loader for a given file extension."""
|
||||
ext = Path(file_path).suffix.lower()
|
||||
|
||||
if ext == ".pdf":
|
||||
return PyPDFLoader(file_path)
|
||||
elif ext in [".docx", ".doc"]:
|
||||
return UnstructuredWordDocumentLoader(file_path)
|
||||
elif ext == ".pptx":
|
||||
return UnstructuredPowerPointLoader(file_path)
|
||||
elif ext in [".xlsx", ".xls"]:
|
||||
return PandasExcelLoader(file_path)
|
||||
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||
return UnstructuredImageLoader(file_path)
|
||||
elif ext == ".odt":
|
||||
return UnstructuredODTLoader(file_path)
|
||||
else:
|
||||
# For text files and unsupported formats, try to load as text
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
return None, content # Return content directly for text processing
|
||||
except UnicodeDecodeError:
|
||||
logger.warning(f"Could not decode file as text: {file_path}")
|
||||
return None, None
|
||||
|
||||
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
||||
"""Load documents from file paths and split them appropriately."""
|
||||
all_docs = []
|
||||
|
||||
for file_path in file_paths:
|
||||
if self._is_document_processed(file_path):
|
||||
logger.info(f"Skipping already processed document: {file_path}")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing document: {file_path}")
|
||||
|
||||
# Get the appropriate loader for the file extension
|
||||
loader = self._get_loader_for_extension(file_path)
|
||||
|
||||
if loader is None:
|
||||
# For unsupported formats that we tried to load as text
|
||||
continue
|
||||
|
||||
try:
|
||||
# Load the document(s)
|
||||
docs = loader.load()
|
||||
|
||||
# Add metadata to each document
|
||||
for doc in docs:
|
||||
# Extract metadata from the original file
|
||||
doc.metadata["source"] = file_path
|
||||
doc.metadata["filename"] = Path(file_path).name
|
||||
doc.metadata["file_path"] = file_path
|
||||
doc.metadata["file_size"] = os.path.getsize(file_path)
|
||||
|
||||
# Add page number if available in original metadata
|
||||
if "page" in doc.metadata:
|
||||
doc.metadata["page_number"] = doc.metadata["page"]
|
||||
|
||||
# Add file extension as metadata
|
||||
doc.metadata["file_extension"] = Path(file_path).suffix
|
||||
|
||||
# Split documents if they are too large
|
||||
split_docs = self.text_splitter.split_documents(docs)
|
||||
|
||||
# Add to the collection
|
||||
all_docs.extend(split_docs)
|
||||
|
||||
# Mark document as processed
|
||||
self._mark_document_processed(file_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {str(e)}")
|
||||
continue
|
||||
|
||||
return all_docs
|
||||
|
||||
def enrich_and_store(self, file_paths: List[str]):
|
||||
"""Load, enrich, and store documents in the vector store."""
|
||||
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
||||
|
||||
# Load and split documents
|
||||
documents = self.load_and_split_documents(file_paths)
|
||||
|
||||
if not documents:
|
||||
logger.info("No new documents to process.")
|
||||
return
|
||||
|
||||
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
||||
|
||||
# Add documents to vector store
|
||||
self.vector_store.add_documents(documents)
|
||||
|
||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store.")
|
||||
|
||||
|
||||
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
||||
"""Get all supported document file paths from the data directory."""
|
||||
supported_extensions = {
|
||||
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
|
||||
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
|
||||
'.webp', '.odt'
|
||||
}
|
||||
|
||||
file_paths = []
|
||||
for root, dirs, files in os.walk(data_dir):
|
||||
for file in files:
|
||||
if Path(file).suffix.lower() in supported_extensions:
|
||||
file_paths.append(os.path.join(root, file))
|
||||
|
||||
return file_paths
|
||||
|
||||
|
||||
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
||||
"""Run the full enrichment process."""
|
||||
logger.info(f"Starting document enrichment from directory: {data_dir}")
|
||||
|
||||
# Get all supported documents from the data directory
|
||||
file_paths = get_all_documents_from_data_dir(data_dir)
|
||||
|
||||
if not file_paths:
|
||||
logger.warning(f"No supported documents found in {data_dir}")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(file_paths)} documents to process")
|
||||
|
||||
# Initialize the document enricher
|
||||
enricher = DocumentEnricher(vector_store)
|
||||
|
||||
# Run the enrichment process
|
||||
enricher.enrich_and_store(file_paths)
|
||||
|
||||
logger.info("Document enrichment process completed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
from vector_storage import initialize_vector_store
|
||||
|
||||
# Initialize vector store
|
||||
vector_store = initialize_vector_store()
|
||||
|
||||
# Run enrichment process
|
||||
run_enrichment_process(vector_store)
|
||||
Reference in New Issue
Block a user