#!/usr/bin/env python3
"""
High-throughput SiliconFlow DeepSeek-OCR SDK.

The public shape intentionally mirrors the Mistral OCR SDK enough for the task
executor:

    client = SiliconFlowDeepSeekOCRClient(api_key="sk-...", base_url="https://api.siliconflow.cn/v1")
    response = client.ocr.process(
        model="deepseek-ai/DeepSeek-OCR",
        document={"type": "local_file", "file_path": "/path/to.pdf"},
    )
    for page in response.pages:
        print(page.index, page.markdown)

Unlike Mistral OCR, this client reads local PDFs directly, renders pages to
compressed JPEG, calls SiliconFlow per page, and merges page results locally.
"""

from __future__ import annotations

import base64
import hashlib
import io
import json
import os
import re
import tempfile
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

import fitz
import httpx
from PIL import Image, ImageStat


class SiliconFlowDeepSeekOCRError(RuntimeError):
    pass


@dataclass
class OCRPageDimensions:
    width: int
    height: int
    dpi: int


@dataclass
class OCRImage:
    id: str
    top_left_x: float
    top_left_y: float
    bottom_right_x: float
    bottom_right_y: float
    image_base64: Optional[str] = None
    type: str = "image"
    source: str = "pdf"
    save: bool = True


@dataclass
class OCRPage:
    index: int
    markdown: str
    images: List[Any] = field(default_factory=list)
    dimensions: Optional[OCRPageDimensions] = None
    blocks: List[Dict[str, Any]] = field(default_factory=list)
    raw_markdown: str = ""
    usage: Optional[Dict[str, Any]] = None


@dataclass
class OCRResponse:
    pages: List[OCRPage]
    model: str
    usage_info: Dict[str, Any] = field(default_factory=dict)


__all__ = [
    "DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT",
    "DEFAULT_DEEPSEEK_OCR_PROMPT",
    "DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT",
    "OCRPage",
    "OCRPageDimensions",
    "OCRImage",
    "OCRResponse",
    "SiliconFlowDeepSeekOCRClient",
    "SiliconFlowDeepSeekOCRError",
    "run_siliconflow_deepseek_pdf_ocr",
]


def _env_bool(name: str, default: bool = False) -> bool:
    value = os.getenv(name)
    if value is None:
        return default
    return value.strip().lower() in {"1", "true", "yes", "y", "on"}


def _env_int(name: str, default: int) -> int:
    value = os.getenv(name)
    if value is None or value.strip() == "":
        return default
    try:
        return int(value)
    except ValueError:
        return default


def _env_float(name: str, default: float) -> float:
    value = os.getenv(name)
    if value is None or value.strip() == "":
        return default
    try:
        return float(value)
    except ValueError:
        return default


def _is_rate_limit_error(status_code: int, body: str) -> bool:
    body_lower = (body or "").lower()
    return (
        status_code == 429
        or "rate limit" in body_lower
        or "too many" in body_lower
        or "tpm limit" in body_lower
        or "rpm limit" in body_lower
    )


def _is_retryable_http_error(status_code: int, body: str) -> bool:
    if _is_rate_limit_error(status_code, body):
        return True
    return status_code in {408, 409, 425, 499, 500, 502, 503, 504, 520, 522, 524}


def _retry_after_seconds(headers: httpx.Headers) -> Optional[float]:
    retry_after = headers.get("retry-after")
    if not retry_after:
        return None
    try:
        return max(0.0, float(retry_after.strip()))
    except ValueError:
        return None


def _api_key(explicit: Optional[str] = None) -> str:
    if explicit and explicit.strip():
        return explicit.strip()
    for name in ("SILICONFLOW_DEEPSEEK_OCR_API_KEY", "SILICONFLOW_OCR_API_KEY", "SILICONFLOW_API_KEY"):
        value = os.getenv(name)
        if value and value.strip():
            return value.strip()
    raise SiliconFlowDeepSeekOCRError(
        "SILICONFLOW_API_KEY or SILICONFLOW_DEEPSEEK_OCR_API_KEY is required."
    )


def _write_temp_pdf(raw: bytes) -> Path:
    fd, name = tempfile.mkstemp(prefix="deepseek_ocr_", suffix=".pdf")
    try:
        with os.fdopen(fd, "wb") as fp:
            fp.write(raw)
    except Exception:
        try:
            os.close(fd)
        except OSError:
            pass
        try:
            Path(name).unlink(missing_ok=True)
        except Exception:
            pass
        raise
    return Path(name)


DEFAULT_DEEPSEEK_OCR_PROMPT = (
    "Convert this document page to markdown. Return markdown only."
)


DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT = (
    "<image>\n<|grounding|>Convert this document page to markdown. Return markdown only."
)


DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT = (
    "<image>\nTranscribe all visible text in this image exactly. Preserve line breaks. Return text only."
)


_IMAGE_BLOCK_TYPES = {
    "figure",
    "image",
    "img",
    "picture",
    "photo",
    "diagram",
    "chart",
    "graph",
    "logo",
}


def _page_count(pdf_path: Path) -> int:
    doc = None
    try:
        doc = fitz.open(str(pdf_path))
        if doc.needs_pass and not doc.authenticate(""):
            raise SiliconFlowDeepSeekOCRError(f"PDF requires a password: {pdf_path}")
        return len(doc)
    finally:
        if doc is not None:
            doc.close()


def _resize_to_edge(image: Image.Image, max_edge: int) -> Image.Image:
    if max_edge <= 0:
        return image
    largest_edge = max(image.width, image.height)
    if largest_edge <= max_edge:
        return image
    scale = max_edge / float(largest_edge)
    new_size = (max(1, int(image.width * scale)), max(1, int(image.height * scale)))
    resampling = getattr(Image, "Resampling", Image)
    resized = image.resize(new_size, resampling.LANCZOS)
    image.close()
    return resized


def _save_jpeg(image: Image.Image, quality: int) -> bytes:
    buffer = io.BytesIO()
    image.save(buffer, format="JPEG", quality=quality, optimize=True)
    return buffer.getvalue()


def _render_page_jpeg(
    pdf_path: Path,
    page_index: int,
    *,
    dpi: int,
    max_edge: int,
    quality: int,
    max_bytes: int,
) -> Tuple[bytes, OCRPageDimensions, Dict[str, Any]]:
    doc = None
    pix = None
    image = None
    try:
        doc = fitz.open(str(pdf_path))
        if doc.needs_pass and not doc.authenticate(""):
            raise SiliconFlowDeepSeekOCRError(f"PDF requires a password: {pdf_path}")
        page = doc[page_index]
        pix = page.get_pixmap(matrix=fitz.Matrix(dpi / 72.0, dpi / 72.0), alpha=False)
        image = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB")
        image = _resize_to_edge(image, max_edge)

        jpeg = _save_jpeg(image, quality)
        current_quality = quality
        while len(jpeg) > max_bytes and (current_quality > 55 or max(image.size) > 1200):
            if current_quality > 55:
                current_quality -= 8
            else:
                image = _resize_to_edge(image, max(1200, int(max(image.size) * 0.85)))
            jpeg = _save_jpeg(image, current_quality)

        dimensions = OCRPageDimensions(width=image.width, height=image.height, dpi=dpi)
        meta = {
            "jpeg_quality": current_quality,
            "byte_size": len(jpeg),
            "max_edge": max_edge,
        }
        return jpeg, dimensions, meta
    finally:
        if image is not None:
            try:
                image.close()
            except Exception:
                pass
        pix = None
        if doc is not None:
            doc.close()


def _clip_rect_to_page(rect: fitz.Rect, page_rect: fitz.Rect) -> Optional[fitz.Rect]:
    clipped = fitz.Rect(
        max(page_rect.x0, rect.x0),
        max(page_rect.y0, rect.y0),
        min(page_rect.x1, rect.x1),
        min(page_rect.y1, rect.y1),
    )
    if clipped.width <= 1 or clipped.height <= 1:
        return None
    return clipped


def _scale_rect_to_rendered(rect: fitz.Rect, page_rect: fitz.Rect, dimensions: OCRPageDimensions) -> List[float]:
    width_scale = dimensions.width / max(page_rect.width, 1e-6)
    height_scale = dimensions.height / max(page_rect.height, 1e-6)
    return [
        max(0.0, min(dimensions.width, (rect.x0 - page_rect.x0) * width_scale)),
        max(0.0, min(dimensions.height, (rect.y0 - page_rect.y0) * height_scale)),
        max(0.0, min(dimensions.width, (rect.x1 - page_rect.x0) * width_scale)),
        max(0.0, min(dimensions.height, (rect.y1 - page_rect.y0) * height_scale)),
    ]


def _bbox_to_pdf_rect(
    bbox: List[float],
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> Optional[fitz.Rect]:
    if len(bbox) < 4:
        return None
    x0, y0, x1, y1 = [float(v) for v in bbox[:4]]
    if x1 < x0:
        x0, x1 = x1, x0
    if y1 < y0:
        y0, y1 = y1, y0

    # DeepSeek-OCR grounding boxes are commonly normalized to a 0-1000 grid.
    # Some wrappers may return rendered-pixel boxes, so only scale when the box
    # fits the normalized range and the rendered page is larger than that grid.
    if (
        max(abs(x0), abs(y0), abs(x1), abs(y1)) <= 1000
        and (dimensions.width > 1000 or dimensions.height > 1000)
    ):
        x0 = x0 * dimensions.width / 1000.0
        x1 = x1 * dimensions.width / 1000.0
        y0 = y0 * dimensions.height / 1000.0
        y1 = y1 * dimensions.height / 1000.0

    width_scale = page_rect.width / max(dimensions.width, 1e-6)
    height_scale = page_rect.height / max(dimensions.height, 1e-6)
    rect = fitz.Rect(
        page_rect.x0 + x0 * width_scale,
        page_rect.y0 + y0 * height_scale,
        page_rect.x0 + x1 * width_scale,
        page_rect.y0 + y1 * height_scale,
    )
    return _clip_rect_to_page(rect, page_rect)


def _rect_iou(a: fitz.Rect, b: fitz.Rect) -> float:
    x0 = max(a.x0, b.x0)
    y0 = max(a.y0, b.y0)
    x1 = min(a.x1, b.x1)
    y1 = min(a.y1, b.y1)
    if x1 <= x0 or y1 <= y0:
        return 0.0
    inter = (x1 - x0) * (y1 - y0)
    union = max(a.width * a.height + b.width * b.height - inter, 1e-6)
    return inter / union


def _rect_area(rect: fitz.Rect) -> float:
    return max(0.0, rect.width) * max(0.0, rect.height)


def _rect_intersection_area(a: fitz.Rect, b: fitz.Rect) -> float:
    x0 = max(a.x0, b.x0)
    y0 = max(a.y0, b.y0)
    x1 = min(a.x1, b.x1)
    y1 = min(a.y1, b.y1)
    if x1 <= x0 or y1 <= y0:
        return 0.0
    return (x1 - x0) * (y1 - y0)


def _rect_smaller_overlap_ratio(a: fitz.Rect, b: fitz.Rect) -> float:
    smaller = min(_rect_area(a), _rect_area(b))
    if smaller <= 0:
        return 0.0
    return _rect_intersection_area(a, b) / smaller


def _rect_union(a: fitz.Rect, b: fitz.Rect) -> fitz.Rect:
    return fitz.Rect(
        min(a.x0, b.x0),
        min(a.y0, b.y0),
        max(a.x1, b.x1),
        max(a.y1, b.y1),
    )


def _merge_sources(source_a: str, source_b: str) -> str:
    parts: List[str] = []
    for source in (source_a, source_b):
        for part in str(source or "").split("+"):
            part = part.strip()
            if part and part not in parts:
                parts.append(part)
    return "+".join(parts) or "merged"


def _rendered_rect_values(
    rect: fitz.Rect,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> Tuple[float, float, float, float, float, float]:
    x0, y0, x1, y1 = _scale_rect_to_rendered(rect, page_rect, dimensions)
    return x0, y0, x1, y1, max(0.0, x1 - x0), max(0.0, y1 - y0)


def _rendered_gap_and_overlap(
    a: fitz.Rect,
    b: fitz.Rect,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> Tuple[float, float, float, float, float, float, float, float]:
    ax0, ay0, ax1, ay1, aw, ah = _rendered_rect_values(a, page_rect, dimensions)
    bx0, by0, bx1, by1, bw, bh = _rendered_rect_values(b, page_rect, dimensions)
    horizontal_gap = max(0.0, max(bx0 - ax1, ax0 - bx1))
    vertical_gap = max(0.0, max(by0 - ay1, ay0 - by1))
    overlap_x = max(0.0, min(ax1, bx1) - max(ax0, bx0))
    overlap_y = max(0.0, min(ay1, by1) - max(ay0, by0))
    return horizontal_gap, vertical_gap, overlap_x, overlap_y, aw, ah, bw, bh


def _is_near_full_page_rect(
    rect: fitz.Rect,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> bool:
    _x0, _y0, _x1, _y1, width, height = _rendered_rect_values(rect, page_rect, dimensions)
    page_area = max(1.0, float(dimensions.width * dimensions.height))
    area_ratio = (width * height) / page_area
    width_ratio = width / max(1.0, float(dimensions.width))
    height_ratio = height / max(1.0, float(dimensions.height))
    threshold = _env_float("SILICONFLOW_DEEPSEEK_OCR_FULL_PAGE_IMAGE_AREA_RATIO", 0.82)
    return area_ratio >= threshold and width_ratio >= 0.82 and height_ratio >= 0.78


def _should_merge_image_rects(
    a: fitz.Rect,
    source_a: str,
    b: fitz.Rect,
    source_b: str,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
    gap_limit_px: float,
) -> bool:
    a_full = _is_near_full_page_rect(a, page_rect, dimensions)
    b_full = _is_near_full_page_rect(b, page_rect, dimensions)
    smaller_overlap = _rect_smaller_overlap_ratio(a, b)

    # Keep page-sized scan/background images from swallowing precise figure
    # boxes. Adjacent PDF strips can still merge into one page-sized helper.
    if a_full != b_full and smaller_overlap >= 0.90:
        return False

    if smaller_overlap >= 0.62:
        return True
    if _rect_iou(a, b) >= 0.12:
        return True

    horizontal_gap, vertical_gap, overlap_x, overlap_y, aw, ah, bw, bh = _rendered_gap_and_overlap(
        a, b, page_rect, dimensions
    )
    min_width = max(1.0, min(aw, bw))
    min_height = max(1.0, min(ah, bh))
    vertical_overlap_ratio = overlap_y / min_height
    horizontal_overlap_ratio = overlap_x / min_width

    if horizontal_gap <= gap_limit_px and vertical_overlap_ratio >= 0.42:
        return True
    if vertical_gap <= gap_limit_px and horizontal_overlap_ratio >= 0.42:
        return True

    # DeepSeek sometimes emits several touching regions for a single complex
    # figure. Merge only genuinely close fragments to avoid joining distant
    # figures in a grid.
    if (
        ("deepseek_block" in source_a or "deepseek_block" in source_b)
        and horizontal_gap <= gap_limit_px
        and vertical_gap <= gap_limit_px
        and (horizontal_overlap_ratio >= 0.18 or vertical_overlap_ratio >= 0.18)
    ):
        return True

    return False


def _merge_image_rect_candidates(
    candidates: List[Tuple[fitz.Rect, str]],
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> List[Tuple[fitz.Rect, str]]:
    if len(candidates) <= 1:
        return candidates

    gap_limit_px = max(0.0, _env_float("SILICONFLOW_DEEPSEEK_OCR_IMAGE_MERGE_GAP_PX", 12.0))
    merged = list(candidates)
    changed = True
    while changed:
        changed = False
        result: List[Tuple[fitz.Rect, str]] = []
        used = [False] * len(merged)

        for index, (rect, source) in enumerate(merged):
            if used[index]:
                continue
            current_rect = rect
            current_source = source
            used[index] = True

            for other_index in range(index + 1, len(merged)):
                if used[other_index]:
                    continue
                other_rect, other_source = merged[other_index]
                if not _should_merge_image_rects(
                    current_rect,
                    current_source,
                    other_rect,
                    other_source,
                    page_rect,
                    dimensions,
                    gap_limit_px,
                ):
                    continue
                current_rect = _rect_union(current_rect, other_rect)
                current_source = _merge_sources(current_source, other_source)
                used[other_index] = True
                changed = True

            result.append((current_rect, current_source))

        merged = result

    return sorted(merged, key=lambda item: (item[0].y0, item[0].x0, item[0].y1, item[0].x1))


def _expand_rect_for_crop(
    rect: fitz.Rect,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
) -> fitz.Rect:
    padding_px = max(0.0, _env_float("SILICONFLOW_DEEPSEEK_OCR_IMAGE_CROP_PADDING_PX", 12.0))
    if padding_px <= 0:
        return rect

    _x0, _y0, _x1, _y1, width, height = _rendered_rect_values(rect, page_rect, dimensions)
    if width <= 0 or height <= 0:
        return rect

    limited_padding_px = min(padding_px, max(2.0, min(width, height) * 0.10))
    x_padding = limited_padding_px * page_rect.width / max(1.0, float(dimensions.width))
    y_padding = limited_padding_px * page_rect.height / max(1.0, float(dimensions.height))
    expanded = fitz.Rect(
        rect.x0 - x_padding,
        rect.y0 - y_padding,
        rect.x1 + x_padding,
        rect.y1 + y_padding,
    )
    return _clip_rect_to_page(expanded, page_rect) or rect


def _coerce_bool(value: Any, default: bool = True) -> bool:
    if value is None:
        return default
    if isinstance(value, bool):
        return value
    if isinstance(value, (int, float)):
        return bool(value)
    if isinstance(value, str):
        return value.strip().lower() not in {"0", "false", "no", "n", "off"}
    return bool(value)


def _dedupe_rects(rects: List[fitz.Rect]) -> List[fitz.Rect]:
    result: List[fitz.Rect] = []
    for rect in sorted(rects, key=lambda r: r.width * r.height, reverse=True):
        if any(_rect_iou(rect, existing) > 0.85 for existing in result):
            continue
        result.append(rect)
    return sorted(result, key=lambda r: (r.y0, r.x0, r.y1, r.x1))


def _image_entry_pixel_size(image_entry: Tuple[Any, ...]) -> Tuple[int, int]:
    try:
        return int(image_entry[2] or 0), int(image_entry[3] or 0)
    except Exception:
        return 0, 0


def _pdf_image_content_profile(
    doc: fitz.Document,
    xref: int,
    cache: Dict[int, Optional[Dict[str, float]]],
) -> Optional[Dict[str, float]]:
    if xref in cache:
        return cache[xref]

    pil_image = None
    thumb = None
    try:
        extracted = doc.extract_image(xref)
        image_bytes = extracted.get("image")
        if not image_bytes:
            cache[xref] = None
            return None
        pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        thumb = pil_image.copy()
        resampling = getattr(Image, "Resampling", Image)
        thumb.thumbnail((256, 256), resampling.LANCZOS)

        gray = thumb.convert("L")
        hist = gray.histogram()
        total = max(1, sum(hist))
        white_ratio = sum(hist[245:]) / total
        dark_ratio = sum(hist[:100]) / total
        sat = thumb.convert("HSV").split()[1]
        saturation_mean = ImageStat.Stat(sat).mean[0]
        profile = {
            "white_ratio": float(white_ratio),
            "dark_ratio": float(dark_ratio),
            "saturation_mean": float(saturation_mean),
        }
        cache[xref] = profile
        return profile
    except Exception:
        cache[xref] = None
        return None
    finally:
        for image in (thumb, pil_image):
            if image is not None:
                try:
                    image.close()
                except Exception:
                    pass


def _is_text_raster_profile(profile: Optional[Dict[str, float]]) -> bool:
    if not profile:
        return False
    return (
        profile.get("white_ratio", 0.0) >= 0.55
        and profile.get("dark_ratio", 1.0) <= 0.38
        and profile.get("saturation_mean", 255.0) <= 24.0
    )


def _should_keep_pdf_image_rect(
    doc: fitz.Document,
    image_entry: Tuple[Any, ...],
    rect: fitz.Rect,
    page_rect: fitz.Rect,
    dimensions: OCRPageDimensions,
    profile_cache: Dict[int, Optional[Dict[str, float]]],
    *,
    text_raster_strip_count: int = 0,
) -> bool:
    x0, y0, x1, y1 = _scale_rect_to_rendered(rect, page_rect, dimensions)
    display_width = max(0.0, x1 - x0)
    display_height = max(0.0, y1 - y0)
    if display_width <= 0 or display_height <= 0:
        return False

    display_area = display_width * display_height
    page_area = max(1.0, float(dimensions.width * dimensions.height))
    area_ratio = display_area / page_area
    width_ratio = display_width / max(1.0, float(dimensions.width))
    aspect = max(
        display_width / max(display_height, 1e-6),
        display_height / max(display_width, 1e-6),
    )

    pixel_width, pixel_height = _image_entry_pixel_size(image_entry)
    min_display_side = min(display_width, display_height)
    max_display_side = max(display_width, display_height)
    if min_display_side < 22 or display_area < 1500:
        return False
    if pixel_width and pixel_height and min(pixel_width, pixel_height) < 18 and max(pixel_width, pixel_height) < 80:
        return False
    if aspect >= 8.0:
        return False
    if width_ratio >= 0.65 and aspect >= 3.5 and area_ratio < 0.22:
        return False

    xref = int(image_entry[0])
    profile = _pdf_image_content_profile(doc, xref, profile_cache)
    looks_like_text_raster = _is_text_raster_profile(profile)

    # Many HTML-to-PDF and scanned web pages encode body text as wide raster
    # strips. Those are technically PDF images, but adding them to markdown
    # duplicates OCR text and pollutes the knowledge base.
    if looks_like_text_raster:
        if text_raster_strip_count >= 2 and width_ratio >= 0.55 and aspect >= 1.55:
            return False
        if text_raster_strip_count >= 2 and area_ratio >= 0.045 and aspect >= 1.35:
            return False
        if area_ratio < 0.02 and aspect >= 2.2:
            return False

    # Very wide small images are usually separators, navigation labels, or
    # rasterized text snippets rather than document figures.
    if max_display_side >= 120 and aspect >= 4.5 and area_ratio < 0.035:
        return False

    return True


def _extract_pdf_image_rects(
    pdf_path: Path,
    page_index: int,
    dimensions: OCRPageDimensions,
    blocks: List[Dict[str, Any]],
) -> List[OCRImage]:
    doc = None
    try:
        doc = fitz.open(str(pdf_path))
        if doc.needs_pass and not doc.authenticate(""):
            raise SiliconFlowDeepSeekOCRError(f"PDF requires a password: {pdf_path}")
        page = doc[page_index]
        page_rect = page.rect
        rects: List[Tuple[fitz.Rect, str]] = []
        profile_cache: Dict[int, Optional[Dict[str, float]]] = {}
        pdf_image_candidates: List[Tuple[Tuple[Any, ...], fitz.Rect]] = []

        for block in blocks:
            block_type = str(block.get("type") or "").strip().lower()
            bbox = block.get("bbox")
            if block_type not in _IMAGE_BLOCK_TYPES or not isinstance(bbox, list):
                continue
            rect = _bbox_to_pdf_rect(bbox, page_rect, dimensions)
            if rect is not None:
                rects.append((rect, "deepseek_block"))

        for image_entry in page.get_images(full=True):
            if not image_entry:
                continue
            xref = image_entry[0]
            try:
                image_rects = page.get_image_rects(xref)
            except Exception:
                continue
            for rect in image_rects:
                clipped = _clip_rect_to_page(rect, page_rect)
                if clipped is not None:
                    pdf_image_candidates.append((image_entry, clipped))

        text_raster_strip_count = 0
        for image_entry, rect in pdf_image_candidates:
            x0, y0, x1, y1 = _scale_rect_to_rendered(rect, page_rect, dimensions)
            display_width = max(0.0, x1 - x0)
            display_height = max(0.0, y1 - y0)
            display_area = display_width * display_height
            page_area = max(1.0, float(dimensions.width * dimensions.height))
            area_ratio = display_area / page_area
            width_ratio = display_width / max(1.0, float(dimensions.width))
            aspect = max(
                display_width / max(display_height, 1e-6),
                display_height / max(display_width, 1e-6),
            )
            xref = int(image_entry[0])
            profile = _pdf_image_content_profile(doc, xref, profile_cache)
            if (
                _is_text_raster_profile(profile)
                and width_ratio >= 0.55
                and aspect >= 1.55
                and area_ratio >= 0.025
            ):
                text_raster_strip_count += 1

        for image_entry, clipped in pdf_image_candidates:
            if _should_keep_pdf_image_rect(
                    doc,
                    image_entry,
                    clipped,
                    page_rect,
                    dimensions,
                    profile_cache,
                    text_raster_strip_count=text_raster_strip_count,
            ):
                rects.append((clipped, "pdf"))

        images: List[OCRImage] = []
        merged_rects = _merge_image_rect_candidates(rects, page_rect, dimensions)
        save_full_page_images = _env_bool("SILICONFLOW_DEEPSEEK_OCR_SAVE_FULL_PAGE_IMAGES", False)
        for idx, (rect, source) in enumerate(merged_rects):
            rect = _expand_rect_for_crop(rect, page_rect, dimensions)
            x0, y0, x1, y1 = _scale_rect_to_rendered(rect, page_rect, dimensions)
            if x1 <= x0 or y1 <= y0:
                continue
            save_image = save_full_page_images or not _is_near_full_page_rect(rect, page_rect, dimensions)
            images.append(
                OCRImage(
                    id=f"img-{idx}",
                    top_left_x=x0,
                    top_left_y=y0,
                    bottom_right_x=x1,
                    bottom_right_y=y1,
                    source=source,
                    save=save_image,
                )
            )
        return images
    except Exception as exc:
        print(f"DeepSeek-OCR image region extraction failed for page {page_index + 1}: {exc}", flush=True)
        return []
    finally:
        if doc is not None:
            doc.close()


def _image_to_dict(image: OCRImage) -> Dict[str, Any]:
    return {
        "id": image.id,
        "top_left_x": image.top_left_x,
        "top_left_y": image.top_left_y,
        "bottom_right_x": image.bottom_right_x,
        "bottom_right_y": image.bottom_right_y,
        "image_base64": image.image_base64,
        "type": image.type,
        "source": image.source,
        "save": image.save,
    }


def _image_markdown_url(image_id: str, url: str) -> str:
    return f"![{image_id}]({url})"


def _needs_full_page_transcription(
    markdown: str,
    images: List[OCRImage],
    dimensions: OCRPageDimensions,
    *,
    max_markdown_chars: int,
) -> bool:
    if max_markdown_chars <= 0 or len((markdown or "").strip()) >= max_markdown_chars:
        return False
    page_area = max(1.0, float(dimensions.width * dimensions.height))
    for image in images:
        width = max(0.0, float(image.bottom_right_x) - float(image.top_left_x))
        height = max(0.0, float(image.bottom_right_y) - float(image.top_left_y))
        if width * height / page_area >= 0.72:
            return True
    return False


def _storage_url() -> str:
    value = os.getenv("TASK_EXECUTOR_STORAGE_URL", "https://dev.knowledge.yunwoai.com/storage")
    try:
        import value as task_value  # type: ignore

        value = getattr(task_value, "storageUrl", value)
    except Exception:
        pass
    return str(value).rstrip("/")


def _db_file_helpers() -> Tuple[Optional[Callable[..., Any]], Optional[Callable[..., Any]]]:
    try:
        import database  # type: ignore

        return getattr(database, "get_file_by_obj_key", None), getattr(database, "insert_file", None)
    except Exception:
        return None, None


def _crop_pdf_image(
    pdf_path: str,
    page_number: int,
    image: OCRImage,
    output_path: str,
    page_dimensions: Optional[Dict[str, Any]],
) -> bool:
    doc = None
    pix = None
    pil_image = None
    try:
        doc = fitz.open(pdf_path)
        if doc.needs_pass and not doc.authenticate(""):
            return False
        page_index = page_number - 1
        if page_index < 0 or page_index >= len(doc):
            return False
        page = doc[page_index]
        page_rect = page.rect
        ocr_width = float((page_dimensions or {}).get("width") or page_rect.width)
        ocr_height = float((page_dimensions or {}).get("height") or page_rect.height)
        dpi = int((page_dimensions or {}).get("dpi") or 300)
        width_scale = page_rect.width / max(ocr_width, 1e-6)
        height_scale = page_rect.height / max(ocr_height, 1e-6)
        rect = fitz.Rect(
            page_rect.x0 + image.top_left_x * width_scale,
            page_rect.y0 + image.top_left_y * height_scale,
            page_rect.x0 + image.bottom_right_x * width_scale,
            page_rect.y0 + image.bottom_right_y * height_scale,
        )
        rect = _clip_rect_to_page(rect, page_rect)
        if rect is None:
            return False
        matrix = fitz.Matrix(max(72, dpi) / 72.0, max(72, dpi) / 72.0)
        pix = page.get_pixmap(matrix=matrix, clip=rect, alpha=False)
        pil_image = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB")
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        pil_image.save(output_path, format="JPEG", quality=92, optimize=True)
        return True
    except Exception as exc:
        print(f"DeepSeek-OCR image crop failed: {exc}", flush=True)
        return False
    finally:
        if pil_image is not None:
            try:
                pil_image.close()
            except Exception:
                pass
        pix = None
        if doc is not None:
            doc.close()


def _save_pdf_images(
    pdf_path: str,
    pages: List[Dict[str, Any]],
    output_dir: Optional[str],
    *,
    file_remark: str = "",
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    pdf_md5 = hashlib.md5(str(pdf_path).encode("utf-8")).hexdigest()
    if output_dir:
        image_dir = output_dir
    else:
        module_path = Path(__file__).resolve()
        storage_root = (
            module_path.parents[2] / "storage"
            if len(module_path.parents) > 2
            else Path.cwd() / "storage"
        )
        image_dir = str(storage_root / "pdf_images" / pdf_md5)
    os.makedirs(image_dir, exist_ok=True)

    storage_url = _storage_url()
    get_file_by_obj_key, insert_file = _db_file_helpers()
    extracted_images: List[Dict[str, Any]] = []
    total = 0
    saved = 0
    skipped = 0

    for page in pages:
        page_number = int(page.get("page_number") or 0)
        page_images = page.get("images") or []
        normalized_page_images: List[Dict[str, Any]] = []
        for image in page_images:
            if isinstance(image, OCRImage):
                image_obj = image
            elif isinstance(image, dict):
                image_obj = OCRImage(
                    id=str(image.get("id") or image.get("image_id") or f"img-{total}"),
                    top_left_x=float(image.get("top_left_x") or 0),
                    top_left_y=float(image.get("top_left_y") or 0),
                    bottom_right_x=float(image.get("bottom_right_x") or 0),
                    bottom_right_y=float(image.get("bottom_right_y") or 0),
                    image_base64=image.get("image_base64"),
                    source=str(image.get("source") or "pdf"),
                    save=_coerce_bool(image.get("save"), True),
                )
            else:
                continue

            if not image_obj.save:
                skipped += 1
                continue

            total += 1
            image_filename = f"page_{page_number}_chunk_0_img_{image_obj.id}.jpeg"
            image_path = os.path.join(image_dir, image_filename)
            success = False

            if image_obj.image_base64:
                try:
                    os.makedirs(os.path.dirname(image_path), exist_ok=True)
                    with open(image_path, "wb") as img_file:
                        img_file.write(base64.b64decode(image_obj.image_base64))
                    success = True
                except Exception as exc:
                    print(f"DeepSeek-OCR image base64 save failed: {exc}", flush=True)

            if not success:
                success = _crop_pdf_image(
                    pdf_path,
                    page_number,
                    image_obj,
                    image_path,
                    page.get("dimensions"),
                )

            if not success:
                continue

            try:
                with open(image_path, "rb") as img_file:
                    image_bytes = img_file.read()
                image_md5 = hashlib.md5(image_bytes).hexdigest()
                image_size = len(image_bytes)
            except Exception:
                continue

            obj_key = f"/pdf_images/{pdf_md5}/{image_filename}"
            if insert_file is not None:
                try:
                    exists = get_file_by_obj_key(obj_key) if get_file_by_obj_key is not None else None
                    if exists is None:
                        insert_file(
                            f"storage/pdf_images/{pdf_md5}/{image_filename}",
                            obj_key,
                            image_filename,
                            "jpeg",
                            image_size,
                            image_md5,
                            None,
                            False,
                            file_remark=file_remark,
                        )
                except Exception as exc:
                    print(f"DeepSeek-OCR image file record insert failed: {exc}", flush=True)

            image_url = f"{storage_url}{obj_key}"
            extracted = {
                "image_id": image_obj.id,
                "page_number": page_number,
                "chunk_index": 0,
                "image_path": image_path,
                "url": image_url,
                "obj_key": obj_key,
                "coordinates": {
                    "top_left_x": image_obj.top_left_x,
                    "top_left_y": image_obj.top_left_y,
                    "bottom_right_x": image_obj.bottom_right_x,
                    "bottom_right_y": image_obj.bottom_right_y,
                },
                "dimensions": page.get("dimensions"),
                "source": image_obj.source,
            }
            extracted_images.append(extracted)
            normalized_page_images.append(extracted)
            saved += 1

            markdown = str(page.get("markdown") or "")
            placeholder = f"({image_obj.id})"
            if placeholder in markdown:
                markdown = markdown.replace(placeholder, f"({image_url})")
            elif image_url not in markdown:
                markdown = f"{markdown.rstrip()}\n\n{_image_markdown_url(image_obj.id, image_url)}".strip()
            page["markdown"] = markdown

        page["images"] = normalized_page_images

    if total or skipped:
        print(
            f"DeepSeek-OCR image extraction: saved {saved}/{total} image(s), skipped {skipped}",
            flush=True,
        )
    return pages, extracted_images


_REF_DET_RE = re.compile(r"<\|ref\|>(.*?)<\|/ref\|>\s*<\|det\|>(.*?)<\|/det\|>", re.S)


def _parse_bbox(det_text: str) -> Optional[List[float]]:
    try:
        parsed = json.loads(det_text)
        if isinstance(parsed, list) and parsed:
            candidate = parsed[0] if isinstance(parsed[0], list) else parsed
            if isinstance(candidate, list) and len(candidate) >= 4:
                return [float(candidate[0]), float(candidate[1]), float(candidate[2]), float(candidate[3])]
    except Exception:
        return None
    return None


def _clean_markdown_and_blocks(raw: str) -> Tuple[str, List[Dict[str, Any]]]:
    text = raw or ""
    matches = list(_REF_DET_RE.finditer(text))
    blocks: List[Dict[str, Any]] = []
    for idx, match in enumerate(matches):
        start = match.end()
        end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
        block_text = text[start:end].strip()
        block: Dict[str, Any] = {
            "type": match.group(1).strip() or "text",
            "text": block_text,
        }
        bbox = _parse_bbox(match.group(2).strip())
        if bbox is not None:
            block["bbox"] = bbox
        blocks.append(block)

    cleaned = _REF_DET_RE.sub("", text)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned).strip()
    return cleaned, blocks


def _extract_message_content(data: Dict[str, Any]) -> str:
    choices = data.get("choices")
    if not isinstance(choices, list) or not choices:
        raise SiliconFlowDeepSeekOCRError(f"SiliconFlow response missing choices: {str(data)[:1000]}")
    first = choices[0] or {}
    message = first.get("message") or {}
    content = message.get("content")
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts = []
        for item in content:
            if isinstance(item, dict):
                text = item.get("text") or item.get("content")
                if text:
                    parts.append(str(text))
            elif item is not None:
                parts.append(str(item))
        return "\n".join(parts)
    return "" if content is None else str(content)


class _OCRResource:
    def __init__(self, client: "SiliconFlowDeepSeekOCRClient") -> None:
        self._client = client

    def process(self, *, model: Optional[str] = None, document: Dict[str, Any], **_: Any) -> OCRResponse:
        return self._client.process_ocr(model=model, document=document)


class SiliconFlowDeepSeekOCRClient:
    def __init__(
        self,
        *,
        api_key: str,
        base_url: str = "https://api.siliconflow.cn/v1",
        api_base: Optional[str] = None,
        model: str = "deepseek-ai/DeepSeek-OCR",
        timeout_sec: float = 120.0,
        page_workers: int = 8,
        max_concurrent_requests: Optional[int] = None,
        trust_env: bool = False,
        max_retries: int = 10,
        rate_limit_retry_forever: bool = True,
        channel_retry_forever: bool = True,
        rate_limit_initial_wait_sec: float = 10.0,
        rate_limit_max_wait_sec: float = 120.0,
        rate_limit_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
        max_tokens: int = 7168,
        prompt: str = DEFAULT_DEEPSEEK_OCR_PROMPT,
        fallback_prompt: Optional[str] = DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT,
        transcribe_prompt: Optional[str] = DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT,
        full_page_transcribe: bool = True,
        full_page_transcribe_max_chars: int = 180,
        image_detail: str = "high",
        max_pages: int = 0,
        dpi: int = 160,
        max_image_edge: int = 2400,
        jpeg_quality: int = 86,
        max_image_bytes: int = 7 * 1024 * 1024,
    ) -> None:
        self.api_key = _api_key(api_key)
        self.base_url = (api_base or base_url).rstrip("/")
        self.model = model
        self.timeout_sec = timeout_sec
        self.page_workers = max(1, int(page_workers))
        limit = max_concurrent_requests if max_concurrent_requests is not None else self.page_workers
        self._semaphore = threading.BoundedSemaphore(max(1, limit))
        self.trust_env = bool(trust_env)
        self.max_retries = max(0, int(max_retries))
        self.rate_limit_retry_forever = bool(rate_limit_retry_forever)
        self.channel_retry_forever = bool(channel_retry_forever)
        self.rate_limit_initial_wait_sec = max(1.0, float(rate_limit_initial_wait_sec))
        self.rate_limit_max_wait_sec = max(
            self.rate_limit_initial_wait_sec,
            float(rate_limit_max_wait_sec),
        )
        self.rate_limit_callback = rate_limit_callback
        self.max_tokens = max(128, int(max_tokens))
        self.prompt = prompt
        self.fallback_prompt = fallback_prompt
        self.transcribe_prompt = transcribe_prompt
        self.full_page_transcribe = bool(full_page_transcribe)
        self.full_page_transcribe_max_chars = max(0, int(full_page_transcribe_max_chars))
        self.image_detail = image_detail
        self.max_pages = max(0, int(max_pages))
        self.dpi = max(72, int(dpi))
        self.max_image_edge = max(0, int(max_image_edge))
        self.jpeg_quality = max(40, min(95, int(jpeg_quality)))
        self.max_image_bytes = max(256 * 1024, int(max_image_bytes))
        self.ocr = _OCRResource(self)
        self._client_lock = threading.Lock()
        self._client: Optional[httpx.Client] = None

    def _should_retry_forever(self, status_code: Optional[int] = None, body: str = "") -> bool:
        if status_code is None:
            return self.channel_retry_forever
        if _is_rate_limit_error(status_code, body):
            return self.rate_limit_retry_forever or self.channel_retry_forever
        return self.channel_retry_forever and _is_retryable_http_error(status_code, body)

    def _http_client(self) -> httpx.Client:
        client = self._client
        if client is None or client.is_closed:
            with self._client_lock:
                client = self._client
                if client is not None and not client.is_closed:
                    return client
                client = self._make_http_client()
                self._client = client
        return client

    def close(self) -> None:
        with self._client_lock:
            client = self._client
            self._client = None
        if client is not None:
            try:
                client.close()
            except Exception:
                pass

    def __enter__(self) -> "SiliconFlowDeepSeekOCRClient":
        return self

    def __exit__(self, *_: Any) -> None:
        self.close()

    def _make_http_client(self) -> httpx.Client:
        # Kept for compatibility with code that may subclass and override client construction.
        timeout = httpx.Timeout(self.timeout_sec, connect=min(15.0, self.timeout_sec))
        limits = httpx.Limits(
            max_keepalive_connections=max(1, self.page_workers),
            max_connections=max(1, self.page_workers * 2),
        )
        return httpx.Client(timeout=timeout, trust_env=self.trust_env, limits=limits, http2=False)

    def _resolve_pdf_path(self, document: Dict[str, Any]) -> Tuple[Path, Optional[Path]]:
        doc_type = str(document.get("type") or "").lower()
        file_path = document.get("file_path") or document.get("path")
        if file_path:
            path = Path(str(file_path)).expanduser().resolve()
            if not path.exists():
                raise FileNotFoundError(str(path))
            return path, None

        if doc_type in {"local_file", "file"} and document.get("document_url"):
            path = Path(str(document["document_url"])).expanduser().resolve()
            if not path.exists():
                raise FileNotFoundError(str(path))
            return path, None

        encoded = document.get("document_base64") or document.get("file_base64") or document.get("base64")
        if encoded:
            raw = base64.b64decode(str(encoded).split(",", 1)[-1])
            temp = _write_temp_pdf(raw)
            return temp, temp

        document_url = document.get("document_url")
        if document_url:
            url = str(document_url)
            if url.startswith("file://"):
                path = Path(url[7:]).expanduser().resolve()
                if not path.exists():
                    raise FileNotFoundError(str(path))
                return path, None
            if url.startswith(("http://", "https://")):
                response = self._http_client().get(url)
                response.raise_for_status()
                temp = _write_temp_pdf(response.content)
                return temp, temp

        raise SiliconFlowDeepSeekOCRError("document must contain local file_path, document_base64, or document_url")

    def _request_payload(self, image_bytes: bytes, model: str, prompt: str, max_tokens: int) -> Dict[str, Any]:
        image_data = base64.b64encode(image_bytes).decode("ascii")
        return {
            "model": model,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{image_data}",
                                "detail": self.image_detail,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
            "stream": False,
            "max_tokens": max_tokens,
            "temperature": 0,
        }

    def _next_safe_max_tokens(self, current_max_tokens: int) -> Optional[int]:
        if current_max_tokens <= 1024:
            return None
        if current_max_tokens > 7168:
            return 7168
        if current_max_tokens > 6144:
            return 6144
        if current_max_tokens > 4096:
            return 4096
        return max(1024, current_max_tokens // 2)

    def _retry_wait_seconds(self, retry_count: int) -> float:
        return min(30.0, 1.5 * max(1, retry_count))

    def _post_page(self, image_bytes: bytes, model: str, prompt: str) -> Dict[str, Any]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        url = f"{self.base_url}/chat/completions"
        last_error: Optional[BaseException] = None
        current_max_tokens = self.max_tokens
        context_limit_adjustments = 0

        retry_count = 0
        rate_limit_attempt = 0
        while True:
            try:
                payload = self._request_payload(image_bytes, model, prompt, current_max_tokens)
                with self._semaphore:
                    response = self._http_client().post(url, headers=headers, json=payload)
                if response.status_code < 400:
                    try:
                        return response.json()
                    except json.JSONDecodeError as exc:
                        error = SiliconFlowDeepSeekOCRError(
                            f"SiliconFlow returned non-JSON response: {response.text[:1000]}"
                        )
                        last_error = error
                        if self.channel_retry_forever:
                            retry_count += 1
                            wait_sec = self._retry_wait_seconds(retry_count)
                            print(
                                f"DeepSeek-OCR non-JSON response, retrying forever "
                                f"(attempt={retry_count}, wait={wait_sec:.1f}s): "
                                f"{response.text[:200]}",
                                flush=True,
                            )
                            time.sleep(wait_sec)
                            continue
                        if retry_count < self.max_retries:
                            retry_count += 1
                            wait_sec = self._retry_wait_seconds(retry_count)
                            print(
                                f"DeepSeek-OCR non-JSON response, retry "
                                f"{retry_count}/{self.max_retries} in {wait_sec:.1f}s: "
                                f"{response.text[:200]}",
                                flush=True,
                            )
                            time.sleep(wait_sec)
                            continue
                        raise error from exc
                body = response.text[:1000]
                error = SiliconFlowDeepSeekOCRError(f"SiliconFlow HTTP {response.status_code}: {body}")
                if (
                    response.status_code == 400
                    and "max_tokens" in body
                    and "max_seq_len" in body
                ):
                    next_max_tokens = self._next_safe_max_tokens(current_max_tokens)
                    if next_max_tokens is not None and context_limit_adjustments < 6:
                        last_error = error
                        current_max_tokens = next_max_tokens
                        context_limit_adjustments += 1
                        print(
                            f"DeepSeek-OCR reduced max_tokens to {current_max_tokens} after context-limit error",
                            flush=True,
                        )
                        continue
                if _is_rate_limit_error(response.status_code, body):
                    last_error = error
                    rate_limit_attempt += 1
                    retry_after = _retry_after_seconds(response.headers)
                    wait_sec = retry_after
                    if wait_sec is None:
                        wait_sec = min(
                            self.rate_limit_max_wait_sec,
                            self.rate_limit_initial_wait_sec * (2 ** min(rate_limit_attempt - 1, 6)),
                        )
                    if self.rate_limit_callback is not None:
                        try:
                            self.rate_limit_callback(
                                {
                                    "status_code": response.status_code,
                                    "body": body,
                                    "rate_limit_attempt": rate_limit_attempt,
                                    "retry_after_sec": retry_after,
                                    "wait_sec": wait_sec,
                                }
                            )
                        except Exception as callback_error:
                            print(
                                f"DeepSeek-OCR rate-limit callback failed: {callback_error}",
                                flush=True,
                            )
                    print(
                        f"DeepSeek-OCR rate limited, retrying forever "
                        f"(rate_limit_attempt={rate_limit_attempt}, wait={wait_sec:.1f}s): {body[:200]}",
                        flush=True,
                    )
                    time.sleep(wait_sec)
                    if self._should_retry_forever(response.status_code, body):
                        continue
                    if retry_count < self.max_retries:
                        retry_count += 1
                        continue
                    break
                if self._should_retry_forever(response.status_code, body):
                    last_error = error
                    retry_count += 1
                    wait_sec = self._retry_wait_seconds(retry_count)
                    print(
                        f"DeepSeek-OCR retryable HTTP error, retrying forever "
                        f"(attempt={retry_count}, wait={wait_sec:.1f}s, status={response.status_code}): "
                        f"{body[:200]}",
                        flush=True,
                    )
                    time.sleep(wait_sec)
                    continue
                last_error = error
                if retry_count < self.max_retries:
                    retry_count += 1
                    wait_sec = self._retry_wait_seconds(retry_count)
                    print(
                        f"DeepSeek-OCR HTTP error, retry "
                        f"{retry_count}/{self.max_retries} in {wait_sec:.1f}s "
                        f"(status={response.status_code}): {body[:200]}",
                        flush=True,
                    )
                    time.sleep(wait_sec)
                    continue
                raise error
            except (httpx.ConnectError, httpx.ReadError, httpx.RemoteProtocolError, httpx.TimeoutException) as exc:
                last_error = exc
                if self._should_retry_forever():
                    retry_count += 1
                    wait_sec = self._retry_wait_seconds(retry_count)
                    print(
                        f"DeepSeek-OCR network error, retrying forever "
                        f"(attempt={retry_count}, wait={wait_sec:.1f}s): {exc}",
                        flush=True,
                    )
                    time.sleep(wait_sec)
                    continue
                if retry_count < self.max_retries:
                    retry_count += 1
                    wait_sec = self._retry_wait_seconds(retry_count)
                    print(
                        f"DeepSeek-OCR network error, retry {retry_count}/{self.max_retries} "
                        f"in {wait_sec:.1f}s: {exc}",
                        flush=True,
                    )
                    time.sleep(wait_sec)
                    continue
                break

        raise SiliconFlowDeepSeekOCRError(f"SiliconFlow DeepSeek-OCR request failed: {last_error}") from last_error

    def _ocr_page(self, pdf_path: Path, page_index: int, model: str) -> OCRPage:
        image_bytes, dimensions, image_meta = _render_page_jpeg(
            pdf_path,
            page_index,
            dpi=self.dpi,
            max_edge=self.max_image_edge,
            quality=self.jpeg_quality,
            max_bytes=self.max_image_bytes,
        )
        started = time.time()
        data = self._post_page(image_bytes, model, self.prompt)
        raw_markdown = _extract_message_content(data)
        used_prompt = "primary"
        if not raw_markdown.strip() and self.fallback_prompt and self.fallback_prompt != self.prompt:
            data = self._post_page(image_bytes, model, self.fallback_prompt)
            raw_markdown = _extract_message_content(data)
            used_prompt = "fallback"
        markdown, blocks = _clean_markdown_and_blocks(raw_markdown)
        images = _extract_pdf_image_rects(pdf_path, page_index, dimensions, blocks)
        if (
            self.full_page_transcribe
            and self.transcribe_prompt
            and _needs_full_page_transcription(
                markdown,
                images,
                dimensions,
                max_markdown_chars=self.full_page_transcribe_max_chars,
            )
        ):
            try:
                transcribe_data = self._post_page(image_bytes, model, self.transcribe_prompt)
                transcribed_raw = _extract_message_content(transcribe_data)
                transcribed_markdown, _ = _clean_markdown_and_blocks(transcribed_raw)
                if len(transcribed_markdown.strip()) > len(markdown.strip()):
                    data = transcribe_data
                    raw_markdown = transcribed_raw
                    markdown = transcribed_markdown
                    used_prompt = "transcribe"
            except Exception as exc:
                print(f"DeepSeek-OCR full-page transcription fallback failed: {exc}", flush=True)
        usage = data.get("usage") if isinstance(data, dict) else None
        if usage is None:
            usage = {}
        usage = dict(usage)
        usage["elapsed_sec"] = round(time.time() - started, 3)
        usage["image"] = image_meta
        usage["prompt_variant"] = used_prompt
        return OCRPage(
            index=page_index,
            markdown=markdown,
            images=images,
            dimensions=dimensions,
            blocks=blocks,
            raw_markdown=raw_markdown,
            usage=usage,
        )

    def process_ocr(self, *, model: Optional[str], document: Dict[str, Any]) -> OCRResponse:
        started = time.time()
        model_name = model or self.model
        pdf_path, cleanup_path = self._resolve_pdf_path(document)
        try:
            page_count = _page_count(pdf_path)
            if page_count <= 0:
                raise SiliconFlowDeepSeekOCRError(f"PDF has no pages: {pdf_path}")

            if self.max_pages > 0:
                page_count = min(page_count, self.max_pages)

            print(
                f"DeepSeek-OCR start: file={pdf_path}, pages={page_count}, "
                f"workers={self.page_workers}, model={model_name}",
                flush=True,
            )
            pages: List[Optional[OCRPage]] = [None] * page_count
            with ThreadPoolExecutor(max_workers=max(1, self.page_workers)) as executor:
                futures = {
                    executor.submit(self._ocr_page, pdf_path, page_index, model_name): page_index
                    for page_index in range(page_count)
                }
                try:
                    for future in as_completed(futures):
                        page_index = futures[future]
                        try:
                            pages[page_index] = future.result()
                            print(
                                f"DeepSeek-OCR page {page_index + 1}/{page_count} done, "
                                f"chars={len(pages[page_index].markdown if pages[page_index] else '')}",
                                flush=True,
                            )
                        except Exception:
                            print(f"DeepSeek-OCR page {page_index + 1}/{page_count} failed", flush=True)
                            traceback.print_exc()
                            raise
                except Exception:
                    for pending in futures:
                        pending.cancel()
                    raise

            normalized_pages = [page for page in pages if page is not None]
            usage_info = {
                "provider": "siliconflow",
                "engine": "deepseek_ocr",
                "page_count": len(normalized_pages),
                "elapsed_sec": round(time.time() - started, 3),
                "page_workers": self.page_workers,
            }
            return OCRResponse(pages=normalized_pages, model=model_name, usage_info=usage_info)
        finally:
            if cleanup_path is not None:
                try:
                    cleanup_path.unlink(missing_ok=True)
                except Exception:
                    pass


def run_siliconflow_deepseek_pdf_ocr(
    pdf_path: str,
    *,
    save_images: bool = False,
    output_dir: Optional[str] = None,
    api_key: Optional[str] = None,
    base_url: Optional[str] = None,
    client: Optional[SiliconFlowDeepSeekOCRClient] = None,
) -> Optional[Tuple[Any, ...]]:
    ocr_client = client or SiliconFlowDeepSeekOCRClient(
        api_key=_api_key(api_key),
        base_url=base_url
        or os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_BASE_URL",
            os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1"),
        ),
        model=os.getenv("SILICONFLOW_DEEPSEEK_OCR_MODEL", "deepseek-ai/DeepSeek-OCR"),
        timeout_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_TIMEOUT_SEC", 120.0),
        page_workers=max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_PAGE_WORKERS", 8)),
        max_concurrent_requests=max(1, _env_int("SILICONFLOW_DEEPSEEK_OCR_CLIENT_CONCURRENCY", 8)),
        trust_env=_env_bool("SILICONFLOW_DEEPSEEK_OCR_TRUST_ENV", False),
        max_retries=max(0, _env_int("SILICONFLOW_DEEPSEEK_OCR_RETRIES", 10)),
        rate_limit_retry_forever=_env_bool("SILICONFLOW_DEEPSEEK_OCR_429_RETRY_FOREVER", True),
        channel_retry_forever=_env_bool("SILICONFLOW_DEEPSEEK_OCR_RETRY_FOREVER", True),
        rate_limit_initial_wait_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_429_INITIAL_WAIT_SEC", 10.0),
        rate_limit_max_wait_sec=_env_float("SILICONFLOW_DEEPSEEK_OCR_429_MAX_WAIT_SEC", 120.0),
        max_tokens=max(128, _env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_TOKENS", 7168)),
        prompt=os.getenv("SILICONFLOW_DEEPSEEK_OCR_PROMPT", DEFAULT_DEEPSEEK_OCR_PROMPT),
        fallback_prompt=os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_FALLBACK_PROMPT",
            DEFAULT_DEEPSEEK_OCR_FALLBACK_PROMPT,
        ),
        transcribe_prompt=os.getenv(
            "SILICONFLOW_DEEPSEEK_OCR_TRANSCRIBE_PROMPT",
            DEFAULT_DEEPSEEK_OCR_TRANSCRIBE_PROMPT,
        ),
        full_page_transcribe=_env_bool("SILICONFLOW_DEEPSEEK_OCR_FULL_PAGE_TRANSCRIBE", True),
        full_page_transcribe_max_chars=max(
            0,
            _env_int("SILICONFLOW_DEEPSEEK_OCR_FULL_PAGE_TRANSCRIBE_MAX_CHARS", 180),
        ),
        image_detail=os.getenv("SILICONFLOW_DEEPSEEK_OCR_IMAGE_DETAIL", "high"),
        max_pages=_env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_PAGES", 0),
        dpi=max(72, _env_int("SILICONFLOW_DEEPSEEK_OCR_DPI", 160)),
        max_image_edge=_env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_IMAGE_EDGE", 2400),
        jpeg_quality=max(40, min(95, _env_int("SILICONFLOW_DEEPSEEK_OCR_JPEG_QUALITY", 86))),
        max_image_bytes=max(256 * 1024, _env_int("SILICONFLOW_DEEPSEEK_OCR_MAX_IMAGE_BYTES", 7 * 1024 * 1024)),
    )
    response = ocr_client.ocr.process(
        model=ocr_client.model,
        document={"type": "local_file", "file_path": pdf_path},
    )

    markdown_parts: List[str] = []
    normalized_pages: List[Dict[str, Any]] = []
    for page in response.pages:
        markdown = page.markdown or ""
        if markdown.strip():
            markdown_parts.append(markdown)
        normalized_pages.append(
            {
                "page_number": page.index + 1,
                "markdown": markdown,
                "chunk_index": 0,
                "images": [_image_to_dict(image) for image in page.images],
                "dimensions": page.dimensions.__dict__ if page.dimensions else None,
                "blocks": page.blocks,
                "provider": "siliconflow_deepseek_ocr",
                "model": response.model,
            }
        )

    markdown_result = "\n\n".join(part for part in markdown_parts if part.strip())

    extracted_images: List[Dict[str, Any]] = []
    if save_images:
        normalized_pages, extracted_images = _save_pdf_images(
            pdf_path,
            normalized_pages,
            output_dir,
            file_remark=f"attachment for {pdf_path}",
        )
        markdown_parts = [str(page.get("markdown") or "") for page in normalized_pages]
        markdown_result = "\n\n".join(part for part in markdown_parts if part.strip())
        if not markdown_result.strip():
            return None
        return markdown_result, normalized_pages, extracted_images
    if not markdown_result.strip():
        return None
    return markdown_result, normalized_pages
