import collections
import json
import logging
import os
import socket
import threading
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Deque, Dict, Iterable, Iterator, List, Optional, Tuple

from siliconflow_deepseek_ocr_sdk import (
    DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT,
    DEFAULT_DEEPSEEK_OCR_PROMPT,
    DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT,
    SiliconFlowDeepSeekOCRClient,
)

try:
    import redis
except ImportError:  # pragma: no cover - depends on deployment environment
    redis = None

try:
    import yaml
except ImportError:  # pragma: no cover - depends on deployment environment
    yaml = None

logger = logging.getLogger(__name__)


def _env_bool(name: str, default: bool = False) -> bool:
    value = os.getenv(name)
    if value is None:
        return default
    return value.strip().lower() in {"1", "true", "yes", "y", "on"}


def _env_int(name: str, default: int) -> int:
    value = os.getenv(name)
    if value is None or value.strip() == "":
        return default
    try:
        return int(value)
    except ValueError:
        logger.warning("Invalid %s=%r, using default %s", name, value, default)
        return default


def _env_float(name: str, default: float) -> float:
    value = os.getenv(name)
    if value is None or value.strip() == "":
        return default
    try:
        return float(value)
    except ValueError:
        logger.warning("Invalid %s=%r, using default %s", name, value, default)
        return default


def _env_optional_timeout(name: str, default: Optional[float]) -> Optional[float]:
    value = os.getenv(name)
    if value is None or value.strip() == "":
        return default
    try:
        parsed = float(value)
    except ValueError:
        logger.warning("Invalid %s=%r, using default %s", name, value, default)
        return default
    if parsed <= 0:
        return None
    return parsed


def _split_keys(raw: str) -> List[str]:
    candidates = [
        token.strip()
        for token in raw.replace("\n", ",").replace(" ", ",").replace(";", ",").split(",")
    ]
    return [token for token in candidates if token]


def _dedupe_keys(keys: Iterable[str]) -> List[str]:
    result: List[str] = []
    seen = set()
    for key in keys:
        clean = (key or "").strip()
        if clean and clean not in seen:
            seen.add(clean)
            result.append(clean)
    return result


def _walk_config_for_siliconflow_keys(node: Any, keys: List[str]) -> None:
    if isinstance(node, dict):
        has_silicon_base = any(
            str(k).lower() in {"apibase", "api_base", "base_url", "apiurl", "api_url"}
            and "api.siliconflow.cn" in str(v).lower()
            for k, v in node.items()
        )
        if has_silicon_base:
            for key, value in node.items():
                if "key" in str(key).lower():
                    if isinstance(value, str):
                        keys.extend(_split_keys(value))
                    elif isinstance(value, list):
                        keys.extend(str(item).strip() for item in value if str(item).strip())
        for child in node.values():
            _walk_config_for_siliconflow_keys(child, keys)
    elif isinstance(node, list):
        for child in node:
            _walk_config_for_siliconflow_keys(child, keys)


def _candidate_config_paths() -> List[Path]:
    here = Path(__file__).resolve()
    candidates = [
        here.parents[2] / "etc" / "config.production.yaml",
        here.parents[2] / "etc" / "config.yaml",
        Path.cwd() / "etc" / "config.production.yaml",
        Path.cwd() / "etc" / "config.yaml",
    ]
    return _dedupe_paths(candidates)


def _dedupe_paths(paths: Iterable[Path]) -> List[Path]:
    result: List[Path] = []
    seen = set()
    for path in paths:
        resolved = str(path)
        if resolved in seen:
            continue
        seen.add(resolved)
        result.append(path)
    return result


def _load_configured_siliconflow_keys() -> List[str]:
    if yaml is None:
        return []
    keys: List[str] = []
    for path in _candidate_config_paths():
        if not path.exists():
            continue
        try:
            with path.open("r", encoding="utf-8", errors="ignore") as fp:
                data = yaml.safe_load(fp)
        except Exception as exc:
            logger.warning("Failed to load OCR key config from %s: %s", path, exc)
            continue
        _walk_config_for_siliconflow_keys(data, keys)
    return _dedupe_keys(keys)


def _load_redis_config() -> Dict[str, Any]:
    config: Dict[str, Any] = {}
    if yaml is None:
        return config
    for path in _candidate_config_paths():
        if not path.exists():
            continue
        try:
            with path.open("r", encoding="utf-8", errors="ignore") as fp:
                data = yaml.safe_load(fp) or {}
        except Exception:
            continue
        redis_config = data.get("Redis") if isinstance(data, dict) else None
        if isinstance(redis_config, dict):
            config.update(redis_config)
            break
    return config


def _redis_url_from_config() -> Optional[str]:
    explicit = os.getenv("SILICONFLOW_DEEPSEEK_OCR_REDIS_URL") or os.getenv("TASK_EXECUTOR_REDIS_URL")
    if explicit and explicit.strip():
        return explicit.strip()

    config = _load_redis_config()
    host = os.getenv("TASK_EXECUTOR_REDIS_HOST") or str(config.get("Host") or "localhost")
    port = int(os.getenv("TASK_EXECUTOR_REDIS_PORT") or config.get("Port") or 6379)
    password = os.getenv("TASK_EXECUTOR_REDIS_PASSWORD")
    if password is None:
        password = str(config.get("Password") or "")
    db = int(os.getenv("TASK_EXECUTOR_REDIS_DB") or config.get("DB") or 0)

    auth = f":{password}@" if password else ""
    return f"redis://{auth}{host}:{port}/{db}"


def _create_redis_client() -> Optional[Any]:
    if not _env_bool("SILICONFLOW_DEEPSEEK_OCR_GLOBAL_POOL", True):
        return None
    if redis is None:
        logger.warning("redis package is unavailable; using process-local DeepSeek-OCR key pool")
        return None
    url = _redis_url_from_config()
    if not url:
        return None
    try:
        client = redis.Redis.from_url(
            url,
            decode_responses=True,
            socket_timeout=_env_float("SILICONFLOW_DEEPSEEK_OCR_REDIS_TIMEOUT_SEC", 2.0),
            socket_connect_timeout=_env_float("SILICONFLOW_DEEPSEEK_OCR_REDIS_CONNECT_TIMEOUT_SEC", 2.0),
        )
        client.ping()
        return client
    except Exception as exc:
        logger.warning("Redis unavailable for DeepSeek-OCR global key pool; using process-local pool: %s", exc)
        return None


def _load_api_keys() -> List[str]:
    keys: List[str] = []
    for name in (
        "SILICONFLOW_DEEPSEEK_OCR_API_KEYS",
        "SILICONFLOW_OCR_API_KEYS",
        "SILICONFLOW_API_KEYS",
        "SILICONFLOW_DEEPSEEK_OCR_API_KEY",
        "SILICONFLOW_OCR_API_KEY",
        "SILICONFLOW_API_KEY",
    ):
        value = os.getenv(name)
        if value and value.strip():
            keys.extend(_split_keys(value))

    if _env_bool("SILICONFLOW_DEEPSEEK_OCR_LOAD_CONFIG_KEYS", True):
        keys.extend(_load_configured_siliconflow_keys())

    return _dedupe_keys(keys)


def _create_client(
    api_key: str,
    *,
    rate_limit_callback: Optional[Any] = None,
) -> SiliconFlowDeepSeekOCRClient:
    return SiliconFlowDeepSeekOCRClient(
        api_key=api_key,
        base_url=os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_BASE_URL",
            os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1"),
        ),
        model=os.getenv("SILICONFLOW_DEEPSEEK_OCR_MODEL", "deepseek-ai/DeepSeek-OCR"),
        timeout_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_TIMEOUT_SEC", 120.0),
        page_workers=max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_PAGE_WORKERS", 8)),
        max_concurrent_requests=max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_CLIENT_CONCURRENCY", 8)),
        trust_env=_env_bool("SILICONFLOW_DEEPSEEK_OCR_TRUST_ENV", False),
        max_retries=max(0, _env_int("SILICONFLOW_DEEPSEEK_OCR_RETRIES", 10)),
        rate_limit_retry_forever=_env_bool("SILICONFLOW_DEEPSEEK_OCR_429_RETRY_FOREVER", True),
        channel_retry_forever=_env_bool("SILICONFLOW_DEEPSEEK_OCR_RETRY_FOREVER", True),
        rate_limit_initial_wait_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_429_INITIAL_WAIT_SEC", 10.0),
        rate_limit_max_wait_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_429_MAX_WAIT_SEC", 120.0),
        rate_limit_callback=rate_limit_callback,
        max_tokens=max(128, _env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_TOKENS", 7168)),
        prompt=os.getenv("SILICONFLOW_DEEPSEEK_OCR_PROMPT", DEFAULT_DEEPSEEK_OCR_PROMPT),
        fallback_prompt=os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_FALLBACK_PROMPT",
            DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT,
        ),
        transcribe_prompt=os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_TRANSCRIBE_PROMPT",
            DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT,
        ),
        full_page_transcribe=_env_bool("SILICONFLOW_DEEPSEEK_OCR_FULL_PAGE_TRANSCRIBE", True),
        full_page_transcribe_max_chars=max(
            0,
            _env_int("SILICONFLOW_DEEPSEEK_OCR_FULL_PAGE_TRANSCRIBE_MAX_CHARS", 180),
        ),
        image_detail=os.getenv("SILICONFLOW_DEEPSEEK_OCR_IMAGE_DETAIL", "high"),
        max_pages=_env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_PAGES", 0),
        dpi=max(72, _env_int("SILICONFLOW_DEEPSEEK_OCR_DPI", 160)),
        max_image_edge=_env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_IMAGE_EDGE", 2400),
        jpeg_quality=max(40, min(95, _env_int("SILICONFLOW_DEEPSEEK_OCR_JPEG_QUALITY", 86))),
        max_image_bytes=max(256 * 1024, _env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_IMAGE_BYTES", 7 * 1024 * 1024)),
    )


class _KeyState:
    __slots__ = ("client", "cooldown_until", "error_count")

    def __init__(self, client: SiliconFlowDeepSeekOCRClient) -> None:
        self.client = client
        self.cooldown_until: float = 0.0
        self.error_count: int = 0


class SiliconFlowDeepSeekOCRBalancer:
    def __init__(
        self,
        api_keys: Iterable[str],
        *,
        per_key_concurrency: int = 1,
        max_errors_before_cooldown: int = 3,
        cooldown_seconds: float = 10.0,
        default_timeout: Optional[float] = None,
    ) -> None:
        keys = [key.strip() for key in api_keys if key and key.strip()]
        if not keys:
            raise ValueError("At least one SiliconFlow DeepSeek-OCR API key is required.")
        if per_key_concurrency < 1:
            raise ValueError("per_key_concurrency must be >= 1.")

        self._default_timeout = default_timeout
        self._per_key_concurrency = per_key_concurrency
        self._max_errors_before_cooldown = max(1, int(max_errors_before_cooldown))
        self._cooldown_seconds = max(0.0, float(cooldown_seconds))

        self._condition = threading.Condition()
        self._states: List[_KeyState] = []
        self._available: Deque[Tuple[int, int]] = collections.deque()
        self._in_use: Dict[Tuple[int, int], _KeyState] = {}
        for idx, api_key in enumerate(keys):
            state = _KeyState(
                _create_client(
                    api_key,
                    rate_limit_callback=lambda _info, key_index=idx: self.mark_rate_limited(key_index),
                )
            )
            self._states.append(state)
            for slot in range(per_key_concurrency):
                self._available.append((idx, slot))

        self._stats_lock = threading.Lock()
        self._stats = {
            "acquired": 0,
            "released": 0,
            "failures": 0,
            "cooldowns": 0,
        }

    @property
    def default_timeout(self) -> Optional[float]:
        return self._default_timeout

    @property
    def key_count(self) -> int:
        return len(self._states)

    @property
    def total_slots(self) -> int:
        return len(self._states) * self._per_key_concurrency

    def _now(self) -> float:
        return time.monotonic()

    def _next_cooldown(self, now: float) -> Optional[float]:
        cooldowns = [
            state.cooldown_until - now
            for state in self._states
            if state.cooldown_until > now
        ]
        if not cooldowns:
            return None
        return max(0.0, min(cooldowns))

    def acquire(self, timeout: Optional[float] = None) -> Tuple[Tuple[int, int], SiliconFlowDeepSeekOCRClient]:
        deadline: Optional[float] = None
        if timeout is None:
            timeout = self._default_timeout
        if timeout is not None:
            deadline = self._now() + timeout

        with self._condition:
            while True:
                now = self._now()
                for _ in range(len(self._available)):
                    token = self._available.popleft()
                    state = self._states[token[0]]
                    if state.cooldown_until > now:
                        self._available.append(token)
                        continue
                    self._in_use[token] = state
                    with self._stats_lock:
                        self._stats["acquired"] += 1
                    return token, state.client

                if deadline is not None and now >= deadline:
                    raise TimeoutError("Timed out waiting for an available SiliconFlow DeepSeek-OCR client.")

                wait_time = self._next_cooldown(now)
                if deadline is not None:
                    remaining = deadline - now
                    wait_time = remaining if wait_time is None else min(wait_time, remaining)
                    if wait_time <= 0:
                        raise TimeoutError("Timed out waiting for an available SiliconFlow DeepSeek-OCR client.")
                if wait_time is None:
                    wait_time = 0.5
                self._condition.wait(timeout=wait_time)

    def release(self, token: Tuple[int, int], *, success: bool) -> None:
        with self._condition:
            state = self._in_use.pop(token, None)
            if state is None:
                logger.warning("Attempted to release unknown SiliconFlow DeepSeek-OCR token %s", token)
                return

            if success:
                state.error_count = 0
            else:
                now = self._now()
                state.error_count += 1
                with self._stats_lock:
                    self._stats["failures"] += 1
                if state.error_count >= self._max_errors_before_cooldown:
                    state.error_count = 0
                    state.cooldown_until = max(state.cooldown_until, now + self._cooldown_seconds)
                    with self._stats_lock:
                        self._stats["cooldowns"] += 1

            self._available.append(token)
            with self._stats_lock:
                self._stats["released"] += 1
            self._condition.notify()

    def mark_rate_limited(self, key_index: int) -> None:
        with self._condition:
            if 0 <= key_index < len(self._states):
                self._states[key_index].cooldown_until = max(
                    self._states[key_index].cooldown_until,
                    self._now() + self._cooldown_seconds,
                )
                with self._stats_lock:
                    self._stats["cooldowns"] += 1
                self._condition.notify_all()

    @contextmanager
    def lease(self, timeout: Optional[float] = None) -> Iterator[SiliconFlowDeepSeekOCRClient]:
        token, client = self.acquire(timeout=timeout)
        try:
            yield client
            self.release(token, success=True)
        except Exception:
            self.release(token, success=False)
            raise

    def stats(self) -> Dict[str, int]:
        with self._stats_lock:
            return dict(self._stats)


class RedisSiliconFlowDeepSeekOCRBalancer:
    def __init__(
        self,
        api_keys: Iterable[str],
        redis_client: Any,
        *,
        per_key_concurrency: int = 1,
        lease_ttl_seconds: float = 900.0,
        cooldown_seconds: float = 120.0,
        default_timeout: Optional[float] = None,
        namespace: str = "yunwo:ocr:deepseek",
    ) -> None:
        keys = [key.strip() for key in api_keys if key and key.strip()]
        if not keys:
            raise ValueError("At least one SiliconFlow DeepSeek-OCR API key is required.")
        if per_key_concurrency < 1:
            raise ValueError("per_key_concurrency must be >= 1.")
        self._keys = keys
        self._redis = redis_client
        self._per_key_concurrency = int(per_key_concurrency)
        self._lease_ttl_seconds = max(30.0, float(lease_ttl_seconds))
        self._cooldown_seconds = max(1.0, float(cooldown_seconds))
        self._default_timeout = default_timeout
        self._namespace = namespace.strip(":")
        self._owner_prefix = f"{socket.gethostname()}:{os.getpid()}"
        self._clients_lock = threading.Lock()
        self._clients: Dict[int, SiliconFlowDeepSeekOCRClient] = {}
        self._stats_lock = threading.Lock()
        self._stats = {
            "acquired": 0,
            "released": 0,
            "lease_refreshes": 0,
            "lease_refresh_failures": 0,
            "rate_limit_cooldowns": 0,
            "waits": 0,
        }

    @property
    def default_timeout(self) -> Optional[float]:
        return self._default_timeout

    @property
    def key_count(self) -> int:
        return len(self._keys)

    @property
    def total_slots(self) -> int:
        return len(self._keys) * self._per_key_concurrency

    def _lease_key(self, key_index: int, slot_index: int) -> str:
        return f"{self._namespace}:lease:{key_index}:{slot_index}"

    def _cooldown_key(self, key_index: int) -> str:
        return f"{self._namespace}:cooldown:{key_index}"

    def _owner(self, key_index: int, slot_index: int) -> str:
        return f"{self._owner_prefix}:{threading.get_ident()}:{key_index}:{slot_index}:{time.time_ns()}"

    def _client_for_key(self, key_index: int) -> SiliconFlowDeepSeekOCRClient:
        with self._clients_lock:
            client = self._clients.get(key_index)
            if client is None:
                client = _create_client(
                    self._keys[key_index],
                    rate_limit_callback=lambda info, idx=key_index: self.mark_rate_limited(idx, info),
                )
                self._clients[key_index] = client
            return client

    def _try_acquire_once(self) -> Optional[Tuple[Tuple[int, int, str], SiliconFlowDeepSeekOCRClient]]:
        ttl = int(max(1, self._lease_ttl_seconds))
        for key_index in range(len(self._keys)):
            try:
                if self._redis.exists(self._cooldown_key(key_index)):
                    continue
            except Exception:
                raise
            for slot_index in range(self._per_key_concurrency):
                lease_key = self._lease_key(key_index, slot_index)
                owner = self._owner(key_index, slot_index)
                acquired = self._redis.set(lease_key, owner, nx=True, ex=ttl)
                if acquired:
                    with self._stats_lock:
                        self._stats["acquired"] += 1
                    return (key_index, slot_index, owner), self._client_for_key(key_index)
        return None

    def _next_wait_seconds(self) -> float:
        min_ttl: Optional[int] = None
        for key_index in range(len(self._keys)):
            try:
                cooldown_ttl = self._redis.ttl(self._cooldown_key(key_index))
            except Exception:
                raise
            if cooldown_ttl is not None and cooldown_ttl > 0:
                min_ttl = cooldown_ttl if min_ttl is None else min(min_ttl, cooldown_ttl)
        if min_ttl is not None:
            return max(0.5, min(float(min_ttl), 5.0))
        return 0.5

    def acquire(self, timeout: Optional[float] = None) -> Tuple[Tuple[int, int, str], SiliconFlowDeepSeekOCRClient]:
        if timeout is None:
            timeout = self._default_timeout
        deadline = None if timeout is None else time.monotonic() + timeout
        while True:
            acquired = self._try_acquire_once()
            if acquired is not None:
                return acquired
            now = time.monotonic()
            if deadline is not None and now >= deadline:
                raise TimeoutError("Timed out waiting for a global SiliconFlow DeepSeek-OCR key slot.")
            wait_sec = self._next_wait_seconds()
            if deadline is not None:
                wait_sec = min(wait_sec, max(0.0, deadline - now))
            with self._stats_lock:
                self._stats["waits"] += 1
            time.sleep(max(0.05, wait_sec))

    def _release_lease(self, key_index: int, slot_index: int, owner: str) -> None:
        lease_key = self._lease_key(key_index, slot_index)
        script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
        """
        try:
            self._redis.eval(script, 1, lease_key, owner)
        except Exception as exc:
            logger.warning("Failed to release DeepSeek-OCR Redis lease %s: %s", lease_key, exc)

    def _refresh_loop(self, token: Tuple[int, int, str], stop_event: threading.Event) -> None:
        key_index, slot_index, owner = token
        lease_key = self._lease_key(key_index, slot_index)
        sleep_sec = max(5.0, min(60.0, self._lease_ttl_seconds / 3.0))
        ttl = int(max(1, self._lease_ttl_seconds))
        script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("EXPIRE", KEYS[1], ARGV[2])
        end
        return 0
        """
        while not stop_event.wait(sleep_sec):
            try:
                refreshed = self._redis.eval(script, 1, lease_key, owner, ttl)
                with self._stats_lock:
                    if refreshed:
                        self._stats["lease_refreshes"] += 1
                    else:
                        self._stats["lease_refresh_failures"] += 1
            except Exception as exc:
                with self._stats_lock:
                    self._stats["lease_refresh_failures"] += 1
                logger.warning("Failed to refresh DeepSeek-OCR Redis lease %s: %s", lease_key, exc)

    def mark_rate_limited(self, key_index: int, info: Optional[Dict[str, Any]] = None) -> None:
        retry_after = None
        if info:
            retry_after = info.get("retry_after_sec")
        cooldown = self._cooldown_seconds
        if isinstance(retry_after, (int, float)) and retry_after > 0:
            cooldown = max(cooldown, float(retry_after))
        cooldown = max(1, int(cooldown))
        value = json.dumps(
            {
                "at": time.time(),
                "owner": self._owner_prefix,
                "info": info or {},
            },
            ensure_ascii=False,
        )
        try:
            self._redis.set(self._cooldown_key(key_index), value, ex=cooldown)
            with self._stats_lock:
                self._stats["rate_limit_cooldowns"] += 1
        except Exception as exc:
            logger.warning("Failed to set DeepSeek-OCR global cooldown for key %s: %s", key_index, exc)

    def release(self, token: Tuple[int, int, str], *, success: bool) -> None:
        key_index, slot_index, owner = token
        self._release_lease(key_index, slot_index, owner)
        with self._stats_lock:
            self._stats["released"] += 1

    @contextmanager
    def lease(self, timeout: Optional[float] = None) -> Iterator[SiliconFlowDeepSeekOCRClient]:
        token, client = self.acquire(timeout=timeout)
        stop_event = threading.Event()
        refresher = threading.Thread(
            target=self._refresh_loop,
            args=(token, stop_event),
            daemon=True,
            name="deepseek-ocr-redis-lease-refresh",
        )
        refresher.start()
        try:
            yield client
            self.release(token, success=True)
        except Exception:
            self.release(token, success=False)
            raise
        finally:
            stop_event.set()
            refresher.join(timeout=1.0)

    def stats(self) -> Dict[str, int]:
        with self._stats_lock:
            return dict(self._stats)


_shared_balancer: Optional[Any] = None
_shared_lock = threading.Lock()


def get_shared_balancer() -> Any:
    global _shared_balancer
    if _shared_balancer is not None:
        return _shared_balancer

    with _shared_lock:
        if _shared_balancer is None:
            api_keys = _load_api_keys()
            per_key_concurrency = max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_PER_KEY_CONCURRENCY", 2))
            lease_timeout = _env_optional_timeout("SILICONFLOW_DEEPSEEK_OCR_LEASE_TIMEOUT_SEC", None)
            cooldown_seconds = _env_float("SILICONFLOW_DEEPSEEK_OCR_COOLDOWN_SEC", 59.0)
            redis_client = _create_redis_client()
            if redis_client is not None:
                _shared_balancer = RedisSiliconFlowDeepSeekOCRBalancer(
                    api_keys,
                    redis_client,
                    per_key_concurrency=per_key_concurrency,
                    lease_ttl_seconds=_env_float("SILICONFLOW_DEEPSEEK_OCR_LEASE_TTL_SEC", 900.0),
                    cooldown_seconds=cooldown_seconds,
                    default_timeout=lease_timeout,
                    namespace=os.getenv("SILICONFLOW_DEEPSEEK_OCR_REDIS_NAMESPACE", "yunwo:ocr:deepseek"),
                )
                logger.info(
                    "Initialized Redis-backed SiliconFlow DeepSeek-OCR balancer with %d key(s), %d total slot(s)",
                    _shared_balancer.key_count,
                    _shared_balancer.total_slots,
                )
                return _shared_balancer

            _shared_balancer = SiliconFlowDeepSeekOCRBalancer(
                api_keys,
                per_key_concurrency=per_key_concurrency,
                max_errors_before_cooldown=max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_ERRORS", 3)),
                cooldown_seconds=cooldown_seconds,
                default_timeout=lease_timeout,
            )
            logger.info(
                "Initialized process-local SiliconFlow DeepSeek-OCR balancer with %d key(s), %d total slot(s)",
                _shared_balancer.key_count,
                _shared_balancer.total_slots,
            )
    return _shared_balancer
