"""Helper utilities for metadata extraction from Russian text.""" import os import re import tempfile from abc import ABC, abstractmethod from pathlib import Path from typing import Callable, Iterator, List import requests from loguru import logger _YEAR_PATTERN = re.compile(r"(? str: normalized = " ".join(value.strip().split()).strip(".,;:!?()[]{}") return normalized.lower() def extract_years_from_text(text: str) -> List[int]: """Extract unique years from text as integers.""" if not text: return [] years = {int(match.group(0)) for match in _YEAR_PATTERN.finditer(text)} return sorted(years) def extract_russian_event_names(text: str) -> List[str]: """ Extract likely Russian event names from text using heuristic regex rules. Returns normalized event phrases in lowercase. """ if not text: return [] events: List[str] = [] seen = set() for match in _EVENT_PHRASE_PATTERN.finditer(text): candidate = _normalize_event(match.group(0)) if len(candidate) < 6: continue if not any(keyword in candidate for keyword in _EVENT_KEYWORDS): continue if candidate not in seen: events.append(candidate) seen.add(candidate) for match in _QUOTED_EVENT_PATTERN.finditer(text): quoted = _normalize_event(match.group(1)) if len(quoted) < 3: continue if quoted not in seen: events.append(quoted) seen.add(quoted) return events class _AdaptiveFile(ABC): extension: str # Format: .jpg local_path: str def __init__(self, extension: str, local_path: str): self.extension = extension self.local_path = local_path # This method allows to work with file locally, and lambda should be provided for this. # Why separate method? For possible cleanup after work is done. And to download file, if needed # Lambda: first argument is a local path @abstractmethod def work_with_file_locally(self, func: Callable[[str], None]): """Run callback with a local path to the file.""" class _AdaptiveCollection(ABC): # Generator method with yield @abstractmethod def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: """Iterate files in collection.""" class LocalFilesystemAdaptiveFile(_AdaptiveFile): def work_with_file_locally(self, func: Callable[[str], None]): func(self.local_path) class LocalFilesystemAdaptiveCollection(_AdaptiveCollection): base_dir: str def __init__(self, base_dir: str): super().__init__() self.base_dir = base_dir def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: for root, dirs, files in os.walk(self.base_dir): for file in files: full_path = os.path.join(root, file) yield LocalFilesystemAdaptiveFile(Path(full_path).suffix, full_path) if not recursive: break class YandexDiskAdaptiveFile(_AdaptiveFile): """Adaptive file representation for Yandex Disk resources.""" def __init__(self, extension: str, local_path: str, token: str): super().__init__(extension, local_path) self.token = token def _download_to_temp_file(self) -> str: headers = {"Authorization": f"OAuth {self.token}"} response = requests.get( "https://cloud-api.yandex.net/v1/disk/resources/download", headers=headers, params={"path": self.local_path}, timeout=30, ) response.raise_for_status() href = response.json()["href"] file_response = requests.get(href, timeout=120) file_response.raise_for_status() suffix = Path(self.local_path).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: temp_file.write(file_response.content) return temp_file.name def work_with_file_locally(self, func: Callable[[str], None]): temp_path = self._download_to_temp_file() try: func(temp_path) finally: if os.path.exists(temp_path): os.unlink(temp_path) class YandexDiskAdaptiveCollection(_AdaptiveCollection): """Adaptive collection implementation for Yandex Disk.""" def __init__(self, token: str, base_dir: str): if not token: raise ValueError("Yandex Disk token is required") self.token = token self.base_dir = base_dir self._headers = {"Authorization": f"OAuth {self.token}"} @staticmethod def _normalize_disk_path(path: str) -> str: return path if path.startswith("disk:/") else f"disk:/{path.lstrip('/')}" def _get_resource_info(self, path: str) -> dict: response = requests.get( "https://cloud-api.yandex.net/v1/disk/resources", headers=self._headers, params={"path": path, "limit": 1000}, timeout=30, ) response.raise_for_status() return response.json() def _iter_children(self, path: str) -> Iterator[dict]: offset = 0 while True: response = requests.get( "https://cloud-api.yandex.net/v1/disk/resources", headers=self._headers, params={"path": path, "limit": 1000, "offset": offset}, timeout=30, ) response.raise_for_status() payload = response.json() embedded = payload.get("_embedded", {}) items = embedded.get("items", []) if not items: break for item in items: yield item if len(items) < 1000: break offset += 1000 def iterate(self, recursive: bool) -> Iterator[_AdaptiveFile]: root_path = self._normalize_disk_path(self.base_dir) root_info = self._get_resource_info(root_path) if root_info.get("type") == "file": path = root_info["path"] logger.info(f"Found file on Yandex Disk: {path}") yield YandexDiskAdaptiveFile(Path(path).suffix, path, self.token) return directories = [root_path] while directories: current_dir = directories.pop(0) for item in self._iter_children(current_dir): item_type = item.get("type") item_path = item.get("path") if item_type == "file": logger.info(f"Found file on Yandex Disk: {item_path}") yield YandexDiskAdaptiveFile( Path(item_path).suffix, item_path, self.token ) elif recursive and item_type == "dir": directories.append(item_path)