Compare commits
3 Commits
06a3155b6b
...
7b52887558
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b52887558 | |||
| 1e6ab247b9 | |||
| e9dd28ad55 |
BIN
services/rag/langchain/.DS_Store
vendored
BIN
services/rag/langchain/.DS_Store
vendored
Binary file not shown.
@@ -7,3 +7,6 @@ QDRANT_HOST=HOST
|
|||||||
QDRANT_REST_PORT=PORT
|
QDRANT_REST_PORT=PORT
|
||||||
QDRANT_GRPC_PORT=PORT
|
QDRANT_GRPC_PORT=PORT
|
||||||
YADISK_TOKEN=TOKEN
|
YADISK_TOKEN=TOKEN
|
||||||
|
ENRICHMENT_SOURCE=local/yadisk
|
||||||
|
ENRICHMENT_LOCAL_PATH=path
|
||||||
|
ENRICHMENT_YADISK_PATH=path
|
||||||
|
|||||||
@@ -73,3 +73,14 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
|||||||
- [x] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess
|
- [x] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess
|
||||||
- [x] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk.
|
- [x] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk.
|
||||||
- [x] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env.test has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done.
|
- [x] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env.test has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done.
|
||||||
|
|
||||||
|
# Phase 12 (using local file system or yandex disk)
|
||||||
|
|
||||||
|
During enrichment, we should use adaptive collection from the helpers, for loading documents. We should not use directly local filesystem, but use adaptive collection as a wrapper.
|
||||||
|
|
||||||
|
- [x] Adaptive file in helper now has filename in it, so tests should be adjusted for this
|
||||||
|
- [x] Add conditional usage of adaptive collection in the enrichment stage. .env has now variable ENRICHMENT_SOURCE with 2 possible values: yadisk, local
|
||||||
|
- [x] With local source, use env variable for local filesystem adaptive collection: ENRICHMENT_LOCAL_PATH
|
||||||
|
- [x] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system
|
||||||
|
- [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them.
|
||||||
|
- [x] Adaptive files has filename in them, so it should be used when extracting metadata
|
||||||
|
|||||||
@@ -37,15 +37,16 @@ def ping():
|
|||||||
name="enrich",
|
name="enrich",
|
||||||
help="Load documents from data directory and store in vector database",
|
help="Load documents from data directory and store in vector database",
|
||||||
)
|
)
|
||||||
@click.option("--data-dir", default="../../../data", help="Path to the data directory")
|
|
||||||
@click.option(
|
@click.option(
|
||||||
"--collection-name",
|
"--collection-name",
|
||||||
default="documents_langchain",
|
default="documents_langchain",
|
||||||
help="Name of the vector store collection",
|
help="Name of the vector store collection",
|
||||||
)
|
)
|
||||||
def enrich(data_dir, collection_name):
|
def enrich(collection_name):
|
||||||
"""Load documents from data directory and store in vector database"""
|
"""Load documents from data directory and store in vector database"""
|
||||||
logger.info(f"Starting enrichment process for directory: {data_dir}")
|
logger.info(
|
||||||
|
f"Starting enrichment process. Enrichment source: {os.getenv('ENRICHMENT_SOURCE')}"
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Import here to avoid circular dependencies
|
# Import here to avoid circular dependencies
|
||||||
@@ -56,7 +57,7 @@ def enrich(data_dir, collection_name):
|
|||||||
vector_store = initialize_vector_store(collection_name=collection_name)
|
vector_store = initialize_vector_store(collection_name=collection_name)
|
||||||
|
|
||||||
# Run enrichment process
|
# Run enrichment process
|
||||||
run_enrichment_process(vector_store, data_dir=data_dir)
|
run_enrichment_process(vector_store)
|
||||||
|
|
||||||
logger.info("Enrichment process completed successfully!")
|
logger.info("Enrichment process completed successfully!")
|
||||||
click.echo("Documents have been successfully loaded into the vector store.")
|
click.echo("Documents have been successfully loaded into the vector store.")
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
"""Document enrichment module for loading documents into vector storage."""
|
"""Document enrichment module for loading documents into vector storage."""
|
||||||
|
|
||||||
import os
|
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Dict, Any
|
from typing import Iterator, List, Tuple
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
from langchain_community.document_loaders import PyPDFLoader
|
||||||
from langchain_core.documents import Document
|
from langchain_core.documents import Document
|
||||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||||
from langchain_community.document_loaders import PyPDFLoader
|
|
||||||
# Dynamically import other loaders to handle optional dependencies
|
# Dynamically import other loaders to handle optional dependencies
|
||||||
try:
|
try:
|
||||||
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
||||||
@@ -33,13 +35,19 @@ try:
|
|||||||
from langchain_community.document_loaders import UnstructuredODTLoader
|
from langchain_community.document_loaders import UnstructuredODTLoader
|
||||||
except ImportError:
|
except ImportError:
|
||||||
UnstructuredODTLoader = None
|
UnstructuredODTLoader = None
|
||||||
from sqlalchemy import create_engine, Column, Integer, String
|
from loguru import logger
|
||||||
|
from sqlalchemy import Column, Integer, String, create_engine
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
from loguru import logger
|
|
||||||
import sqlite3
|
|
||||||
|
|
||||||
from helpers import extract_russian_event_names, extract_years_from_text
|
from helpers import (
|
||||||
|
LocalFilesystemAdaptiveCollection,
|
||||||
|
YandexDiskAdaptiveCollection,
|
||||||
|
_AdaptiveCollection,
|
||||||
|
_AdaptiveFile,
|
||||||
|
extract_russian_event_names,
|
||||||
|
extract_years_from_text,
|
||||||
|
)
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -48,14 +56,48 @@ load_dotenv()
|
|||||||
# Define the path to the data directory
|
# Define the path to the data directory
|
||||||
DATA_DIR = Path("../../../data").resolve()
|
DATA_DIR = Path("../../../data").resolve()
|
||||||
DB_PATH = Path("document_tracking.db").resolve()
|
DB_PATH = Path("document_tracking.db").resolve()
|
||||||
|
ENRICHMENT_SOURCE = os.getenv("ENRICHMENT_SOURCE", "local").lower()
|
||||||
|
ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH")
|
||||||
|
ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH")
|
||||||
|
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
|
||||||
|
|
||||||
|
SUPPORTED_EXTENSIONS = {
|
||||||
|
".pdf",
|
||||||
|
".docx",
|
||||||
|
".doc",
|
||||||
|
".pptx",
|
||||||
|
".xlsx",
|
||||||
|
".xls",
|
||||||
|
".jpg",
|
||||||
|
".jpeg",
|
||||||
|
".png",
|
||||||
|
".gif",
|
||||||
|
".bmp",
|
||||||
|
".tiff",
|
||||||
|
".webp",
|
||||||
|
".odt",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def try_guess_source(extension: str) -> str:
|
||||||
|
if extension in [".xlsx", "xls"]:
|
||||||
|
return "таблица"
|
||||||
|
elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||||
|
return "изображение"
|
||||||
|
elif extension in [".pptx"]:
|
||||||
|
return "презентация"
|
||||||
|
else:
|
||||||
|
return "документ"
|
||||||
|
|
||||||
|
|
||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
class ProcessedDocument(Base):
|
class ProcessedDocument(Base):
|
||||||
"""Database model for tracking processed documents."""
|
"""Database model for tracking processed documents."""
|
||||||
|
|
||||||
__tablename__ = "processed_documents"
|
__tablename__ = "processed_documents"
|
||||||
|
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
file_path = Column(String, unique=True, nullable=False)
|
file_path = Column(String, unique=True, nullable=False)
|
||||||
file_hash = Column(String, nullable=False)
|
file_hash = Column(String, nullable=False)
|
||||||
@@ -63,7 +105,7 @@ class ProcessedDocument(Base):
|
|||||||
|
|
||||||
class DocumentEnricher:
|
class DocumentEnricher:
|
||||||
"""Class responsible for enriching documents and loading them to vector storage."""
|
"""Class responsible for enriching documents and loading them to vector storage."""
|
||||||
|
|
||||||
def __init__(self, vector_store):
|
def __init__(self, vector_store):
|
||||||
self.vector_store = vector_store
|
self.vector_store = vector_store
|
||||||
self.text_splitter = RecursiveCharacterTextSplitter(
|
self.text_splitter = RecursiveCharacterTextSplitter(
|
||||||
@@ -71,17 +113,17 @@ class DocumentEnricher:
|
|||||||
chunk_overlap=200,
|
chunk_overlap=200,
|
||||||
length_function=len,
|
length_function=len,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize database for tracking processed documents
|
# Initialize database for tracking processed documents
|
||||||
self._init_db()
|
self._init_db()
|
||||||
|
|
||||||
def _init_db(self):
|
def _init_db(self):
|
||||||
"""Initialize the SQLite database for tracking processed documents."""
|
"""Initialize the SQLite database for tracking processed documents."""
|
||||||
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
||||||
Base.metadata.create_all(self.engine)
|
Base.metadata.create_all(self.engine)
|
||||||
Session = sessionmaker(bind=self.engine)
|
Session = sessionmaker(bind=self.engine)
|
||||||
self.session = Session()
|
self.session = Session()
|
||||||
|
|
||||||
def _get_file_hash(self, file_path: str) -> str:
|
def _get_file_hash(self, file_path: str) -> str:
|
||||||
"""Calculate SHA256 hash of a file."""
|
"""Calculate SHA256 hash of a file."""
|
||||||
hash_sha256 = hashlib.sha256()
|
hash_sha256 = hashlib.sha256()
|
||||||
@@ -90,25 +132,20 @@ class DocumentEnricher:
|
|||||||
for chunk in iter(lambda: f.read(4096), b""):
|
for chunk in iter(lambda: f.read(4096), b""):
|
||||||
hash_sha256.update(chunk)
|
hash_sha256.update(chunk)
|
||||||
return hash_sha256.hexdigest()
|
return hash_sha256.hexdigest()
|
||||||
|
|
||||||
def _is_document_processed(self, file_path: str) -> bool:
|
def _is_document_hash_processed(self, file_hash: str) -> bool:
|
||||||
"""Check if a document has already been processed."""
|
"""Check if a document hash has already been processed."""
|
||||||
file_hash = self._get_file_hash(file_path)
|
existing = (
|
||||||
existing = self.session.query(ProcessedDocument).filter_by(
|
self.session.query(ProcessedDocument).filter_by(file_hash=file_hash).first()
|
||||||
file_hash=file_hash
|
|
||||||
).first()
|
|
||||||
return existing is not None
|
|
||||||
|
|
||||||
def _mark_document_processed(self, file_path: str):
|
|
||||||
"""Mark a document as processed in the database."""
|
|
||||||
file_hash = self._get_file_hash(file_path)
|
|
||||||
doc_record = ProcessedDocument(
|
|
||||||
file_path=file_path,
|
|
||||||
file_hash=file_hash
|
|
||||||
)
|
)
|
||||||
|
return existing is not None
|
||||||
|
|
||||||
|
def _mark_document_processed(self, file_identifier: str, file_hash: str):
|
||||||
|
"""Mark a document as processed in the database."""
|
||||||
|
doc_record = ProcessedDocument(file_path=file_identifier, file_hash=file_hash)
|
||||||
self.session.add(doc_record)
|
self.session.add(doc_record)
|
||||||
self.session.commit()
|
self.session.commit()
|
||||||
|
|
||||||
def _get_loader_for_extension(self, file_path: str):
|
def _get_loader_for_extension(self, file_path: str):
|
||||||
"""Get the appropriate loader for a given file extension."""
|
"""Get the appropriate loader for a given file extension."""
|
||||||
ext = Path(file_path).suffix.lower()
|
ext = Path(file_path).suffix.lower()
|
||||||
@@ -117,173 +154,219 @@ class DocumentEnricher:
|
|||||||
return PyPDFLoader(file_path)
|
return PyPDFLoader(file_path)
|
||||||
elif ext in [".docx", ".doc"]:
|
elif ext in [".docx", ".doc"]:
|
||||||
if UnstructuredWordDocumentLoader is None:
|
if UnstructuredWordDocumentLoader is None:
|
||||||
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredWordDocumentLoader(
|
||||||
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
elif ext == ".pptx":
|
elif ext == ".pptx":
|
||||||
if UnstructuredPowerPointLoader is None:
|
if UnstructuredPowerPointLoader is None:
|
||||||
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredPowerPointLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredPowerPointLoader(
|
||||||
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
elif ext in [".xlsx", ".xls"]:
|
elif ext in [".xlsx", ".xls"]:
|
||||||
if UnstructuredExcelLoader is None:
|
if UnstructuredExcelLoader is None:
|
||||||
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredExcelLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredExcelLoader(
|
||||||
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||||
if UnstructuredImageLoader is None:
|
if UnstructuredImageLoader is None:
|
||||||
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredImageLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
# Use OCR strategy for images to extract text
|
# Use OCR strategy for images to extract text
|
||||||
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
|
return UnstructuredImageLoader(
|
||||||
|
file_path, **{"strategy": "ocr_only", "languages": ["rus"]}
|
||||||
|
)
|
||||||
elif ext == ".odt":
|
elif ext == ".odt":
|
||||||
if UnstructuredODTLoader is None:
|
if UnstructuredODTLoader is None:
|
||||||
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredODTLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredODTLoader(
|
||||||
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# For text files and unsupported formats, try to load as text
|
return None
|
||||||
try:
|
|
||||||
with open(file_path, 'r', encoding='utf-8') as f:
|
|
||||||
content = f.read()
|
|
||||||
return None, content # Return content directly for text processing
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
logger.warning(f"Could not decode file as text: {file_path}")
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
|
||||||
"""Load documents from file paths and split them appropriately."""
|
|
||||||
all_docs = []
|
|
||||||
|
|
||||||
for file_path in file_paths:
|
def _load_one_adaptive_file(
|
||||||
if self._is_document_processed(file_path):
|
self, adaptive_file: _AdaptiveFile
|
||||||
logger.info(f"Skipping already processed document: {file_path}")
|
) -> Tuple[List[Document], str | None]:
|
||||||
continue
|
"""Load and split one adaptive file by using its local working callback."""
|
||||||
|
loaded_docs: List[Document] = []
|
||||||
|
file_hash: str | None = None
|
||||||
|
source_identifier = try_guess_source(adaptive_file.extension)
|
||||||
|
extension = adaptive_file.extension.lower()
|
||||||
|
|
||||||
logger.info(f"Processing document: {file_path}")
|
def process_local_file(local_file_path: str):
|
||||||
|
nonlocal loaded_docs, file_hash
|
||||||
|
|
||||||
# Get the appropriate loader for the file extension
|
file_hash = self._get_file_hash(local_file_path)
|
||||||
loader = self._get_loader_for_extension(file_path)
|
if self._is_document_hash_processed(file_hash):
|
||||||
|
logger.info(
|
||||||
|
f"Skipping already processed document hash for: {source_identifier}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
loader = self._get_loader_for_extension(local_file_path)
|
||||||
if loader is None:
|
if loader is None:
|
||||||
# For unsupported formats that we tried to load as text
|
logger.warning(f"No loader available for file: {source_identifier}")
|
||||||
|
return
|
||||||
|
|
||||||
|
docs = loader.load()
|
||||||
|
for doc in docs:
|
||||||
|
doc.metadata["source"] = source_identifier
|
||||||
|
doc.metadata["filename"] = adaptive_file.filename
|
||||||
|
doc.metadata["file_path"] = source_identifier
|
||||||
|
doc.metadata["file_size"] = os.path.getsize(local_file_path)
|
||||||
|
doc.metadata["file_extension"] = extension
|
||||||
|
|
||||||
|
if "page" in doc.metadata:
|
||||||
|
doc.metadata["page_number"] = doc.metadata["page"]
|
||||||
|
|
||||||
|
split_docs = self.text_splitter.split_documents(docs)
|
||||||
|
for chunk in split_docs:
|
||||||
|
years = extract_years_from_text(chunk.page_content)
|
||||||
|
events = extract_russian_event_names(chunk.page_content)
|
||||||
|
chunk.metadata["years"] = years
|
||||||
|
chunk.metadata["events"] = events
|
||||||
|
|
||||||
|
loaded_docs = split_docs
|
||||||
|
|
||||||
|
adaptive_file.work_with_file_locally(process_local_file)
|
||||||
|
return loaded_docs, file_hash
|
||||||
|
|
||||||
|
def load_and_split_documents(
|
||||||
|
self, adaptive_collection: _AdaptiveCollection, recursive: bool = True
|
||||||
|
) -> Iterator[Tuple[List[Document], List[Tuple[str, str]]]]:
|
||||||
|
"""Load documents from adaptive collection and split them appropriately."""
|
||||||
|
docs_chunk: List[Document] = []
|
||||||
|
processed_file_records: dict[str, str] = {}
|
||||||
|
|
||||||
|
for adaptive_file in adaptive_collection.iterate(recursive=recursive):
|
||||||
|
if len(processed_file_records) >= 2:
|
||||||
|
yield docs_chunk, list(processed_file_records.items())
|
||||||
|
docs_chunk = []
|
||||||
|
processed_file_records = {}
|
||||||
|
|
||||||
|
if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
logger.info(f"Processing document: {adaptive_file.filename}")
|
||||||
try:
|
try:
|
||||||
# Load the document(s)
|
split_docs, file_hash = self._load_one_adaptive_file(adaptive_file)
|
||||||
docs = loader.load()
|
if split_docs:
|
||||||
|
docs_chunk.extend(split_docs)
|
||||||
# Add metadata to each document
|
if file_hash:
|
||||||
for doc in docs:
|
processed_file_records[adaptive_file.filename] = file_hash
|
||||||
# Extract metadata from the original file
|
|
||||||
doc.metadata["source"] = file_path
|
|
||||||
doc.metadata["filename"] = Path(file_path).name
|
|
||||||
doc.metadata["file_path"] = file_path
|
|
||||||
doc.metadata["file_size"] = os.path.getsize(file_path)
|
|
||||||
|
|
||||||
# Add page number if available in original metadata
|
|
||||||
if "page" in doc.metadata:
|
|
||||||
doc.metadata["page_number"] = doc.metadata["page"]
|
|
||||||
|
|
||||||
# Add file extension as metadata
|
|
||||||
doc.metadata["file_extension"] = Path(file_path).suffix
|
|
||||||
|
|
||||||
# Split documents if they are too large
|
|
||||||
split_docs = self.text_splitter.split_documents(docs)
|
|
||||||
|
|
||||||
# Extract additional metadata from each chunk.
|
|
||||||
for chunk in split_docs:
|
|
||||||
years = extract_years_from_text(chunk.page_content)
|
|
||||||
events = extract_russian_event_names(chunk.page_content)
|
|
||||||
chunk.metadata["years"] = years
|
|
||||||
chunk.metadata["events"] = events
|
|
||||||
|
|
||||||
# Add to the collection
|
|
||||||
all_docs.extend(split_docs)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing {file_path}: {str(e)}")
|
logger.error(f"Error processing {adaptive_file.filename}: {str(e)}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return all_docs
|
def enrich_and_store(self, adaptive_collection: _AdaptiveCollection):
|
||||||
|
|
||||||
def enrich_and_store(self, file_paths: List[str]):
|
|
||||||
"""Load, enrich, and store documents in the vector store."""
|
"""Load, enrich, and store documents in the vector store."""
|
||||||
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
logger.info("Starting enrichment process...")
|
||||||
|
|
||||||
# Load and split documents
|
# Load and split documents
|
||||||
documents = self.load_and_split_documents(file_paths)
|
for documents, processed_file_records in self.load_and_split_documents(
|
||||||
|
adaptive_collection
|
||||||
|
):
|
||||||
|
if not documents:
|
||||||
|
logger.info("No new documents to process.")
|
||||||
|
return
|
||||||
|
|
||||||
if not documents:
|
logger.info(
|
||||||
logger.info("No new documents to process.")
|
f"Loaded and split {len(documents)} document chunks, adding to vector store..."
|
||||||
return
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"Documents len: {len(documents)}, processed_file_records len: {len(processed_file_records)}"
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
# Add documents to vector store
|
||||||
|
try:
|
||||||
|
self.vector_store.add_documents(documents)
|
||||||
|
|
||||||
# Add documents to vector store
|
# Only mark documents as processed after successful insertion to vector store
|
||||||
try:
|
for file_identifier, file_hash in processed_file_records:
|
||||||
self.vector_store.add_documents(documents)
|
self._mark_document_processed(file_identifier, file_hash)
|
||||||
|
|
||||||
# Only mark documents as processed after successful insertion to vector store
|
logger.info(
|
||||||
processed_file_paths = set()
|
f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_records)} files as processed."
|
||||||
for doc in documents:
|
)
|
||||||
if 'source' in doc.metadata:
|
except Exception as e:
|
||||||
processed_file_paths.add(doc.metadata['source'])
|
logger.error(f"Error adding documents to vector store: {str(e)}")
|
||||||
|
raise
|
||||||
for file_path in processed_file_paths:
|
|
||||||
self._mark_document_processed(file_path)
|
|
||||||
|
|
||||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error adding documents to vector store: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
def get_enrichment_adaptive_collection() -> _AdaptiveCollection:
|
||||||
"""Get all supported document file paths from the data directory."""
|
"""Create adaptive collection based on environment source configuration."""
|
||||||
supported_extensions = {
|
source = ENRICHMENT_SOURCE
|
||||||
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
|
if source == "local":
|
||||||
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
|
local_path = ENRICHMENT_LOCAL_PATH
|
||||||
'.webp', '.odt'
|
if local_path is None:
|
||||||
}
|
raise RuntimeError(
|
||||||
|
"Enrichment strategy is local, but no ENRICHMENT_LOCAL_PATH is defined!"
|
||||||
file_paths = []
|
)
|
||||||
for root, dirs, files in os.walk(data_dir):
|
|
||||||
for file in files:
|
logger.info(f"Using local adaptive collection from path: {local_path}")
|
||||||
if Path(file).suffix.lower() in supported_extensions:
|
return LocalFilesystemAdaptiveCollection(local_path)
|
||||||
file_paths.append(os.path.join(root, file))
|
|
||||||
|
if source == "yadisk":
|
||||||
return file_paths
|
if not YADISK_TOKEN:
|
||||||
|
raise ValueError("YADISK_TOKEN must be set when ENRICHMENT_SOURCE=yadisk")
|
||||||
|
if not ENRICHMENT_YADISK_PATH:
|
||||||
|
raise ValueError(
|
||||||
|
"ENRICHMENT_YADISK_PATH must be set when ENRICHMENT_SOURCE=yadisk"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Using Yandex Disk adaptive collection from path: {ENRICHMENT_YADISK_PATH}"
|
||||||
|
)
|
||||||
|
return YandexDiskAdaptiveCollection(
|
||||||
|
token=YADISK_TOKEN,
|
||||||
|
base_dir=ENRICHMENT_YADISK_PATH,
|
||||||
|
)
|
||||||
|
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported ENRICHMENT_SOURCE='{source}'. Allowed values: local, yadisk"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
def run_enrichment_process(vector_store):
|
||||||
"""Run the full enrichment process."""
|
"""Run the full enrichment process."""
|
||||||
logger.info(f"Starting document enrichment from directory: {data_dir}")
|
logger.info("Starting document enrichment process")
|
||||||
|
|
||||||
# Get all supported documents from the data directory
|
adaptive_collection = get_enrichment_adaptive_collection()
|
||||||
file_paths = get_all_documents_from_data_dir(data_dir)
|
|
||||||
|
|
||||||
if not file_paths:
|
|
||||||
logger.warning(f"No supported documents found in {data_dir}")
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(f"Found {len(file_paths)} documents to process")
|
|
||||||
|
|
||||||
# Initialize the document enricher
|
# Initialize the document enricher
|
||||||
enricher = DocumentEnricher(vector_store)
|
enricher = DocumentEnricher(vector_store)
|
||||||
|
|
||||||
# Run the enrichment process
|
# Run the enrichment process
|
||||||
enricher.enrich_and_store(file_paths)
|
enricher.enrich_and_store(adaptive_collection)
|
||||||
|
|
||||||
logger.info("Document enrichment process completed!")
|
logger.info("Document enrichment process completed!")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example usage
|
# Example usage
|
||||||
from vector_storage import initialize_vector_store
|
from vector_storage import initialize_vector_store
|
||||||
|
|
||||||
# Initialize vector store
|
# Initialize vector store
|
||||||
vector_store = initialize_vector_store()
|
vector_store = initialize_vector_store()
|
||||||
|
|
||||||
# Run enrichment process
|
# Run enrichment process
|
||||||
run_enrichment_process(vector_store)
|
run_enrichment_process(vector_store)
|
||||||
|
|||||||
@@ -115,11 +115,11 @@ def extract_russian_event_names(text: str) -> List[str]:
|
|||||||
|
|
||||||
class _AdaptiveFile(ABC):
|
class _AdaptiveFile(ABC):
|
||||||
extension: str # Format: .jpg
|
extension: str # Format: .jpg
|
||||||
local_path: str
|
filename: str
|
||||||
|
|
||||||
def __init__(self, extension: str, local_path: str):
|
def __init__(self, filename: str, extension: str):
|
||||||
|
self.filename = filename
|
||||||
self.extension = extension
|
self.extension = extension
|
||||||
self.local_path = local_path
|
|
||||||
|
|
||||||
# This method allows to work with file locally, and lambda should be provided for this.
|
# This method allows to work with file locally, and lambda should be provided for this.
|
||||||
# Why separate method? For possible cleanup after work is done. And to download file, if needed
|
# Why separate method? For possible cleanup after work is done. And to download file, if needed
|
||||||
@@ -137,6 +137,12 @@ class _AdaptiveCollection(ABC):
|
|||||||
|
|
||||||
|
|
||||||
class LocalFilesystemAdaptiveFile(_AdaptiveFile):
|
class LocalFilesystemAdaptiveFile(_AdaptiveFile):
|
||||||
|
local_path: str
|
||||||
|
|
||||||
|
def __init__(self, filename: str, extension: str, local_path: str):
|
||||||
|
super().__init__(filename, extension)
|
||||||
|
self.local_path = local_path
|
||||||
|
|
||||||
def work_with_file_locally(self, func: Callable[[str], None]):
|
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||||
func(self.local_path)
|
func(self.local_path)
|
||||||
|
|
||||||
@@ -153,7 +159,8 @@ class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
|
|||||||
for root, dirs, files in os.walk(self.base_dir):
|
for root, dirs, files in os.walk(self.base_dir):
|
||||||
for file in files:
|
for file in files:
|
||||||
full_path = os.path.join(root, file)
|
full_path = os.path.join(root, file)
|
||||||
yield LocalFilesystemAdaptiveFile(Path(full_path).suffix, full_path)
|
p = Path(full_path)
|
||||||
|
yield LocalFilesystemAdaptiveFile(p.name, p.suffix, full_path)
|
||||||
|
|
||||||
if not recursive:
|
if not recursive:
|
||||||
break
|
break
|
||||||
@@ -162,16 +169,19 @@ class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
|
|||||||
class YandexDiskAdaptiveFile(_AdaptiveFile):
|
class YandexDiskAdaptiveFile(_AdaptiveFile):
|
||||||
"""Adaptive file representation for Yandex Disk resources."""
|
"""Adaptive file representation for Yandex Disk resources."""
|
||||||
|
|
||||||
def __init__(self, extension: str, local_path: str, token: str):
|
remote_path: str
|
||||||
super().__init__(extension, local_path)
|
|
||||||
|
def __init__(self, filename: str, extension: str, remote_path: str, token: str):
|
||||||
|
super().__init__(filename, extension)
|
||||||
self.token = token
|
self.token = token
|
||||||
|
self.remote_path = remote_path
|
||||||
|
|
||||||
def _download_to_temp_file(self) -> str:
|
def _download_to_temp_file(self) -> str:
|
||||||
headers = {"Authorization": f"OAuth {self.token}"}
|
headers = {"Authorization": f"OAuth {self.token}"}
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
"https://cloud-api.yandex.net/v1/disk/resources/download",
|
"https://cloud-api.yandex.net/v1/disk/resources/download",
|
||||||
headers=headers,
|
headers=headers,
|
||||||
params={"path": self.local_path},
|
params={"path": self.remote_path},
|
||||||
timeout=30,
|
timeout=30,
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
@@ -180,7 +190,8 @@ class YandexDiskAdaptiveFile(_AdaptiveFile):
|
|||||||
file_response = requests.get(href, timeout=120)
|
file_response = requests.get(href, timeout=120)
|
||||||
file_response.raise_for_status()
|
file_response.raise_for_status()
|
||||||
|
|
||||||
suffix = Path(self.local_path).suffix
|
p = Path(self.remote_path)
|
||||||
|
suffix = p.suffix
|
||||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
||||||
temp_file.write(file_response.content)
|
temp_file.write(file_response.content)
|
||||||
return temp_file.name
|
return temp_file.name
|
||||||
@@ -249,7 +260,8 @@ class YandexDiskAdaptiveCollection(_AdaptiveCollection):
|
|||||||
if root_info.get("type") == "file":
|
if root_info.get("type") == "file":
|
||||||
path = root_info["path"]
|
path = root_info["path"]
|
||||||
logger.info(f"Found file on Yandex Disk: {path}")
|
logger.info(f"Found file on Yandex Disk: {path}")
|
||||||
yield YandexDiskAdaptiveFile(Path(path).suffix, path, self.token)
|
p = Path(path)
|
||||||
|
yield YandexDiskAdaptiveFile(p.name, p.suffix, path, self.token)
|
||||||
return
|
return
|
||||||
|
|
||||||
directories = [root_path]
|
directories = [root_path]
|
||||||
@@ -257,11 +269,12 @@ class YandexDiskAdaptiveCollection(_AdaptiveCollection):
|
|||||||
current_dir = directories.pop(0)
|
current_dir = directories.pop(0)
|
||||||
for item in self._iter_children(current_dir):
|
for item in self._iter_children(current_dir):
|
||||||
item_type = item.get("type")
|
item_type = item.get("type")
|
||||||
item_path = item.get("path")
|
item_path = str(item.get("path"))
|
||||||
if item_type == "file":
|
if item_type == "file":
|
||||||
logger.info(f"Found file on Yandex Disk: {item_path}")
|
logger.info(f"Found file on Yandex Disk: {item_path}")
|
||||||
|
p = Path(item_path)
|
||||||
yield YandexDiskAdaptiveFile(
|
yield YandexDiskAdaptiveFile(
|
||||||
Path(item_path).suffix, item_path, self.token
|
p.name, p.suffix, item_path, self.token
|
||||||
)
|
)
|
||||||
elif recursive and item_type == "dir":
|
elif recursive and item_type == "dir":
|
||||||
directories.append(item_path)
|
directories.append(item_path)
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase):
|
|||||||
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
||||||
|
|
||||||
files = list(collection.iterate(recursive=False))
|
files = list(collection.iterate(recursive=False))
|
||||||
file_names = sorted(Path(file.local_path).name for file in files)
|
file_names = sorted(file.filename for file in files)
|
||||||
|
|
||||||
self.assertEqual(file_names, ["root.txt"])
|
self.assertEqual(file_names, ["root.txt"])
|
||||||
self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files))
|
self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files))
|
||||||
@@ -33,7 +33,9 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase):
|
|||||||
|
|
||||||
def test_work_with_file_locally_provides_existing_path(self):
|
def test_work_with_file_locally_provides_existing_path(self):
|
||||||
target_path = self.samples_dir / "root.txt"
|
target_path = self.samples_dir / "root.txt"
|
||||||
adaptive_file = LocalFilesystemAdaptiveFile(target_path.suffix, str(target_path))
|
adaptive_file = LocalFilesystemAdaptiveFile(
|
||||||
|
target_path.name, target_path.suffix, str(target_path)
|
||||||
|
)
|
||||||
|
|
||||||
observed = {}
|
observed = {}
|
||||||
|
|
||||||
@@ -44,6 +46,7 @@ class TestLocalFilesystemAdaptiveCollection(unittest.TestCase):
|
|||||||
|
|
||||||
adaptive_file.work_with_file_locally(callback)
|
adaptive_file.work_with_file_locally(callback)
|
||||||
|
|
||||||
|
self.assertEqual(adaptive_file.filename, "root.txt")
|
||||||
self.assertEqual(observed["path"], str(target_path))
|
self.assertEqual(observed["path"], str(target_path))
|
||||||
self.assertEqual(observed["content"], "root file")
|
self.assertEqual(observed["content"], "root file")
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ class TestYandexDiskAdaptiveCollection(unittest.TestCase):
|
|||||||
self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}")
|
self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}")
|
||||||
|
|
||||||
for item in files:
|
for item in files:
|
||||||
|
self.assertTrue(item.filename)
|
||||||
logger.info(f"Yandex file found during test iteration: {item.local_path}")
|
logger.info(f"Yandex file found during test iteration: {item.local_path}")
|
||||||
|
|
||||||
self.assertIsInstance(files, list)
|
self.assertIsInstance(files, list)
|
||||||
|
|||||||
Reference in New Issue
Block a user