Compare commits
12 Commits
effbc7d00f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 93d538ecc6 | |||
| f5659675ec | |||
| 7b52887558 | |||
| 1e6ab247b9 | |||
| e9dd28ad55 | |||
| 06a3155b6b | |||
| 63c3e2c5c7 | |||
| 447ecaba39 | |||
| ce62fd50ed | |||
| 2cb9b39bf2 | |||
| f9c47c772f | |||
| 0adbc29692 |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
data-unpacked-archives
|
||||||
|
data-broken-archives
|
||||||
BIN
services/rag/.DS_Store
vendored
BIN
services/rag/.DS_Store
vendored
Binary file not shown.
BIN
services/rag/langchain/.DS_Store
vendored
Normal file
BIN
services/rag/langchain/.DS_Store
vendored
Normal file
Binary file not shown.
@@ -6,3 +6,11 @@ CHAT_MODEL_STRATEGY=ollama
|
|||||||
QDRANT_HOST=HOST
|
QDRANT_HOST=HOST
|
||||||
QDRANT_REST_PORT=PORT
|
QDRANT_REST_PORT=PORT
|
||||||
QDRANT_GRPC_PORT=PORT
|
QDRANT_GRPC_PORT=PORT
|
||||||
|
YADISK_TOKEN=TOKEN
|
||||||
|
ENRICHMENT_SOURCE=local/yadisk
|
||||||
|
ENRICHMENT_LOCAL_PATH=path
|
||||||
|
ENRICHMENT_YADISK_PATH=path
|
||||||
|
ENRICHMENT_PROCESSING_MODE=async/sync
|
||||||
|
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT=5
|
||||||
|
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS=4
|
||||||
|
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS=4
|
||||||
|
|||||||
1
services/rag/langchain/.gitignore
vendored
1
services/rag/langchain/.gitignore
vendored
@@ -215,3 +215,4 @@ __marimo__/
|
|||||||
# Streamlit
|
# Streamlit
|
||||||
.streamlit/secrets.toml
|
.streamlit/secrets.toml
|
||||||
document_tracking.db
|
document_tracking.db
|
||||||
|
.env.test
|
||||||
|
|||||||
4
services/rag/langchain/FOOD_FOR_THOUGHT.md
Normal file
4
services/rag/langchain/FOOD_FOR_THOUGHT.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
# Things to remember but not to implement right away. Wanna-have, nice-to-have, look-into
|
||||||
|
|
||||||
|
- [x] Year. Extract year from the chunk, store it into metadata, then add filter when retrieving, if query too has a year.
|
||||||
|
- [ ] Rankers. Should we use rerankers in our pipeline? What does it mean. Get from the vector storage a lot more results, than needed. Rerank them with reranker model, then feed the limited rest further into our pipeline.
|
||||||
@@ -56,3 +56,48 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
|||||||
- [x] After accepting API endpont address, it should be used to send requests and process responses to imitate chat with the agent by the provided API endpoint.
|
- [x] After accepting API endpont address, it should be used to send requests and process responses to imitate chat with the agent by the provided API endpoint.
|
||||||
- [x] Show API endpoint in the header of the chat.
|
- [x] Show API endpoint in the header of the chat.
|
||||||
- [x] If there is error connecting with the API, imitate bot sending message about error with the connection and suggestion to reload page to provide new API endpoint
|
- [x] If there is error connecting with the API, imitate bot sending message about error with the connection and suggestion to reload page to provide new API endpoint
|
||||||
|
|
||||||
|
# Phase 10 (extracting additional metadata from chunks, and filtering where possible with it)
|
||||||
|
|
||||||
|
- [x] Create separate function in helpers module (create if does not exist) for retrieving years from the text. It should return found years.
|
||||||
|
- [x] During enriching vector storage, when loading and splitting documents, extract years from the chunk, and add these years as numbers into metadata field "years" (array of number or best suitable Qdrant type for searching by the year if needed). The helper function for retrieving years from text can be used.
|
||||||
|
- [x] Updating VectorStoreRetriever._get_relevant_documents: We need to ensure, that when searching for something with the year (user mentiones year in the query, in Russian language), we search vectors with metadata which has these mentioned year in the "years" array of years. The helper function for retrieving years from query can be used to filter out documents with years.
|
||||||
|
- [x] Create heuristic, regex function in helpers module for extracting name of event, in Russian language. We need to use regex and possible words before, after the event, etc.
|
||||||
|
- [x] Durint enriching vector storage, try to extract event name from the chunk and save in metadata in field "events", which will contain list of strings, possible evennts. Helper function usage is advised.
|
||||||
|
- [x] In VectorStoreRetriever._get_relevant_documents add similarity search for the event name, if event name is present in the query. Helper function should be used here to try to extract the event name.
|
||||||
|
|
||||||
|
# Phase 11 (adaptive collection, to attach different filesystems in the future)
|
||||||
|
|
||||||
|
- [x] Create adaptive collection class and adaptive file class in the helpers, which will be as abstract classes, that should encompass feature of iterating and working with files locally
|
||||||
|
- [x] Write local filesystem implementation of adaptive collection
|
||||||
|
- [x] Write tests for local filesystem implementation, using test/samples folder filled with files and directories for testing of iteration and recursivess
|
||||||
|
- [x] Create Yandex Disk implementation of the Adaptive Collection. Constructor should have requirement for TOKEN for Yandex Disk.
|
||||||
|
- [x] Write tests for Yandex Disk implementation, using folder "Общая/Информация". .env.test has YADISK_TOKEN variable for connecting. While testing log output of found files during iterating. If test fails at this step, leave to manual fixing, and this step can be marked as done.
|
||||||
|
|
||||||
|
# Phase 12 (using local file system or yandex disk)
|
||||||
|
|
||||||
|
During enrichment, we should use adaptive collection from the helpers, for loading documents. We should not use directly local filesystem, but use adaptive collection as a wrapper.
|
||||||
|
|
||||||
|
- [x] Adaptive file in helper now has filename in it, so tests should be adjusted for this
|
||||||
|
- [x] Add conditional usage of adaptive collection in the enrichment stage. .env has now variable ENRICHMENT_SOURCE with 2 possible values: yadisk, local
|
||||||
|
- [x] With local source, use env variable for local filesystem adaptive collection: ENRICHMENT_LOCAL_PATH
|
||||||
|
- [x] With yadisk source, use env variable for YADISK_TOKEN for token for auth within Yandex Disk, ENRICHMENT_YADISK_PATH for path on the Yandex Disk system
|
||||||
|
- [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them.
|
||||||
|
- [x] Adaptive files has filename in them, so it should be used when extracting metadata
|
||||||
|
|
||||||
|
|
||||||
|
# Phase 13 (async processing of files)
|
||||||
|
|
||||||
|
During this Phase we create asynchronous process of enrichment, utilizing async/await
|
||||||
|
|
||||||
|
- [x] Prepare enrichment to be async process, so adjust neede libraries, etc. that are needed to be processed.
|
||||||
|
- [x] Create queue for adaptive files. It will store adaptive files that needs to be processed
|
||||||
|
- [x] Create queue for documents that were taken from the adaptive files.
|
||||||
|
- [x] Create function that iterates through the adaptive collection and adds it to the adaptive files queue ADAPTIVE_FILES_QUEUE. Let's call it insert_adaptive_files_queue
|
||||||
|
- [x] Create function that takes adaptive file from the adaptive files queue (PROCESSED_DOCUMENTS_QUEUE) and processed it, by splitting into chunks of documents. Let's call it process_adaptive_files_queue
|
||||||
|
- [x] Create function that takes chunk of documents from the processed documents queue, and sends them into the vector storage. It marks document, of which these chunks, as processed in the local database (existing feature adapted here. Let's call it upload_processed_documents_from_queue
|
||||||
|
- [x] Utilize Python threading machinery, to create threads for several our functions. There will be environment variables: ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT (default 5), ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS (default 4), ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS (default 4)
|
||||||
|
- [x] Function insert_adaptive_files_queue would not be in a thread. It will iterate through adaptive collection and wait while queue has less than ENRICHMENT_ADAPTIVE_FILE_LOAD_QUEUE_LIMIT.
|
||||||
|
- [x] Function process_adaptive_files_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS)
|
||||||
|
- [x] Function upload_processed_documents_from_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS)
|
||||||
|
- [x] Program should control threads. Function insert_adaptive_files_queue, after adaptive collection ends, then should wait untill all theads finish. What does finish mean? It means when our insert_adaptive_files_queue function realizes that there is no adaptive files left in collection, it marks shared variable between threads, that collection finished. When our other functions in threads sees that this variable became true - they deplete queue and do not go to the next loop to wait for new items in queue, and just finish. This would eventually finish the program. Each thread finishes, and main program too as usual after processing all of things.
|
||||||
|
|||||||
@@ -21,7 +21,9 @@ from vector_storage import initialize_vector_store
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
def get_llm_model_info(llm_model: str = None) -> Tuple[str, str, str, str, str]:
|
def get_llm_model_info(
|
||||||
|
llm_model: Optional[str] = None,
|
||||||
|
) -> Tuple[str, str, str, str, str]:
|
||||||
"""
|
"""
|
||||||
Get LLM model information based on environment configuration.
|
Get LLM model information based on environment configuration.
|
||||||
|
|
||||||
@@ -121,7 +123,7 @@ class DocumentRetrievalTool(BaseTool):
|
|||||||
|
|
||||||
|
|
||||||
def create_chat_agent(
|
def create_chat_agent(
|
||||||
collection_name: str = "documents_langchain", llm_model: str = None
|
collection_name: str = "documents_langchain", llm_model: Optional[str] = None
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""
|
"""
|
||||||
Create a chat agent with document retrieval capabilities.
|
Create a chat agent with document retrieval capabilities.
|
||||||
@@ -177,7 +179,7 @@ def create_chat_agent(
|
|||||||
def chat_with_agent(
|
def chat_with_agent(
|
||||||
query: str,
|
query: str,
|
||||||
collection_name: str = "documents_langchain",
|
collection_name: str = "documents_langchain",
|
||||||
llm_model: str = None,
|
llm_model: Optional[str] = None,
|
||||||
history: List[BaseMessage] = None,
|
history: List[BaseMessage] = None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
from dotenv import load_dotenv
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
@@ -37,15 +37,16 @@ def ping():
|
|||||||
name="enrich",
|
name="enrich",
|
||||||
help="Load documents from data directory and store in vector database",
|
help="Load documents from data directory and store in vector database",
|
||||||
)
|
)
|
||||||
@click.option("--data-dir", default="../../../data", help="Path to the data directory")
|
|
||||||
@click.option(
|
@click.option(
|
||||||
"--collection-name",
|
"--collection-name",
|
||||||
default="documents_langchain",
|
default="documents_langchain",
|
||||||
help="Name of the vector store collection",
|
help="Name of the vector store collection",
|
||||||
)
|
)
|
||||||
def enrich(data_dir, collection_name):
|
def enrich(collection_name):
|
||||||
"""Load documents from data directory and store in vector database"""
|
"""Load documents from data directory and store in vector database"""
|
||||||
logger.info(f"Starting enrichment process for directory: {data_dir}")
|
logger.info(
|
||||||
|
f"Starting enrichment process. Enrichment source: {os.getenv('ENRICHMENT_SOURCE')}"
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Import here to avoid circular dependencies
|
# Import here to avoid circular dependencies
|
||||||
@@ -56,7 +57,7 @@ def enrich(data_dir, collection_name):
|
|||||||
vector_store = initialize_vector_store(collection_name=collection_name)
|
vector_store = initialize_vector_store(collection_name=collection_name)
|
||||||
|
|
||||||
# Run enrichment process
|
# Run enrichment process
|
||||||
run_enrichment_process(vector_store, data_dir=data_dir)
|
run_enrichment_process(vector_store)
|
||||||
|
|
||||||
logger.info("Enrichment process completed successfully!")
|
logger.info("Enrichment process completed successfully!")
|
||||||
click.echo("Documents have been successfully loaded into the vector store.")
|
click.echo("Documents have been successfully loaded into the vector store.")
|
||||||
@@ -85,36 +86,9 @@ def retrieve(query, collection_name, top_k):
|
|||||||
"""Retrieve documents from vector database based on a query"""
|
"""Retrieve documents from vector database based on a query"""
|
||||||
logger.info(f"Starting retrieval process for query: {query}")
|
logger.info(f"Starting retrieval process for query: {query}")
|
||||||
|
|
||||||
try:
|
click.echo(
|
||||||
# Import here to avoid circular dependencies
|
"WARNING: Retrieval disabled, since it is no longer relevant for the testing of the retrieving feature. Use chat with agent instead. xoxo"
|
||||||
from retrieval import search_documents_with_metadata
|
)
|
||||||
|
|
||||||
# Perform retrieval
|
|
||||||
results = search_documents_with_metadata(
|
|
||||||
query=query,
|
|
||||||
collection_name=collection_name,
|
|
||||||
top_k=top_k
|
|
||||||
)
|
|
||||||
|
|
||||||
if not results:
|
|
||||||
click.echo("No relevant documents found for the query.")
|
|
||||||
return
|
|
||||||
|
|
||||||
click.echo(f"Found {len(results)} relevant documents:\n")
|
|
||||||
|
|
||||||
for i, result in enumerate(results, 1):
|
|
||||||
click.echo(f"{i}. Source: {result['source']}")
|
|
||||||
click.echo(f" Filename: {result['filename']}")
|
|
||||||
click.echo(f" Page: {result['page_number']}")
|
|
||||||
click.echo(f" File Extension: {result['file_extension']}")
|
|
||||||
click.echo(f" Content Preview: {result['content'][:200]}...")
|
|
||||||
click.echo(f" Metadata: {result['metadata']}\n")
|
|
||||||
|
|
||||||
logger.info("Retrieval process completed successfully!")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error during retrieval process: {str(e)}")
|
|
||||||
click.echo(f"Error: {str(e)}")
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(
|
@cli.command(
|
||||||
@@ -143,10 +117,7 @@ def chat(collection_name, model):
|
|||||||
click.echo("Type 'quit' or 'exit' to end the conversation.\n")
|
click.echo("Type 'quit' or 'exit' to end the conversation.\n")
|
||||||
|
|
||||||
# Run the interactive chat loop
|
# Run the interactive chat loop
|
||||||
run_chat_loop(
|
run_chat_loop(collection_name=collection_name, llm_model=model)
|
||||||
collection_name=collection_name,
|
|
||||||
llm_model=model
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info("Chat session ended")
|
logger.info("Chat session ended")
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,21 @@
|
|||||||
"""Document enrichment module for loading documents into vector storage."""
|
"""Document enrichment module for loading documents into vector storage."""
|
||||||
|
|
||||||
import os
|
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Dict, Any
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
from langchain_community.document_loaders import PyPDFLoader
|
||||||
from langchain_core.documents import Document
|
from langchain_core.documents import Document
|
||||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||||
from langchain_community.document_loaders import PyPDFLoader
|
from loguru import logger
|
||||||
|
from sqlalchemy import Column, Integer, String, create_engine
|
||||||
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
# Dynamically import other loaders to handle optional dependencies
|
# Dynamically import other loaders to handle optional dependencies
|
||||||
try:
|
try:
|
||||||
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
|
||||||
@@ -33,25 +41,63 @@ try:
|
|||||||
from langchain_community.document_loaders import UnstructuredODTLoader
|
from langchain_community.document_loaders import UnstructuredODTLoader
|
||||||
except ImportError:
|
except ImportError:
|
||||||
UnstructuredODTLoader = None
|
UnstructuredODTLoader = None
|
||||||
from sqlalchemy import create_engine, Column, Integer, String
|
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from helpers import (
|
||||||
from sqlalchemy.orm import sessionmaker
|
LocalFilesystemAdaptiveCollection,
|
||||||
from loguru import logger
|
YandexDiskAdaptiveCollection,
|
||||||
import sqlite3
|
YandexDiskAdaptiveFile,
|
||||||
|
_AdaptiveCollection,
|
||||||
|
_AdaptiveFile,
|
||||||
|
extract_russian_event_names,
|
||||||
|
extract_years_from_text,
|
||||||
|
)
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
# Define the path to the data directory
|
# Define the path to the data directory
|
||||||
DATA_DIR = Path("../../../data").resolve()
|
DATA_DIR = Path("../../../data").resolve()
|
||||||
DB_PATH = Path("document_tracking.db").resolve()
|
DB_PATH = Path("document_tracking.db").resolve()
|
||||||
|
ENRICHMENT_SOURCE = os.getenv("ENRICHMENT_SOURCE", "local").lower()
|
||||||
|
ENRICHMENT_LOCAL_PATH = os.getenv("ENRICHMENT_LOCAL_PATH")
|
||||||
|
ENRICHMENT_YADISK_PATH = os.getenv("ENRICHMENT_YADISK_PATH")
|
||||||
|
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
|
||||||
|
|
||||||
|
ENRICHMENT_PROCESSING_MODE = os.getenv("ENRICHMENT_PROCESSING_MODE", "async").lower()
|
||||||
|
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT = int(
|
||||||
|
os.getenv("ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT", "5")
|
||||||
|
)
|
||||||
|
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS = int(
|
||||||
|
os.getenv("ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS", "4")
|
||||||
|
)
|
||||||
|
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS = int(
|
||||||
|
os.getenv("ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS", "4")
|
||||||
|
)
|
||||||
|
|
||||||
|
SUPPORTED_EXTENSIONS = {
|
||||||
|
".pdf",
|
||||||
|
".docx",
|
||||||
|
".doc",
|
||||||
|
".pptx",
|
||||||
|
".xlsx",
|
||||||
|
".xls",
|
||||||
|
".jpg",
|
||||||
|
".jpeg",
|
||||||
|
".png",
|
||||||
|
".gif",
|
||||||
|
".bmp",
|
||||||
|
".tiff",
|
||||||
|
".webp",
|
||||||
|
".odt",
|
||||||
|
".txt", # this one is obvious but was unexpected to see in data lol
|
||||||
|
}
|
||||||
|
|
||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
class ProcessedDocument(Base):
|
class ProcessedDocument(Base):
|
||||||
"""Database model for tracking processed documents."""
|
"""Database model for tracking processed documents."""
|
||||||
|
|
||||||
__tablename__ = "processed_documents"
|
__tablename__ = "processed_documents"
|
||||||
|
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
@@ -59,6 +105,25 @@ class ProcessedDocument(Base):
|
|||||||
file_hash = Column(String, nullable=False)
|
file_hash = Column(String, nullable=False)
|
||||||
|
|
||||||
|
|
||||||
|
# to guess the filetype in russian language, for searching it
|
||||||
|
def try_guess_file_type(extension: str) -> str:
|
||||||
|
if extension in [".xlsx", "xls"]:
|
||||||
|
return "таблица"
|
||||||
|
elif extension in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||||
|
return "изображение"
|
||||||
|
elif extension in [".pptx"]:
|
||||||
|
return "презентация"
|
||||||
|
else:
|
||||||
|
return "документ"
|
||||||
|
|
||||||
|
|
||||||
|
def identify_adaptive_file_source(adaptive_file: _AdaptiveFile) -> str:
|
||||||
|
if isinstance(adaptive_file, YandexDiskAdaptiveFile):
|
||||||
|
return "Яндекс Диск"
|
||||||
|
else:
|
||||||
|
return "Локальный Файл"
|
||||||
|
|
||||||
|
|
||||||
class DocumentEnricher:
|
class DocumentEnricher:
|
||||||
"""Class responsible for enriching documents and loading them to vector storage."""
|
"""Class responsible for enriching documents and loading them to vector storage."""
|
||||||
|
|
||||||
@@ -70,6 +135,34 @@ class DocumentEnricher:
|
|||||||
length_function=len,
|
length_function=len,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# In sync mode we force minimal concurrency values.
|
||||||
|
if ENRICHMENT_PROCESSING_MODE == "sync":
|
||||||
|
self.adaptive_files_queue_limit = 1
|
||||||
|
self.file_process_threads_count = 1
|
||||||
|
self.document_upload_threads_count = 1
|
||||||
|
else:
|
||||||
|
self.adaptive_files_queue_limit = max(
|
||||||
|
1, ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT
|
||||||
|
)
|
||||||
|
self.file_process_threads_count = max(
|
||||||
|
1, ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS
|
||||||
|
)
|
||||||
|
self.document_upload_threads_count = max(
|
||||||
|
1, ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 13 queues
|
||||||
|
self.ADAPTIVE_FILES_QUEUE: queue.Queue = queue.Queue(
|
||||||
|
maxsize=self.adaptive_files_queue_limit
|
||||||
|
)
|
||||||
|
self.PROCESSED_DOCUMENTS_QUEUE: queue.Queue = queue.Queue(
|
||||||
|
maxsize=max(1, self.adaptive_files_queue_limit * 2)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Shared state for thread lifecycle
|
||||||
|
self.collection_finished = threading.Event()
|
||||||
|
self.processing_finished = threading.Event()
|
||||||
|
|
||||||
# Initialize database for tracking processed documents
|
# Initialize database for tracking processed documents
|
||||||
self._init_db()
|
self._init_db()
|
||||||
|
|
||||||
@@ -77,35 +170,45 @@ class DocumentEnricher:
|
|||||||
"""Initialize the SQLite database for tracking processed documents."""
|
"""Initialize the SQLite database for tracking processed documents."""
|
||||||
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
self.engine = create_engine(f"sqlite:///{DB_PATH}")
|
||||||
Base.metadata.create_all(self.engine)
|
Base.metadata.create_all(self.engine)
|
||||||
Session = sessionmaker(bind=self.engine)
|
self.SessionLocal = sessionmaker(bind=self.engine)
|
||||||
self.session = Session()
|
|
||||||
|
|
||||||
def _get_file_hash(self, file_path: str) -> str:
|
def _get_file_hash(self, file_path: str) -> str:
|
||||||
"""Calculate SHA256 hash of a file."""
|
"""Calculate SHA256 hash of a file."""
|
||||||
hash_sha256 = hashlib.sha256()
|
hash_sha256 = hashlib.sha256()
|
||||||
with open(file_path, "rb") as f:
|
with open(file_path, "rb") as file_handle:
|
||||||
# Read file in chunks to handle large files
|
for chunk in iter(lambda: file_handle.read(4096), b""):
|
||||||
for chunk in iter(lambda: f.read(4096), b""):
|
|
||||||
hash_sha256.update(chunk)
|
hash_sha256.update(chunk)
|
||||||
return hash_sha256.hexdigest()
|
return hash_sha256.hexdigest()
|
||||||
|
|
||||||
def _is_document_processed(self, file_path: str) -> bool:
|
def _is_document_hash_processed(self, file_hash: str) -> bool:
|
||||||
"""Check if a document has already been processed."""
|
"""Check if a document hash has already been processed."""
|
||||||
file_hash = self._get_file_hash(file_path)
|
session = self.SessionLocal()
|
||||||
existing = self.session.query(ProcessedDocument).filter_by(
|
try:
|
||||||
file_hash=file_hash
|
existing = (
|
||||||
).first()
|
session.query(ProcessedDocument).filter_by(file_hash=file_hash).first()
|
||||||
return existing is not None
|
)
|
||||||
|
return existing is not None
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
def _mark_document_processed(self, file_path: str):
|
def _mark_document_processed(self, file_identifier: str, file_hash: str):
|
||||||
"""Mark a document as processed in the database."""
|
"""Mark a document as processed in the database."""
|
||||||
file_hash = self._get_file_hash(file_path)
|
session = self.SessionLocal()
|
||||||
doc_record = ProcessedDocument(
|
try:
|
||||||
file_path=file_path,
|
existing = (
|
||||||
file_hash=file_hash
|
session.query(ProcessedDocument)
|
||||||
)
|
.filter_by(file_path=file_identifier)
|
||||||
self.session.add(doc_record)
|
.first()
|
||||||
self.session.commit()
|
)
|
||||||
|
if existing is not None:
|
||||||
|
existing.file_hash = file_hash
|
||||||
|
else:
|
||||||
|
session.add(
|
||||||
|
ProcessedDocument(file_path=file_identifier, file_hash=file_hash)
|
||||||
|
)
|
||||||
|
session.commit()
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
def _get_loader_for_extension(self, file_path: str):
|
def _get_loader_for_extension(self, file_path: str):
|
||||||
"""Get the appropriate loader for a given file extension."""
|
"""Get the appropriate loader for a given file extension."""
|
||||||
@@ -113,168 +216,275 @@ class DocumentEnricher:
|
|||||||
|
|
||||||
if ext == ".pdf":
|
if ext == ".pdf":
|
||||||
return PyPDFLoader(file_path)
|
return PyPDFLoader(file_path)
|
||||||
elif ext in [".docx", ".doc"]:
|
if ext in [".docx", ".doc"]:
|
||||||
if UnstructuredWordDocumentLoader is None:
|
if UnstructuredWordDocumentLoader is None:
|
||||||
logger.warning(f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredWordDocumentLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredWordDocumentLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredWordDocumentLoader(
|
||||||
elif ext == ".pptx":
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
|
if ext == ".pptx":
|
||||||
if UnstructuredPowerPointLoader is None:
|
if UnstructuredPowerPointLoader is None:
|
||||||
logger.warning(f"UnstructuredPowerPointLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredPowerPointLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredPowerPointLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredPowerPointLoader(
|
||||||
elif ext in [".xlsx", ".xls"]:
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
|
if ext in [".xlsx", ".xls"]:
|
||||||
if UnstructuredExcelLoader is None:
|
if UnstructuredExcelLoader is None:
|
||||||
logger.warning(f"UnstructuredExcelLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredExcelLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredExcelLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredExcelLoader(
|
||||||
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
|
)
|
||||||
|
if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
||||||
if UnstructuredImageLoader is None:
|
if UnstructuredImageLoader is None:
|
||||||
logger.warning(f"UnstructuredImageLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredImageLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
# Use OCR strategy for images to extract text
|
return UnstructuredImageLoader(
|
||||||
return UnstructuredImageLoader(file_path, **{"strategy": "ocr_only", "languages": ["rus"]})
|
file_path, **{"strategy": "ocr_only", "languages": ["rus"]}
|
||||||
elif ext == ".odt":
|
)
|
||||||
|
if ext == ".odt":
|
||||||
if UnstructuredODTLoader is None:
|
if UnstructuredODTLoader is None:
|
||||||
logger.warning(f"UnstructuredODTLoader not available for {file_path}. Skipping.")
|
logger.warning(
|
||||||
|
f"UnstructuredODTLoader not available for {file_path}. Skipping."
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
return UnstructuredODTLoader(file_path, **{"strategy": "hi_res", "languages": ["rus"]})
|
return UnstructuredODTLoader(
|
||||||
else:
|
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
|
||||||
# For text files and unsupported formats, try to load as text
|
)
|
||||||
try:
|
return None
|
||||||
with open(file_path, 'r', encoding='utf-8') as f:
|
|
||||||
content = f.read()
|
|
||||||
return None, content # Return content directly for text processing
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
logger.warning(f"Could not decode file as text: {file_path}")
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
def load_and_split_documents(self, file_paths: List[str]) -> List[Document]:
|
def _load_one_adaptive_file(
|
||||||
"""Load documents from file paths and split them appropriately."""
|
self, adaptive_file: _AdaptiveFile
|
||||||
all_docs = []
|
) -> Tuple[List[Document], Optional[Tuple[str, str]]]:
|
||||||
|
"""Load and split one adaptive file by using its local working callback."""
|
||||||
|
loaded_docs: List[Document] = []
|
||||||
|
processed_record: Optional[Tuple[str, str]] = None
|
||||||
|
source_identifier = identify_adaptive_file_source(adaptive_file)
|
||||||
|
extension = adaptive_file.extension.lower()
|
||||||
|
file_type = try_guess_file_type(extension)
|
||||||
|
|
||||||
for file_path in file_paths:
|
def process_local_file(local_file_path: str):
|
||||||
if self._is_document_processed(file_path):
|
nonlocal loaded_docs, processed_record
|
||||||
logger.info(f"Skipping already processed document: {file_path}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.info(f"Processing document: {file_path}")
|
file_hash = self._get_file_hash(local_file_path)
|
||||||
|
if self._is_document_hash_processed(file_hash):
|
||||||
# Get the appropriate loader for the file extension
|
logger.info(
|
||||||
loader = self._get_loader_for_extension(file_path)
|
f"SKIPPING already processed document hash for: {source_identifier}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
logger.info("Document is not processed! Doing it")
|
||||||
|
|
||||||
|
loader = self._get_loader_for_extension(local_file_path)
|
||||||
if loader is None:
|
if loader is None:
|
||||||
# For unsupported formats that we tried to load as text
|
logger.warning(f"No loader available for file: {source_identifier}")
|
||||||
|
return
|
||||||
|
|
||||||
|
docs = loader.load()
|
||||||
|
for doc in docs:
|
||||||
|
doc.metadata["file_type"] = file_type
|
||||||
|
doc.metadata["source"] = source_identifier
|
||||||
|
doc.metadata["filename"] = adaptive_file.filename
|
||||||
|
doc.metadata["file_path"] = source_identifier
|
||||||
|
doc.metadata["file_size"] = os.path.getsize(local_file_path)
|
||||||
|
doc.metadata["file_extension"] = extension
|
||||||
|
|
||||||
|
if "page" in doc.metadata:
|
||||||
|
doc.metadata["page_number"] = doc.metadata["page"]
|
||||||
|
|
||||||
|
split_docs = self.text_splitter.split_documents(docs)
|
||||||
|
for chunk in split_docs:
|
||||||
|
chunk.metadata["years"] = extract_years_from_text(chunk.page_content)
|
||||||
|
chunk.metadata["events"] = extract_russian_event_names(
|
||||||
|
chunk.page_content
|
||||||
|
)
|
||||||
|
|
||||||
|
loaded_docs = split_docs
|
||||||
|
processed_record = (source_identifier, file_hash)
|
||||||
|
|
||||||
|
adaptive_file.work_with_file_locally(process_local_file)
|
||||||
|
return loaded_docs, processed_record
|
||||||
|
|
||||||
|
# Phase 13 API: inserts adaptive files into ADAPTIVE_FILES_QUEUE
|
||||||
|
def insert_adaptive_files_queue(
|
||||||
|
self, adaptive_collection: _AdaptiveCollection, recursive: bool = True
|
||||||
|
):
|
||||||
|
for adaptive_file in adaptive_collection.iterate(recursive=recursive):
|
||||||
|
if adaptive_file.extension.lower() not in SUPPORTED_EXTENSIONS:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping unsupported file extension for {adaptive_file.filename}: {adaptive_file.extension}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.ADAPTIVE_FILES_QUEUE.put(adaptive_file)
|
||||||
|
|
||||||
|
logger.debug("ADAPTIVE COLLECTION DEPLETED!")
|
||||||
|
self.collection_finished.set()
|
||||||
|
|
||||||
|
# Phase 13 API: reads adaptive files and writes processed docs into PROCESSED_DOCUMENTS_QUEUE
|
||||||
|
def process_adaptive_files_queue(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
adaptive_file = self.ADAPTIVE_FILES_QUEUE.get(timeout=0.2)
|
||||||
|
except queue.Empty:
|
||||||
|
if self.collection_finished.is_set():
|
||||||
|
return
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Load the document(s)
|
split_docs, processed_record = self._load_one_adaptive_file(
|
||||||
docs = loader.load()
|
adaptive_file
|
||||||
|
)
|
||||||
|
if split_docs:
|
||||||
|
self.PROCESSED_DOCUMENTS_QUEUE.put((split_docs, processed_record))
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(f"Error processing {adaptive_file.filename}: {error}")
|
||||||
|
finally:
|
||||||
|
self.ADAPTIVE_FILES_QUEUE.task_done()
|
||||||
|
|
||||||
# Add metadata to each document
|
# Phase 13 API: uploads chunked docs and marks file processed
|
||||||
for doc in docs:
|
def upload_processed_documents_from_queue(self):
|
||||||
# Extract metadata from the original file
|
while True:
|
||||||
doc.metadata["source"] = file_path
|
try:
|
||||||
doc.metadata["filename"] = Path(file_path).name
|
payload = self.PROCESSED_DOCUMENTS_QUEUE.get(timeout=0.2)
|
||||||
doc.metadata["file_path"] = file_path
|
except queue.Empty:
|
||||||
doc.metadata["file_size"] = os.path.getsize(file_path)
|
if self.processing_finished.is_set():
|
||||||
|
return
|
||||||
# Add page number if available in original metadata
|
|
||||||
if "page" in doc.metadata:
|
|
||||||
doc.metadata["page_number"] = doc.metadata["page"]
|
|
||||||
|
|
||||||
# Add file extension as metadata
|
|
||||||
doc.metadata["file_extension"] = Path(file_path).suffix
|
|
||||||
|
|
||||||
# Split documents if they are too large
|
|
||||||
split_docs = self.text_splitter.split_documents(docs)
|
|
||||||
|
|
||||||
# Add to the collection
|
|
||||||
all_docs.extend(split_docs)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error processing {file_path}: {str(e)}")
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return all_docs
|
try:
|
||||||
|
documents, processed_record = payload
|
||||||
|
self.vector_store.add_documents(documents)
|
||||||
|
|
||||||
def enrich_and_store(self, file_paths: List[str]):
|
if processed_record is not None:
|
||||||
|
self._mark_document_processed(
|
||||||
|
processed_record[0], processed_record[1]
|
||||||
|
)
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"Error uploading processed documents: {error}. But swallowing error. NOT raising."
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
self.PROCESSED_DOCUMENTS_QUEUE.task_done()
|
||||||
|
|
||||||
|
def _run_threaded_pipeline(self, adaptive_collection: _AdaptiveCollection):
|
||||||
|
"""Run Phase 13 queue/thread pipeline."""
|
||||||
|
process_threads = [
|
||||||
|
threading.Thread(
|
||||||
|
target=self.process_adaptive_files_queue,
|
||||||
|
name=f"adaptive-file-processor-{index}",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
for index in range(self.file_process_threads_count)
|
||||||
|
]
|
||||||
|
upload_threads = [
|
||||||
|
threading.Thread(
|
||||||
|
target=self.upload_processed_documents_from_queue,
|
||||||
|
name=f"document-uploader-{index}",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
for index in range(self.document_upload_threads_count)
|
||||||
|
]
|
||||||
|
|
||||||
|
for thread in process_threads:
|
||||||
|
thread.start()
|
||||||
|
for thread in upload_threads:
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# This one intentionally runs on main thread per Phase 13 requirement.
|
||||||
|
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
|
||||||
|
|
||||||
|
# Wait file queue completion and processing threads end.
|
||||||
|
self.ADAPTIVE_FILES_QUEUE.join()
|
||||||
|
for thread in process_threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
# Signal upload workers no more payload is expected.
|
||||||
|
self.processing_finished.set()
|
||||||
|
|
||||||
|
# Wait upload completion and upload threads end.
|
||||||
|
self.PROCESSED_DOCUMENTS_QUEUE.join()
|
||||||
|
for thread in upload_threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
def _run_sync_pipeline(self, adaptive_collection: _AdaptiveCollection):
|
||||||
|
"""Sequential pipeline for sync mode."""
|
||||||
|
logger.info("Running enrichment in sync mode")
|
||||||
|
self.insert_adaptive_files_queue(adaptive_collection, recursive=True)
|
||||||
|
self.process_adaptive_files_queue()
|
||||||
|
self.processing_finished.set()
|
||||||
|
self.upload_processed_documents_from_queue()
|
||||||
|
|
||||||
|
def enrich_and_store(self, adaptive_collection: _AdaptiveCollection):
|
||||||
"""Load, enrich, and store documents in the vector store."""
|
"""Load, enrich, and store documents in the vector store."""
|
||||||
logger.info(f"Starting enrichment process for {len(file_paths)} files...")
|
logger.info("Starting enrichment process...")
|
||||||
|
|
||||||
# Load and split documents
|
if ENRICHMENT_PROCESSING_MODE == "sync":
|
||||||
documents = self.load_and_split_documents(file_paths)
|
logger.info("Document enrichment process starting in SYNC mode")
|
||||||
|
self._run_sync_pipeline(adaptive_collection)
|
||||||
if not documents:
|
|
||||||
logger.info("No new documents to process.")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"Loaded and split {len(documents)} document chunks, adding to vector store...")
|
logger.info("Document enrichment process starting in ASYNC/THREAD mode")
|
||||||
|
self._run_threaded_pipeline(adaptive_collection)
|
||||||
# Add documents to vector store
|
|
||||||
try:
|
|
||||||
self.vector_store.add_documents(documents)
|
|
||||||
|
|
||||||
# Only mark documents as processed after successful insertion to vector store
|
|
||||||
processed_file_paths = set()
|
|
||||||
for doc in documents:
|
|
||||||
if 'source' in doc.metadata:
|
|
||||||
processed_file_paths.add(doc.metadata['source'])
|
|
||||||
|
|
||||||
for file_path in processed_file_paths:
|
|
||||||
self._mark_document_processed(file_path)
|
|
||||||
|
|
||||||
logger.info(f"Successfully added {len(documents)} document chunks to vector store and marked {len(processed_file_paths)} files as processed.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error adding documents to vector store: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def get_all_documents_from_data_dir(data_dir: str = str(DATA_DIR)) -> List[str]:
|
def get_enrichment_adaptive_collection(
|
||||||
"""Get all supported document file paths from the data directory."""
|
data_dir: str = str(DATA_DIR),
|
||||||
supported_extensions = {
|
) -> _AdaptiveCollection:
|
||||||
'.pdf', '.docx', '.doc', '.pptx', '.xlsx', '.xls',
|
"""Create adaptive collection based on environment source configuration."""
|
||||||
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff',
|
source = ENRICHMENT_SOURCE
|
||||||
'.webp', '.odt'
|
if source == "local":
|
||||||
}
|
local_path = ENRICHMENT_LOCAL_PATH or data_dir
|
||||||
|
logger.info(f"Using local adaptive collection from path: {local_path}")
|
||||||
|
return LocalFilesystemAdaptiveCollection(local_path)
|
||||||
|
|
||||||
file_paths = []
|
if source == "yadisk":
|
||||||
for root, dirs, files in os.walk(data_dir):
|
if not YADISK_TOKEN:
|
||||||
for file in files:
|
raise ValueError("YADISK_TOKEN must be set when ENRICHMENT_SOURCE=yadisk")
|
||||||
if Path(file).suffix.lower() in supported_extensions:
|
if not ENRICHMENT_YADISK_PATH:
|
||||||
file_paths.append(os.path.join(root, file))
|
raise ValueError(
|
||||||
|
"ENRICHMENT_YADISK_PATH must be set when ENRICHMENT_SOURCE=yadisk"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Using Yandex Disk adaptive collection from path: {ENRICHMENT_YADISK_PATH}"
|
||||||
|
)
|
||||||
|
return YandexDiskAdaptiveCollection(
|
||||||
|
token=YADISK_TOKEN,
|
||||||
|
base_dir=ENRICHMENT_YADISK_PATH,
|
||||||
|
)
|
||||||
|
|
||||||
return file_paths
|
raise ValueError(
|
||||||
|
f"Unsupported ENRICHMENT_SOURCE='{source}'. Allowed values: local, yadisk"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
def run_enrichment_process(vector_store, data_dir: str = str(DATA_DIR)):
|
||||||
"""Run the full enrichment process."""
|
"""Run the full enrichment process."""
|
||||||
logger.info(f"Starting document enrichment from directory: {data_dir}")
|
logger.info("Starting document enrichment process")
|
||||||
|
|
||||||
# Get all supported documents from the data directory
|
adaptive_collection = get_enrichment_adaptive_collection(data_dir=data_dir)
|
||||||
file_paths = get_all_documents_from_data_dir(data_dir)
|
|
||||||
|
|
||||||
if not file_paths:
|
|
||||||
logger.warning(f"No supported documents found in {data_dir}")
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(f"Found {len(file_paths)} documents to process")
|
|
||||||
|
|
||||||
# Initialize the document enricher
|
# Initialize the document enricher
|
||||||
enricher = DocumentEnricher(vector_store)
|
enricher = DocumentEnricher(vector_store)
|
||||||
|
|
||||||
# Run the enrichment process
|
# Run the enrichment process
|
||||||
enricher.enrich_and_store(file_paths)
|
enricher.enrich_and_store(adaptive_collection)
|
||||||
|
|
||||||
logger.info("Document enrichment process completed!")
|
logger.info("Document enrichment process completed!")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example usage
|
|
||||||
from vector_storage import initialize_vector_store
|
from vector_storage import initialize_vector_store
|
||||||
|
|
||||||
# Initialize vector store
|
|
||||||
vector_store = initialize_vector_store()
|
vector_store = initialize_vector_store()
|
||||||
|
|
||||||
# Run enrichment process
|
|
||||||
run_enrichment_process(vector_store)
|
run_enrichment_process(vector_store)
|
||||||
280
services/rag/langchain/helpers.py
Normal file
280
services/rag/langchain/helpers.py
Normal file
@@ -0,0 +1,280 @@
|
|||||||
|
"""Helper utilities for metadata extraction from Russian text."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import tempfile
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable, Iterator, List
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
_YEAR_PATTERN = re.compile(r"(?<!\d)(1\d{3}|20\d{2}|2100)(?!\d)")
|
||||||
|
|
||||||
|
_EVENT_KEYWORDS = (
|
||||||
|
"конференц",
|
||||||
|
"форум",
|
||||||
|
"выставк",
|
||||||
|
"фестивал",
|
||||||
|
"саммит",
|
||||||
|
"чемпионат",
|
||||||
|
"олимпиад",
|
||||||
|
"кубок",
|
||||||
|
"конкурс",
|
||||||
|
"вебинар",
|
||||||
|
"семинар",
|
||||||
|
"лекци",
|
||||||
|
"презентаци",
|
||||||
|
"хакатон",
|
||||||
|
"митап",
|
||||||
|
"встреч",
|
||||||
|
"съезд",
|
||||||
|
"конгресс",
|
||||||
|
)
|
||||||
|
|
||||||
|
_EVENT_PHRASE_PATTERN = re.compile(
|
||||||
|
r"\b("
|
||||||
|
r"конференц(?:ия|ии|ию|ией)?|"
|
||||||
|
r"форум(?:а|е|у|ом)?|"
|
||||||
|
r"выставк(?:а|и|е|у|ой)?|"
|
||||||
|
r"фестивал(?:ь|я|е|ю|ем)?|"
|
||||||
|
r"саммит(?:а|е|у|ом)?|"
|
||||||
|
r"чемпионат(?:а|е|у|ом)?|"
|
||||||
|
r"олимпиад(?:а|ы|е|у|ой)?|"
|
||||||
|
r"кубок(?:а|е|у|ом)?|"
|
||||||
|
r"конкурс(?:а|е|у|ом)?|"
|
||||||
|
r"вебинар(?:а|е|у|ом)?|"
|
||||||
|
r"семинар(?:а|е|у|ом)?|"
|
||||||
|
r"лекци(?:я|и|ю|ей)?|"
|
||||||
|
r"презентаци(?:я|и|ю|ей)?|"
|
||||||
|
r"хакатон(?:а|е|у|ом)?|"
|
||||||
|
r"митап(?:а|е|у|ом)?|"
|
||||||
|
r"встреч(?:а|и|е|у|ей)?|"
|
||||||
|
r"съезд(?:а|е|у|ом)?|"
|
||||||
|
r"конгресс(?:а|е|у|ом)?"
|
||||||
|
r")\b(?:\s+[A-Za-zА-Яа-я0-9][A-Za-zА-Яа-я0-9\-_/.]{1,40}){0,6}",
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
_QUOTED_EVENT_PATTERN = re.compile(
|
||||||
|
r"(?:мероприят(?:ие|ия|ию|ием)|событ(?:ие|ия|ию|ием)|"
|
||||||
|
r"конференц(?:ия|ии|ию|ией)?|форум(?:а|е|у|ом)?|"
|
||||||
|
r"выставк(?:а|и|е|у|ой)?|фестивал(?:ь|я|е|ю|ем)?)"
|
||||||
|
r"[^\n\"«»]{0,40}[«\"]([^»\"\n]{3,120})[»\"]",
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_event(value: str) -> str:
|
||||||
|
normalized = " ".join(value.strip().split()).strip(".,;:!?()[]{}")
|
||||||
|
return normalized.lower()
|
||||||
|
|
||||||
|
|
||||||
|
def extract_years_from_text(text: str) -> List[int]:
|
||||||
|
"""Extract unique years from text as integers."""
|
||||||
|
if not text:
|
||||||
|
return []
|
||||||
|
|
||||||
|
years = {int(match.group(0)) for match in _YEAR_PATTERN.finditer(text)}
|
||||||
|
return sorted(years)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_russian_event_names(text: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Extract likely Russian event names from text using heuristic regex rules.
|
||||||
|
|
||||||
|
Returns normalized event phrases in lowercase.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return []
|
||||||
|
|
||||||
|
events: List[str] = []
|
||||||
|
seen = set()
|
||||||
|
|
||||||
|
for match in _EVENT_PHRASE_PATTERN.finditer(text):
|
||||||
|
candidate = _normalize_event(match.group(0))
|
||||||
|
if len(candidate) < 6:
|
||||||
|
continue
|
||||||
|
if not any(keyword in candidate for keyword in _EVENT_KEYWORDS):
|
||||||
|
continue
|
||||||
|
if candidate not in seen:
|
||||||
|
events.append(candidate)
|
||||||
|
seen.add(candidate)
|
||||||
|
|
||||||
|
for match in _QUOTED_EVENT_PATTERN.finditer(text):
|
||||||
|
quoted = _normalize_event(match.group(1))
|
||||||
|
if len(quoted) < 3:
|
||||||
|
continue
|
||||||
|
if quoted not in seen:
|
||||||
|
events.append(quoted)
|
||||||
|
seen.add(quoted)
|
||||||
|
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
class _AdaptiveFile(ABC):
|
||||||
|
extension: str # Format: .jpg
|
||||||
|
filename: str
|
||||||
|
|
||||||
|
def __init__(self, filename: str, extension: str):
|
||||||
|
self.filename = filename
|
||||||
|
self.extension = extension
|
||||||
|
|
||||||
|
# This method allows to work with file locally, and lambda should be provided for this.
|
||||||
|
# Why separate method? For possible cleanup after work is done. And to download file, if needed
|
||||||
|
# Lambda: first argument is a local path
|
||||||
|
@abstractmethod
|
||||||
|
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||||
|
"""Run callback with a local path to the file."""
|
||||||
|
|
||||||
|
|
||||||
|
class _AdaptiveCollection(ABC):
|
||||||
|
# Generator method with yield
|
||||||
|
@abstractmethod
|
||||||
|
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||||
|
"""Iterate files in collection."""
|
||||||
|
|
||||||
|
|
||||||
|
class LocalFilesystemAdaptiveFile(_AdaptiveFile):
|
||||||
|
local_path: str
|
||||||
|
|
||||||
|
def __init__(self, filename: str, extension: str, local_path: str):
|
||||||
|
super().__init__(filename, extension)
|
||||||
|
self.local_path = local_path
|
||||||
|
|
||||||
|
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||||
|
func(self.local_path)
|
||||||
|
|
||||||
|
|
||||||
|
class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
|
||||||
|
base_dir: str
|
||||||
|
|
||||||
|
def __init__(self, base_dir: str):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.base_dir = base_dir
|
||||||
|
|
||||||
|
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||||
|
for root, dirs, files in os.walk(self.base_dir):
|
||||||
|
for file in files:
|
||||||
|
full_path = os.path.join(root, file)
|
||||||
|
p = Path(full_path)
|
||||||
|
yield LocalFilesystemAdaptiveFile(p.name, p.suffix, full_path)
|
||||||
|
|
||||||
|
if not recursive:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
class YandexDiskAdaptiveFile(_AdaptiveFile):
|
||||||
|
"""Adaptive file representation for Yandex Disk resources."""
|
||||||
|
|
||||||
|
remote_path: str
|
||||||
|
|
||||||
|
def __init__(self, filename: str, extension: str, remote_path: str, token: str):
|
||||||
|
super().__init__(filename, extension)
|
||||||
|
self.token = token
|
||||||
|
self.remote_path = remote_path
|
||||||
|
|
||||||
|
def _download_to_temp_file(self) -> str:
|
||||||
|
headers = {"Authorization": f"OAuth {self.token}"}
|
||||||
|
response = requests.get(
|
||||||
|
"https://cloud-api.yandex.net/v1/disk/resources/download",
|
||||||
|
headers=headers,
|
||||||
|
params={"path": self.remote_path},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
href = response.json()["href"]
|
||||||
|
|
||||||
|
file_response = requests.get(href, timeout=120)
|
||||||
|
file_response.raise_for_status()
|
||||||
|
|
||||||
|
p = Path(self.remote_path)
|
||||||
|
suffix = p.suffix
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
||||||
|
temp_file.write(file_response.content)
|
||||||
|
return temp_file.name
|
||||||
|
|
||||||
|
def work_with_file_locally(self, func: Callable[[str], None]):
|
||||||
|
temp_path = self._download_to_temp_file()
|
||||||
|
try:
|
||||||
|
func(temp_path)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.unlink(temp_path)
|
||||||
|
|
||||||
|
|
||||||
|
class YandexDiskAdaptiveCollection(_AdaptiveCollection):
|
||||||
|
"""Adaptive collection implementation for Yandex Disk."""
|
||||||
|
|
||||||
|
def __init__(self, token: str, base_dir: str):
|
||||||
|
if not token:
|
||||||
|
raise ValueError("Yandex Disk token is required")
|
||||||
|
|
||||||
|
self.token = token
|
||||||
|
self.base_dir = base_dir
|
||||||
|
self._headers = {"Authorization": f"OAuth {self.token}"}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _normalize_disk_path(path: str) -> str:
|
||||||
|
return path if path.startswith("disk:/") else f"disk:/{path.lstrip('/')}"
|
||||||
|
|
||||||
|
def _get_resource_info(self, path: str) -> dict:
|
||||||
|
response = requests.get(
|
||||||
|
"https://cloud-api.yandex.net/v1/disk/resources",
|
||||||
|
headers=self._headers,
|
||||||
|
params={"path": path, "limit": 1000},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def _iter_children(self, path: str) -> Iterator[dict]:
|
||||||
|
offset = 0
|
||||||
|
while True:
|
||||||
|
response = requests.get(
|
||||||
|
"https://cloud-api.yandex.net/v1/disk/resources",
|
||||||
|
headers=self._headers,
|
||||||
|
params={"path": path, "limit": 1000, "offset": offset},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
payload = response.json()
|
||||||
|
embedded = payload.get("_embedded", {})
|
||||||
|
items = embedded.get("items", [])
|
||||||
|
if not items:
|
||||||
|
break
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
yield item
|
||||||
|
|
||||||
|
if len(items) < 1000:
|
||||||
|
break
|
||||||
|
offset += 1000
|
||||||
|
|
||||||
|
def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]:
|
||||||
|
root_path = self._normalize_disk_path(self.base_dir)
|
||||||
|
root_info = self._get_resource_info(root_path)
|
||||||
|
|
||||||
|
if root_info.get("type") == "file":
|
||||||
|
path = root_info["path"]
|
||||||
|
logger.info(f"Found file on Yandex Disk: {path}")
|
||||||
|
p = Path(path)
|
||||||
|
yield YandexDiskAdaptiveFile(p.name, p.suffix, path, self.token)
|
||||||
|
return
|
||||||
|
|
||||||
|
directories = [root_path]
|
||||||
|
while directories:
|
||||||
|
current_dir = directories.pop(0)
|
||||||
|
for item in self._iter_children(current_dir):
|
||||||
|
item_type = item.get("type")
|
||||||
|
item_path = str(item.get("path"))
|
||||||
|
if item_type == "file":
|
||||||
|
logger.info(f"Found file on Yandex Disk: {item_path}")
|
||||||
|
p = Path(item_path)
|
||||||
|
yield YandexDiskAdaptiveFile(
|
||||||
|
p.name, p.suffix, item_path, self.token
|
||||||
|
)
|
||||||
|
elif recursive and item_type == "dir":
|
||||||
|
directories.append(item_path)
|
||||||
@@ -1,13 +1,16 @@
|
|||||||
"""Retrieval module for querying vector storage and returning relevant documents with metadata."""
|
"""Retrieval module for querying vector storage and returning relevant documents with metadata."""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from typing import List, Optional
|
from typing import List
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from langchain_core.retrievers import BaseRetriever
|
|
||||||
from langchain_core.callbacks import CallbackManagerForRetrieverRun
|
from langchain_core.callbacks import CallbackManagerForRetrieverRun
|
||||||
from langchain_core.documents import Document
|
from langchain_core.documents import Document
|
||||||
|
from langchain_core.retrievers import BaseRetriever
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
from qdrant_client.http.models import FieldCondition, Filter, MatchAny
|
||||||
|
|
||||||
|
from helpers import extract_russian_event_names, extract_years_from_text
|
||||||
from vector_storage import initialize_vector_store
|
from vector_storage import initialize_vector_store
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
@@ -22,6 +25,91 @@ class VectorStoreRetriever(BaseRetriever):
|
|||||||
vector_store: object # Qdrant vector store instance
|
vector_store: object # Qdrant vector store instance
|
||||||
top_k: int = 5 # Number of documents to retrieve
|
top_k: int = 5 # Number of documents to retrieve
|
||||||
|
|
||||||
|
def _build_qdrant_filter(
|
||||||
|
self, years: List[int], events: List[str]
|
||||||
|
) -> Filter | None:
|
||||||
|
"""Build a Qdrant payload filter for extracted years and events."""
|
||||||
|
conditions: List[FieldCondition] = []
|
||||||
|
|
||||||
|
if years:
|
||||||
|
conditions.extend(
|
||||||
|
[
|
||||||
|
FieldCondition(
|
||||||
|
key="metadata.years",
|
||||||
|
match=MatchAny(any=years),
|
||||||
|
),
|
||||||
|
FieldCondition(
|
||||||
|
key="years",
|
||||||
|
match=MatchAny(any=years),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
if events:
|
||||||
|
conditions.extend(
|
||||||
|
[
|
||||||
|
FieldCondition(
|
||||||
|
key="metadata.events",
|
||||||
|
match=MatchAny(any=events),
|
||||||
|
),
|
||||||
|
FieldCondition(
|
||||||
|
key="events",
|
||||||
|
match=MatchAny(any=events),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
if not conditions:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return Filter(should=conditions)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _post_filter_documents(
|
||||||
|
documents: List[Document], years: List[int], events: List[str]
|
||||||
|
) -> List[Document]:
|
||||||
|
"""Fallback filter in Python in case vector DB filter cannot be applied."""
|
||||||
|
if not years and not events:
|
||||||
|
return documents
|
||||||
|
|
||||||
|
year_set = set(years)
|
||||||
|
event_set = set(events)
|
||||||
|
filtered: List[Document] = []
|
||||||
|
|
||||||
|
for doc in documents:
|
||||||
|
metadata = doc.metadata or {}
|
||||||
|
doc_years = {
|
||||||
|
int(year)
|
||||||
|
for year in metadata.get("years", [])
|
||||||
|
if isinstance(year, int) or (isinstance(year, str) and year.isdigit())
|
||||||
|
}
|
||||||
|
doc_events = {str(event).lower() for event in metadata.get("events", [])}
|
||||||
|
|
||||||
|
year_match = not year_set or bool(doc_years.intersection(year_set))
|
||||||
|
event_match = not event_set or bool(doc_events.intersection(event_set))
|
||||||
|
|
||||||
|
if year_match and event_match:
|
||||||
|
filtered.append(doc)
|
||||||
|
|
||||||
|
return filtered
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _merge_unique_documents(documents: List[Document]) -> List[Document]:
|
||||||
|
"""Deduplicate documents while preserving order."""
|
||||||
|
unique_docs: List[Document] = []
|
||||||
|
seen = set()
|
||||||
|
for doc in documents:
|
||||||
|
dedup_key = (
|
||||||
|
doc.metadata.get("source", ""),
|
||||||
|
doc.metadata.get("page_number", doc.metadata.get("page", "")),
|
||||||
|
doc.page_content[:200],
|
||||||
|
)
|
||||||
|
if dedup_key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(dedup_key)
|
||||||
|
unique_docs.append(doc)
|
||||||
|
return unique_docs
|
||||||
|
|
||||||
def _get_relevant_documents(
|
def _get_relevant_documents(
|
||||||
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
|
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
|
||||||
) -> List[Document]:
|
) -> List[Document]:
|
||||||
@@ -38,8 +126,54 @@ class VectorStoreRetriever(BaseRetriever):
|
|||||||
logger.info(f"Searching for documents related to query: {query[:50]}...")
|
logger.info(f"Searching for documents related to query: {query[:50]}...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Perform similarity search on the vector store
|
years_in_query = extract_years_from_text(query)
|
||||||
results = self.vector_store.similarity_search(query, k=self.top_k)
|
events_in_query = extract_russian_event_names(query)
|
||||||
|
search_filter = self._build_qdrant_filter(years_in_query, events_in_query)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Extracted query metadata for retrieval: years={years_in_query}, events={events_in_query}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Main search by original user query.
|
||||||
|
search_k = max(self.top_k * 3, self.top_k)
|
||||||
|
if search_filter is not None:
|
||||||
|
try:
|
||||||
|
results = self.vector_store.similarity_search(
|
||||||
|
query, k=search_k, filter=search_filter
|
||||||
|
)
|
||||||
|
except Exception as filter_error:
|
||||||
|
logger.warning(
|
||||||
|
f"Vector store filter failed, fallback to unfiltered search: {filter_error}"
|
||||||
|
)
|
||||||
|
results = self.vector_store.similarity_search(query, k=search_k)
|
||||||
|
results = self._post_filter_documents(
|
||||||
|
results, years_in_query, events_in_query
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
results = self.vector_store.similarity_search(query, k=search_k)
|
||||||
|
|
||||||
|
# Additional event-focused similarity search if event names are present.
|
||||||
|
if events_in_query:
|
||||||
|
event_results: List[Document] = []
|
||||||
|
for event_name in events_in_query:
|
||||||
|
try:
|
||||||
|
if search_filter is not None:
|
||||||
|
event_docs = self.vector_store.similarity_search(
|
||||||
|
event_name, k=self.top_k, filter=search_filter
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
event_docs = self.vector_store.similarity_search(
|
||||||
|
event_name, k=self.top_k
|
||||||
|
)
|
||||||
|
except Exception as event_search_error:
|
||||||
|
logger.warning(
|
||||||
|
f"Event-focused search failed for '{event_name}': {event_search_error}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
event_results.extend(event_docs)
|
||||||
|
results.extend(event_results)
|
||||||
|
|
||||||
|
results = self._merge_unique_documents(results)[: self.top_k]
|
||||||
|
|
||||||
logger.info(f"Found {len(results)} relevant documents")
|
logger.info(f"Found {len(results)} relevant documents")
|
||||||
|
|
||||||
@@ -60,7 +194,9 @@ def create_retriever(collection_name: str = "documents_langchain", top_k: int =
|
|||||||
Returns:
|
Returns:
|
||||||
VectorStoreRetriever instance
|
VectorStoreRetriever instance
|
||||||
"""
|
"""
|
||||||
logger.info(f"Initializing vector store for retrieval from collection: {collection_name}")
|
logger.info(
|
||||||
|
f"Initializing vector store for retrieval from collection: {collection_name}"
|
||||||
|
)
|
||||||
|
|
||||||
# Initialize the vector store
|
# Initialize the vector store
|
||||||
vector_store = initialize_vector_store(collection_name=collection_name)
|
vector_store = initialize_vector_store(collection_name=collection_name)
|
||||||
@@ -71,35 +207,8 @@ def create_retriever(collection_name: str = "documents_langchain", top_k: int =
|
|||||||
return retriever
|
return retriever
|
||||||
|
|
||||||
|
|
||||||
def search_documents(query: str, collection_name: str = "documents_langchain", top_k: int = 5) -> List[Document]:
|
|
||||||
"""
|
|
||||||
Search for documents in the vector store based on the query.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
query: The query string to search for
|
|
||||||
collection_name: Name of the Qdrant collection to use
|
|
||||||
top_k: Number of documents to retrieve
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of documents with metadata
|
|
||||||
"""
|
|
||||||
logger.info(f"Starting document search for query: {query}")
|
|
||||||
|
|
||||||
# Create the retriever
|
|
||||||
retriever = create_retriever(collection_name=collection_name, top_k=top_k)
|
|
||||||
|
|
||||||
# Perform the search
|
|
||||||
results = retriever.invoke(query)
|
|
||||||
|
|
||||||
logger.info(f"Search completed, returned {len(results)} documents")
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
def search_documents_with_metadata(
|
def search_documents_with_metadata(
|
||||||
query: str,
|
query: str, collection_name: str = "documents_langchain", top_k: int = 5
|
||||||
collection_name: str = "documents_langchain",
|
|
||||||
top_k: int = 5
|
|
||||||
) -> List[dict]:
|
) -> List[dict]:
|
||||||
"""
|
"""
|
||||||
Search for documents and return them with detailed metadata.
|
Search for documents and return them with detailed metadata.
|
||||||
@@ -129,30 +238,20 @@ def search_documents_with_metadata(
|
|||||||
"metadata": doc.metadata,
|
"metadata": doc.metadata,
|
||||||
"source": doc.metadata.get("source", "Unknown"),
|
"source": doc.metadata.get("source", "Unknown"),
|
||||||
"filename": doc.metadata.get("filename", "Unknown"),
|
"filename": doc.metadata.get("filename", "Unknown"),
|
||||||
"page_number": doc.metadata.get("page_number", doc.metadata.get("page", "N/A")),
|
"page_number": doc.metadata.get(
|
||||||
|
"page_number", doc.metadata.get("page", "N/A")
|
||||||
|
),
|
||||||
"file_extension": doc.metadata.get("file_extension", "N/A"),
|
"file_extension": doc.metadata.get("file_extension", "N/A"),
|
||||||
"file_size": doc.metadata.get("file_size", "N/A")
|
"file_size": doc.metadata.get("file_size", "N/A"),
|
||||||
}
|
}
|
||||||
formatted_results.append(formatted_result)
|
formatted_results.append(formatted_result)
|
||||||
|
|
||||||
logger.info(f"Metadata search completed, returned {len(formatted_results)} documents")
|
logger.info(
|
||||||
|
f"Metadata search completed, returned {len(formatted_results)} documents"
|
||||||
|
)
|
||||||
|
|
||||||
return formatted_results
|
return formatted_results
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error during document search with metadata: {str(e)}")
|
logger.error(f"Error during document search with metadata: {str(e)}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Example usage
|
|
||||||
query = "What is the main topic discussed in the documents?"
|
|
||||||
results = search_documents_with_metadata(query, top_k=5)
|
|
||||||
|
|
||||||
print(f"Found {len(results)} documents:")
|
|
||||||
for i, result in enumerate(results, 1):
|
|
||||||
print(f"\n{i}. Source: {result['source']}")
|
|
||||||
print(f" Filename: {result['filename']}")
|
|
||||||
print(f" Page: {result['page_number']}")
|
|
||||||
print(f" Content preview: {result['content'][:200]}...")
|
|
||||||
print(f" Metadata: {result['metadata']}")
|
|
||||||
1
services/rag/langchain/test/samples/level1/first.md
Normal file
1
services/rag/langchain/test/samples/level1/first.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
first level
|
||||||
1
services/rag/langchain/test/samples/root.txt
Normal file
1
services/rag/langchain/test/samples/root.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
root file
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from helpers import LocalFilesystemAdaptiveCollection, LocalFilesystemAdaptiveFile
|
||||||
|
|
||||||
|
|
||||||
|
class TestLocalFilesystemAdaptiveCollection(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.samples_dir = Path(__file__).parent / "samples"
|
||||||
|
|
||||||
|
def test_iterate_non_recursive_returns_only_root_files(self):
|
||||||
|
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
||||||
|
|
||||||
|
files = list(collection.iterate(recursive=False))
|
||||||
|
file_names = sorted(file.filename for file in files)
|
||||||
|
|
||||||
|
self.assertEqual(file_names, ["root.txt"])
|
||||||
|
self.assertTrue(all(isinstance(file, LocalFilesystemAdaptiveFile) for file in files))
|
||||||
|
|
||||||
|
def test_iterate_recursive_returns_nested_files(self):
|
||||||
|
collection = LocalFilesystemAdaptiveCollection(str(self.samples_dir))
|
||||||
|
|
||||||
|
files = list(collection.iterate(recursive=True))
|
||||||
|
relative_paths = sorted(
|
||||||
|
str(Path(file.local_path).relative_to(self.samples_dir)) for file in files
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
relative_paths,
|
||||||
|
["level1/first.md", "level1/level2/second.log", "root.txt"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_work_with_file_locally_provides_existing_path(self):
|
||||||
|
target_path = self.samples_dir / "root.txt"
|
||||||
|
adaptive_file = LocalFilesystemAdaptiveFile(
|
||||||
|
target_path.name, target_path.suffix, str(target_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
observed = {}
|
||||||
|
|
||||||
|
def callback(path: str):
|
||||||
|
observed["path"] = path
|
||||||
|
with open(path, "r", encoding="utf-8") as handle:
|
||||||
|
observed["content"] = handle.read().strip()
|
||||||
|
|
||||||
|
adaptive_file.work_with_file_locally(callback)
|
||||||
|
|
||||||
|
self.assertEqual(adaptive_file.filename, "root.txt")
|
||||||
|
self.assertEqual(observed["path"], str(target_path))
|
||||||
|
self.assertEqual(observed["content"], "root file")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from loguru import logger
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from helpers import YandexDiskAdaptiveCollection
|
||||||
|
|
||||||
|
load_dotenv(dotenv_path=Path(__file__).resolve().parent.parent / ".env.test")
|
||||||
|
|
||||||
|
|
||||||
|
class TestYandexDiskAdaptiveCollection(unittest.TestCase):
|
||||||
|
def test_constructor_requires_token(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
YandexDiskAdaptiveCollection(token="", base_dir="Общая/Информация")
|
||||||
|
|
||||||
|
def test_iterate_logs_found_files_for_shared_folder(self):
|
||||||
|
token = os.getenv("YADISK_TOKEN")
|
||||||
|
if not token:
|
||||||
|
self.skipTest("YADISK_TOKEN is not configured")
|
||||||
|
|
||||||
|
collection = YandexDiskAdaptiveCollection(
|
||||||
|
token=token,
|
||||||
|
base_dir="Общая/Информация",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
files = list(collection.iterate(recursive=True))
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
self.skipTest(f"Yandex Disk request failed and needs manual verification: {exc}")
|
||||||
|
|
||||||
|
for item in files:
|
||||||
|
self.assertTrue(item.filename)
|
||||||
|
logger.info(f"Yandex file found during test iteration: {item.local_path}")
|
||||||
|
|
||||||
|
self.assertIsInstance(files, list)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -1,3 +1,14 @@
|
|||||||
|
# Model Strategy Configuration
|
||||||
|
CHAT_STRATEGY=ollama
|
||||||
|
EMBEDDING_STRATEGY=ollama
|
||||||
|
|
||||||
# Ollama Configuration
|
# Ollama Configuration
|
||||||
OLLAMA_EMBEDDING_MODEL=MODEL
|
OLLAMA_EMBEDDING_MODEL=MODEL
|
||||||
OLLAMA_CHAT_MODEL=MODEL
|
OLLAMA_CHAT_MODEL=MODEL
|
||||||
|
|
||||||
|
# OpenAI Configuration (for reference - uncomment and configure when using OpenAI strategy)
|
||||||
|
# OPENAI_CHAT_URL=https://api.openai.com/v1
|
||||||
|
# OPENAI_CHAT_KEY=your_openai_api_key_here
|
||||||
|
# OPENAI_EMBEDDING_MODEL=text-embedding-3-small
|
||||||
|
# OPENAI_EMBEDDING_BASE_URL=https://api.openai.com/v1
|
||||||
|
# OPENAI_EMBEDDING_API_KEY=your_openai_api_key_here
|
||||||
|
|||||||
@@ -35,8 +35,20 @@ Chosen data folder: relatve ./../../../data - from the current folder
|
|||||||
|
|
||||||
- [x] Create file `retrieval.py` with the configuration for chosen RAG framework, that will retrieve data from the vector storage based on the query. Use retrieving library/plugin, that supports chosen vector storage within the chosen RAG framework. Retrieving configuration should search for the provided text in the query as argument in the function and return found information with the stored meta data, like paragraph, section, page etc. Important: if for chosen RAG framework, there is no need in separation of search, separation of retrieving from the chosen vector storage, this step may be skipped and marked done.
|
- [x] Create file `retrieval.py` with the configuration for chosen RAG framework, that will retrieve data from the vector storage based on the query. Use retrieving library/plugin, that supports chosen vector storage within the chosen RAG framework. Retrieving configuration should search for the provided text in the query as argument in the function and return found information with the stored meta data, like paragraph, section, page etc. Important: if for chosen RAG framework, there is no need in separation of search, separation of retrieving from the chosen vector storage, this step may be skipped and marked done.
|
||||||
|
|
||||||
# Phase 6 (chat feature, as agent, for usage in the cli)
|
# Phase 6 (models strategy, loading env and update on using openai models)
|
||||||
|
|
||||||
- [ ] Create file `agent.py`, which will incorporate into itself agent, powered by the chat model. It should use integration with ollama, model specified in .env in property: OLLAMA_CHAT_MODEL
|
- [x] Add `CHAT_STRATEGY`, `EMBEDDING_STRATEGY` fields to .env, possible values are "openai" or "ollama".
|
||||||
- [ ] Integrate this agent with the existing solution for retrieving, with retrieval.py
|
- [x] Add `OPENAI_CHAT_URL`, `OPENAI_CHAT_KEY`, `OPENAI_EMBEDDING_MODEL`, `OPENAI_EMBEDDING_BASE_URL`, `OPENAI_EMBEDDING_API_KEY` values to .env.dist with dummy values and to .env with dummy values.
|
||||||
|
- [x] Add in all important .env wise places in the code loading .env file for it's variables
|
||||||
|
- [x] Create reusable function, that will return configuration for models. It will check CHAT_STRATEGY and load environment variables accordingly, and return config for usage.
|
||||||
|
- [x] Add this function everywhere in the codebase where chat or embedding models configuration needed
|
||||||
|
|
||||||
|
# Phase 7 (explicit logging and progressbar)
|
||||||
|
|
||||||
|
- [x] Add log of how many files currently being processed in enrichment. We need to see how many total to process and how many processed each time new document being processed. If it's possible, also add progressbar showing percentage and those numbers on top of logs.
|
||||||
|
|
||||||
|
# Phase 8 (chat feature, as agent, for usage in the cli)
|
||||||
|
|
||||||
|
- [ ] Create file `agent.py`, which will incorporate into itself agent, powered by the chat model. It should use integration with openai, env variables are configure
|
||||||
|
- [ ] Integrate this agent with the existing solution for retrieving, with retrieval.py, if it's possible in current chosen RAG framework
|
||||||
- [ ] Integrate this agent with the cli, as command to start chatting with the agent. If there is a built-in solution for console communication with the agent, initiate this on cli command.
|
- [ ] Integrate this agent with the cli, as command to start chatting with the agent. If there is a built-in solution for console communication with the agent, initiate this on cli command.
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ The system has been enhanced to properly handle Russian language documents with
|
|||||||
|
|
||||||
### Architecture Components
|
### Architecture Components
|
||||||
- CLI entry point (`cli.py`)
|
- CLI entry point (`cli.py`)
|
||||||
|
- Configuration module (`config.py`) - manages model strategies and environment variables
|
||||||
- Document enrichment module (`enrichment.py`)
|
- Document enrichment module (`enrichment.py`)
|
||||||
- Vector storage configuration (`vector_storage.py`)
|
- Vector storage configuration (`vector_storage.py`)
|
||||||
- Retrieval module (`retrieval.py`)
|
- Retrieval module (`retrieval.py`)
|
||||||
@@ -57,9 +58,15 @@ The system has been enhanced to properly handle Russian language documents with
|
|||||||
- Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
|
- Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
|
||||||
|
|
||||||
### Environment Variables
|
### Environment Variables
|
||||||
|
- `CHAT_STRATEGY`: Strategy for chat models ("ollama" or "openai")
|
||||||
|
- `EMBEDDING_STRATEGY`: Strategy for embedding models ("ollama" or "openai")
|
||||||
- `OLLAMA_EMBEDDING_MODEL`: Name of the Ollama model to use for embeddings
|
- `OLLAMA_EMBEDDING_MODEL`: Name of the Ollama model to use for embeddings
|
||||||
- `OLLAMA_CHAT_MODEL`: Name of the Ollama model to use for chat functionality
|
- `OLLAMA_CHAT_MODEL`: Name of the Ollama model to use for chat functionality
|
||||||
- API keys for external services (OpenRouter option available but commented out)
|
- `OPENAI_CHAT_URL`: URL for OpenAI-compatible chat API (when using OpenAI strategy)
|
||||||
|
- `OPENAI_CHAT_KEY`: API key for OpenAI-compatible chat API (when using OpenAI strategy)
|
||||||
|
- `OPENAI_EMBEDDING_MODEL`: Name of the OpenAI embedding model (when using OpenAI strategy)
|
||||||
|
- `OPENAI_EMBEDDING_BASE_URL`: Base URL for OpenAI-compatible embedding API (when using OpenAI strategy)
|
||||||
|
- `OPENAI_EMBEDDING_API_KEY`: API key for OpenAI-compatible embedding API (when using OpenAI strategy)
|
||||||
|
|
||||||
### Document Processing
|
### Document Processing
|
||||||
- Support multiple file formats based on EXTENSIONS.md
|
- Support multiple file formats based on EXTENSIONS.md
|
||||||
@@ -105,7 +112,19 @@ The system has been enhanced to properly handle Russian language documents with
|
|||||||
- [x] Query processing with metadata retrieval
|
- [x] Query processing with metadata retrieval
|
||||||
- [x] Russian language/Cyrillic text encoding support
|
- [x] Russian language/Cyrillic text encoding support
|
||||||
|
|
||||||
### Phase 6: Chat Agent
|
### Phase 6: Model Strategy
|
||||||
|
- [x] Add `CHAT_STRATEGY` and `EMBEDDING_STRATEGY` environment variables
|
||||||
|
- [x] Add OpenAI configuration options to .env files
|
||||||
|
- [x] Create reusable model configuration function
|
||||||
|
- [x] Update all modules to use the new configuration system
|
||||||
|
- [x] Ensure proper .env loading across all modules
|
||||||
|
|
||||||
|
### Phase 7: Enhanced Logging and Progress Tracking
|
||||||
|
- [x] Added progress bar using tqdm to show processing progress
|
||||||
|
- [x] Added logging to show total files and processed count during document enrichment
|
||||||
|
- [x] Enhanced user feedback during document processing with percentage and counts
|
||||||
|
|
||||||
|
### Phase 8: Chat Agent
|
||||||
- [ ] Agent module with Ollama integration
|
- [ ] Agent module with Ollama integration
|
||||||
- [ ] Integration with retrieval module
|
- [ ] Integration with retrieval module
|
||||||
- [ ] CLI command for chat functionality
|
- [ ] CLI command for chat functionality
|
||||||
@@ -115,9 +134,10 @@ The system has been enhanced to properly handle Russian language documents with
|
|||||||
llamaindex/
|
llamaindex/
|
||||||
├── venv/ # Python virtual environment
|
├── venv/ # Python virtual environment
|
||||||
├── cli.py # CLI entry point
|
├── cli.py # CLI entry point
|
||||||
|
├── config.py # Configuration module for model strategies
|
||||||
├── vector_storage.py # Vector storage configuration
|
├── vector_storage.py # Vector storage configuration
|
||||||
├── enrichment.py # Document loading and processing (to be created)
|
├── enrichment.py # Document loading and processing
|
||||||
├── retrieval.py # Search and retrieval functionality (to be created)
|
├── retrieval.py # Search and retrieval functionality
|
||||||
├── agent.py # Chat agent implementation (to be created)
|
├── agent.py # Chat agent implementation (to be created)
|
||||||
├── EXTENSIONS.md # Supported file extensions and loaders
|
├── EXTENSIONS.md # Supported file extensions and loaders
|
||||||
├── .env.dist # Environment variable template
|
├── .env.dist # Environment variable template
|
||||||
@@ -141,3 +161,7 @@ The system expects documents to be placed in `./../../../data` relative to the p
|
|||||||
- Check that the data directory contains supported file types
|
- Check that the data directory contains supported file types
|
||||||
- Review logs in `logs/dev.log` for detailed error information
|
- Review logs in `logs/dev.log` for detailed error information
|
||||||
- For Russian/Cyrillic text issues, ensure proper encoding handling is configured in both enrichment and retrieval modules
|
- For Russian/Cyrillic text issues, ensure proper encoding handling is configured in both enrichment and retrieval modules
|
||||||
|
|
||||||
|
## Important Notes
|
||||||
|
- Do not test long-running or heavy system scripts during development as they can consume significant system resources and take hours to complete
|
||||||
|
- The enrich command processes all files in the data directory and may require substantial memory and processing time
|
||||||
@@ -7,6 +7,10 @@ import click
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables from .env file
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
def setup_logging():
|
def setup_logging():
|
||||||
|
|||||||
144
services/rag/llamaindex/config.py
Normal file
144
services/rag/llamaindex/config.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
"""
|
||||||
|
Configuration module for managing model strategies in the RAG solution.
|
||||||
|
|
||||||
|
This module provides functions to get appropriate model configurations
|
||||||
|
based on environment variables for both embeddings and chat models.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
# Load environment variables from .env file
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
def get_embedding_model():
|
||||||
|
"""
|
||||||
|
Get the appropriate embedding model based on the EMBEDDING_STRATEGY environment variable.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An embedding model instance based on the selected strategy
|
||||||
|
"""
|
||||||
|
strategy = os.getenv("EMBEDDING_STRATEGY", "ollama").lower()
|
||||||
|
|
||||||
|
if strategy == "ollama":
|
||||||
|
from llama_index.embeddings.ollama import OllamaEmbedding
|
||||||
|
|
||||||
|
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
||||||
|
ollama_base_url = "http://localhost:11434"
|
||||||
|
|
||||||
|
logger.info(f"Initializing Ollama embedding model: {ollama_embed_model}")
|
||||||
|
|
||||||
|
embed_model = OllamaEmbedding(
|
||||||
|
model_name=ollama_embed_model, base_url=ollama_base_url
|
||||||
|
)
|
||||||
|
|
||||||
|
return embed_model
|
||||||
|
|
||||||
|
elif strategy == "openai":
|
||||||
|
from llama_index.embeddings.openai_like import OpenAILikeEmbedding
|
||||||
|
|
||||||
|
openai_base_url = os.getenv(
|
||||||
|
"OPENAI_EMBEDDING_BASE_URL", "https://api.openai.com/v1"
|
||||||
|
)
|
||||||
|
openai_api_key = os.getenv("OPENAI_EMBEDDING_API_KEY", "dummy_key_for_template")
|
||||||
|
openai_embed_model = os.getenv(
|
||||||
|
"OPENAI_EMBEDDING_MODEL", "text-embedding-3-small"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set the API key in environment for OpenAI
|
||||||
|
os.environ["OPENAI_API_KEY"] = openai_api_key
|
||||||
|
|
||||||
|
logger.info(f"Initializing OpenAI embedding model: {openai_embed_model}")
|
||||||
|
|
||||||
|
embed_model = OpenAILikeEmbedding(
|
||||||
|
model_name=openai_embed_model,
|
||||||
|
api_base=openai_base_url,
|
||||||
|
api_key=openai_api_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
return embed_model
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported EMBEDDING_STRATEGY: {strategy}. Supported values are 'ollama' and 'openai'"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_llm_model():
|
||||||
|
"""
|
||||||
|
Get the appropriate LLM model based on the CHAT_STRATEGY environment variable.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An LLM model instance based on the selected strategy
|
||||||
|
"""
|
||||||
|
strategy = os.getenv("CHAT_STRATEGY", "ollama").lower()
|
||||||
|
|
||||||
|
if strategy == "ollama":
|
||||||
|
from llama_index.llms.ollama import Ollama
|
||||||
|
|
||||||
|
ollama_chat_model = os.getenv("OLLAMA_CHAT_MODEL", "nemotron-mini:4b")
|
||||||
|
ollama_base_url = "http://localhost:11434"
|
||||||
|
|
||||||
|
logger.info(f"Initializing Ollama chat model: {ollama_chat_model}")
|
||||||
|
|
||||||
|
llm = Ollama(
|
||||||
|
model=ollama_chat_model,
|
||||||
|
base_url=ollama_base_url,
|
||||||
|
request_timeout=120.0, # Increase timeout for longer responses
|
||||||
|
)
|
||||||
|
|
||||||
|
return llm
|
||||||
|
|
||||||
|
elif strategy == "openai":
|
||||||
|
from llama_index.llms.openai import OpenAI
|
||||||
|
|
||||||
|
openai_chat_url = os.getenv("OPENAI_CHAT_URL", "https://api.openai.com/v1")
|
||||||
|
openai_chat_key = os.getenv("OPENAI_CHAT_KEY", "dummy_key_for_template")
|
||||||
|
openai_chat_model = os.getenv("OPENAI_CHAT_MODEL", "gpt-3.5-turbo")
|
||||||
|
|
||||||
|
# Set the API key in environment for OpenAI
|
||||||
|
os.environ["OPENAI_API_KEY"] = openai_chat_key
|
||||||
|
|
||||||
|
logger.info(f"Initializing OpenAI chat model: {openai_chat_model}")
|
||||||
|
|
||||||
|
llm = OpenAI(model=openai_chat_model, api_base=openai_chat_url)
|
||||||
|
|
||||||
|
return llm
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported CHAT_STRATEGY: {strategy}. Supported values are 'ollama' and 'openai'"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_model_configurations():
|
||||||
|
"""
|
||||||
|
Get both embedding and LLM model configurations based on environment variables.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A tuple of (embedding_model, llm_model)
|
||||||
|
"""
|
||||||
|
embed_model = get_embedding_model()
|
||||||
|
llm_model = get_llm_model()
|
||||||
|
|
||||||
|
return embed_model, llm_model
|
||||||
|
|
||||||
|
|
||||||
|
def setup_global_models():
|
||||||
|
"""
|
||||||
|
Set up the global models in LlamaIndex Settings to prevent defaulting to OpenAI.
|
||||||
|
"""
|
||||||
|
from llama_index.core import Settings
|
||||||
|
|
||||||
|
embed_model, llm_model = get_model_configurations()
|
||||||
|
|
||||||
|
# Set as the global embedding model
|
||||||
|
Settings.embed_model = embed_model
|
||||||
|
|
||||||
|
# Set as the global LLM
|
||||||
|
Settings.llm = llm_model
|
||||||
|
|
||||||
|
logger.info("Global models configured successfully based on environment variables")
|
||||||
@@ -13,6 +13,7 @@ from typing import List, Dict, Any
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
from llama_index.core import SimpleDirectoryReader, Document
|
from llama_index.core import SimpleDirectoryReader, Document
|
||||||
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
|
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
|
||||||
@@ -20,6 +21,9 @@ from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
|
|||||||
|
|
||||||
from vector_storage import get_vector_store_and_index
|
from vector_storage import get_vector_store_and_index
|
||||||
|
|
||||||
|
# Import the new configuration module
|
||||||
|
from config import get_embedding_model
|
||||||
|
|
||||||
|
|
||||||
class DocumentTracker:
|
class DocumentTracker:
|
||||||
"""Class to handle tracking of processed documents to avoid re-processing."""
|
"""Class to handle tracking of processed documents to avoid re-processing."""
|
||||||
@@ -259,13 +263,18 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
|
|||||||
processed_count = 0
|
processed_count = 0
|
||||||
skipped_count = 0
|
skipped_count = 0
|
||||||
|
|
||||||
|
# Initialize progress bar
|
||||||
|
pbar = tqdm(total=len(all_files), desc="Processing documents", unit="file")
|
||||||
|
|
||||||
for file_path in all_files:
|
for file_path in all_files:
|
||||||
logger.info(f"Processing file: {file_path}")
|
logger.info(f"Processing file: {file_path} ({processed_count + skipped_count + 1}/{len(all_files)})")
|
||||||
|
|
||||||
# Check if document has already been processed
|
# Check if document has already been processed
|
||||||
if tracker.is_document_processed(file_path):
|
if tracker.is_document_processed(file_path):
|
||||||
logger.info(f"Skipping already processed file: {file_path}")
|
logger.info(f"Skipping already processed file: {file_path}")
|
||||||
skipped_count += 1
|
skipped_count += 1
|
||||||
|
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
|
||||||
|
pbar.update(1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -344,11 +353,15 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
|
|||||||
# Mark document as processed only after successful insertion
|
# Mark document as processed only after successful insertion
|
||||||
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
|
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
|
||||||
processed_count += 1
|
processed_count += 1
|
||||||
|
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing file {file_path}: {str(e)}")
|
logger.error(f"Error processing file {file_path}: {str(e)}")
|
||||||
continue
|
|
||||||
|
|
||||||
|
# Update progress bar regardless of success or failure
|
||||||
|
pbar.update(1)
|
||||||
|
|
||||||
|
pbar.close()
|
||||||
logger.info(f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}")
|
logger.info(f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
71
services/rag/llamaindex/helpers/embedding.py
Normal file
71
services/rag/llamaindex/helpers/embedding.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
from typing import List
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from llama_index.core.embeddings import BaseEmbedding
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
|
||||||
|
class OpenAICompatibleEmbedding(BaseEmbedding):
|
||||||
|
model: str = Field(...)
|
||||||
|
api_key: str = Field(...)
|
||||||
|
api_base: str = Field(...)
|
||||||
|
timeout: int = Field(default=60)
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model: str,
|
||||||
|
api_key: str,
|
||||||
|
api_base: str,
|
||||||
|
timeout: int = 60,
|
||||||
|
):
|
||||||
|
self.model = model
|
||||||
|
self.api_key = api_key
|
||||||
|
self.api_base = api_base.rstrip("/")
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
# ---------- low-level call ----------
|
||||||
|
|
||||||
|
def _embed(self, texts: List[str]) -> List[List[float]]:
|
||||||
|
url = f"{self.api_base}/embeddings"
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Bearer {self.api_key}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"input": texts,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = requests.post(
|
||||||
|
url,
|
||||||
|
headers=headers,
|
||||||
|
json=payload,
|
||||||
|
timeout=self.timeout,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
return [item["embedding"] for item in data["data"]]
|
||||||
|
|
||||||
|
# ---------- document embeddings ----------
|
||||||
|
|
||||||
|
def _get_text_embedding(self, text: str) -> List[float]:
|
||||||
|
return self._embed([text])[0]
|
||||||
|
|
||||||
|
def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
|
||||||
|
return self._embed(texts)
|
||||||
|
|
||||||
|
async def _aget_text_embedding(self, text: str) -> List[float]:
|
||||||
|
return self._get_text_embedding(text)
|
||||||
|
|
||||||
|
async def _aget_text_embeddings(self, texts: List[str]) -> List[List[float]]:
|
||||||
|
return self._get_text_embeddings(texts)
|
||||||
|
|
||||||
|
# ---------- query embeddings (REQUIRED) ----------
|
||||||
|
|
||||||
|
def _get_query_embedding(self, query: str) -> List[float]:
|
||||||
|
# bge-m3 uses same embedding for query & doc
|
||||||
|
return self._embed([query])[0]
|
||||||
|
|
||||||
|
async def _aget_query_embedding(self, query: str) -> List[float]:
|
||||||
|
return self._get_query_embedding(query)
|
||||||
@@ -16,33 +16,8 @@ from pathlib import Path
|
|||||||
|
|
||||||
from vector_storage import get_vector_store_and_index
|
from vector_storage import get_vector_store_and_index
|
||||||
|
|
||||||
|
# Import the new configuration module
|
||||||
from llama_index.embeddings.ollama import OllamaEmbedding
|
from config import setup_global_models
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def setup_global_models():
|
|
||||||
"""Set up the global models to prevent defaulting to OpenAI."""
|
|
||||||
# Set up the embedding model
|
|
||||||
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
|
||||||
ollama_base_url = "http://localhost:11434"
|
|
||||||
|
|
||||||
embed_model = OllamaEmbedding(
|
|
||||||
model_name=ollama_embed_model,
|
|
||||||
base_url=ollama_base_url
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set as the global embedding model
|
|
||||||
Settings.embed_model = embed_model
|
|
||||||
|
|
||||||
# Set up the LLM model
|
|
||||||
ollama_chat_model = os.getenv("OLLAMA_CHAT_MODEL", "nemotron-mini:4b")
|
|
||||||
|
|
||||||
from llama_index.llms.ollama import Ollama
|
|
||||||
llm = Ollama(model=ollama_chat_model, base_url=ollama_base_url)
|
|
||||||
|
|
||||||
# Set as the global LLM
|
|
||||||
Settings.llm = llm
|
|
||||||
|
|
||||||
|
|
||||||
def initialize_retriever(
|
def initialize_retriever(
|
||||||
|
|||||||
@@ -3,18 +3,20 @@ Vector storage configuration for the RAG solution using LlamaIndex and Qdrant.
|
|||||||
|
|
||||||
This module provides initialization and configuration for:
|
This module provides initialization and configuration for:
|
||||||
- Qdrant vector storage connection
|
- Qdrant vector storage connection
|
||||||
- Ollama embedding model
|
- Embedding model based on configured strategy
|
||||||
- Automatic collection creation
|
- Automatic collection creation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from llama_index.core import VectorStoreIndex
|
from llama_index.core import VectorStoreIndex
|
||||||
from llama_index.vector_stores.qdrant import QdrantVectorStore
|
from llama_index.vector_stores.qdrant import QdrantVectorStore
|
||||||
from llama_index.embeddings.ollama import OllamaEmbedding
|
|
||||||
from llama_index.llms.ollama import Ollama
|
|
||||||
from qdrant_client import QdrantClient
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
|
||||||
|
# Import the new configuration module
|
||||||
|
from config import get_embedding_model
|
||||||
|
|
||||||
|
|
||||||
def initialize_vector_storage(
|
def initialize_vector_storage(
|
||||||
@@ -22,41 +24,29 @@ def initialize_vector_storage(
|
|||||||
host: str = "localhost",
|
host: str = "localhost",
|
||||||
port: int = 6333,
|
port: int = 6333,
|
||||||
grpc_port: int = 6334,
|
grpc_port: int = 6334,
|
||||||
ollama_base_url: str = "http://localhost:11434",
|
|
||||||
ollama_embed_model: Optional[str] = None
|
|
||||||
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
||||||
"""
|
"""
|
||||||
Initialize Qdrant vector storage with Ollama embeddings.
|
Initialize Qdrant vector storage with embedding model based on configured strategy.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
collection_name: Name of the Qdrant collection
|
collection_name: Name of the Qdrant collection
|
||||||
host: Qdrant host address
|
host: Qdrant host address
|
||||||
port: Qdrant REST API port
|
port: Qdrant REST API port
|
||||||
grpc_port: Qdrant gRPC API port
|
grpc_port: Qdrant gRPC API port
|
||||||
ollama_base_url: Base URL for Ollama API
|
|
||||||
ollama_embed_model: Name of the Ollama embedding model
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
||||||
"""
|
"""
|
||||||
logger.info(f"Initializing vector storage with collection: {collection_name}")
|
logger.info(f"Initializing vector storage with collection: {collection_name}")
|
||||||
|
|
||||||
# Get embedding model from environment if not provided
|
|
||||||
if ollama_embed_model is None:
|
|
||||||
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
|
||||||
|
|
||||||
logger.info(f"Using Ollama embedding model: {ollama_embed_model}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Initialize Qdrant client
|
# Initialize Qdrant client
|
||||||
client = QdrantClient(host=host, port=port)
|
client = QdrantClient(host=host, port=port)
|
||||||
|
|
||||||
# Initialize the embedding model first to get the correct dimensions
|
# Get the embedding model based on the configured strategy
|
||||||
embed_model = OllamaEmbedding(
|
embed_model = get_embedding_model()
|
||||||
model_name=ollama_embed_model,
|
|
||||||
base_url=ollama_base_url
|
# Get a test embedding to determine the correct dimensions
|
||||||
)
|
|
||||||
# Get a test embedding to determine the correct size
|
|
||||||
test_embedding = embed_model.get_text_embedding("test")
|
test_embedding = embed_model.get_text_embedding("test")
|
||||||
embedding_dimension = len(test_embedding)
|
embedding_dimension = len(test_embedding)
|
||||||
logger.info(f"Detected embedding dimension: {embedding_dimension}")
|
logger.info(f"Detected embedding dimension: {embedding_dimension}")
|
||||||
@@ -71,55 +61,66 @@ def initialize_vector_storage(
|
|||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
vectors_config={
|
vectors_config={
|
||||||
"size": embedding_dimension, # Use the actual embedding size
|
"size": embedding_dimension, # Use the actual embedding size
|
||||||
"distance": "Cosine" # Cosine distance is commonly used
|
"distance": "Cosine", # Cosine distance is commonly used
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}"
|
||||||
)
|
)
|
||||||
logger.info(f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}")
|
|
||||||
else:
|
else:
|
||||||
logger.info(f"Collection '{collection_name}' already exists")
|
logger.info(f"Collection '{collection_name}' already exists")
|
||||||
# Get the actual collection config to determine the vector size
|
# Get the actual collection config to determine the vector size
|
||||||
collection_info = client.get_collection(collection_name)
|
collection_info = client.get_collection(collection_name)
|
||||||
# Access the vector configuration properly - handle different possible structures
|
# Access the vector configuration properly - handle different possible structures
|
||||||
if hasattr(collection_info.config.params, 'vectors') and collection_info.config.params.vectors is not None:
|
if (
|
||||||
|
hasattr(collection_info.config.params, "vectors")
|
||||||
|
and collection_info.config.params.vectors is not None
|
||||||
|
):
|
||||||
existing_dimension = collection_info.config.params.vectors.size
|
existing_dimension = collection_info.config.params.vectors.size
|
||||||
if existing_dimension != embedding_dimension:
|
if existing_dimension != embedding_dimension:
|
||||||
logger.warning(f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating...")
|
logger.warning(
|
||||||
|
f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating..."
|
||||||
|
)
|
||||||
# Delete and recreate the collection with the correct dimensions
|
# Delete and recreate the collection with the correct dimensions
|
||||||
client.delete_collection(collection_name)
|
client.delete_collection(collection_name)
|
||||||
client.create_collection(
|
client.create_collection(
|
||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
vectors_config={
|
vectors_config={
|
||||||
"size": embedding_dimension, # Use the detected size
|
"size": embedding_dimension, # Use the detected size
|
||||||
"distance": "Cosine"
|
"distance": "Cosine",
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
|
||||||
)
|
)
|
||||||
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
|
|
||||||
else:
|
else:
|
||||||
logger.info(f"Using existing collection with matching dimension: {embedding_dimension}")
|
logger.info(
|
||||||
|
f"Using existing collection with matching dimension: {embedding_dimension}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Last resort: recreate the collection with the correct dimensions
|
# Last resort: recreate the collection with the correct dimensions
|
||||||
logger.warning(f"Could not determine vector dimension for existing collection, recreating...")
|
logger.warning(
|
||||||
|
f"Could not determine vector dimension for existing collection, recreating..."
|
||||||
|
)
|
||||||
# Delete and recreate the collection with the correct dimensions
|
# Delete and recreate the collection with the correct dimensions
|
||||||
client.delete_collection(collection_name)
|
client.delete_collection(collection_name)
|
||||||
client.create_collection(
|
client.create_collection(
|
||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
vectors_config={
|
vectors_config={
|
||||||
"size": embedding_dimension, # Use the detected size
|
"size": embedding_dimension, # Use the detected size
|
||||||
"distance": "Cosine"
|
"distance": "Cosine",
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
|
||||||
)
|
)
|
||||||
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
|
|
||||||
|
|
||||||
# Initialize the Qdrant vector store
|
# Initialize the Qdrant vector store
|
||||||
vector_store = QdrantVectorStore(
|
vector_store = QdrantVectorStore(client=client, collection_name=collection_name)
|
||||||
client=client,
|
|
||||||
collection_name=collection_name
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create index from vector store with the embedding model we already created
|
# Create index from vector store with the embedding model we already created
|
||||||
index = VectorStoreIndex.from_vector_store(
|
index = VectorStoreIndex.from_vector_store(
|
||||||
vector_store=vector_store,
|
vector_store=vector_store, embed_model=embed_model
|
||||||
embed_model=embed_model
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Vector storage initialized successfully")
|
logger.info("Vector storage initialized successfully")
|
||||||
@@ -130,21 +131,6 @@ def initialize_vector_storage(
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
# Optional: Alternative embedding configuration using OpenAI via OpenRouter
|
|
||||||
# Uncomment and configure as needed for future use
|
|
||||||
# from llama_index.embeddings.openai import OpenAIEmbedding
|
|
||||||
#
|
|
||||||
# def initialize_openai_embeddings():
|
|
||||||
# # Use OpenRouter API key from environment
|
|
||||||
# os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "")
|
|
||||||
#
|
|
||||||
# embed_model = OpenAIEmbedding(
|
|
||||||
# model="openai/text-embedding-3-small", # Or another suitable model
|
|
||||||
# api_base="https://openrouter.ai/api/v1" # OpenRouter endpoint
|
|
||||||
# )
|
|
||||||
# return embed_model
|
|
||||||
|
|
||||||
|
|
||||||
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
||||||
"""
|
"""
|
||||||
Convenience function to get the initialized vector store and index.
|
Convenience function to get the initialized vector store and index.
|
||||||
@@ -152,9 +138,7 @@ def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
|
|||||||
Returns:
|
Returns:
|
||||||
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
Tuple of (QdrantVectorStore, VectorStoreIndex)
|
||||||
"""
|
"""
|
||||||
# Get the embedding model name from environment variables
|
return initialize_vector_storage()
|
||||||
embed_model_name = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
|
|
||||||
return initialize_vector_storage(ollama_embed_model=embed_model_name)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
145
unzip_archives.sh
Executable file
145
unzip_archives.sh
Executable file
@@ -0,0 +1,145 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# Script to recursively unzip archives in the data folder
|
||||||
|
# Valid archives are extracted in place, then moved to data-unpacked-archives
|
||||||
|
# Invalid/broken archives are moved to data-broken-archives
|
||||||
|
|
||||||
|
set -e # Exit on any error
|
||||||
|
|
||||||
|
DATA_DIR="./data"
|
||||||
|
UNPACKED_DIR="./data-unpacked-archives"
|
||||||
|
BROKEN_DIR="./data-broken-archives"
|
||||||
|
|
||||||
|
# Create destination directories if they don't exist
|
||||||
|
mkdir -p "$UNPACKED_DIR"
|
||||||
|
mkdir -p "$BROKEN_DIR"
|
||||||
|
|
||||||
|
# Find all zip files recursively in the data directory
|
||||||
|
find "$DATA_DIR" -type f -name "*.zip" | while read -r archive; do
|
||||||
|
echo "Processing: $archive"
|
||||||
|
|
||||||
|
# Check if the zip file is valid and not password protected
|
||||||
|
if unzip -t "$archive" >/dev/null 2>&1; then
|
||||||
|
echo " Archive is valid, extracting..."
|
||||||
|
|
||||||
|
# Extract the archive in the same directory where it's located
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
unzip -o "$archive" -d "$ARCHIVE_DIR"
|
||||||
|
|
||||||
|
# Move the processed archive to the unpacked directory
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
echo " Archive is invalid, password-protected, or in unsupported format"
|
||||||
|
|
||||||
|
# Move the broken archive to the broken directory
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Also handle other common archive formats that might be present
|
||||||
|
for ext in rar 7z tar.gz tar.xz tar.bz2 gz xz bz2 tar; do
|
||||||
|
find "$DATA_DIR" -type f -name "*.$ext" | while read -r archive; do
|
||||||
|
echo "Processing: $archive (non-zip format)"
|
||||||
|
|
||||||
|
case $ext in
|
||||||
|
rar)
|
||||||
|
if command -v unrar >/dev/null 2>&1; then
|
||||||
|
if unrar l "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
unrar x "$archive" "$ARCHIVE_DIR"/
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted RAR and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process RAR, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " unrar not available, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
7z)
|
||||||
|
if command -v 7z >/dev/null 2>&1; then
|
||||||
|
if 7z l "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
7z x "$archive" -o"$ARCHIVE_DIR"/
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted 7z and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process 7z, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " 7z not available, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
tar.gz|tgz|gz)
|
||||||
|
if gunzip -t "$archive" >/dev/null 2>&1 || tar -tzf "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
if [[ "$ext" == "gz" ]]; then
|
||||||
|
# For gz files, we need to decompress in place
|
||||||
|
cp "$archive" "$ARCHIVE_DIR/"
|
||||||
|
gzip -d "$ARCHIVE_DIR/$(basename "$archive")"
|
||||||
|
else
|
||||||
|
tar -xzf "$archive" -C "$ARCHIVE_DIR"/
|
||||||
|
fi
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process $ext, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
tar.bz2|bz2)
|
||||||
|
if bzip2 -t "$archive" >/dev/null 2>&1 || tar -tjf "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
if [[ "$ext" == "bz2" ]]; then
|
||||||
|
# For bz2 files, we need to decompress in place
|
||||||
|
cp "$archive" "$ARCHIVE_DIR/"
|
||||||
|
bzip2 -d "$ARCHIVE_DIR/$(basename "$archive")"
|
||||||
|
else
|
||||||
|
tar -xjf "$archive" -C "$ARCHIVE_DIR"/
|
||||||
|
fi
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process $ext, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
tar.xz|xz)
|
||||||
|
if xz -t "$archive" >/dev/null 2>&1 || tar -tJf "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
if [[ "$ext" == "xz" ]]; then
|
||||||
|
# For xz files, we need to decompress in place
|
||||||
|
cp "$archive" "$ARCHIVE_DIR/"
|
||||||
|
xz -d "$ARCHIVE_DIR/$(basename "$archive")"
|
||||||
|
else
|
||||||
|
tar -xJf "$archive" -C "$ARCHIVE_DIR"/
|
||||||
|
fi
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process $ext, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
tar)
|
||||||
|
if tar -tf "$archive" >/dev/null 2>&1; then
|
||||||
|
ARCHIVE_DIR=$(dirname "$archive")
|
||||||
|
tar -xf "$archive" -C "$ARCHIVE_DIR"/
|
||||||
|
mv "$archive" "$UNPACKED_DIR/"
|
||||||
|
echo " Successfully extracted TAR and moved to $UNPACKED_DIR"
|
||||||
|
else
|
||||||
|
mv "$archive" "$BROKEN_DIR/"
|
||||||
|
echo " Could not process TAR, moved to $BROKEN_DIR"
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "Processing complete!"
|
||||||
Reference in New Issue
Block a user