Compare commits

...

6 Commits

24 changed files with 99695 additions and 485 deletions

BIN
.DS_Store vendored

Binary file not shown.

48
ext_stats.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python3
import argparse
import json
from collections import Counter
from pathlib import Path
def normalize_ext(path_str: str) -> str:
ext = Path(path_str).suffix.lower()
return ext if ext else "(no_ext)"
def load_paths(json_path: Path) -> list[str]:
with json_path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Expected JSON to be a list of file paths")
if not all(isinstance(item, str) for item in data):
raise ValueError("Expected JSON list to contain only strings")
return data
def main() -> int:
parser = argparse.ArgumentParser(
description="Count file extensions from a JSON list of paths."
)
parser.add_argument(
"json_path",
nargs="?",
default="yadisk_files.json",
help="Path to JSON file (default: yadisk_files.json)",
)
args = parser.parse_args()
json_path = Path(args.json_path)
if not json_path.exists():
raise SystemExit(f"JSON file not found: {json_path}")
paths = load_paths(json_path)
counts = Counter(normalize_ext(p) for p in paths)
for ext, count in counts.most_common():
print(f"{ext}\t{count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

BIN
services/.DS_Store vendored Normal file

Binary file not shown.

BIN
services/opensource/.DS_Store vendored Normal file

Binary file not shown.

Submodule services/opensource/ragflow added at ce71d87867

Binary file not shown.

View File

@@ -3,6 +3,7 @@ OLLAMA_CHAT_MODEL=MODEL
OPENAI_CHAT_URL=URL
OPENAI_CHAT_KEY=KEY
CHAT_MODEL_STRATEGY=ollama
PREFECT_API_URL=URL
QDRANT_HOST=HOST
QDRANT_REST_PORT=PORT
QDRANT_GRPC_PORT=PORT
@@ -14,3 +15,4 @@ ENRICHMENT_PROCESSING_MODE=async/sync
ENRICHMENT_ADAPTIVE_FILES_QUEUE_LIMIT=5
ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS=4
ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS=4
PREFECT_YADISK_ENRICH_CONCURRENCY=8

View File

@@ -6,7 +6,7 @@ Use if possible logging, using library `loguru`, for steps. Use logrotation in f
Chosen RAG framework: Langchain
Chosen Vector Storage: Qdrant
Chosen data folder: relatve ./../../../data - from the current folder
Chosen data folder: relative ./../../../data - from the current folder
# Phase 1 (cli entrypoint)
@@ -85,7 +85,6 @@ During enrichment, we should use adaptive collection from the helpers, for loadi
- [x] We still will need filetypes that we will need to skip, so while iterating over files we need to check their extension and skip them.
- [x] Adaptive files has filename in them, so it should be used when extracting metadata
# Phase 13 (async processing of files)
During this Phase we create asynchronous process of enrichment, utilizing async/await
@@ -101,3 +100,28 @@ During this Phase we create asynchronous process of enrichment, utilizing async/
- [x] Function process_adaptive_files_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_FILE_PROCESS_THREADS)
- [x] Function upload_processed_documents_from_queue should be started in number of threads (defined in .env ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS)
- [x] Program should control threads. Function insert_adaptive_files_queue, after adaptive collection ends, then should wait untill all theads finish. What does finish mean? It means when our insert_adaptive_files_queue function realizes that there is no adaptive files left in collection, it marks shared variable between threads, that collection finished. When our other functions in threads sees that this variable became true - they deplete queue and do not go to the next loop to wait for new items in queue, and just finish. This would eventually finish the program. Each thread finishes, and main program too as usual after processing all of things.
# Phase 14 (integration of Prefect client, for creating flow and tasks on remote Prefect server)
- [x] Install Prefect client library.
- [x] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
- [x] Create prefect client file in `prefect/01_yadisk_analyze.py`. In this file we will work with prefect flows and tasks for this phase.
- [x] Create prefect flow called "analyze_yadisk_file_urls"
- [x] Create prefect task "iterate_yadisk_folder_and_store_file_paths" that will connect to yandex disk with yadisk library, analyze everything inside folder `Общая` recursively and store file paths in the ./../../../yadisk_files.json, in array of strings.
- [x] In our pefect file add function for flow to serve, as per prefect documentation on serving flows
- [x] Tests will be done manually by hand, by executing this script and checking prefect dashboard. No automatical tests needed for this phase.
# Phase 15 (prefect enrichment process for langchain, with predefined values, also removal of non-documet formats)
- [x] Remove for now formats, extensions for images of any kind, archives of any kind, and add possible text documents, documents formats, like .txt, .xlsx, etc. in enrichment processes/functions.
- [x] Create prefect client file in `prefect/02_yadisk_predefined_enrich.py`. This file will firt load file from ./../../../yadisk_files.json into array of paths. After that, array of paths will be filtered, and only supported in enrichment extensions will be left. After that, code will iterate through each path in this filtered array, use yadisk library to download file, process it for enrichment, and the remove it after processing. There should be statistics for this, at runtime, with progressbar that shows how many files processed out of how many left. Also, near the progressbar there should be counter of errors. Yes, if there is an error, it should be swallowed, even if it is inside thred or async function.
- [x] For yandex disk integration use library yadisk. In .env file there should be variable YADISK_TOKEN for accessing the needed connection
- [x] Code for loading should be reflected upon, and then made it so it would be done in async way, with as much as possible simulatenous tasks. yadisk async integration should be used (async features can be checked here: https://pypi.org/project/yadisk/)
- [x] No tests for code should be done at this phase, all tests will be done manually, because loading of documents can take a long time for automated test.
# Phase 16 (making demo ui scalable)
- [x] Make demo-ui window containable and reusable part of html + js. This part will be used for creating multi-windowed demo ui.
- [x] Make tabbed UI with top level tabs. First tab exists and is selected. Each tab should have copy of demo ui, meaning the chat window with ability to specify the api url
- [x] At the end of the tabs there should be button with plus sign, which will add new tab. Tabs to be called by numbers.
- [x] There should predefined 3 tabs opened. First one should have predefined api url "https://rag.langchain.overwatch.su/api/test-query", second "https://rag.llamaindex.overwatch.su/api/test-query", third "https://rag.haystack.overwatch.su/api/test-query"

View File

@@ -3,117 +3,297 @@
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>RAG Solution Chat Interface</title>
<title>RAG Multi-Window Demo</title>
<style>
:root {
--bg: #f1efe8;
--paper: #fffdf7;
--ink: #1f2937;
--muted: #6b7280;
--line: #dfd8c9;
--accent: #0f766e;
--accent-2: #d97706;
--bot: #ece8dc;
--user: #115e59;
--danger-bg: #fde8e8;
--danger-ink: #9b1c1c;
}
* {
box-sizing: border-box;
margin: 0;
padding: 0;
box-sizing: border-box;
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
}
body {
background-color: #f5f7fa;
color: #333;
line-height: 1.6;
background:
radial-gradient(circle at 15% 15%, #f9d9a8 0%, transparent 35%),
radial-gradient(circle at 85% 20%, #b7e3d8 0%, transparent 40%),
linear-gradient(180deg, #f5f0e4 0%, #ede7d8 100%);
color: var(--ink);
font-family: "Trebuchet MS", "Segoe UI", sans-serif;
min-height: 100vh;
}
.container {
max-width: 900px;
.app {
max-width: 1100px;
margin: 0 auto;
padding: 20px;
padding: 20px 14px 24px;
}
header {
background: linear-gradient(135deg, #6a11cb 0%, #2575fc 100%);
color: white;
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
.shell {
border: 1px solid rgba(70, 62, 43, 0.15);
border-radius: 16px;
background: rgba(255, 253, 247, 0.92);
box-shadow: 0 18px 45px rgba(47, 41, 30, 0.12);
overflow: hidden;
backdrop-filter: blur(6px);
}
h1 {
font-size: 1.8rem;
margin-bottom: 10px;
.shell-header {
padding: 14px 18px;
border-bottom: 1px solid var(--line);
background: linear-gradient(180deg, rgba(255,255,255,0.9), rgba(244,239,228,0.85));
}
.api-endpoint-container {
display: flex;
gap: 10px;
margin-top: 15px;
flex-wrap: wrap;
.shell-header h1 {
font-size: 1.25rem;
letter-spacing: 0.02em;
}
.api-endpoint-container label {
.shell-header p {
margin-top: 4px;
color: var(--muted);
font-size: 0.92rem;
}
.tabs-bar {
display: flex;
align-items: center;
font-weight: bold;
gap: 8px;
padding: 10px 12px;
border-bottom: 1px solid var(--line);
background: rgba(247, 243, 233, 0.9);
overflow-x: auto;
}
.api-endpoint-container input {
flex: 1;
min-width: 300px;
.tab-btn {
border: 1px solid var(--line);
background: #faf7ee;
color: var(--ink);
padding: 8px 12px;
border: none;
border-radius: 4px;
margin-left: 5px;
}
.api-endpoint-container button {
background-color: #fff;
color: #2575fc;
border: none;
padding: 8px 15px;
border-radius: 4px;
border-radius: 999px;
cursor: pointer;
font-weight: bold;
transition: background-color 0.3s;
font-weight: 700;
white-space: nowrap;
transition: transform 0.15s ease, background-color 0.15s ease, border-color 0.15s ease;
}
.api-endpoint-container button:hover {
background-color: #e6f0ff;
.tab-btn:hover {
transform: translateY(-1px);
border-color: #c9bea6;
}
.chat-container {
background-color: white;
.tab-btn.active {
background: var(--accent);
border-color: var(--accent);
color: #fff;
}
.tab-btn.add-tab {
min-width: 38px;
padding-inline: 0;
text-align: center;
font-size: 1.1rem;
background: #fff;
color: var(--accent-2);
border-color: #e5c792;
}
.panel-host {
padding: 14px;
}
.chat-panel {
display: flex;
flex-direction: column;
gap: 12px;
}
.panel-toolbar {
display: grid;
grid-template-columns: auto 1fr auto;
gap: 10px;
align-items: center;
background: var(--paper);
border: 1px solid var(--line);
border-radius: 12px;
padding: 10px;
}
.panel-toolbar label {
font-weight: 700;
color: #4b5563;
font-size: 0.92rem;
}
.endpoint-input {
width: 100%;
border: 1px solid #d8cfbd;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
padding: 10px 12px;
font-size: 0.94rem;
background: #fff;
}
.endpoint-input:focus,
.message-input:focus {
outline: 2px solid rgba(15, 118, 110, 0.16);
border-color: var(--accent);
}
.panel-toolbar button,
.send-btn {
border: none;
border-radius: 10px;
cursor: pointer;
font-weight: 700;
}
.set-endpoint-btn {
padding: 10px 12px;
background: #fff;
color: var(--accent);
border: 1px solid #b9d4cf;
}
.chat-card {
border: 1px solid var(--line);
border-radius: 14px;
overflow: hidden;
height: 60vh;
background: #fff;
min-height: 62vh;
display: flex;
flex-direction: column;
}
.chat-header {
background-color: #f8f9fa;
padding: 15px;
border-bottom: 1px solid #eaeaea;
font-weight: bold;
color: #495057;
display: flex;
justify-content: space-between;
align-items: center;
gap: 12px;
padding: 12px 14px;
border-bottom: 1px solid var(--line);
background: #fbf8ef;
}
.chat-header-title {
font-weight: 800;
}
.chat-header-endpoint {
color: var(--muted);
font-size: 0.85rem;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
text-align: right;
}
.chat-messages {
flex: 1;
padding: 20px;
overflow-y: auto;
padding: 14px;
display: flex;
flex-direction: column;
gap: 15px;
gap: 12px;
background:
linear-gradient(180deg, rgba(255,255,255,0.92), rgba(250,247,238,0.95)),
repeating-linear-gradient(
0deg,
rgba(218, 206, 181, 0.15),
rgba(218, 206, 181, 0.15) 1px,
transparent 1px,
transparent 28px
);
}
.message {
max-width: 80%;
padding: 12px 16px;
border-radius: 18px;
position: relative;
animation: fadeIn 0.3s ease;
max-width: 82%;
padding: 11px 13px;
border-radius: 14px;
line-height: 1.45;
animation: slideIn 0.2s ease;
box-shadow: 0 1px 0 rgba(0, 0, 0, 0.03);
}
@keyframes fadeIn {
.message.user-message {
align-self: flex-end;
background: var(--user);
color: #fff;
border-bottom-right-radius: 5px;
}
.message.bot-message {
align-self: flex-start;
background: var(--bot);
color: #3c3f44;
border-bottom-left-radius: 5px;
}
.message.error-message {
align-self: flex-start;
background: var(--danger-bg);
color: var(--danger-ink);
border: 1px solid #f3bcbc;
}
.message.typing-indicator {
align-self: flex-start;
background: #eef2f7;
color: #475569;
font-style: italic;
}
.chat-input-row {
display: grid;
grid-template-columns: 1fr auto;
gap: 10px;
padding: 12px;
border-top: 1px solid var(--line);
background: #fbf8ef;
}
.message-input {
border: 1px solid #d8cfbd;
border-radius: 12px;
padding: 11px 12px;
font-size: 0.95rem;
}
.send-btn {
background: var(--accent);
color: #fff;
padding: 0 16px;
min-width: 86px;
}
.send-btn:disabled,
.set-endpoint-btn:disabled {
opacity: 0.55;
cursor: not-allowed;
}
.footer-note {
padding: 8px 14px 14px;
color: var(--muted);
font-size: 0.85rem;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateY(10px);
transform: translateY(6px);
}
to {
opacity: 1;
@@ -121,281 +301,274 @@
}
}
.user-message {
align-self: flex-end;
background-color: #2575fc;
color: white;
border-bottom-right-radius: 4px;
}
.bot-message {
align-self: flex-start;
background-color: #e9ecef;
color: #495057;
border-bottom-left-radius: 4px;
}
.error-message {
align-self: flex-start;
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
border-radius: 18px;
}
.input-area {
display: flex;
padding: 15px;
background-color: #f8f9fa;
border-top: 1px solid #eaeaea;
}
.input-area input {
flex: 1;
padding: 12px 15px;
border: 1px solid #ddd;
border-radius: 24px;
outline: none;
font-size: 1rem;
}
.input-area button {
background-color: #2575fc;
color: white;
border: none;
padding: 12px 20px;
border-radius: 24px;
margin-left: 10px;
cursor: pointer;
font-weight: bold;
transition: background-color 0.3s;
}
.input-area button:hover {
background-color: #1a68e8;
}
.input-area button:disabled {
background-color: #adb5bd;
cursor: not-allowed;
}
.typing-indicator {
align-self: flex-start;
background-color: #e9ecef;
color: #495057;
padding: 12px 16px;
border-radius: 18px;
font-style: italic;
}
footer {
text-align: center;
margin-top: 20px;
color: #6c757d;
font-size: 0.9rem;
}
@media (max-width: 768px) {
.container {
padding: 10px;
@media (max-width: 720px) {
.panel-toolbar {
grid-template-columns: 1fr;
}
.api-endpoint-container {
.chat-header {
flex-direction: column;
align-items: flex-start;
}
.api-endpoint-container input {
min-width: auto;
.chat-header-endpoint {
text-align: left;
white-space: normal;
word-break: break-all;
}
.message {
max-width: 90%;
max-width: 92%;
}
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>RAG Solution Chat Interface</h1>
<div class="api-endpoint-container">
<label for="apiEndpoint">API Endpoint:</label>
<input
type="text"
id="apiEndpoint"
value="http://localhost:8000/api/test-query"
placeholder="Enter API endpoint URL"
/>
<button onclick="setApiEndpoint()">Set Endpoint</button>
<div class="app">
<div class="shell">
<div class="shell-header">
<h1>RAG Demo Control Room</h1>
<p>Multiple chat windows with independent API endpoints.</p>
</div>
</header>
<div class="chat-container">
<div class="chat-header">Chat with RAG Agent</div>
<div class="chat-messages" id="chatMessages">
<div class="message bot-message">
Hello! I'm your RAG agent. Please enter your API endpoint and start
chatting.
</div>
</div>
<div class="input-area">
<input
type="text"
id="userInput"
placeholder="Type your message here..."
onkeypress="handleKeyPress(event)"
/>
<button onclick="sendMessage()" id="sendButton">Send</button>
</div>
<div class="tabs-bar" id="tabsBar"></div>
<div class="panel-host" id="panelHost"></div>
<div class="footer-note">Phase 16 scalable demo UI: reusable chat panel + tabbed windows.</div>
</div>
<footer>
<p>RAG Solution with LangChain | Chat Interface Demo</p>
</footer>
</div>
<template id="chatPanelTemplate">
<div class="chat-panel">
<div class="panel-toolbar">
<label>API Endpoint</label>
<input class="endpoint-input" type="text" placeholder="Enter API endpoint URL" />
<button class="set-endpoint-btn" type="button">Set Endpoint</button>
</div>
<div class="chat-card">
<div class="chat-header">
<div class="chat-header-title">Chat with RAG Agent</div>
<div class="chat-header-endpoint">Endpoint: not set</div>
</div>
<div class="chat-messages"></div>
<div class="chat-input-row">
<input class="message-input" type="text" placeholder="Type your message here..." />
<button class="send-btn" type="button">Send</button>
</div>
</div>
</div>
</template>
<script>
// Store the API endpoint
let apiEndpoint = document.getElementById("apiEndpoint").value;
const DEFAULT_TAB_ENDPOINTS = [
"https://rag.langchain.overwatch.su/api/test-query",
"https://rag.llamaindex.overwatch.su/api/test-query",
"https://rag.haystack.overwatch.su/api/test-query",
];
// Set the API endpoint from the input field
function setApiEndpoint() {
const input = document.getElementById("apiEndpoint");
apiEndpoint = input.value.trim();
class ChatPanel {
constructor(rootElement, initialEndpoint = "") {
this.root = rootElement;
this.apiEndpoint = initialEndpoint;
if (!apiEndpoint) {
alert("Please enter a valid API endpoint URL");
return;
}
this.endpointInput = this.root.querySelector(".endpoint-input");
this.setEndpointButton = this.root.querySelector(".set-endpoint-btn");
this.headerEndpoint = this.root.querySelector(".chat-header-endpoint");
this.messagesEl = this.root.querySelector(".chat-messages");
this.messageInput = this.root.querySelector(".message-input");
this.sendButton = this.root.querySelector(".send-btn");
// Add notification that endpoint was set
addMessage(`API endpoint set to: ${apiEndpoint}`, "bot-message");
}
// Send a message to the API
async function sendMessage() {
const inputElement = document.getElementById("userInput");
const message = inputElement.value.trim();
const sendButton = document.getElementById("sendButton");
if (!message) {
return;
}
if (!apiEndpoint) {
alert("Please set the API endpoint first");
return;
}
// Disable the send button and input during request
sendButton.disabled = true;
inputElement.disabled = true;
try {
// Add user message to chat
addMessage(message, "user-message");
// Clear input
inputElement.value = "";
// Show typing indicator
const typingIndicator = addMessage(
"Thinking...",
"typing-indicator",
"typing",
this.endpointInput.value = initialEndpoint;
this._renderEndpointLabel();
this._bindEvents();
this.addMessage(
initialEndpoint
? `Ready. Endpoint preset to: ${initialEndpoint}`
: "Hello. Set an API endpoint and start chatting.",
"bot-message",
);
}
// Send request to API
const response = await fetch(apiEndpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
query: message,
}),
_bindEvents() {
this.setEndpointButton.addEventListener("click", () => this.setApiEndpoint());
this.sendButton.addEventListener("click", () => this.sendMessage());
this.messageInput.addEventListener("keydown", (event) => {
if (event.key === "Enter") {
event.preventDefault();
this.sendMessage();
}
});
}
// Remove typing indicator
removeMessage(typingIndicator);
focusInput() {
this.messageInput.focus();
}
if (!response.ok) {
throw new Error(
`API request failed with status ${response.status}`,
);
_renderEndpointLabel() {
this.headerEndpoint.textContent = this.apiEndpoint
? `Endpoint: ${this.apiEndpoint}`
: "Endpoint: not set";
}
setApiEndpoint() {
const candidate = this.endpointInput.value.trim();
if (!candidate) {
alert("Please enter a valid API endpoint URL");
return;
}
this.apiEndpoint = candidate;
this._renderEndpointLabel();
this.addMessage(`API endpoint set to: ${candidate}`, "bot-message");
}
addMessage(text, className, extraClass = "") {
const messageDiv = document.createElement("div");
messageDiv.className = `message ${className} ${extraClass}`.trim();
messageDiv.innerHTML = String(text).replace(/\n/g, "<br>");
this.messagesEl.appendChild(messageDiv);
this.messagesEl.scrollTop = this.messagesEl.scrollHeight;
return messageDiv;
}
removeMessage(node) {
if (node && node.parentNode) {
node.parentNode.removeChild(node);
}
}
async sendMessage() {
const message = this.messageInput.value.trim();
if (!message) return;
if (!this.apiEndpoint) {
alert("Please set the API endpoint first");
return;
}
const data = await response.json();
this.sendButton.disabled = true;
this.messageInput.disabled = true;
// Add bot response to chat
if (data.success) {
addMessage(data.response, "bot-message");
} else {
addMessage(
`Error: ${data.error || "Unknown error occurred"}`,
let typingIndicator = null;
try {
this.addMessage(message, "user-message");
this.messageInput.value = "";
typingIndicator = this.addMessage("Thinking...", "typing-indicator", "typing");
const response = await fetch(this.apiEndpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: message }),
});
this.removeMessage(typingIndicator);
typingIndicator = null;
if (!response.ok) {
throw new Error(`API request failed with status ${response.status}`);
}
const data = await response.json();
if (data.success) {
this.addMessage(data.response, "bot-message");
} else {
this.addMessage(
`Error: ${data.error || "Unknown error occurred"}`,
"error-message",
);
}
} catch (error) {
if (typingIndicator) this.removeMessage(typingIndicator);
this.addMessage(
`Connection error: ${error.message}. Please check the API endpoint and try again. Reload page if needed to reinitialize endpoint state.`,
"error-message",
);
} finally {
this.sendButton.disabled = false;
this.messageInput.disabled = false;
this.messageInput.focus();
}
} catch (error) {
console.error("Error:", error);
// Remove typing indicator if still present
const typingElements = document.querySelectorAll(".typing");
typingElements.forEach((el) => el.remove());
// Add error message to chat
addMessage(
`Connection error: ${error.message}. Please check the API endpoint and try again.`,
"error-message",
);
} finally {
// Re-enable the send button and input
sendButton.disabled = false;
inputElement.disabled = false;
inputElement.focus();
}
}
// Add a message to the chat
function addMessage(text, className, id = null) {
const chatMessages = document.getElementById("chatMessages");
const messageDiv = document.createElement("div");
messageDiv.className = `message ${className}`;
if (id) {
messageDiv.id = id;
class MultiChatApp {
constructor() {
this.tabsBar = document.getElementById("tabsBar");
this.panelHost = document.getElementById("panelHost");
this.panelTemplate = document.getElementById("chatPanelTemplate");
this.tabs = [];
this.activeTabId = null;
this.nextTabNumber = 1;
}
// Format text with line breaks
const formattedText = text.replace(/\n/g, "<br>");
messageDiv.innerHTML = formattedText;
init() {
DEFAULT_TAB_ENDPOINTS.forEach((endpoint) => this.createTab(endpoint));
if (this.tabs.length > 0) {
this.selectTab(this.tabs[0].id);
}
this.renderTabs();
}
chatMessages.appendChild(messageDiv);
createTab(initialEndpoint = "") {
const tab = {
id: `tab-${crypto.randomUUID ? crypto.randomUUID() : `${Date.now()}-${Math.random()}`}`,
title: String(this.nextTabNumber++),
endpoint: initialEndpoint,
panel: null,
panelNode: null,
};
// Scroll to bottom
chatMessages.scrollTop = chatMessages.scrollHeight;
const fragment = this.panelTemplate.content.cloneNode(true);
const panelNode = fragment.firstElementChild;
const panel = new ChatPanel(panelNode, initialEndpoint);
tab.panel = panel;
tab.panelNode = panelNode;
this.tabs.push(tab);
this.renderTabs();
return tab;
}
return messageDiv;
}
selectTab(tabId) {
this.activeTabId = tabId;
const tab = this.tabs.find((item) => item.id === tabId);
if (!tab) return;
// Remove a message from the chat
function removeMessage(element) {
if (element && element.parentNode) {
element.parentNode.removeChild(element);
this.panelHost.innerHTML = "";
this.panelHost.appendChild(tab.panelNode);
tab.panel._renderEndpointLabel();
tab.panel.focusInput();
this.renderTabs();
}
renderTabs() {
this.tabsBar.innerHTML = "";
this.tabs.forEach((tab) => {
const btn = document.createElement("button");
btn.type = "button";
btn.className = `tab-btn ${tab.id === this.activeTabId ? "active" : ""}`.trim();
btn.textContent = tab.title;
btn.title = tab.endpoint || `Tab ${tab.title}`;
btn.addEventListener("click", () => this.selectTab(tab.id));
this.tabsBar.appendChild(btn);
});
const addBtn = document.createElement("button");
addBtn.type = "button";
addBtn.className = "tab-btn add-tab";
addBtn.textContent = "+";
addBtn.title = "Add new tab";
addBtn.addEventListener("click", () => {
const tab = this.createTab("");
this.selectTab(tab.id);
});
this.tabsBar.appendChild(addBtn);
}
}
// Handle Enter key press in the input field
function handleKeyPress(event) {
if (event.key === "Enter") {
sendMessage();
}
}
// Focus on the input field when the page loads
window.onload = function () {
document.getElementById("userInput").focus();
};
window.addEventListener("DOMContentLoaded", () => {
const app = new MultiChatApp();
app.init();
});
</script>
</body>
</html>

View File

@@ -8,7 +8,7 @@ from pathlib import Path
from typing import List, Optional, Tuple
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from loguru import logger
@@ -75,21 +75,26 @@ ENRICHMENT_ADAPTIVE_DOCUMENT_UPLOADS_THREADS = int(
)
SUPPORTED_EXTENSIONS = {
".pdf",
".docx",
".csv",
".doc",
".pptx",
".xlsx",
".xls",
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tiff",
".webp",
".docx",
".epub",
".htm",
".html",
".json",
".jsonl",
".md",
".odt",
".txt", # this one is obvious but was unexpected to see in data lol
".pdf",
".ppt",
".pptx",
".rtf",
".rst",
".tsv",
".txt",
".xls",
".xlsx",
".xml",
}
Base = declarative_base()
@@ -261,6 +266,8 @@ class DocumentEnricher:
return UnstructuredODTLoader(
file_path, **{"strategy": "hi_res", "languages": ["rus"]}
)
if ext in [".txt", ".md"]:
return TextLoader(file_path, encoding="utf-8")
return None
def _load_one_adaptive_file(
@@ -273,7 +280,7 @@ class DocumentEnricher:
extension = adaptive_file.extension.lower()
file_type = try_guess_file_type(extension)
def process_local_file(local_file_path: str):
def process_local_file(original_path: str, local_file_path: str):
nonlocal loaded_docs, processed_record
file_hash = self._get_file_hash(local_file_path)
@@ -295,7 +302,7 @@ class DocumentEnricher:
doc.metadata["file_type"] = file_type
doc.metadata["source"] = source_identifier
doc.metadata["filename"] = adaptive_file.filename
doc.metadata["file_path"] = source_identifier
doc.metadata["file_path"] = original_path
doc.metadata["file_size"] = os.path.getsize(local_file_path)
doc.metadata["file_extension"] = extension
@@ -310,7 +317,7 @@ class DocumentEnricher:
)
loaded_docs = split_docs
processed_record = (source_identifier, file_hash)
processed_record = (original_path, file_hash)
adaptive_file.work_with_file_locally(process_local_file)
return loaded_docs, processed_record

View File

@@ -123,9 +123,9 @@ class _AdaptiveFile(ABC):
# This method allows to work with file locally, and lambda should be provided for this.
# Why separate method? For possible cleanup after work is done. And to download file, if needed
# Lambda: first argument is a local path
# Lambda: first argument is an original path, second: local path. In case of just local files, these will be the same
@abstractmethod
def work_with_file_locally(self, func: Callable[[str], None]):
def work_with_file_locally(self, func: Callable[[str, str], None]):
"""Run callback with a local path to the file."""
@@ -143,8 +143,8 @@ class LocalFilesystemAdaptiveFile(_AdaptiveFile):
super().__init__(filename, extension)
self.local_path = local_path
def work_with_file_locally(self, func: Callable[[str], None]):
func(self.local_path)
def work_with_file_locally(self, func: Callable[[str, str], None]):
func(self.local_path, self.local_path)
class LocalFilesystemAdaptiveCollection(_AdaptiveCollection):
@@ -196,10 +196,10 @@ class YandexDiskAdaptiveFile(_AdaptiveFile):
temp_file.write(file_response.content)
return temp_file.name
def work_with_file_locally(self, func: Callable[[str], None]):
def work_with_file_locally(self, func: Callable[[str, str], None]):
temp_path = self._download_to_temp_file()
try:
func(temp_path)
func(self.remote_path, temp_path)
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)

View File

@@ -0,0 +1,85 @@
"""Prefect flow for analyzing Yandex Disk files and storing file paths."""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from prefect.logging import get_run_logger
from prefect import flow, task
load_dotenv()
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
YADISK_ROOT_PATH = "Общая"
OUTPUT_JSON_PATH = (
Path(__file__).resolve().parent.parent / "../../../yadisk_files.json"
).resolve()
if PREFECT_API_URL:
os.environ["PREFECT_API_URL"] = PREFECT_API_URL
@task(name="iterate_yadisk_folder_and_store_file_paths")
def iterate_yadisk_folder_and_store_file_paths() -> List[str]:
"""Iterate Yandex Disk recursively from `Общая` and save file paths to JSON."""
if not YADISK_TOKEN:
raise ValueError("YADISK_TOKEN is required to analyze Yandex Disk")
try:
import yadisk
except ImportError as error:
raise RuntimeError(
"yadisk package is required for this task. Install dependencies in venv first."
) from error
yandex_disk = yadisk.YaDisk(token=YADISK_TOKEN)
file_paths: List[str] = []
logger = get_run_logger()
def walk_folder(folder_path: str) -> None:
for item in yandex_disk.listdir(folder_path):
item_type = getattr(item, "type", None)
item_path = getattr(item, "path", None)
if item_path is None and isinstance(item, dict):
item_type = item.get("type")
item_path = item.get("path")
if not item_path:
continue
if item_type == "dir":
walk_folder(item_path)
elif item_type == "file":
logger.info(f"Added {len(file_paths)} file into the list")
file_paths.append(item_path)
walk_folder(YADISK_ROOT_PATH)
OUTPUT_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as output_file:
json.dump(file_paths, output_file, ensure_ascii=False, indent=2)
return file_paths
@flow(name="analyze_yadisk_file_urls")
def analyze_yadisk_file_urls() -> List[str]:
"""Run Yandex Disk analysis task and return collected file paths."""
return iterate_yadisk_folder_and_store_file_paths()
def serve_analyze_yadisk_file_urls() -> None:
"""Serve the flow as a deployment for remote Prefect execution."""
analyze_yadisk_file_urls.serve(name="analyze-yadisk-file-urls")
if __name__ == "__main__":
serve_analyze_yadisk_file_urls()

View File

@@ -0,0 +1,216 @@
"""Prefect flow to enrich Yandex Disk files from a predefined JSON file list."""
from __future__ import annotations
import asyncio
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from prefect import flow, task
load_dotenv()
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
YADISK_TOKEN = os.getenv("YADISK_TOKEN")
ENRICH_CONCURRENCY = int(os.getenv("PREFECT_YADISK_ENRICH_CONCURRENCY", "8"))
OUTPUT_FILE_LIST = (PROJECT_ROOT / "../../../yadisk_files.json").resolve()
if PREFECT_API_URL:
os.environ["PREFECT_API_URL"] = PREFECT_API_URL
class _ProgressTracker:
def __init__(self, total: int):
self.total = total
self.processed = 0
self.errors = 0
self._lock = asyncio.Lock()
async def mark_done(self, error: bool = False):
async with self._lock:
self.processed += 1
if error:
self.errors += 1
self._render()
def _render(self):
total = max(self.total, 1)
width = 30
filled = int(width * self.processed / total)
bar = "#" * filled + "-" * (width - filled)
left = max(self.total - self.processed, 0)
print(
f"\r[{bar}] {self.processed}/{self.total} processed | left: {left} | errors: {self.errors}",
end="",
flush=True,
)
if self.processed >= self.total:
print()
async def _download_yadisk_file(async_disk, remote_path: str, local_path: str) -> None:
await async_disk.download(remote_path, local_path)
def _process_local_file_for_enrichment(enricher, vector_store, local_path: str, remote_path: str) -> bool:
"""Process one downloaded file and upload chunks into vector store.
Returns True when file was processed/uploaded, False when skipped.
"""
extension = Path(remote_path).suffix.lower()
file_hash = enricher._get_file_hash(local_path)
if enricher._is_document_hash_processed(file_hash):
return False
loader = enricher._get_loader_for_extension(local_path)
if loader is None:
return False
docs = loader.load()
filename = Path(remote_path).name
for doc in docs:
doc.metadata["source"] = remote_path
doc.metadata["filename"] = filename
doc.metadata["file_path"] = remote_path
doc.metadata["file_size"] = os.path.getsize(local_path)
doc.metadata["file_extension"] = extension
if "page" in doc.metadata:
doc.metadata["page_number"] = doc.metadata["page"]
split_docs = enricher.text_splitter.split_documents(docs)
from helpers import extract_russian_event_names, extract_years_from_text
for chunk in split_docs:
chunk.metadata["years"] = extract_years_from_text(chunk.page_content)
chunk.metadata["events"] = extract_russian_event_names(chunk.page_content)
if not split_docs:
return False
vector_store.add_documents(split_docs)
enricher._mark_document_processed(remote_path, file_hash)
return True
async def _process_remote_file(async_disk, remote_path: str, semaphore: asyncio.Semaphore, tracker: _ProgressTracker, enricher, vector_store):
async with semaphore:
temp_path = None
had_error = False
try:
suffix = Path(remote_path).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
temp_path = tmp_file.name
await _download_yadisk_file(async_disk, remote_path, temp_path)
await asyncio.to_thread(
_process_local_file_for_enrichment,
enricher,
vector_store,
temp_path,
remote_path,
)
except Exception:
# Phase requirement: swallow per-file errors and continue processing.
had_error = True
finally:
if temp_path and os.path.exists(temp_path):
try:
os.unlink(temp_path)
except OSError:
had_error = True
await tracker.mark_done(error=had_error)
@task(name="prefilter_yadisk_file_paths")
def prefilter_yadisk_file_paths() -> List[str]:
"""Load file list JSON and keep only extensions supported by enrichment."""
from enrichment import SUPPORTED_EXTENSIONS
if not OUTPUT_FILE_LIST.exists():
raise FileNotFoundError(f"File list not found: {OUTPUT_FILE_LIST}")
with open(OUTPUT_FILE_LIST, "r", encoding="utf-8") as input_file:
raw_paths = json.load(input_file)
filtered = [
path for path in raw_paths if Path(str(path)).suffix.lower() in SUPPORTED_EXTENSIONS
]
return filtered
@task(name="enrich_filtered_yadisk_files_async")
async def enrich_filtered_yadisk_files_async(filtered_paths: List[str]) -> dict:
"""Download/process Yandex Disk files concurrently and enrich LangChain vector store."""
if not YADISK_TOKEN:
raise ValueError("YADISK_TOKEN is required for Yandex Disk enrichment")
if not filtered_paths:
print("No supported files found for enrichment.")
return {"total": 0, "processed": 0, "errors": 0}
try:
import yadisk
except ImportError as error:
raise RuntimeError("yadisk package is required for this flow") from error
if not hasattr(yadisk, "AsyncYaDisk"):
raise RuntimeError("Installed yadisk package does not expose AsyncYaDisk")
from enrichment import DocumentEnricher
from vector_storage import initialize_vector_store
vector_store = initialize_vector_store()
enricher = DocumentEnricher(vector_store)
tracker = _ProgressTracker(total=len(filtered_paths))
semaphore = asyncio.Semaphore(max(1, ENRICH_CONCURRENCY))
async with yadisk.AsyncYaDisk(token=YADISK_TOKEN) as async_disk:
tasks = [
asyncio.create_task(
_process_remote_file(
async_disk=async_disk,
remote_path=remote_path,
semaphore=semaphore,
tracker=tracker,
enricher=enricher,
vector_store=vector_store,
)
)
for remote_path in filtered_paths
]
await asyncio.gather(*tasks)
return {
"total": tracker.total,
"processed": tracker.processed,
"errors": tracker.errors,
}
@flow(name="yadisk_predefined_enrich")
async def yadisk_predefined_enrich() -> dict:
filtered_paths = prefilter_yadisk_file_paths()
return await enrich_filtered_yadisk_files_async(filtered_paths)
def serve_yadisk_predefined_enrich() -> None:
yadisk_predefined_enrich.serve(name="yadisk-predefined-enrich")
if __name__ == "__main__":
serve_mode = os.getenv("PREFECT_SERVE", "0") == "1"
if serve_mode:
serve_yadisk_predefined_enrich()
else:
asyncio.run(yadisk_predefined_enrich())

View File

@@ -55,3 +55,5 @@ unstructured-pytesseract>=0.3.12
# System and utilities
ollama>=0.3.0
prefect>=2.19.0
yadisk>=3.4.0

View File

@@ -4,8 +4,8 @@ import json
import os
from contextlib import asynccontextmanager
from typing import Any, Dict
from dotenv import load_dotenv
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger

View File

@@ -6,9 +6,21 @@ EMBEDDING_STRATEGY=ollama
OLLAMA_EMBEDDING_MODEL=MODEL
OLLAMA_CHAT_MODEL=MODEL
# Qdrant Configuration
QDRANT_HOST=localhost
QDRANT_REST_PORT=6333
QDRANT_GRPC_PORT=6334
# OpenAI Configuration (for reference - uncomment and configure when using OpenAI strategy)
# OPENAI_CHAT_URL=https://api.openai.com/v1
# OPENAI_CHAT_KEY=your_openai_api_key_here
# OPENAI_EMBEDDING_MODEL=text-embedding-3-small
# OPENAI_EMBEDDING_BASE_URL=https://api.openai.com/v1
# OPENAI_EMBEDDING_API_KEY=your_openai_api_key_here
# Yandex Disk + Prefect (Phase 9)
YADISK_TOKEN=your_yadisk_token_here
PREFECT_API_URL=https://your-prefect-server.example/api
QDRANT_HOST=HOST
QDRANT_REST_PORT=PORT
QDRANT_GRPC_PORT=PORT

View File

@@ -47,8 +47,25 @@ Chosen data folder: relatve ./../../../data - from the current folder
- [x] Add log of how many files currently being processed in enrichment. We need to see how many total to process and how many processed each time new document being processed. If it's possible, also add progressbar showing percentage and those numbers on top of logs.
# Phase 8 (chat feature, as agent, for usage in the cli)
# Phase 8 (comment unsupported formats for now)
- [ ] Create file `agent.py`, which will incorporate into itself agent, powered by the chat model. It should use integration with openai, env variables are configure
- [ ] Integrate this agent with the existing solution for retrieving, with retrieval.py, if it's possible in current chosen RAG framework
- [ ] Integrate this agent with the cli, as command to start chatting with the agent. If there is a built-in solution for console communication with the agent, initiate this on cli command.
- [x] Remove for now formats, extensions for images of any kind, archives of any kind, and add possible text documents, documents formats, like .txt, .xlsx, etc. in enrichment processes/functions.
# Phase 9 (integration of Prefect client, for creating flow and tasks on remote Prefect server)
- [x] Install Prefect client library.
- [x] Add .env variable PREFECT_API_URL, that will be used for connecting client to the prefect server
- [x] Create prefect client file in `prefect/01_yadisk_predefined_enrich.py`. This file will firt load file from ./../../../yadisk_files.json into array of paths. After that, array of paths will be filtered, and only supported in enrichment extensions will be left. After that, code will iterate through each path in this filtered array, use yadisk library to download file, process it for enrichment, and the remove it after processing. There should be statistics for this, at runtime, with progressbar that shows how many files processed out of how many left. Also, near the progressbar there should be counter of errors. Yes, if there is an error, it should be swallowed, even if it is inside thred or async function.
- [x] For yandex disk integration use library yadisk. In .env file there should be variable YADISK_TOKEN for accessing the needed connection
- [x] Code for loading should be reflected upon, and then made it so it would be done in async way, with as much as possible simulatenous tasks. yadisk async integration should be used (async features can be checked here: https://pypi.org/project/yadisk/)
- [x] No tests for code should be done at this phase, all tests will be done manually, because loading of documents can take a long time for automated test.
# Phase 10 (qdrant connection credentials in .env)
- [x] Add Qdrant connection variables to the .env file: QDRANT_HOST, QDRANT_REST_PORT, QDRANT_GRPC_PORT
- [x] Replace everywhere where Qdran connection used hardcoded values into the usage of Qdrant .env variables
# Phase 11 (http endpoint to retrieve data from the vector storage by query)
- [x] Create file `server.py`, with web framework fastapi, for example
- [x] Add POST endpoint "/api/test-query" which will use agent, and retrieve response for query, sent in JSON format, field "query"

View File

@@ -6,24 +6,53 @@ processing them with appropriate loaders, splitting them into chunks,
and storing them in the vector database with proper metadata.
"""
import os
import hashlib
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from llama_index.core import Document, SimpleDirectoryReader
from llama_index.core.node_parser import CodeSplitter, SentenceSplitter
from loguru import logger
from tqdm import tqdm
from llama_index.core import SimpleDirectoryReader, Document
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter
# Removed unused import
from vector_storage import get_vector_store_and_index
# Import the new configuration module
from config import get_embedding_model
# Removed unused import
from vector_storage import get_vector_store_and_index
SUPPORTED_ENRICHMENT_EXTENSIONS = {
".csv",
".doc",
".docx",
".epub",
".htm",
".html",
".json",
".jsonl",
".md",
".odt",
".pdf",
".ppt",
".pptx",
".rtf",
".rst",
".tsv",
".txt",
".xls",
".xlsx",
".xml",
}
def get_supported_enrichment_extensions() -> set[str]:
"""Return the file extensions currently supported by enrichment."""
return set(SUPPORTED_ENRICHMENT_EXTENSIONS)
class DocumentTracker:
"""Class to handle tracking of processed documents to avoid re-processing."""
@@ -38,7 +67,7 @@ class DocumentTracker:
cursor = conn.cursor()
# Create table for tracking processed documents
cursor.execute('''
cursor.execute("""
CREATE TABLE IF NOT EXISTS processed_documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE NOT NULL,
@@ -47,7 +76,7 @@ class DocumentTracker:
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata_json TEXT
)
''')
""")
conn.commit()
conn.close()
@@ -63,7 +92,7 @@ class DocumentTracker:
cursor.execute(
"SELECT COUNT(*) FROM processed_documents WHERE filepath = ? AND checksum = ?",
(filepath, checksum)
(filepath, checksum),
)
count = cursor.fetchone()[0]
@@ -79,11 +108,14 @@ class DocumentTracker:
filename = Path(filepath).name
try:
cursor.execute('''
cursor.execute(
"""
INSERT OR REPLACE INTO processed_documents
(filename, filepath, checksum, processed_at, metadata_json)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?)
''', (filename, filepath, checksum, str(metadata) if metadata else None))
""",
(filename, filepath, checksum, str(metadata) if metadata else None),
)
conn.commit()
logger.info(f"Document marked as processed: {filepath}")
@@ -104,62 +136,67 @@ class DocumentTracker:
def get_text_splitter(file_extension: str):
"""Get appropriate text splitter based on file type."""
from llama_index.core.node_parser import SentenceSplitter, CodeSplitter, TokenTextSplitter
from llama_index.core.node_parser import MarkdownElementNodeParser
from llama_index.core.node_parser import (
CodeSplitter,
MarkdownElementNodeParser,
SentenceSplitter,
TokenTextSplitter,
)
# For code files, use CodeSplitter
if file_extension.lower() in ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.cs', '.go', '.rs', '.php', '.html', '.css', '.md', '.rst']:
if file_extension.lower() in [
".py",
".js",
".ts",
".java",
".cpp",
".c",
".h",
".cs",
".go",
".rs",
".php",
".html",
".css",
".md",
".rst",
]:
return CodeSplitter(language="python", max_chars=1000)
# For PDF files, use a parser that can handle multi-page documents
elif file_extension.lower() == '.pdf':
elif file_extension.lower() == ".pdf":
return SentenceSplitter(
chunk_size=512, # Smaller chunks for dense PDF content
chunk_overlap=100
chunk_overlap=100,
)
# For presentation files (PowerPoint), use smaller chunks
elif file_extension.lower() == '.pptx':
elif file_extension.lower() == ".pptx":
return SentenceSplitter(
chunk_size=256, # Slides typically have less text
chunk_overlap=50
chunk_overlap=50,
)
# For spreadsheets, use smaller chunks
elif file_extension.lower() == '.xlsx':
return SentenceSplitter(
chunk_size=256,
chunk_overlap=50
)
elif file_extension.lower() == ".xlsx":
return SentenceSplitter(chunk_size=256, chunk_overlap=50)
# For text-heavy documents like Word, use medium-sized chunks
elif file_extension.lower() in ['.docx', '.odt']:
return SentenceSplitter(
chunk_size=768,
chunk_overlap=150
)
elif file_extension.lower() in [".docx", ".odt"]:
return SentenceSplitter(chunk_size=768, chunk_overlap=150)
# For plain text files, use larger chunks
elif file_extension.lower() == '.txt':
return SentenceSplitter(
chunk_size=1024,
chunk_overlap=200
)
elif file_extension.lower() == ".txt":
return SentenceSplitter(chunk_size=1024, chunk_overlap=200)
# For image files, we'll handle them differently (metadata extraction)
elif file_extension.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg']:
elif file_extension.lower() in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg"]:
# Images will be handled by multimodal models, return a simple splitter
return SentenceSplitter(
chunk_size=512,
chunk_overlap=100
)
return SentenceSplitter(chunk_size=512, chunk_overlap=100)
# For other files, use a standard SentenceSplitter
else:
return SentenceSplitter(
chunk_size=768,
chunk_overlap=150
)
return SentenceSplitter(chunk_size=768, chunk_overlap=150)
def ensure_proper_encoding(text):
@@ -178,35 +215,41 @@ def ensure_proper_encoding(text):
if isinstance(text, bytes):
# Decode bytes to string with proper encoding
try:
return text.decode('utf-8')
return text.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try other encodings commonly used for Russian/Cyrillic text
try:
return text.decode('cp1251') # Windows Cyrillic encoding
return text.decode("cp1251") # Windows Cyrillic encoding
except UnicodeDecodeError:
try:
return text.decode('koi8-r') # Russian encoding
return text.decode("koi8-r") # Russian encoding
except UnicodeDecodeError:
# If all else fails, decode with errors='replace'
return text.decode('utf-8', errors='replace')
return text.decode("utf-8", errors="replace")
elif isinstance(text, str):
# Ensure the string is properly encoded
try:
# Try to encode and decode to ensure it's valid UTF-8
return text.encode('utf-8').decode('utf-8')
return text.encode("utf-8").decode("utf-8")
except UnicodeEncodeError:
# If there are encoding issues, try to fix them
return text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
return text.encode("utf-8", errors="replace").decode(
"utf-8", errors="replace"
)
else:
# Convert other types to string and ensure proper encoding
text_str = str(text)
try:
return text_str.encode('utf-8').decode('utf-8')
return text_str.encode("utf-8").decode("utf-8")
except UnicodeEncodeError:
return text_str.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
return text_str.encode("utf-8", errors="replace").decode(
"utf-8", errors="replace"
)
def process_documents_from_data_folder(data_path: str = "../../../data", recursive: bool = True):
def process_documents_from_data_folder(
data_path: str = "../../../data", recursive: bool = True
):
"""
Process all documents from the data folder using appropriate loaders and store in vector DB.
@@ -237,11 +280,7 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
return
# Find all supported files in the data directory
supported_extensions = {
'.pdf', '.docx', '.xlsx', '.pptx', '.odt', '.txt',
'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg',
'.zip', '.rar', '.tar', '.gz'
}
supported_extensions = get_supported_enrichment_extensions()
# Walk through the directory structure
all_files = []
@@ -258,111 +297,140 @@ def process_documents_from_data_folder(data_path: str = "../../../data", recursi
if file_ext in supported_extensions:
all_files.append(str(file))
logger.info(f"Found {len(all_files)} files to process")
logger.info(
f"Found {len(all_files)} supported files to process (extensions: {', '.join(sorted(supported_extensions))})"
)
processed_count = 0
skipped_count = 0
error_count = 0
# Initialize progress bar
pbar = tqdm(total=len(all_files), desc="Processing documents", unit="file")
for file_path in all_files:
logger.info(f"Processing file: {file_path} ({processed_count + skipped_count + 1}/{len(all_files)})")
# Check if document has already been processed
if tracker.is_document_processed(file_path):
logger.info(f"Skipping already processed file: {file_path}")
skipped_count += 1
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
pbar.update(1)
continue
logger.info(
f"Processing file: {file_path} ({processed_count + skipped_count + 1}/{len(all_files)})"
)
try:
# Load the document using SimpleDirectoryReader
# This automatically selects the appropriate reader based on file extension
def file_metadata_func(file_path_str):
# Apply proper encoding to filename
filename = ensure_proper_encoding(Path(file_path_str).name)
return {"filename": filename}
reader = SimpleDirectoryReader(
input_files=[file_path],
file_metadata=file_metadata_func
result = process_document_file(file_path, tracker=tracker, index=index)
if result["status"] == "processed":
processed_count += 1
elif result["status"] == "skipped":
skipped_count += 1
else:
error_count += 1
pbar.set_postfix(
{"Processed": processed_count, "Skipped": skipped_count, "Errors": error_count}
)
documents = reader.load_data()
# Process each document
for doc in documents:
# Extract additional metadata based on document type
file_ext = Path(file_path).suffix
# Apply proper encoding to file path
encoded_file_path = ensure_proper_encoding(file_path)
# Add additional metadata
doc.metadata["file_path"] = encoded_file_path
doc.metadata["processed_at"] = datetime.now().isoformat()
# Handle document-type-specific metadata
if file_ext.lower() == '.pdf':
# PDF-specific metadata
doc.metadata["page_label"] = ensure_proper_encoding(doc.metadata.get("page_label", "unknown"))
doc.metadata["file_type"] = "pdf"
elif file_ext.lower() in ['.docx', '.odt']:
# Word document metadata
doc.metadata["section"] = ensure_proper_encoding(doc.metadata.get("section", "unknown"))
doc.metadata["file_type"] = "document"
elif file_ext.lower() == '.pptx':
# PowerPoint metadata
doc.metadata["slide_id"] = ensure_proper_encoding(doc.metadata.get("slide_id", "unknown"))
doc.metadata["file_type"] = "presentation"
elif file_ext.lower() == '.xlsx':
# Excel metadata
doc.metadata["sheet_name"] = ensure_proper_encoding(doc.metadata.get("sheet_name", "unknown"))
doc.metadata["file_type"] = "spreadsheet"
# Determine the appropriate text splitter based on file type
splitter = get_text_splitter(file_ext)
# Split the document into nodes
nodes = splitter.get_nodes_from_documents([doc])
# Insert nodes into the vector index
nodes_with_enhanced_metadata = []
for i, node in enumerate(nodes):
# Enhance node metadata with additional information
node.metadata["original_doc_id"] = ensure_proper_encoding(doc.doc_id)
node.metadata["chunk_number"] = i
node.metadata["total_chunks"] = len(nodes)
node.metadata["file_path"] = encoded_file_path
# Ensure the text content is properly encoded
node.text = ensure_proper_encoding(node.text)
nodes_with_enhanced_metadata.append(node)
# Add all nodes to the index at once
if nodes_with_enhanced_metadata:
index.insert_nodes(nodes_with_enhanced_metadata)
logger.info(f"Processed {len(nodes)} nodes from {encoded_file_path}")
# Mark document as processed only after successful insertion
tracker.mark_document_processed(file_path, {"nodes_count": len(documents)})
processed_count += 1
pbar.set_postfix({"Processed": processed_count, "Skipped": skipped_count})
except Exception as e:
logger.error(f"Error processing file {file_path}: {str(e)}")
error_count += 1
pbar.set_postfix(
{"Processed": processed_count, "Skipped": skipped_count, "Errors": error_count}
)
# Update progress bar regardless of success or failure
pbar.update(1)
pbar.close()
logger.info(f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}")
logger.info(
f"Document enrichment completed. Processed: {processed_count}, Skipped: {skipped_count}, Errors: {error_count}"
)
def process_document_file(
file_path: str,
tracker: Optional[DocumentTracker] = None,
index=None,
) -> Dict[str, Any]:
"""
Process a single document file and store its chunks in the vector index.
Returns a dict with status and counters. Status is one of:
`processed`, `skipped`, `error`.
"""
file_ext = Path(file_path).suffix.lower()
if file_ext not in get_supported_enrichment_extensions():
logger.info(f"Skipping unsupported extension for file: {file_path}")
return {"status": "skipped", "reason": "unsupported_extension", "nodes": 0}
tracker = tracker or DocumentTracker()
if tracker.is_document_processed(file_path):
logger.info(f"Skipping already processed file: {file_path}")
return {"status": "skipped", "reason": "already_processed", "nodes": 0}
if index is None:
_, index = get_vector_store_and_index()
try:
def file_metadata_func(file_path_str):
filename = ensure_proper_encoding(Path(file_path_str).name)
return {"filename": filename}
reader = SimpleDirectoryReader(
input_files=[file_path], file_metadata=file_metadata_func
)
documents = reader.load_data()
total_nodes_inserted = 0
for doc in documents:
current_file_ext = Path(file_path).suffix
encoded_file_path = ensure_proper_encoding(file_path)
doc.metadata["file_path"] = encoded_file_path
doc.metadata["processed_at"] = datetime.now().isoformat()
if current_file_ext.lower() == ".pdf":
doc.metadata["page_label"] = ensure_proper_encoding(
doc.metadata.get("page_label", "unknown")
)
doc.metadata["file_type"] = "pdf"
elif current_file_ext.lower() in [".docx", ".odt", ".doc", ".rtf"]:
doc.metadata["section"] = ensure_proper_encoding(
doc.metadata.get("section", "unknown")
)
doc.metadata["file_type"] = "document"
elif current_file_ext.lower() in [".pptx", ".ppt"]:
doc.metadata["slide_id"] = ensure_proper_encoding(
doc.metadata.get("slide_id", "unknown")
)
doc.metadata["file_type"] = "presentation"
elif current_file_ext.lower() in [".xlsx", ".xls", ".csv", ".tsv"]:
doc.metadata["sheet_name"] = ensure_proper_encoding(
doc.metadata.get("sheet_name", "unknown")
)
doc.metadata["file_type"] = "spreadsheet"
splitter = get_text_splitter(current_file_ext)
nodes = splitter.get_nodes_from_documents([doc])
nodes_with_enhanced_metadata = []
for i, node in enumerate(nodes):
node.metadata["original_doc_id"] = ensure_proper_encoding(doc.doc_id)
node.metadata["chunk_number"] = i
node.metadata["total_chunks"] = len(nodes)
node.metadata["file_path"] = encoded_file_path
node.text = ensure_proper_encoding(node.text)
nodes_with_enhanced_metadata.append(node)
if nodes_with_enhanced_metadata:
index.insert_nodes(nodes_with_enhanced_metadata)
total_nodes_inserted += len(nodes_with_enhanced_metadata)
logger.info(f"Processed {len(nodes)} nodes from {encoded_file_path}")
tracker.mark_document_processed(
file_path,
{"documents_count": len(documents), "nodes_count": total_nodes_inserted},
)
return {"status": "processed", "nodes": total_nodes_inserted}
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
return {"status": "error", "reason": str(e), "nodes": 0}
def enrich_documents():

View File

@@ -0,0 +1,268 @@
"""
Prefect flow for enriching documents from a predefined YaDisk file list.
Flow steps:
1. Load file paths from ../../../yadisk_files.json
2. Filter them by supported enrichment extensions
3. Download each file from YaDisk asynchronously
4. Enrich each downloaded file into vector storage
5. Remove downloaded temporary files after processing
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
from loguru import logger
from tqdm import tqdm
ROOT_DIR = Path(__file__).resolve().parents[1]
load_dotenv(ROOT_DIR / ".env")
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
import yadisk
from enrichment import get_supported_enrichment_extensions, process_document_file
from prefect import flow, task
DEFAULT_YADISK_LIST_PATH = (ROOT_DIR / "../../../yadisk_files.json").resolve()
def setup_prefect_flow_logging() -> None:
"""Configure loguru handlers for flow execution."""
logs_dir = ROOT_DIR / "logs"
logs_dir.mkdir(exist_ok=True)
logger.remove()
logger.add(
str(logs_dir / "dev.log"),
rotation="10 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {file}:{line} | {message}",
)
logger.add(
sys.stdout,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
colorize=True,
)
def _normalize_yadisk_paths(payload: Any) -> list[str]:
"""Extract a list of file paths from several common JSON shapes."""
if isinstance(payload, list):
return [str(item) for item in payload if isinstance(item, (str, Path))]
if isinstance(payload, dict):
for key in ("paths", "files", "items"):
value = payload.get(key)
if isinstance(value, list):
normalized: list[str] = []
for item in value:
if isinstance(item, str):
normalized.append(item)
elif isinstance(item, dict):
for item_key in ("path", "remote_path", "file_path", "name"):
if item_key in item and item[item_key]:
normalized.append(str(item[item_key]))
break
return normalized
raise ValueError(
"Unsupported yadisk_files.json structure. Expected list or dict with paths/files/items."
)
def _make_temp_local_path(base_dir: Path, remote_path: str) -> Path:
"""Create a deterministic temp file path for a remote YaDisk path."""
remote_name = Path(remote_path).name or "downloaded_file"
suffix = Path(remote_name).suffix
stem = Path(remote_name).stem or "file"
digest = hashlib.md5(remote_path.encode("utf-8")).hexdigest()[:10]
safe_stem = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in stem)
return base_dir / f"{safe_stem}_{digest}{suffix}"
@task(name="load_yadisk_paths")
def load_yadisk_paths(json_file_path: str) -> list[str]:
"""Load remote file paths from JSON file."""
path = Path(json_file_path)
if not path.exists():
raise FileNotFoundError(f"YaDisk paths JSON file not found: {path}")
with path.open("r", encoding="utf-8") as f:
payload = json.load(f)
paths = _normalize_yadisk_paths(payload)
logger.info(f"Loaded {len(paths)} paths from {path}")
return paths
@task(name="filter_supported_yadisk_paths")
def filter_supported_yadisk_paths(paths: list[str]) -> list[str]:
"""Keep only paths supported by enrichment extension filters."""
supported_extensions = get_supported_enrichment_extensions()
filtered = [p for p in paths if Path(str(p)).suffix.lower() in supported_extensions]
logger.info(
f"Filtered YaDisk paths: {len(filtered)}/{len(paths)} supported "
f"(extensions: {', '.join(sorted(supported_extensions))})"
)
return filtered
async def _download_and_enrich_one(
client: yadisk.AsyncClient,
remote_path: str,
temp_dir: Path,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
pbar: tqdm,
pbar_lock: asyncio.Lock,
) -> None:
"""Download one YaDisk file, enrich it, remove it, and update stats."""
local_path = _make_temp_local_path(temp_dir, remote_path)
result_status = "error"
async with semaphore:
try:
local_path.parent.mkdir(parents=True, exist_ok=True)
await client.download(remote_path, str(local_path))
# Run sync enrichment in a worker thread. Exceptions are swallowed below.
enrich_result = await asyncio.to_thread(
process_document_file, str(local_path)
)
result_status = str(enrich_result.get("status", "error"))
logger.info(
f"YaDisk processed: remote={remote_path}, local={local_path}, status={result_status}"
)
except Exception as e:
# Explicitly swallow errors as requested.
logger.error(f"YaDisk processing error for {remote_path}: {e}")
result_status = "error"
finally:
try:
if local_path.exists():
local_path.unlink()
except Exception as cleanup_error:
logger.warning(
f"Failed to remove temp file {local_path}: {cleanup_error}"
)
async with pbar_lock:
if result_status == "processed":
stats["processed"] += 1
elif result_status == "skipped":
stats["skipped"] += 1
else:
stats["errors"] += 1
stats["completed"] += 1
pbar.update(1)
pbar.set_postfix(
processed=stats["processed"],
skipped=stats["skipped"],
errors=stats["errors"],
)
@flow(name="yadisk_predefined_enrich")
async def yadisk_predefined_enrich_flow(
yadisk_json_path: str = str(DEFAULT_YADISK_LIST_PATH),
concurrency: int = 4,
) -> dict[str, int]:
"""
Download and enrich YaDisk files listed in the JSON file using async YaDisk client.
"""
setup_prefect_flow_logging()
prefect_api_url = os.getenv("PREFECT_API_URL", "").strip()
yadisk_token = os.getenv("YADISK_TOKEN", "").strip()
if not prefect_api_url:
logger.warning("PREFECT_API_URL is not set in environment/.env")
else:
# Prefect reads this env var for API connectivity.
os.environ["PREFECT_API_URL"] = prefect_api_url
logger.info(f"Using Prefect API URL: {prefect_api_url}")
if not yadisk_token:
raise ValueError("YADISK_TOKEN is required in .env to access Yandex Disk")
all_paths = load_yadisk_paths(yadisk_json_path)
supported_paths = filter_supported_yadisk_paths(all_paths)
stats = {
"total": len(supported_paths),
"completed": 0,
"processed": 0,
"skipped": 0,
"errors": 0,
}
if not supported_paths:
logger.info("No supported YaDisk paths to process")
return stats
concurrency = max(1, int(concurrency))
logger.info(
f"Starting async YaDisk enrichment for {len(supported_paths)} files with concurrency={concurrency}"
)
semaphore = asyncio.Semaphore(concurrency)
pbar_lock = asyncio.Lock()
with tempfile.TemporaryDirectory(prefix="yadisk_enrich_") as temp_dir_str:
temp_dir = Path(temp_dir_str)
pbar = tqdm(total=len(supported_paths), desc="YaDisk enrich", unit="file")
try:
async with yadisk.AsyncClient(token=yadisk_token) as client:
try:
is_token_valid = await client.check_token()
logger.info(f"YaDisk token validation result: {is_token_valid}")
except Exception as token_check_error:
# Token check issues should not block processing attempts.
logger.warning(f"YaDisk token check failed: {token_check_error}")
tasks = [
asyncio.create_task(
_download_and_enrich_one(
client=client,
remote_path=remote_path,
temp_dir=temp_dir,
semaphore=semaphore,
stats=stats,
pbar=pbar,
pbar_lock=pbar_lock,
)
)
for remote_path in supported_paths
]
# Worker function swallows per-file errors, but keep gather resilient too.
await asyncio.gather(*tasks, return_exceptions=True)
finally:
pbar.close()
logger.info(
"YaDisk enrichment flow finished. "
f"Total={stats['total']}, Completed={stats['completed']}, "
f"Processed={stats['processed']}, Skipped={stats['skipped']}, Errors={stats['errors']}"
)
return stats
if __name__ == "__main__":
print("SERVING PREFECT FLOW FOR YANDEX DISK ENRICHMENT OF PREDEFINED PATHS")
yadisk_predefined_enrich_flow.serve()

View File

@@ -0,0 +1,173 @@
aiofiles==25.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
aiosqlite==0.22.1
alembic==1.18.4
amplitude-analytics==1.2.2
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.12.1
apprise==1.9.7
asgi-lifespan==2.1.0
asyncpg==0.31.0
attrs==25.4.0
banks==2.3.0
beartype==0.22.9
beautifulsoup4==4.14.3
cachetools==7.0.1
certifi==2026.1.4
cffi==2.0.0
charset-normalizer==3.4.4
click==8.3.1
cloudpickle==3.1.2
colorama==0.4.6
coolname==3.0.0
croniter==6.0.0
cryptography==46.0.5
dataclasses-json==0.6.7
dateparser==1.3.0
defusedxml==0.7.1
Deprecated==1.2.18
dirtyjson==1.0.8
distro==1.9.0
docker==7.1.0
docx2txt==0.9
et_xmlfile==2.0.0
exceptiongroup==1.3.1
fakeredis==2.34.0
fastapi==0.131.0
filetype==1.2.0
frozenlist==1.8.0
fsspec==2026.1.0
graphviz==0.21
greenlet==3.3.1
griffe==1.15.0
grpcio==1.76.0
h11==0.16.0
h2==4.3.0
hpack==4.1.0
httpcore==1.0.9
httpx==0.28.1
humanize==4.15.0
hyperframe==6.1.0
idna==3.11
importlib_metadata==8.7.1
iniconfig==2.3.0
Jinja2==3.1.6
jinja2-humanize-extension==0.4.0
jiter==0.13.0
joblib==1.5.3
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.26.0
jsonschema-specifications==2025.9.1
llama-cloud==0.1.35
llama-cloud-services==0.6.54
llama-index==0.14.13
llama-index-cli==0.5.3
llama-index-core==0.14.13
llama-index-embeddings-ollama==0.8.6
llama-index-embeddings-openai==0.5.1
llama-index-embeddings-openai-like==0.2.2
llama-index-indices-managed-llama-cloud==0.9.4
llama-index-instrumentation==0.4.2
llama-index-llms-ollama==0.9.1
llama-index-llms-openai==0.6.17
llama-index-readers-file==0.5.6
llama-index-readers-llama-parse==0.5.1
llama-index-vector-stores-qdrant==0.9.1
llama-index-workflows==2.13.1
llama-parse==0.6.54
loguru==0.7.3
lupa==2.6
lxml==6.0.2
Mako==1.3.10
Markdown==3.10.2
markdown-it-py==4.0.0
MarkupSafe==3.0.3
marshmallow==3.26.2
mdurl==0.1.2
multidict==6.7.1
mypy_extensions==1.1.0
nest-asyncio==1.6.0
networkx==3.6.1
nltk==3.9.2
numpy==2.4.2
oauthlib==3.3.1
ollama==0.6.1
openai==2.16.0
openpyxl==3.1.5
opentelemetry-api==1.39.1
orjson==3.11.7
packaging==25.0
pandas==2.3.3
pathspec==1.0.4
patool==4.0.4
pendulum==3.2.0
pillow==12.1.0
platformdirs==4.5.1
pluggy==1.6.0
portalocker==3.2.0
prefect==3.6.18
prometheus_client==0.24.1
propcache==0.4.1
protobuf==6.33.5
py-key-value-aio==0.4.4
pycparser==3.0
pydantic==2.12.5
pydantic-extra-types==2.11.0
pydantic-settings==2.13.1
pydantic_core==2.41.5
pydocket==0.17.9
Pygments==2.19.2
pypdf==6.6.2
pytest==9.0.2
pytest-asyncio==1.3.0
python-dateutil==2.9.0.post0
python-dotenv==1.2.1
python-json-logger==4.0.0
python-pptx==1.0.2
python-slugify==8.0.4
pytz==2025.2
PyYAML==6.0.3
qdrant-client==1.16.2
readchar==4.2.1
redis==7.2.0
referencing==0.37.0
regex==2026.1.15
requests==2.32.5
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==14.3.3
rpds-py==0.30.0
ruamel.yaml==0.19.1
ruamel.yaml.clib==0.2.15
semver==3.0.4
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
sortedcontainers==2.4.0
soupsieve==2.8.3
SQLAlchemy==2.0.46
starlette==0.52.1
striprtf==0.0.26
tenacity==9.1.2
text-unidecode==1.3
tiktoken==0.12.0
toml==0.10.2
tqdm==4.67.3
typer==0.24.1
typing-inspect==0.9.0
typing-inspection==0.4.2
typing_extensions==4.15.0
tzdata==2025.3
tzlocal==5.3.1
urllib3==2.6.3
uvicorn==0.41.0
websockets==16.0
wrapt==1.17.3
xlsxwriter==3.2.9
yadisk==3.4.0
yarl==1.22.0
zipp==3.23.0

View File

@@ -14,7 +14,7 @@ from llama_index.core.retrievers import VectorIndexRetriever
from loguru import logger
from pathlib import Path
from vector_storage import get_vector_store_and_index
from vector_storage import get_qdrant_connection_config, get_vector_store_and_index
# Import the new configuration module
from config import setup_global_models
@@ -23,8 +23,9 @@ from config import setup_global_models
def initialize_retriever(
collection_name: str = "documents_llamaindex",
similarity_top_k: int = 5,
host: str = "localhost",
port: int = 6333
host: str | None = None,
port: int | None = None,
grpc_port: int | None = None,
) -> RetrieverQueryEngine:
"""
Initialize the retriever query engine with the vector store.
@@ -32,8 +33,9 @@ def initialize_retriever(
Args:
collection_name: Name of the Qdrant collection
similarity_top_k: Number of top similar documents to retrieve
host: Qdrant host address
port: Qdrant REST API port
host: Qdrant host address (defaults to QDRANT_HOST from .env)
port: Qdrant REST API port (defaults to QDRANT_REST_PORT from .env)
grpc_port: Qdrant gRPC API port (defaults to QDRANT_GRPC_PORT from .env)
Returns:
RetrieverQueryEngine configured with the vector store
@@ -44,8 +46,23 @@ def initialize_retriever(
# Set up the global models to prevent defaulting to OpenAI
setup_global_models()
qdrant_config = get_qdrant_connection_config()
resolved_host = host or str(qdrant_config["host"])
resolved_port = port or int(qdrant_config["port"])
resolved_grpc_port = grpc_port or int(qdrant_config["grpc_port"])
logger.info(
f"Retriever Qdrant connection: host={resolved_host}, "
f"rest_port={resolved_port}, grpc_port={resolved_grpc_port}"
)
# Get the vector store and index from the existing configuration
vector_store, index = get_vector_store_and_index()
vector_store, index = get_vector_store_and_index(
collection_name=collection_name,
host=resolved_host,
port=resolved_port,
grpc_port=resolved_grpc_port,
)
# Create a retriever from the index
retriever = VectorIndexRetriever(

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
HTTP API server for querying the vector storage via the existing retrieval pipeline.
"""
from pathlib import Path
import sys
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from loguru import logger
from retrieval import initialize_retriever
load_dotenv()
def setup_logging() -> None:
"""Configure loguru to stdout and rotating file logs."""
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
logger.remove()
logger.add(
"logs/dev.log",
rotation="10 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {file}:{line} | {message}",
)
logger.add(
sys.stdout,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
colorize=True,
)
setup_logging()
app = FastAPI(title="LlamaIndex RAG API", version="1.0.0")
class TestQueryRequest(BaseModel):
query: str = Field(..., min_length=1, description="User query text")
top_k: int = Field(5, ge=1, le=20, description="Number of retrieved chunks")
class SourceItem(BaseModel):
content: str
score: float | None = None
metadata: dict = Field(default_factory=dict)
class TestQueryResponse(BaseModel):
query: str
response: str
sources: list[SourceItem]
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/api/test-query", response_model=TestQueryResponse)
def test_query(payload: TestQueryRequest) -> TestQueryResponse:
"""
Query the vector store using the existing retrieval/query engine.
"""
query = payload.query.strip()
if not query:
raise HTTPException(status_code=400, detail="Field 'query' must not be empty")
logger.info(f"Received /api/test-query request (top_k={payload.top_k})")
try:
query_engine = initialize_retriever(similarity_top_k=payload.top_k)
result = query_engine.query(query)
sources: list[SourceItem] = []
if hasattr(result, "source_nodes"):
for node in result.source_nodes:
sources.append(
SourceItem(
content=str(getattr(node, "text", "")),
score=getattr(node, "score", None),
metadata=getattr(node, "metadata", {}) or {},
)
)
response_text = str(result)
logger.info(
f"/api/test-query completed successfully (sources={len(sources)})"
)
return TestQueryResponse(query=query, response=response_text, sources=sources)
except HTTPException:
raise
except Exception as e:
logger.error(f"/api/test-query failed: {e}")
raise HTTPException(status_code=500, detail="Failed to process query")
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=False)

View File

@@ -10,6 +10,7 @@ This module provides initialization and configuration for:
import os
from typing import Optional
from dotenv import load_dotenv
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from loguru import logger
@@ -18,12 +19,26 @@ from qdrant_client import QdrantClient
# Import the new configuration module
from config import get_embedding_model
load_dotenv()
def get_qdrant_connection_config() -> dict[str, int | str]:
"""Load Qdrant connection settings from environment variables."""
host = os.getenv("QDRANT_HOST", "localhost")
rest_port = int(os.getenv("QDRANT_REST_PORT", "6333"))
grpc_port = int(os.getenv("QDRANT_GRPC_PORT", "6334"))
return {
"host": host,
"port": rest_port,
"grpc_port": grpc_port,
}
def initialize_vector_storage(
collection_name: str = "documents_llamaindex",
host: str = "localhost",
port: int = 6333,
grpc_port: int = 6334,
host: Optional[str] = None,
port: Optional[int] = None,
grpc_port: Optional[int] = None,
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Initialize Qdrant vector storage with embedding model based on configured strategy.
@@ -37,11 +52,19 @@ def initialize_vector_storage(
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
logger.info(f"Initializing vector storage with collection: {collection_name}")
qdrant_config = get_qdrant_connection_config()
host = host or str(qdrant_config["host"])
port = port or int(qdrant_config["port"])
grpc_port = grpc_port or int(qdrant_config["grpc_port"])
logger.info(
f"Initializing vector storage with collection: {collection_name} "
f"(host={host}, rest_port={port}, grpc_port={grpc_port})"
)
try:
# Initialize Qdrant client
client = QdrantClient(host=host, port=port)
client = QdrantClient(host=host, port=port, grpc_port=grpc_port)
# Get the embedding model based on the configured strategy
embed_model = get_embedding_model()
@@ -131,14 +154,24 @@ def initialize_vector_storage(
raise
def get_vector_store_and_index() -> tuple[QdrantVectorStore, VectorStoreIndex]:
def get_vector_store_and_index(
collection_name: str = "documents_llamaindex",
host: Optional[str] = None,
port: Optional[int] = None,
grpc_port: Optional[int] = None,
) -> tuple[QdrantVectorStore, VectorStoreIndex]:
"""
Convenience function to get the initialized vector store and index.
Returns:
Tuple of (QdrantVectorStore, VectorStoreIndex)
"""
return initialize_vector_storage()
return initialize_vector_storage(
collection_name=collection_name,
host=host,
port=port,
grpc_port=grpc_port,
)
if __name__ == "__main__":

97955
yadisk_files.json Normal file

File diff suppressed because it is too large Load Diff