Compare commits

...

5 Commits

19 changed files with 781 additions and 196 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
data-unpacked-archives
data-broken-archives

BIN
services/rag/.DS_Store vendored

Binary file not shown.

View File

@@ -0,0 +1,4 @@
# Things to remember but not to implement right away. Wanna-have, nice-to-have, look-into
- [x] Year. Extract year from the chunk, store it into metadata, then add filter when retrieving, if query too has a year.
- [ ] Rankers. Should we use rerankers in our pipeline? What does it mean. Get from the vector storage a lot more results, than needed. Rerank them with reranker model, then feed the limited rest further into our pipeline.

View File

@@ -56,3 +56,12 @@ Chosen data folder: relatve ./../../../data - from the current folder
- [x] After accepting API endpont address, it should be used to send requests and process responses to imitate chat with the agent by the provided API endpoint.
- [x] Show API endpoint in the header of the chat.
- [x] If there is error connecting with the API, imitate bot sending message about error with the connection and suggestion to reload page to provide new API endpoint
# Phase 10 (extracting additional metadata from chunks, and filtering where possible with it)
- [x] Create separate function in helpers module (create if does not exist) for retrieving years from the text. It should return found years.
- [x] During enriching vector storage, when loading and splitting documents, extract years from the chunk, and add these years as numbers into metadata field "years" (array of number or best suitable Qdrant type for searching by the year if needed). The helper function for retrieving years from text can be used.
- [x] Updating VectorStoreRetriever._get_relevant_documents: We need to ensure, that when searching for something with the year (user mentiones year in the query, in Russian language), we search vectors with metadata which has these mentioned year in the "years" array of years. The helper function for retrieving years from query can be used to filter out documents with years.
- [x] Create heuristic, regex function in helpers module for extracting name of event, in Russian language. We need to use regex and possible words before, after the event, etc.
- [x] Durint enriching vector storage, try to extract event name from the chunk and save in metadata in field "events", which will contain list of strings, possible evennts. Helper function usage is advised.
- [x] In VectorStoreRetriever._get_relevant_documents add similarity search for the event name, if event name is present in the query. Helper function should be used here to try to extract the event name.

View File

@@ -21,7 +21,9 @@ from vector_storage import initialize_vector_store
load_dotenv()
def get_llm_model_info(llm_model: str = None) -> Tuple[str, str, str, str, str]:
def get_llm_model_info(
llm_model: Optional[str] = None,
) -> Tuple[str, str, str, str, str]:
"""
Get LLM model information based on environment configuration.
@@ -121,7 +123,7 @@ class DocumentRetrievalTool(BaseTool):
def create_chat_agent(
collection_name: str = "documents_langchain", llm_model: str = None
collection_name: str = "documents_langchain", llm_model: Optional[str] = None
) -> Any:
"""
Create a chat agent with document retrieval capabilities.
@@ -177,7 +179,7 @@ def create_chat_agent(
def chat_with_agent(
query: str,
collection_name: str = "documents_langchain",
llm_model: str = None,
llm_model: Optional[str] = None,
history: List[BaseMessage] = None,
) -> Dict[str, Any]:
"""

View File

@@ -1,8 +1,8 @@
import os
from pathlib import Path
from dotenv import load_dotenv
import click
from dotenv import load_dotenv
from loguru import logger
# Load environment variables
@@ -85,36 +85,9 @@ def retrieve(query, collection_name, top_k):
"""Retrieve documents from vector database based on a query"""
logger.info(f"Starting retrieval process for query: {query}")
try:
# Import here to avoid circular dependencies
from retrieval import search_documents_with_metadata
# Perform retrieval
results = search_documents_with_metadata(
query=query,
collection_name=collection_name,
top_k=top_k
)
if not results:
click.echo("No relevant documents found for the query.")
return
click.echo(f"Found {len(results)} relevant documents:\n")
for i, result in enumerate(results, 1):
click.echo(f"{i}. Source: {result['source']}")
click.echo(f" Filename: {result['filename']}")
click.echo(f" Page: {result['page_number']}")
click.echo(f" File Extension: {result['file_extension']}")
click.echo(f" Content Preview: {result['content'][:200]}...")
click.echo(f" Metadata: {result['metadata']}\n")
logger.info("Retrieval process completed successfully!")
except Exception as e:
logger.error(f"Error during retrieval process: {str(e)}")
click.echo(f"Error: {str(e)}")
click.echo(
"WARNING: Retrieval disabled, since it is no longer relevant for the testing of the retrieving feature. Use chat with agent instead. xoxo"
)
@cli.command(
@@ -143,10 +116,7 @@ def chat(collection_name, model):
click.echo("Type 'quit' or 'exit' to end the conversation.\n")
# Run the interactive chat loop
run_chat_loop(
collection_name=collection_name,
llm_model=model
)
run_chat_loop(collection_name=collection_name, llm_model=model)
logger.info("Chat session ended")

View File

@@ -39,6 +39,8 @@ from sqlalchemy.orm import sessionmaker
from loguru import logger
import sqlite3
from helpers import extract_russian_event_names, extract_years_from_text
# Load environment variables
load_dotenv()
@@ -189,6 +191,13 @@ class DocumentEnricher:
# Split documents if they are too large
split_docs = self.text_splitter.split_documents(docs)
# Extract additional metadata from each chunk.
for chunk in split_docs:
years = extract_years_from_text(chunk.page_content)
events = extract_russian_event_names(chunk.page_content)
chunk.metadata["years"] = years
chunk.metadata["events"] = events
# Add to the collection
all_docs.extend(split_docs)

View File

@@ -0,0 +1,107 @@
"""Helper utilities for metadata extraction from Russian text."""
import re
from typing import List
_YEAR_PATTERN = re.compile(r"(?<!\d)(1\d{3}|20\d{2}|2100)(?!\d)")
_EVENT_KEYWORDS = (
"конференц",
"форум",
"выставк",
"фестивал",
"саммит",
"чемпионат",
"олимпиад",
"кубок",
"конкурс",
"вебинар",
"семинар",
"лекци",
"презентаци",
"хакатон",
"митап",
"встреч",
"съезд",
"конгресс",
)
_EVENT_PHRASE_PATTERN = re.compile(
r"\b("
r"конференц(?:ия|ии|ию|ией)?|"
r"форум(?:а|е|у|ом)?|"
r"выставк(?:а|и|е|у|ой)?|"
r"фестивал(?:ь|я|е|ю|ем)?|"
r"саммит(?:а|е|у|ом)?|"
r"чемпионат(?:а|е|у|ом)?|"
r"олимпиад(?:а|ы|е|у|ой)?|"
r"кубок(?:а|е|у|ом)?|"
r"конкурс(?:а|е|у|ом)?|"
r"вебинар(?:а|е|у|ом)?|"
r"семинар(?:а|е|у|ом)?|"
r"лекци(?:я|и|ю|ей)?|"
r"презентаци(?:я|и|ю|ей)?|"
r"хакатон(?:а|е|у|ом)?|"
r"митап(?:а|е|у|ом)?|"
r"встреч(?:а|и|е|у|ей)?|"
r"съезд(?:а|е|у|ом)?|"
r"конгресс(?:а|е|у|ом)?"
r")\b(?:\s+[A-Za-zА-Яа-я0-9][A-Za-zА-Яа-я0-9\-_/.]{1,40}){0,6}",
flags=re.IGNORECASE,
)
_QUOTED_EVENT_PATTERN = re.compile(
r"(?:мероприят(?:ие|ия|ию|ием)|событ(?:ие|ия|ию|ием)|"
r"конференц(?:ия|ии|ию|ией)?|форум(?:а|е|у|ом)?|"
r"выставк(?:а|и|е|у|ой)?|фестивал(?:ь|я|е|ю|ем)?)"
r"[^\n\"«»]{0,40}[«\"]([^»\"\n]{3,120})[»\"]",
flags=re.IGNORECASE,
)
def _normalize_event(value: str) -> str:
normalized = " ".join(value.strip().split()).strip(".,;:!?()[]{}")
return normalized.lower()
def extract_years_from_text(text: str) -> List[int]:
"""Extract unique years from text as integers."""
if not text:
return []
years = {int(match.group(0)) for match in _YEAR_PATTERN.finditer(text)}
return sorted(years)
def extract_russian_event_names(text: str) -> List[str]:
"""
Extract likely Russian event names from text using heuristic regex rules.
Returns normalized event phrases in lowercase.
"""
if not text:
return []
events: List[str] = []
seen = set()
for match in _EVENT_PHRASE_PATTERN.finditer(text):
candidate = _normalize_event(match.group(0))
if len(candidate) < 6:
continue
if not any(keyword in candidate for keyword in _EVENT_KEYWORDS):
continue
if candidate not in seen:
events.append(candidate)
seen.add(candidate)
for match in _QUOTED_EVENT_PATTERN.finditer(text):
quoted = _normalize_event(match.group(1))
if len(quoted) < 3:
continue
if quoted not in seen:
events.append(quoted)
seen.add(quoted)
return events

View File

@@ -1,13 +1,16 @@
"""Retrieval module for querying vector storage and returning relevant documents with metadata."""
import os
from typing import List, Optional
from typing import List
from dotenv import load_dotenv
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from loguru import logger
from qdrant_client.http.models import FieldCondition, Filter, MatchAny
from helpers import extract_russian_event_names, extract_years_from_text
from vector_storage import initialize_vector_store
# Load environment variables
@@ -22,6 +25,91 @@ class VectorStoreRetriever(BaseRetriever):
vector_store: object # Qdrant vector store instance
top_k: int = 5 # Number of documents to retrieve
def _build_qdrant_filter(
self, years: List[int], events: List[str]
) -> Filter | None:
"""Build a Qdrant payload filter for extracted years and events."""
conditions: List[FieldCondition] = []
if years:
conditions.extend(
[
FieldCondition(
key="metadata.years",
match=MatchAny(any=years),
),
FieldCondition(
key="years",
match=MatchAny(any=years),
),
]
)
if events:
conditions.extend(
[
FieldCondition(
key="metadata.events",
match=MatchAny(any=events),
),
FieldCondition(
key="events",
match=MatchAny(any=events),
),
]
)
if not conditions:
return None
return Filter(should=conditions)
@staticmethod
def _post_filter_documents(
documents: List[Document], years: List[int], events: List[str]
) -> List[Document]:
"""Fallback filter in Python in case vector DB filter cannot be applied."""
if not years and not events:
return documents
year_set = set(years)
event_set = set(events)
filtered: List[Document] = []
for doc in documents:
metadata = doc.metadata or {}
doc_years = {
int(year)
for year in metadata.get("years", [])
if isinstance(year, int) or (isinstance(year, str) and year.isdigit())
}
doc_events = {str(event).lower() for event in metadata.get("events", [])}
year_match = not year_set or bool(doc_years.intersection(year_set))
event_match = not event_set or bool(doc_events.intersection(event_set))
if year_match and event_match:
filtered.append(doc)
return filtered
@staticmethod
def _merge_unique_documents(documents: List[Document]) -> List[Document]:
"""Deduplicate documents while preserving order."""
unique_docs: List[Document] = []
seen = set()
for doc in documents:
dedup_key = (
doc.metadata.get("source", ""),
doc.metadata.get("page_number", doc.metadata.get("page", "")),
doc.page_content[:200],
)
if dedup_key in seen:
continue
seen.add(dedup_key)
unique_docs.append(doc)
return unique_docs
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
@@ -38,8 +126,54 @@ class VectorStoreRetriever(BaseRetriever):
logger.info(f"Searching for documents related to query: {query[:50]}...")
try:
# Perform similarity search on the vector store
results = self.vector_store.similarity_search(query, k=self.top_k)
years_in_query = extract_years_from_text(query)
events_in_query = extract_russian_event_names(query)
search_filter = self._build_qdrant_filter(years_in_query, events_in_query)
logger.info(
f"Extracted query metadata for retrieval: years={years_in_query}, events={events_in_query}"
)
# Main search by original user query.
search_k = max(self.top_k * 3, self.top_k)
if search_filter is not None:
try:
results = self.vector_store.similarity_search(
query, k=search_k, filter=search_filter
)
except Exception as filter_error:
logger.warning(
f"Vector store filter failed, fallback to unfiltered search: {filter_error}"
)
results = self.vector_store.similarity_search(query, k=search_k)
results = self._post_filter_documents(
results, years_in_query, events_in_query
)
else:
results = self.vector_store.similarity_search(query, k=search_k)
# Additional event-focused similarity search if event names are present.
if events_in_query:
event_results: List[Document] = []
for event_name in events_in_query:
try:
if search_filter is not None:
event_docs = self.vector_store.similarity_search(
event_name, k=self.top_k, filter=search_filter
)
else:
event_docs = self.vector_store.similarity_search(
event_name, k=self.top_k
)
except Exception as event_search_error:
logger.warning(
f"Event-focused search failed for '{event_name}': {event_search_error}"
)
continue
event_results.extend(event_docs)
results.extend(event_results)
results = self._merge_unique_documents(results)[: self.top_k]
logger.info(f"Found {len(results)} relevant documents")
@@ -60,7 +194,9 @@ def create_retriever(collection_name: str = "documents_langchain", top_k: int =
Returns:
VectorStoreRetriever instance
"""
logger.info(f"Initializing vector store for retrieval from collection: {collection_name}")
logger.info(
f"Initializing vector store for retrieval from collection: {collection_name}"
)
# Initialize the vector store
vector_store = initialize_vector_store(collection_name=collection_name)
@@ -71,35 +207,8 @@ def create_retriever(collection_name: str = "documents_langchain", top_k: int =
return retriever
def search_documents(query: str, collection_name: str = "documents_langchain", top_k: int = 5) -> List[Document]:
"""
Search for documents in the vector store based on the query.
Args:
query: The query string to search for
collection_name: Name of the Qdrant collection to use
top_k: Number of documents to retrieve
Returns:
List of documents with metadata
"""
logger.info(f"Starting document search for query: {query}")
# Create the retriever
retriever = create_retriever(collection_name=collection_name, top_k=top_k)
# Perform the search
results = retriever.invoke(query)
logger.info(f"Search completed, returned {len(results)} documents")
return results
def search_documents_with_metadata(
query: str,
collection_name: str = "documents_langchain",
top_k: int = 5
query: str, collection_name: str = "documents_langchain", top_k: int = 5
) -> List[dict]:
"""
Search for documents and return them with detailed metadata.
@@ -129,30 +238,20 @@ def search_documents_with_metadata(
"metadata": doc.metadata,
"source": doc.metadata.get("source", "Unknown"),
"filename": doc.metadata.get("filename", "Unknown"),
"page_number": doc.metadata.get("page_number", doc.metadata.get("page", "N/A")),
"page_number": doc.metadata.get(
"page_number", doc.metadata.get("page", "N/A")
),
"file_extension": doc.metadata.get("file_extension", "N/A"),
"file_size": doc.metadata.get("file_size", "N/A")
"file_size": doc.metadata.get("file_size", "N/A"),
}
formatted_results.append(formatted_result)
logger.info(f"Metadata search completed, returned {len(formatted_results)} documents")
logger.info(
f"Metadata search completed, returned {len(formatted_results)} documents"
)
return formatted_results
except Exception as e:
logger.error(f"Error during document search with metadata: {str(e)}")
return []
if __name__ == "__main__":
# Example usage
query = "What is the main topic discussed in the documents?"
results = search_documents_with_metadata(query, top_k=5)
print(f"Found {len(results)} documents:")
for i, result in enumerate(results, 1):
print(f"\n{i}. Source: {result['source']}")
print(f" Filename: {result['filename']}")
print(f" Page: {result['page_number']}")
print(f" Content preview: {result['content'][:200]}...")
print(f" Metadata: {result['metadata']}")

View File

@@ -1,3 +1,14 @@
# Model Strategy Configuration
CHAT_STRATEGY=ollama
EMBEDDING_STRATEGY=ollama
# Ollama Configuration
OLLAMA_EMBEDDING_MODEL=MODEL
OLLAMA_CHAT_MODEL=MODEL
# OpenAI Configuration (for reference - uncomment and configure when using OpenAI strategy)
# OPENAI_CHAT_URL=https://api.openai.com/v1
# OPENAI_CHAT_KEY=your_openai_api_key_here
# OPENAI_EMBEDDING_MODEL=text-embedding-3-small
# OPENAI_EMBEDDING_BASE_URL=https://api.openai.com/v1
# OPENAI_EMBEDDING_API_KEY=your_openai_api_key_here

View File

@@ -35,8 +35,20 @@ Chosen data folder: relatve ./../../../data - from the current folder
- [x] Create file `retrieval.py` with the configuration for chosen RAG framework, that will retrieve data from the vector storage based on the query. Use retrieving library/plugin, that supports chosen vector storage within the chosen RAG framework. Retrieving configuration should search for the provided text in the query as argument in the function and return found information with the stored meta data, like paragraph, section, page etc. Important: if for chosen RAG framework, there is no need in separation of search, separation of retrieving from the chosen vector storage, this step may be skipped and marked done.
# Phase 6 (chat feature, as agent, for usage in the cli)
# Phase 6 (models strategy, loading env and update on using openai models)
- [ ] Create file `agent.py`, which will incorporate into itself agent, powered by the chat model. It should use integration with ollama, model specified in .env in property: OLLAMA_CHAT_MODEL
- [ ] Integrate this agent with the existing solution for retrieving, with retrieval.py
- [x] Add `CHAT_STRATEGY`, `EMBEDDING_STRATEGY` fields to .env, possible values are "openai" or "ollama".
- [x] Add `OPENAI_CHAT_URL`, `OPENAI_CHAT_KEY`, `OPENAI_EMBEDDING_MODEL`, `OPENAI_EMBEDDING_BASE_URL`, `OPENAI_EMBEDDING_API_KEY` values to .env.dist with dummy values and to .env with dummy values.
- [x] Add in all important .env wise places in the code loading .env file for it's variables
- [x] Create reusable function, that will return configuration for models. It will check CHAT_STRATEGY and load environment variables accordingly, and return config for usage.
- [x] Add this function everywhere in the codebase where chat or embedding models configuration needed
# Phase 7 (explicit logging and progressbar)
- [x] Add log of how many files currently being processed in enrichment. We need to see how many total to process and how many processed each time new document being processed. If it's possible, also add progressbar showing percentage and those numbers on top of logs.
# Phase 8 (chat feature, as agent, for usage in the cli)
- [ ] Create file `agent.py`, which will incorporate into itself agent, powered by the chat model. It should use integration with openai, env variables are configure
- [ ] Integrate this agent with the existing solution for retrieving, with retrieval.py, if it's possible in current chosen RAG framework
- [ ] Integrate this agent with the cli, as command to start chatting with the agent. If there is a built-in solution for console communication with the agent, initiate this on cli command.

View File

@@ -16,6 +16,7 @@ The system has been enhanced to properly handle Russian language documents with
### Architecture Components
- CLI entry point (`cli.py`)
- Configuration module (`config.py`) - manages model strategies and environment variables
- Document enrichment module (`enrichment.py`)
- Vector storage configuration (`vector_storage.py`)
- Retrieval module (`retrieval.py`)
@@ -57,9 +58,15 @@ The system has been enhanced to properly handle Russian language documents with
- Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
### Environment Variables
- `CHAT_STRATEGY`: Strategy for chat models ("ollama" or "openai")
- `EMBEDDING_STRATEGY`: Strategy for embedding models ("ollama" or "openai")
- `OLLAMA_EMBEDDING_MODEL`: Name of the Ollama model to use for embeddings
- `OLLAMA_CHAT_MODEL`: Name of the Ollama model to use for chat functionality
- API keys for external services (OpenRouter option available but commented out)
- `OPENAI_CHAT_URL`: URL for OpenAI-compatible chat API (when using OpenAI strategy)
- `OPENAI_CHAT_KEY`: API key for OpenAI-compatible chat API (when using OpenAI strategy)
- `OPENAI_EMBEDDING_MODEL`: Name of the OpenAI embedding model (when using OpenAI strategy)
- `OPENAI_EMBEDDING_BASE_URL`: Base URL for OpenAI-compatible embedding API (when using OpenAI strategy)
- `OPENAI_EMBEDDING_API_KEY`: API key for OpenAI-compatible embedding API (when using OpenAI strategy)
### Document Processing
- Support multiple file formats based on EXTENSIONS.md
@@ -105,7 +112,19 @@ The system has been enhanced to properly handle Russian language documents with
- [x] Query processing with metadata retrieval
- [x] Russian language/Cyrillic text encoding support
### Phase 6: Chat Agent
### Phase 6: Model Strategy
- [x] Add `CHAT_STRATEGY` and `EMBEDDING_STRATEGY` environment variables
- [x] Add OpenAI configuration options to .env files
- [x] Create reusable model configuration function
- [x] Update all modules to use the new configuration system
- [x] Ensure proper .env loading across all modules
### Phase 7: Enhanced Logging and Progress Tracking
- [x] Added progress bar using tqdm to show processing progress
- [x] Added logging to show total files and processed count during document enrichment
- [x] Enhanced user feedback during document processing with percentage and counts
### Phase 8: Chat Agent
- [ ] Agent module with Ollama integration
- [ ] Integration with retrieval module
- [ ] CLI command for chat functionality
@@ -115,9 +134,10 @@ The system has been enhanced to properly handle Russian language documents with
llamaindex/
├── venv/ # Python virtual environment
├── cli.py # CLI entry point
├── config.py # Configuration module for model strategies
├── vector_storage.py # Vector storage configuration
├── enrichment.py # Document loading and processing (to be created)
├── retrieval.py # Search and retrieval functionality (to be created)
├── enrichment.py # Document loading and processing
├── retrieval.py # Search and retrieval functionality
├── agent.py # Chat agent implementation (to be created)
├── EXTENSIONS.md # Supported file extensions and loaders
├── .env.dist # Environment variable template
@@ -141,3 +161,7 @@ The system expects documents to be placed in `./../../../data` relative to the p
- Check that the data directory contains supported file types
- Review logs in `logs/dev.log` for detailed error information
- For Russian/Cyrillic text issues, ensure proper encoding handling is configured in both enrichment and retrieval modules
## Important Notes
- Do not test long-running or heavy system scripts during development as they can consume significant system resources and take hours to complete
- The enrich command processes all files in the data directory and may require substantial memory and processing time

View File

@@ -7,6 +7,10 @@ import click
from loguru import logger
import sys
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def setup_logging():

View File

@@ -0,0 +1,144 @@
"""
Configuration module for managing model strategies in the RAG solution.
This module provides functions to get appropriate model configurations
based on environment variables for both embeddings and chat models.
"""
import os
from dotenv import load_dotenv
from loguru import logger
# Load environment variables from .env file
load_dotenv()
def get_embedding_model():
"""
Get the appropriate embedding model based on the EMBEDDING_STRATEGY environment variable.
Returns:
An embedding model instance based on the selected strategy
"""
strategy = os.getenv("EMBEDDING_STRATEGY", "ollama").lower()
if strategy == "ollama":
from llama_index.embeddings.ollama import OllamaEmbedding
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
ollama_base_url = "http://localhost:11434"
logger.info(f"Initializing Ollama embedding model: {ollama_embed_model}")
embed_model = OllamaEmbedding(
model_name=ollama_embed_model, base_url=ollama_base_url
)
return embed_model
elif strategy == "openai":
from llama_index.embeddings.openai_like import OpenAILikeEmbedding
openai_base_url = os.getenv(
"OPENAI_EMBEDDING_BASE_URL", "https://api.openai.com/v1"
)
openai_api_key = os.getenv("OPENAI_EMBEDDING_API_KEY", "dummy_key_for_template")
openai_embed_model = os.getenv(
"OPENAI_EMBEDDING_MODEL", "text-embedding-3-small"
)
# Set the API key in environment for OpenAI
os.environ["OPENAI_API_KEY"] = openai_api_key
logger.info(f"Initializing OpenAI embedding model: {openai_embed_model}")
embed_model = OpenAILikeEmbedding(
model_name=openai_embed_model,
api_base=openai_base_url,
api_key=openai_api_key,
)
return embed_model
else:
raise ValueError(
f"Unsupported EMBEDDING_STRATEGY: {strategy}. Supported values are 'ollama' and 'openai'"
)
def get_llm_model():
"""
Get the appropriate LLM model based on the CHAT_STRATEGY environment variable.
Returns:
An LLM model instance based on the selected strategy
"""
strategy = os.getenv("CHAT_STRATEGY", "ollama").lower()
if strategy == "ollama":
from llama_index.llms.ollama import Ollama
ollama_chat_model = os.getenv("OLLAMA_CHAT_MODEL", "nemotron-mini:4b")
ollama_base_url = "http://localhost:11434"
logger.info(f"Initializing Ollama chat model: {ollama_chat_model}")
llm = Ollama(
model=ollama_chat_model,
base_url=ollama_base_url,
request_timeout=120.0, # Increase timeout for longer responses
)
return llm
elif strategy == "openai":
from llama_index.llms.openai import OpenAI
openai_chat_url = os.getenv("OPENAI_CHAT_URL", "https://api.openai.com/v1")
openai_chat_key = os.getenv("OPENAI_CHAT_KEY", "dummy_key_for_template")
openai_chat_model = os.getenv("OPENAI_CHAT_MODEL", "gpt-3.5-turbo")
# Set the API key in environment for OpenAI
os.environ["OPENAI_API_KEY"] = openai_chat_key
logger.info(f"Initializing OpenAI chat model: {openai_chat_model}")
llm = OpenAI(model=openai_chat_model, api_base=openai_chat_url)
return llm
else:
raise ValueError(
f"Unsupported CHAT_STRATEGY: {strategy}. Supported values are 'ollama' and 'openai'"
)
def get_model_configurations():
"""
Get both embedding and LLM model configurations based on environment variables.
Returns:
A tuple of (embedding_model, llm_model)
"""
embed_model = get_embedding_model()
llm_model = get_llm_model()
return embed_model, llm_model
def setup_global_models():
"""
Set up the global models in LlamaIndex Settings to prevent defaulting to OpenAI.
"""
from llama_index.core import Settings
embed_model, llm_model = get_model_configurations()
# Set as the global embedding model
Settings.embed_model = embed_model
# Set as the global LLM
Settings.llm = llm_model
logger.info("Global models configured successfully based on environment variables")

View File

@@ -13,6 +13,7 @@ from typing import List, Dict, Any
from datetime import datetime
import sqlite3
from loguru import logger
from tqdm import tqdm
from llama_index.core import SimpleDirectoryReader, Document
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
@@ -20,6 +21,9 @@ from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
from vector_storage import get_vector_store_and_index
# Import the new configuration module
from config import get_embedding_model
class DocumentTracker:
"""Class to handle tracking of processed documents to avoid re-processing."""
@@ -259,13 +263,18 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
processed_count = 0
skipped_count = 0
# Initialize progress bar
pbar = tqdm(total=len(all_files), desc="Processing documents", unit="file")
for file_path in all_files:
logger.info(f"Processing file: {file_path}")
logger.info(f"Processing file: {file_path} ({processed_count + skipped_count + 1}/{len(all_files)})")
# Check if document has already been processed
if tracker.is_document_processed(file_path):
logger.info(f"Skipping already processed file: {file_path}")
skipped_count += 1
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
pbar.update(1)
continue
try:
@@ -344,11 +353,15 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
# Mark document as processed only after successful insertion
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
processed_count += 1
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
except Exception as e:
logger.error(f"Error processing file {file_path}: {str(e)}")
continue
# Update progress bar regardless of success or failure
pbar.update(1)
pbar.close()
logger.info(f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}")

View File

@@ -0,0 +1,71 @@
from typing import List
import requests
from llama_index.core.embeddings import BaseEmbedding
from pydantic import Field
class OpenAICompatibleEmbedding(BaseEmbedding):
model: str = Field(...)
api_key: str = Field(...)
api_base: str = Field(...)
timeout: int = Field(default=60)
def __init__(
self,
model: str,
api_key: str,
api_base: str,
timeout: int = 60,
):
self.model = model
self.api_key = api_key
self.api_base = api_base.rstrip("/")
self.timeout = timeout
# ---------- low-level call ----------
def _embed(self, texts: List[str]) -> List[List[float]]:
url = f"{self.api_base}/embeddings"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"input": texts,
}
resp = requests.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
return [item["embedding"] for item in data["data"]]
# ---------- document embeddings ----------
def _get_text_embedding(self, text: str) -> List[float]:
return self._embed([text])[0]
def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
return self._embed(texts)
async def _aget_text_embedding(self, text: str) -> List[float]:
return self._get_text_embedding(text)
async def _aget_text_embeddings(self, texts: List[str]) -> List[List[float]]:
return self._get_text_embeddings(texts)
# ---------- query embeddings (REQUIRED) ----------
def _get_query_embedding(self, query: str) -> List[float]:
# bge-m3 uses same embedding for query & doc
return self._embed([query])[0]
async def _aget_query_embedding(self, query: str) -> List[float]:
return self._get_query_embedding(query)

View File

@@ -16,33 +16,8 @@ from pathlib import Path
from vector_storage import get_vector_store_and_index
from llama_index.embeddings.ollama import OllamaEmbedding
import os
def setup_global_models():
"""Set up the global models to prevent defaulting to OpenAI."""
# Set up the embedding model
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
ollama_base_url = "http://localhost:11434"
embed_model = OllamaEmbedding(
model_name=ollama_embed_model,
base_url=ollama_base_url
)
# Set as the global embedding model
Settings.embed_model = embed_model
# Set up the LLM model
ollama_chat_model = os.getenv("OLLAMA_CHAT_MODEL", "nemotron-mini:4b")
from llama_index.llms.ollama import Ollama
llm = Ollama(model=ollama_chat_model, base_url=ollama_base_url)
# Set as the global LLM
Settings.llm = llm
# Import the new configuration module
from config import setup_global_models
def initialize_retriever(

View File

@@ -3,18 +3,20 @@ Vector storage configuration for the RAG solution using LlamaIndex and Qdrant.
This module provides initialization and configuration for:
- Qdrant vector storage connection
- Ollama embedding model
- Embedding model based on configured strategy
- Automatic collection creation
"""
import os
from typing import Optional
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from qdrant_client import QdrantClient
from loguru import logger
from qdrant_client import QdrantClient
# Import the new configuration module
from config import get_embedding_model
def initialize_vector_storage(
@@ -22,41 +24,29 @@ def initialize_vector_storage(
host: str = "localhost",
port: int = 6333,
grpc_port: int = 6334,
ollama_base_url: str = "http://localhost:11434",
ollama_embed_model: Optional[str] = None
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Initialize Qdrant vector storage with Ollama embeddings.
Initialize Qdrant vector storage with embedding model based on configured strategy.
Args:
collection_name: Name of the Qdrant collection
host: Qdrant host address
port: Qdrant REST API port
grpc_port: Qdrant gRPC API port
ollama_base_url: Base URL for Ollama API
ollama_embed_model: Name of the Ollama embedding model
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
logger.info(f"Initializing vector storage with collection: {collection_name}")
# Get embedding model from environment if not provided
if ollama_embed_model is None:
ollama_embed_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
logger.info(f"Using Ollama embedding model: {ollama_embed_model}")
try:
# Initialize Qdrant client
client = QdrantClient(host=host, port=port)
# Initialize the embedding model first to get the correct dimensions
embed_model = OllamaEmbedding(
model_name=ollama_embed_model,
base_url=ollama_base_url
)
# Get a test embedding to determine the correct size
# Get the embedding model based on the configured strategy
embed_model = get_embedding_model()
# Get a test embedding to determine the correct dimensions
test_embedding = embed_model.get_text_embedding("test")
embedding_dimension = len(test_embedding)
logger.info(f"Detected embedding dimension: {embedding_dimension}")
@@ -71,55 +61,66 @@ def initialize_vector_storage(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the actual embedding size
"distance": "Cosine" # Cosine distance is commonly used
}
"distance": "Cosine", # Cosine distance is commonly used
},
)
logger.info(
f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}"
)
logger.info(f"Collection '{collection_name}' created successfully with dimension {embedding_dimension}")
else:
logger.info(f"Collection '{collection_name}' already exists")
# Get the actual collection config to determine the vector size
collection_info = client.get_collection(collection_name)
# Access the vector configuration properly - handle different possible structures
if hasattr(collection_info.config.params, 'vectors') and collection_info.config.params.vectors is not None:
if (
hasattr(collection_info.config.params, "vectors")
and collection_info.config.params.vectors is not None
):
existing_dimension = collection_info.config.params.vectors.size
if existing_dimension != embedding_dimension:
logger.warning(f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating...")
logger.warning(
f"Existing collection dimension ({existing_dimension}) doesn't match embedding dimension ({embedding_dimension}), recreating..."
)
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine"
}
"distance": "Cosine",
},
)
logger.info(
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
)
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
else:
logger.info(f"Using existing collection with matching dimension: {embedding_dimension}")
logger.info(
f"Using existing collection with matching dimension: {embedding_dimension}"
)
else:
# Last resort: recreate the collection with the correct dimensions
logger.warning(f"Could not determine vector dimension for existing collection, recreating...")
logger.warning(
f"Could not determine vector dimension for existing collection, recreating..."
)
# Delete and recreate the collection with the correct dimensions
client.delete_collection(collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"size": embedding_dimension, # Use the detected size
"distance": "Cosine"
}
"distance": "Cosine",
},
)
logger.info(
f"Collection '{collection_name}' recreated with dimension {embedding_dimension}"
)
logger.info(f"Collection '{collection_name}' recreated with dimension {embedding_dimension}")
# Initialize the Qdrant vector store
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name
)
vector_store = QdrantVectorStore(client=client, collection_name=collection_name)
# Create index from vector store with the embedding model we already created
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
embed_model=embed_model
vector_store=vector_store, embed_model=embed_model
)
logger.info("Vector storage initialized successfully")
@@ -130,21 +131,6 @@ def initialize_vector_storage(
raise
# Optional: Alternative embedding configuration using OpenAI via OpenRouter
# Uncomment and configure as needed for future use
# from llama_index.embeddings.openai import OpenAIEmbedding
#
# def initialize_openai_embeddings():
# # Use OpenRouter API key from environment
# os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "")
#
# embed_model = OpenAIEmbedding(
# model="openai/text-embedding-3-small", # Or another suitable model
# api_base="https://openrouter.ai/api/v1" # OpenRouter endpoint
# )
# return embed_model
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Convenience function to get the initialized vector store and index.
@@ -152,9 +138,7 @@ def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
# Get the embedding model name from environment variables
embed_model_name = os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:4b")
return initialize_vector_storage(ollama_embed_model=embed_model_name)
return initialize_vector_storage()
if __name__ == "__main__":

145
unzip_archives.sh Executable file
View File

@@ -0,0 +1,145 @@
#!/bin/bash
# Script to recursively unzip archives in the data folder
# Valid archives are extracted in place, then moved to data-unpacked-archives
# Invalid/broken archives are moved to data-broken-archives
set -e # Exit on any error
DATA_DIR="./data"
UNPACKED_DIR="./data-unpacked-archives"
BROKEN_DIR="./data-broken-archives"
# Create destination directories if they don't exist
mkdir -p "$UNPACKED_DIR"
mkdir -p "$BROKEN_DIR"
# Find all zip files recursively in the data directory
find "$DATA_DIR" -type f -name "*.zip" | while read -r archive; do
echo "Processing: $archive"
# Check if the zip file is valid and not password protected
if unzip -t "$archive" >/dev/null 2>&1; then
echo " Archive is valid, extracting..."
# Extract the archive in the same directory where it's located
ARCHIVE_DIR=$(dirname "$archive")
unzip -o "$archive" -d "$ARCHIVE_DIR"
# Move the processed archive to the unpacked directory
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted and moved to $UNPACKED_DIR"
else
echo " Archive is invalid, password-protected, or in unsupported format"
# Move the broken archive to the broken directory
mv "$archive" "$BROKEN_DIR/"
echo " Moved to $BROKEN_DIR"
fi
done
# Also handle other common archive formats that might be present
for ext in rar 7z tar.gz tar.xz tar.bz2 gz xz bz2 tar; do
find "$DATA_DIR" -type f -name "*.$ext" | while read -r archive; do
echo "Processing: $archive (non-zip format)"
case $ext in
rar)
if command -v unrar >/dev/null 2>&1; then
if unrar l "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
unrar x "$archive" "$ARCHIVE_DIR"/
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted RAR and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process RAR, moved to $BROKEN_DIR"
fi
else
mv "$archive" "$BROKEN_DIR/"
echo " unrar not available, moved to $BROKEN_DIR"
fi
;;
7z)
if command -v 7z >/dev/null 2>&1; then
if 7z l "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
7z x "$archive" -o"$ARCHIVE_DIR"/
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted 7z and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process 7z, moved to $BROKEN_DIR"
fi
else
mv "$archive" "$BROKEN_DIR/"
echo " 7z not available, moved to $BROKEN_DIR"
fi
;;
tar.gz|tgz|gz)
if gunzip -t "$archive" >/dev/null 2>&1 || tar -tzf "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
if [[ "$ext" == "gz" ]]; then
# For gz files, we need to decompress in place
cp "$archive" "$ARCHIVE_DIR/"
gzip -d "$ARCHIVE_DIR/$(basename "$archive")"
else
tar -xzf "$archive" -C "$ARCHIVE_DIR"/
fi
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process $ext, moved to $BROKEN_DIR"
fi
;;
tar.bz2|bz2)
if bzip2 -t "$archive" >/dev/null 2>&1 || tar -tjf "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
if [[ "$ext" == "bz2" ]]; then
# For bz2 files, we need to decompress in place
cp "$archive" "$ARCHIVE_DIR/"
bzip2 -d "$ARCHIVE_DIR/$(basename "$archive")"
else
tar -xjf "$archive" -C "$ARCHIVE_DIR"/
fi
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process $ext, moved to $BROKEN_DIR"
fi
;;
tar.xz|xz)
if xz -t "$archive" >/dev/null 2>&1 || tar -tJf "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
if [[ "$ext" == "xz" ]]; then
# For xz files, we need to decompress in place
cp "$archive" "$ARCHIVE_DIR/"
xz -d "$ARCHIVE_DIR/$(basename "$archive")"
else
tar -xJf "$archive" -C "$ARCHIVE_DIR"/
fi
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted $ext and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process $ext, moved to $BROKEN_DIR"
fi
;;
tar)
if tar -tf "$archive" >/dev/null 2>&1; then
ARCHIVE_DIR=$(dirname "$archive")
tar -xf "$archive" -C "$ARCHIVE_DIR"/
mv "$archive" "$UNPACKED_DIR/"
echo " Successfully extracted TAR and moved to $UNPACKED_DIR"
else
mv "$archive" "$BROKEN_DIR/"
echo " Could not process TAR, moved to $BROKEN_DIR"
fi
;;
esac
done
done
echo "Processing complete!"