#!/usr/bin/env python3 """ RAG evaluation script (file-batch mode). Key behavior: - Step = one document file (all its questions), not one question. - Pre-download/caching in ./tmp/rag-evaluation (skip if already downloaded). - Sequential API calls only (LangChain then LlamaIndex). - Pairwise answer evaluation (both systems in one judge prompt). - JSON output with append/overwrite support for batch runs and re-runs. """ from __future__ import annotations import argparse import datetime as dt import json import os import re import time from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any try: import requests except ImportError as e: # pragma: no cover raise SystemExit( "Missing dependency: requests. Run with your project venv " "(for example services/rag/langchain/venv/bin/python rag_evaluation.py ...)" ) from e from dotenv import load_dotenv load_dotenv() # ============================================================================= # Configuration # ============================================================================= LANGCHAIN_URL = os.getenv("LANGCHAIN_URL", "http://localhost:8331/api/test-query") LLAMAINDEX_URL = os.getenv("LLAMAINDEX_URL", "http://localhost:8334/api/test-query") # OpenAI-compatible evaluator endpoint. You can point this at OpenAI-compatible providers. OPENAI_CHAT_URL = os.getenv( "OPENAI_CHAT_URL", "https://foundation-models.api.cloud.ru/v1" ) OPENAI_CHAT_KEY = os.getenv("OPENAI_CHAT_KEY", "") OPENAI_CHAT_MODEL = os.getenv("OPENAI_CHAT_MODEL", "MiniMaxAI/MiniMax-M2") YADISK_TOKEN = os.getenv("YADISK_TOKEN", "") BASE_DIR = Path(__file__).resolve().parent INPUT_MD = BASE_DIR / "DOCUMENTS_TO_TEST.md" OUTPUT_JSON = BASE_DIR / "EVALUATION_RESULT.json" TMP_DIR = BASE_DIR / "tmp" / "rag-evaluation" RAG_TIMEOUT = int(os.getenv("RAG_TIMEOUT", "120")) EVAL_TIMEOUT = int(os.getenv("EVAL_TIMEOUT", "90")) YADISK_META_TIMEOUT = int(os.getenv("YADISK_META_TIMEOUT", "30")) YADISK_DOWNLOAD_TIMEOUT = int(os.getenv("YADISK_DOWNLOAD_TIMEOUT", "180")) # ============================================================================= # Data structures # ============================================================================= @dataclass class QuestionResult: section: str question: str langchain_answer: str = "" llamaindex_answer: str = "" langchain_score: float = 0.0 llamaindex_score: float = 0.0 winner: str = "Tie" rationale: str = "" evaluator_model: str = "" evaluated_at: str = "" @dataclass class DocumentEvaluation: index: int path: str cache_file: str = "" cache_status: str = "" questions: list[QuestionResult] = field(default_factory=list) started_at: str = "" finished_at: str = "" # ============================================================================= # Markdown parsing # ============================================================================= def split_documents(md_text: str) -> tuple[list[str], list[str]]: lines = md_text.splitlines() header: list[str] = [] docs: list[list[str]] = [] current: list[str] | None = None for line in lines: if line.startswith("## "): if current is not None: docs.append(current) current = [line] else: if current is None: header.append(line) else: current.append(line) if current is not None: docs.append(current) return header, ["\n".join(d) for d in docs] def parse_document_block(idx: int, block: str) -> tuple[str, list[QuestionResult]]: lines = block.splitlines() header = lines[0].strip() m = re.search(r"`([^`]+)`", header) doc_path = m.group(1) if m else "" section = "" questions: list[QuestionResult] = [] for line in lines[1:]: if line.startswith("### "): section = line[4:].strip() elif line.startswith("- "): q = line[2:].strip() if q: questions.append(QuestionResult(section=section, question=q)) return doc_path, questions def parse_all_docs(md_path: Path) -> list[tuple[int, str, list[QuestionResult]]]: raw = md_path.read_text(encoding="utf-8") _, blocks = split_documents(raw) parsed: list[tuple[int, str, list[QuestionResult]]] = [] for i, block in enumerate(blocks, start=1): path, questions = parse_document_block(i, block) parsed.append((i, path, questions)) return parsed # ============================================================================= # Caching / Yandex Disk # ============================================================================= def cache_file_name(remote_path: str) -> str: # Deterministic local cache filename digest = re.sub(r"[^a-z0-9]", "", str(abs(hash(remote_path))))[:12] suffix = Path(remote_path).suffix or ".bin" return f"{digest}{suffix}" def download_yadisk_to_cache(remote_path: str, token: str, cache_path: Path) -> str: """ Download file into cache path if missing. Returns status: "cached_existing" | "downloaded" | "error:..." """ cache_path.parent.mkdir(parents=True, exist_ok=True) if cache_path.exists() and cache_path.stat().st_size > 0: return "cached_existing" if not token: return "error:missing_yadisk_token" headers = {"Authorization": f"OAuth {token}"} try: r = requests.get( "https://cloud-api.yandex.net/v1/disk/resources/download", headers=headers, params={"path": remote_path}, timeout=YADISK_META_TIMEOUT, ) r.raise_for_status() href = r.json()["href"] f = requests.get(href, timeout=YADISK_DOWNLOAD_TIMEOUT) f.raise_for_status() cache_path.write_bytes(f.content) if cache_path.stat().st_size == 0: return "error:empty_download" return "downloaded" except Exception as e: # noqa: BLE001 return f"error:{e}" # ============================================================================= # File text extraction (for evaluator context) # ============================================================================= def extract_text_from_file(path: Path) -> str: ext = path.suffix.lower() if ext in {".txt", ".md", ".csv", ".json", ".xml", ".html", ".htm"}: return path.read_text(encoding="utf-8", errors="ignore") if ext in {".docx", ".doc"}: try: from docx import Document # type: ignore doc = Document(str(path)) return "\n".join(p.text for p in doc.paragraphs) except Exception as e: # noqa: BLE001 return f"[DOC parse error: {e}]" if ext == ".pdf": try: import PyPDF2 # type: ignore out: list[str] = [] with path.open("rb") as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: out.append(page.extract_text() or "") return "\n".join(out) except Exception as e: # noqa: BLE001 return f"[PDF parse error: {e}]" if ext in {".xlsx", ".xls"}: try: from openpyxl import load_workbook # type: ignore wb = load_workbook(str(path), read_only=True) out: list[str] = [] for ws in wb.worksheets: for row in ws.iter_rows(values_only=True): out.append("\t".join("" if c is None else str(c) for c in row)) if len(out) > 5000: break if len(out) > 5000: break return "\n".join(out) except Exception as e: # noqa: BLE001 return f"[XLS parse error: {e}]" # fallback try: return path.read_text(encoding="utf-8", errors="ignore") except Exception: return f"[Binary file: {path.name}]" # ============================================================================= # RAG API calls (sequential) # ============================================================================= def call_rag(url: str, query: str, timeout: int) -> str: payload = {"query": query} try: r = requests.post(url, json=payload, timeout=timeout) r.raise_for_status() data = r.json() text = data.get("response", "") if text is None: return "" return str(text).strip() except Exception as e: # noqa: BLE001 return f"ERROR: {e}" def call_langchain(query: str, timeout: int) -> str: return call_rag(LANGCHAIN_URL, query, timeout) def call_llamaindex(query: str, timeout: int) -> str: payload = {"query": query, "mode": "agent"} try: r = requests.post(LLAMAINDEX_URL, json=payload, timeout=timeout) r.raise_for_status() data = r.json() text = data.get("response", "") if text is None: return "" return str(text).strip() except Exception as e: # noqa: BLE001 return f"ERROR: {e}" # ============================================================================= # Evaluator # ============================================================================= def _rule_score(answer: str) -> float: if not answer or not answer.strip(): return 0.0 if answer.startswith("ERROR:"): return -1.0 score = 0.3 if len(answer) > 120: score += 0.2 if re.search(r"\d", answer): score += 0.1 if re.search(r"[.!?]", answer): score += 0.1 if re.search(r"(не найден|недостаточно|нет информации)", answer.lower()): score += 0.05 return min(1.0, score) SECTION_CRITERIA: dict[str, str] = { "Entity/Fact Recall (Response Relevance)": "Оцени точность извлечения сущностей/фактов и релевантность вопросу.", "Numerical & Temporal Precision": "Оцени точность чисел, дат, периодов и временных связей.", "Context Precision (Evidence-anchored)": "Оцени, насколько ответ опирается на релевантный контекст без лишнего.", "Faithfulness / Non-hallucination": "Оцени отсутствие галлюцинаций и корректное поведение при отсутствии фактов.", "Reasoning & Synthesis": "Оцени качество синтеза фактов и логичность итогового вывода.", } def build_pair_eval_prompt( question: str, section: str, langchain_answer: str, llamaindex_answer: str, document_text: str, ) -> str: criteria = SECTION_CRITERIA.get( section, "Оцени релевантность, точность и полезность." ) context = document_text[:9000] return f"""Ты судья качества RAG-ответов. Сравни два ответа на один вопрос. Вопрос: {question} Секция оценки: {section} Критерий: {criteria} Ответ A (LangChain): {langchain_answer} Ответ B (LlamaIndex): {llamaindex_answer} Опорный контекст документа: {context} Верни ТОЛЬКО JSON: {{ "langchain_score": , "llamaindex_score": , "winner": "LangChain|LlamaIndex|Tie", "rationale": "<кратко по сути>" }} Правила: - Технические ошибки/таймауты должны получать -1.0. - Пустой ответ без ошибки = 0.0. - Галлюцинации сильно штрафуются. - Если разница незначительная, выбирай Tie. """ def evaluate_pair_with_llm( question: str, section: str, langchain_answer: str, llamaindex_answer: str, document_text: str, ) -> tuple[float, float, str, str]: # Deterministic short-circuit for technical failures if langchain_answer.startswith("ERROR:") and llamaindex_answer.startswith("ERROR:"): return -1.0, -1.0, "Tie", "Обе системы вернули техническую ошибку." if langchain_answer.startswith("ERROR:"): return ( -1.0, _rule_score(llamaindex_answer), "LlamaIndex", "LangChain технически не ответил.", ) if llamaindex_answer.startswith("ERROR:"): return ( _rule_score(langchain_answer), -1.0, "LangChain", "LlamaIndex технически не ответил.", ) if not OPENAI_CHAT_KEY: # fallback heuristic lc = _rule_score(langchain_answer) li = _rule_score(llamaindex_answer) if abs(lc - li) < 0.05: return lc, li, "Tie", "Эвристическая оценка без LLM (ключ не задан)." return ( (lc, li, "LangChain", "Эвристическая оценка без LLM.") if lc > li else ( lc, li, "LlamaIndex", "Эвристическая оценка без LLM.", ) ) prompt = build_pair_eval_prompt( question=question, section=section, langchain_answer=langchain_answer, llamaindex_answer=llamaindex_answer, document_text=document_text, ) headers = { "Authorization": f"Bearer {OPENAI_CHAT_KEY}", "Content-Type": "application/json", } payload = { "model": OPENAI_CHAT_MODEL, "messages": [ { "role": "system", "content": "Ты строгий судья качества RAG. Отвечай только JSON.", }, {"role": "user", "content": prompt}, ], "temperature": 0.0, "max_tokens": 400, } try: r = requests.post( f"{OPENAI_CHAT_URL.rstrip('/')}/chat/completions", headers=headers, json=payload, timeout=EVAL_TIMEOUT, ) r.raise_for_status() data = r.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") m = re.search(r"\{.*\}", content, re.DOTALL) raw = m.group(0) if m else content parsed = json.loads(raw) lc = float(parsed.get("langchain_score", 0.0)) li = float(parsed.get("llamaindex_score", 0.0)) winner = str(parsed.get("winner", "Tie")) rationale = str(parsed.get("rationale", "")) if winner not in {"LangChain", "LlamaIndex", "Tie"}: winner = "Tie" return lc, li, winner, rationale except Exception as e: # noqa: BLE001 lc = _rule_score(langchain_answer) li = _rule_score(llamaindex_answer) if abs(lc - li) < 0.05: return lc, li, "Tie", f"Fallback heuristic; LLM eval error: {e}" return ( (lc, li, "LangChain", f"Fallback heuristic; LLM eval error: {e}") if lc > li else ( lc, li, "LlamaIndex", f"Fallback heuristic; LLM eval error: {e}", ) ) # ============================================================================= # JSON storage # ============================================================================= def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def default_json_payload( all_docs: list[tuple[int, str, list[QuestionResult]]], ) -> dict[str, Any]: return { "meta": { "created_at": now_iso(), "updated_at": now_iso(), "input_file": str(INPUT_MD), "langchain_url": LANGCHAIN_URL, "llamaindex_url": LLAMAINDEX_URL, "evaluator_model": OPENAI_CHAT_MODEL, "notes": [ "step = one file (all file questions)", "sequential API calls only", "cache dir: ./tmp/rag-evaluation", ], }, "documents": [ { "index": idx, "path": path, "cache_file": "", "cache_status": "not_processed", "started_at": "", "finished_at": "", "questions": [asdict(q) for q in questions], } for idx, path, questions in all_docs ], "batches": [], } def load_or_init_json( all_docs: list[tuple[int, str, list[QuestionResult]]], output_json: Path, mode: str, ) -> dict[str, Any]: if mode == "overwrite" or not output_json.exists(): return default_json_payload(all_docs) try: data = json.loads(output_json.read_text(encoding="utf-8")) if "documents" not in data: return default_json_payload(all_docs) return data except Exception: return default_json_payload(all_docs) def upsert_document_result(store: dict[str, Any], result: DocumentEvaluation) -> None: docs = store.setdefault("documents", []) for i, doc in enumerate(docs): if doc.get("path") == result.path: docs[i] = { "index": result.index, "path": result.path, "cache_file": result.cache_file, "cache_status": result.cache_status, "started_at": result.started_at, "finished_at": result.finished_at, "questions": [asdict(q) for q in result.questions], } return docs.append( { "index": result.index, "path": result.path, "cache_file": result.cache_file, "cache_status": result.cache_status, "started_at": result.started_at, "finished_at": result.finished_at, "questions": [asdict(q) for q in result.questions], } ) def update_batch_stats(store: dict[str, Any], batch_meta: dict[str, Any]) -> None: store.setdefault("batches", []).append(batch_meta) store.setdefault("meta", {})["updated_at"] = now_iso() def compute_batch_summary(results: list[DocumentEvaluation]) -> dict[str, Any]: wins = {"LangChain": 0, "LlamaIndex": 0, "Tie": 0} scores_lc: list[float] = [] scores_li: list[float] = [] q_total = 0 for d in results: for q in d.questions: q_total += 1 wins[q.winner] = wins.get(q.winner, 0) + 1 scores_lc.append(q.langchain_score) scores_li.append(q.llamaindex_score) avg_lc = sum(scores_lc) / max(1, len(scores_lc)) avg_li = sum(scores_li) / max(1, len(scores_li)) if avg_lc > avg_li + 0.01: ranking = "LangChain" elif avg_li > avg_lc + 0.01: ranking = "LlamaIndex" else: ranking = "Tie" return { "documents_processed": len(results), "questions_processed": q_total, "wins": wins, "avg_langchain": round(avg_lc, 4), "avg_llamaindex": round(avg_li, 4), "ranking": ranking, } # ============================================================================= # Main flow # ============================================================================= def run_evaluation(doc_from: int, doc_to: int, mode: str) -> None: all_docs = parse_all_docs(INPUT_MD) total_docs = len(all_docs) doc_from = max(1, doc_from) doc_to = min(total_docs, doc_to) if doc_from > doc_to: raise ValueError(f"Invalid doc range: {doc_from}:{doc_to}") store = load_or_init_json(all_docs, OUTPUT_JSON, mode) TMP_DIR.mkdir(parents=True, exist_ok=True) selected = [d for d in all_docs if doc_from <= d[0] <= doc_to] print( f"Total docs: {total_docs}. Processing docs {doc_from}:{doc_to} ({len(selected)} steps)." ) print(f"Cache dir: {TMP_DIR}") print(f"Output JSON: {OUTPUT_JSON}") batch_results: list[DocumentEvaluation] = [] batch_started = now_iso() for step, (idx, doc_path, questions) in enumerate(selected, start=1): print(f"\n[STEP {step}/{len(selected)}] File #{idx}: {doc_path}") started = now_iso() cache_name = cache_file_name(doc_path) cache_path = TMP_DIR / cache_name cache_status = download_yadisk_to_cache(doc_path, YADISK_TOKEN, cache_path) print(f" -> cache: {cache_status} ({cache_path})") doc_text = "" if cache_status.startswith("error:"): doc_text = f"[CACHE_ERROR] {cache_status}" else: doc_text = extract_text_from_file(cache_path) print(f" -> extracted text length: {len(doc_text)}") evaluated_questions: list[QuestionResult] = [] for qn, q in enumerate(questions, start=1): qr = QuestionResult(section=q.section, question=q.question) print(f" [{qn}/{len(questions)}] {q.question[:90]}") t0 = time.time() qr.langchain_answer = call_langchain(q.question, timeout=RAG_TIMEOUT) print(f" LangChain: {time.time() - t0:.1f}s") t0 = time.time() qr.llamaindex_answer = call_llamaindex(q.question, timeout=RAG_TIMEOUT) print(f" LlamaIndex: {time.time() - t0:.1f}s") lc, li, winner, rationale = evaluate_pair_with_llm( question=q.question, section=q.section, langchain_answer=qr.langchain_answer, llamaindex_answer=qr.llamaindex_answer, document_text=doc_text, ) qr.langchain_score = lc qr.llamaindex_score = li qr.winner = winner qr.rationale = rationale qr.evaluator_model = OPENAI_CHAT_MODEL qr.evaluated_at = now_iso() evaluated_questions.append(qr) doc_result = DocumentEvaluation( index=idx, path=doc_path, cache_file=str(cache_path), cache_status=cache_status, questions=evaluated_questions, started_at=started, finished_at=now_iso(), ) upsert_document_result(store, doc_result) batch_results.append(doc_result) # Save incremental progress after each file/step OUTPUT_JSON.write_text( json.dumps(store, ensure_ascii=False, indent=2), encoding="utf-8" ) print(" -> step saved") summary = compute_batch_summary(batch_results) batch_meta = { "started_at": batch_started, "finished_at": now_iso(), "range": f"{doc_from}:{doc_to}", "summary": summary, "mode": mode, } update_batch_stats(store, batch_meta) OUTPUT_JSON.write_text( json.dumps(store, ensure_ascii=False, indent=2), encoding="utf-8" ) print("\nBatch complete.") print(json.dumps(summary, ensure_ascii=False, indent=2)) print(f"Saved to: {OUTPUT_JSON}") def parse_range(value: str) -> tuple[int, int]: m = re.fullmatch(r"(\d+):(\d+)", value.strip()) if not m: raise argparse.ArgumentTypeError( "Range must be in format from:to (example: 1:10)" ) a, b = int(m.group(1)), int(m.group(2)) if a <= 0 or b <= 0: raise argparse.ArgumentTypeError("Range values must be positive") return a, b def main() -> int: parser = argparse.ArgumentParser( description="RAG evaluation in file-batch mode (JSON output)" ) parser.add_argument( "doc_range", type=parse_range, help="Document range in format from:to (step = one file). Example: 1:10", ) parser.add_argument( "--mode", choices=["append", "overwrite"], default="append", help="append: upsert evaluated docs into existing JSON; overwrite: rebuild JSON from input docs", ) args = parser.parse_args() doc_from, doc_to = args.doc_range if "MiniMax" in OPENAI_CHAT_MODEL or "MiniMax" in OPENAI_CHAT_URL: print( "NOTE: evaluator model is MiniMax. It works, but for stricter judging quality, " "gpt-4.1-mini/gpt-4.1 (if available on your endpoint) is usually stronger." ) run_evaluation(doc_from=doc_from, doc_to=doc_to, mode=args.mode) return 0 if __name__ == "__main__": raise SystemExit(main())