Working enrichment
This commit is contained in:
@@ -6,14 +6,32 @@ from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
from langchain_core.documents import Document
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from langchain_community.document_loaders import (
|
||||
PyPDFLoader,
|
||||
UnstructuredWordDocumentLoader,
|
||||
UnstructuredPowerPointLoader,
|
||||
PandasExcelLoader,
|
||||
UnstructuredImageLoader,
|
||||
UnstructuredODTLoader,
|
||||
)
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
# Dynamically import other loaders to handle optional dependencies
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
||||
except ImportError:
|
||||
UnstructuredWordDocumentLoader = None
|
||||
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredPowerPointLoader
|
||||
except ImportError:
|
||||
UnstructuredPowerPointLoader = None
|
||||
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredExcelLoader
|
||||
except ImportError:
|
||||
UnstructuredExcelLoader = None
|
||||
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredImageLoader
|
||||
except ImportError:
|
||||
UnstructuredImageLoader = None
|
||||
|
||||
try:
|
||||
from langchain_community.document_loaders import UnstructuredODTLoader
|
||||
except ImportError:
|
||||
UnstructuredODTLoader = None
|
||||
from sqlalchemy import create_engine, Column, Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
@@ -88,19 +106,35 @@ class DocumentEnricher:
|
||||
def _get_loader_for_extension(self, file_path: str):
|
||||
"""Get the appropriate loader for a given file extension."""
|
||||
ext = Path(file_path).suffix.lower()
|
||||
|
||||
|
||||
if ext == ".pdf":
|
||||
return PyPDFLoader(file_path)
|
||||
elif ext in [".docx", ".doc"]:
|
||||
return UnstructuredWordDocumentLoader(file_path)
|
||||
if UnstructuredWordDocumentLoader is None:
|
||||
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
|
||||
return None
|
||||
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext == ".pptx":
|
||||
return UnstructuredPowerPointLoader(file_path)
|
||||
if UnstructuredPowerPointLoader is None:
|
||||
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
|
||||
return None
|
||||
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext in [".xlsx", ".xls"]:
|
||||
return PandasExcelLoader(file_path)
|
||||
if UnstructuredExcelLoader is None:
|
||||
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
|
||||
return None
|
||||
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||
return UnstructuredImageLoader(file_path)
|
||||
if UnstructuredImageLoader is None:
|
||||
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
|
||||
return None
|
||||
# Use OCR strategy for images to extract text
|
||||
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
|
||||
elif ext == ".odt":
|
||||
return UnstructuredODTLoader(file_path)
|
||||
if UnstructuredODTLoader is None:
|
||||
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
|
||||
return None
|
||||
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
||||
else:
|
||||
# For text files and unsupported formats, try to load as text
|
||||
try:
|
||||
@@ -114,25 +148,25 @@ class DocumentEnricher:
|
||||
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
||||
"""Load documents from file paths and split them appropriately."""
|
||||
all_docs = []
|
||||
|
||||
|
||||
for file_path in file_paths:
|
||||
if self._is_document_processed(file_path):
|
||||
logger.info(f"Skipping already processed document: {file_path}")
|
||||
continue
|
||||
|
||||
|
||||
logger.info(f"Processing document: {file_path}")
|
||||
|
||||
|
||||
# Get the appropriate loader for the file extension
|
||||
loader = self._get_loader_for_extension(file_path)
|
||||
|
||||
|
||||
if loader is None:
|
||||
# For unsupported formats that we tried to load as text
|
||||
continue
|
||||
|
||||
|
||||
try:
|
||||
# Load the document(s)
|
||||
docs = loader.load()
|
||||
|
||||
|
||||
# Add metadata to each document
|
||||
for doc in docs:
|
||||
# Extract metadata from the original file
|
||||
@@ -140,46 +174,56 @@ class DocumentEnricher:
|
||||
doc.metadata["filename"] = Path(file_path).name
|
||||
doc.metadata["file_path"] = file_path
|
||||
doc.metadata["file_size"] = os.path.getsize(file_path)
|
||||
|
||||
|
||||
# Add page number if available in original metadata
|
||||
if "page" in doc.metadata:
|
||||
doc.metadata["page_number"] = doc.metadata["page"]
|
||||
|
||||
|
||||
# Add file extension as metadata
|
||||
doc.metadata["file_extension"] = Path(file_path).suffix
|
||||
|
||||
|
||||
# Split documents if they are too large
|
||||
split_docs = self.text_splitter.split_documents(docs)
|
||||
|
||||
|
||||
# Add to the collection
|
||||
all_docs.extend(split_docs)
|
||||
|
||||
# Mark document as processed
|
||||
self._mark_document_processed(file_path)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {str(e)}")
|
||||
continue
|
||||
|
||||
|
||||
return all_docs
|
||||
|
||||
def enrich_and_store(self, file_paths: List[str]):
|
||||
"""Load, enrich, and store documents in the vector store."""
|
||||
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
||||
|
||||
|
||||
# Load and split documents
|
||||
documents = self.load_and_split_documents(file_paths)
|
||||
|
||||
|
||||
if not documents:
|
||||
logger.info("No new documents to process.")
|
||||
return
|
||||
|
||||
|
||||
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
||||
|
||||
|
||||
# Add documents to vector store
|
||||
self.vector_store.add_documents(documents)
|
||||
|
||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store.")
|
||||
try:
|
||||
self.vector_store.add_documents(documents)
|
||||
|
||||
# Only mark documents as processed after successful insertion to vector store
|
||||
processed_file_paths = set()
|
||||
for doc in documents:
|
||||
if 'source' in doc.metadata:
|
||||
processed_file_paths.add(doc.metadata['source'])
|
||||
|
||||
for file_path in processed_file_paths:
|
||||
self._mark_document_processed(file_path)
|
||||
|
||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding documents to vector store: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
||||
|
||||
Reference in New Issue
Block a user