diff --git a/service/milvusdb/app.py b/service/milvusdb/app.py
index a8b1aca..c20e4a0 100644
--- a/service/milvusdb/app.py
+++ b/service/milvusdb/app.py
@@ -37,7 +37,7 @@ def add(folder_id,doc_id,paragraph_index,paragraph_embedding,paragraph_original,
     vector = embedding.data[0].embedding
     # Convert to a flat float32 list compatible with Milvus (avoid extra nesting)
     vector = np.array(vector, dtype='float32').tolist()
-    manager.insert_text_vector(
+    record_id = manager.insert_text_vector(
         text_vector=vector,
         paragraph_index=paragraph_index,
         paragraph_text=paragraph_original,
@@ -45,6 +45,8 @@ def add(folder_id,doc_id,paragraph_index,paragraph_embedding,paragraph_original,
         doc_id=doc_id,
         folder_ids=[folder_id]
     )
+    if not record_id:
+        raise RuntimeError("Milvus single insert failed")
     print(f"Inserted text vector for doc_id {doc_id} in folder_id {folder_id}")
     return True
 
@@ -557,4 +559,4 @@ def get_adjacent_paragraphs(doc_id, paragraph_index, context_window=2):
 
 # add(folder_id=1, doc_id=1, paragraph_embedding="这是一个测试段落,主要内容为关于Python的教程", paragraph_original="这是一个测试段落,主要内容为关于Python的教程", paragraph_summary="Python教程", embedding=None)
 
-# print(search(folder_ids=[1,2,3,10005], query="江苏南通今天的天气怎么样?", top_k=100))
\ No newline at end of file
+# print(search(folder_ids=[1,2,3,10005], query="江苏南通今天的天气怎么样?", top_k=100))
diff --git a/service/milvusdb_dev/app.py b/service/milvusdb_dev/app.py
index d1a3383..a0dc8d4 100644
--- a/service/milvusdb_dev/app.py
+++ b/service/milvusdb_dev/app.py
@@ -271,7 +271,7 @@ def add(
     vector = embedding.data[0].embedding
     # Convert to a flat float32 list compatible with Milvus (avoid extra nesting)
     vector = np.array(vector, dtype="float32").tolist()
-    manager.insert_text_vector(
+    record_id = manager.insert_text_vector(
         text_vector=vector,
         paragraph_index=paragraph_index,
         paragraph_text=paragraph_original,
@@ -279,6 +279,8 @@ def add(
         doc_id=doc_id,
         folder_ids=[folder_id],
     )
+    if not record_id:
+        raise RuntimeError("Milvus single insert failed")
     logger.debug("Inserted text vector for doc_id=%s in folder_id=%s", doc_id, folder_id)
     return True
 
diff --git a/service/task_executor/agents.py b/service/task_executor/agents.py
index d3e48fd..69a8b20 100644
--- a/service/task_executor/agents.py
+++ b/service/task_executor/agents.py
@@ -10,6 +10,24 @@ import os
 import stat
 
 
+def _split_env_list(value):
+    return [item.strip() for item in value.split(",") if item.strip()]
+
+
+def _positive_int_env(name, default):
+    try:
+        return max(1, int(os.getenv(name, str(default))))
+    except (TypeError, ValueError):
+        return default
+
+
+def _float_env(name, default):
+    try:
+        return max(0.0, float(os.getenv(name, str(default))))
+    except (TypeError, ValueError):
+        return default
+
+
 def get_file_creation_timestamp(file_path):
     """
     获取文件创建时间戳，作为related_timestamp的后备方案
@@ -61,6 +79,20 @@ paragraph_summary_model = "glm-4-flash-250414"
 doc_metadata_extractor_model = "glm-z1-flash"
 image_summary_model = "glm-4v-flash"
 
+DOC_METADATA_EXTRACTOR_MODELS = (
+    _split_env_list(os.getenv("DOC_METADATA_EXTRACTOR_MODELS", ""))
+    or [doc_metadata_extractor_model]
+)
+DOC_METADATA_EXTRACTOR_MODEL_ROUNDS = _positive_int_env(
+    "DOC_METADATA_EXTRACTOR_MODEL_ROUNDS", 2
+)
+DOC_METADATA_EXTRACTOR_RETRIES_PER_MODEL = _positive_int_env(
+    "DOC_METADATA_EXTRACTOR_RETRIES_PER_MODEL", 3
+)
+DOC_METADATA_EXTRACTOR_RETRY_DELAY = _float_env(
+    "DOC_METADATA_EXTRACTOR_RETRY_DELAY", 2.0
+)
+
 try:
     DOC_METADATA_EXTRACTOR_MAX_TOKENS = int(
         os.getenv("DOC_METADATA_EXTRACTOR_MAX_TOKENS", "1024")
@@ -113,24 +145,54 @@ def doc_metadata_extractor(filename, content, file_path=None):
             "content": content
         }, ensure_ascii=False)
     })
-    print("send request")
-    response = llm_clients.get_model_response_json(
-        doc_metadata_extractor_model,
-        history,
-        0.7,
-        streamMode=True,
-        continueing_prompt=False,
-        max_completion_tokens=DOC_METADATA_EXTRACTOR_MAX_TOKENS,
-    )
-    print("Response:", response)
-    
-    # 检查响应是否为None或不是字典类型
-    if response is None or not isinstance(response, dict):
-        print(f"Invalid response type: {type(response)}, response: {response}")
-        return {}
-    
-    if response.get("success",True) == False:
-        return {}
+    response = None
+    last_error = None
+    models = DOC_METADATA_EXTRACTOR_MODELS or [doc_metadata_extractor_model]
+
+    for round_index in range(DOC_METADATA_EXTRACTOR_MODEL_ROUNDS):
+        for model_name in models:
+            try:
+                print(
+                    "send metadata request "
+                    f"model={model_name} round={round_index + 1}/"
+                    f"{DOC_METADATA_EXTRACTOR_MODEL_ROUNDS}"
+                )
+                response = llm_clients.get_model_response_json(
+                    model_name,
+                    history,
+                    0.3,
+                    streamMode=True,
+                    continueing_prompt=False,
+                    max_completion_tokens=DOC_METADATA_EXTRACTOR_MAX_TOKENS,
+                    max_retries=DOC_METADATA_EXTRACTOR_RETRIES_PER_MODEL,
+                    use_retry_list=False,
+                )
+                print("Metadata response:", response)
+
+                if response is None or not isinstance(response, dict):
+                    raise ValueError(f"invalid response type: {type(response)}")
+                if response.get("success", True) is False:
+                    raise RuntimeError(response.get("error") or "metadata response success=false")
+
+                required_text = (
+                    response.get("doc_name")
+                    or response.get("abstract")
+                    or response.get("doc_type")
+                )
+                if not required_text:
+                    raise ValueError("metadata response missing doc_name/abstract/doc_type")
+                break
+            except Exception as e:
+                last_error = e
+                print(f"Metadata extraction failed for model {model_name}: {e}")
+                response = None
+                if DOC_METADATA_EXTRACTOR_RETRY_DELAY > 0:
+                    time.sleep(DOC_METADATA_EXTRACTOR_RETRY_DELAY)
+        if response:
+            break
+
+    if not response:
+        raise RuntimeError(f"metadata extraction failed after retries: {last_error}")
     
     related_date = response.get("related_date", "")
     try:
diff --git a/service/task_executor/doc_processor.py b/service/task_executor/doc_processor.py
index 69e80db..188f79a 100644
--- a/service/task_executor/doc_processor.py
+++ b/service/task_executor/doc_processor.py
@@ -15,6 +15,10 @@ import os
 MILVUS_VARCHAR_SAFE_BYTES = 65000
 
 
+class VectorUploadError(RuntimeError):
+    """Raised when parsed content cannot be fully persisted to vector storage."""
+
+
 def _load_milvus_config():
     config_path = os.getenv("KNOWLEDGE_CONFIG_PATH")
     if not config_path:
@@ -422,6 +426,19 @@ def upload_paragraph_batch(folder_id, doc_id, paragraph_items, file_name=None, u
     else:
         result = milvusdb.add_paragraphs(folder_id, doc_id, clean_items)
 
+    if not isinstance(result, dict) or result.get("success") is not True:
+        raise VectorUploadError(
+            f"Milvus paragraph upload failed for doc_id={doc_id}, "
+            f"folder_id={folder_id}, result={result}"
+        )
+
+    inserted_count = result.get("inserted_count")
+    if inserted_count is not None and int(inserted_count) != len(clean_items):
+        raise VectorUploadError(
+            f"Milvus paragraph upload count mismatch for doc_id={doc_id}, "
+            f"expected={len(clean_items)}, inserted={inserted_count}"
+        )
+
     if file_name:
         indexes = [str(item["paragraph_index"]) for item in clean_items]
         print(f"段落批次 [{', '.join(indexes)}] 已上传到文件 {file_name}")
@@ -472,6 +489,8 @@ def parse_doc_personal_task(task_info, doc_info_task):
     results = []
     successful_results = {}
     failed_results = {}
+    total_processed_paragraphs = 0
+    total_uploaded_paragraphs = 0
     
     # 初始化全局段落索引计数器，从1开始
     global_paragraph_index = 1
@@ -553,10 +572,6 @@ def parse_doc_personal_task(task_info, doc_info_task):
 
             # 检查段落是否为空
             if len(paragraphs) == 0 and len(originalTexts) == 0:
-                if k == len(paragraphs_multi) - 1:
-                    database.set_task_status(task_info["id"], "completed")
-                    database.set_document_status(task_info["related_id"], "准备就绪")
-                    logger.info(f"文档 {file_name} 所有段落处理完成")
                 continue
                 
             paragraphs_prepare_to_directly_upload = []
@@ -605,19 +620,13 @@ def parse_doc_personal_task(task_info, doc_info_task):
             # 检查是否有有效的任务配置
             if len(task_configs) == 0:
                 logger.warning(f"文档 {file_name} 第 {k+1} 批段落没有有效内容，跳过处理")
-                if k == len(paragraphs_multi) - 1:
-                    database.set_task_status(task_info["id"], "completed")
-                    database.set_document_status(task_info["related_id"], "准备就绪")
                 # 注意：此处valid_paragraph_count为0，所以不需要更新global_paragraph_index
                 continue
                 
             try:
                 task_ids = processParagraphExecutor.add_tasks_batch(task_configs)
                 if not task_ids:  # 如果没有成功添加任务
-                    logger.warning(f"文档 {file_name} 第 {k+1} 批段落任务添加失败，跳过处理")
-                    # 仍然需要更新全局段落索引
-                    global_paragraph_index += valid_paragraph_count
-                    continue
+                    raise RuntimeError(f"文档 {file_name} 第 {k+1} 批段落任务添加失败")
                     
                 results = processParagraphExecutor.execute_all()
                 successful_results = processParagraphExecutor.get_successful_results()
@@ -628,13 +637,13 @@ def parse_doc_personal_task(task_info, doc_info_task):
                     logger.warning(f"文档 {file_name} 第 {k+1} 批有 {len(failed_results)} 个段落处理失败")
                     for task_id, error in failed_results.items():
                         logger.error(f"段落处理失败，任务ID: {task_id}, 错误: {str(error)}")
+                    raise RuntimeError(
+                        f"文档 {file_name} 第 {k+1} 批段落处理失败: {len(failed_results)}"
+                    )
                 
                 # 如果没有成功的结果，更新索引后继续下一批
                 if len(successful_results) == 0:
-                    logger.warning(f"文档 {file_name} 第 {k+1} 批段落处理完全失败")
-                    # 仍然需要更新全局段落索引
-                    global_paragraph_index += valid_paragraph_count
-                    continue
+                    raise RuntimeError(f"文档 {file_name} 第 {k+1} 批段落处理完全失败")
                 
                 # 创建多线程执行器用于段落上传
                 upload_executor = BatchTaskExecutor(
@@ -729,13 +738,32 @@ def parse_doc_personal_task(task_info, doc_info_task):
                                 logger.error(f"文档 {file_name} 第 {k+1} 批有 {len(upload_failed)} 个段落上传失败")
                                 for task_id, error in upload_failed.items():
                                     logger.error(f"段落上传失败，任务ID: {task_id}, 错误: {str(error)}")
+                                raise VectorUploadError(
+                                    f"文档 {file_name} 第 {k+1} 批段落上传失败: {len(upload_failed)}"
+                                )
+
+                            uploaded_count = 0
+                            for upload_result in upload_successful.values():
+                                if isinstance(upload_result, dict):
+                                    uploaded_count += int(upload_result.get("inserted_count", upload_result.get("count", 0)) or 0)
+                                else:
+                                    uploaded_count += 1
+                            expected_upload_count = len(sorted_successful_items)
+                            if uploaded_count != expected_upload_count:
+                                raise VectorUploadError(
+                                    f"文档 {file_name} 第 {k+1} 批段落上传数量不一致: "
+                                    f"expected={expected_upload_count}, uploaded={uploaded_count}"
+                                )
+                            total_processed_paragraphs += expected_upload_count
+                            total_uploaded_paragraphs += uploaded_count
                             
                             logger.info(f"文档 {file_name} 第 {k+1} 批段落上传完成，成功: {len(upload_successful)}, 失败: {len(upload_failed)}")
                         else:
-                            logger.error(f"文档 {file_name} 第 {k+1} 批段落上传任务添加失败")
+                            raise VectorUploadError(f"文档 {file_name} 第 {k+1} 批段落上传任务添加失败")
                         
                     except Exception as e:
                         logger.error(f"文档 {file_name} 第 {k+1} 批段落上传任务执行失败: {str(e)}")
+                        raise
                     finally:
                         # 清理上传执行器
                         if upload_executor:
@@ -758,8 +786,7 @@ def parse_doc_personal_task(task_info, doc_info_task):
                 _safe_append_failedlog(
                     f"文档 {file_name} 第 {k+1} 批段落处理任务创建失败,错误信息: {str(e)}，错误位置：2\n"
                 )
-                # 不要因为一批失败就整个任务失败，继续处理下一批
-                continue
+                raise
             finally:
                 # 清理批处理任务执行器
                 if processParagraphExecutor:
@@ -783,72 +810,74 @@ def parse_doc_personal_task(task_info, doc_info_task):
                 gc.collect()
 
         # 所有批次处理完成后，添加文档元信息提取功能
-        try:
-            logger.info(f"开始提取文档 {file_name} 的元信息")
-            
-            # 获取文档的完整内容用于元信息提取
-            # 合并所有段落的原始文本内容
-            full_content = ""
-            if originalText_multi:
-                for original_texts in originalText_multi:
-                    if original_texts:
-                        for text in original_texts:
-                            if text and text.strip():
-                                full_content += text + "\n"
+        if total_processed_paragraphs != total_uploaded_paragraphs:
+            raise VectorUploadError(
+                f"文档 {file_name} 段落上传不完整: "
+                f"processed={total_processed_paragraphs}, uploaded={total_uploaded_paragraphs}"
+            )
+        if total_uploaded_paragraphs <= 0:
+            raise VectorUploadError(f"文档 {file_name} 没有任何段落向量上传成功")
+
+        logger.info(f"开始提取文档 {file_name} 的元信息")
             
-            # 限制内容长度，避免超出模型限制
-            if len(full_content) > 20000:  # 限制为50k字符
-                full_content = full_content[:20000] + "..."
-                
-            # 获取文档的基本信息用于元数据上传和文件路径获取
-            doc_info = database.get_doc_info(task_info["related_id"])
-            print(f"文档 {file_name} 元信息提取中，内容长度: {len(full_content)}")
-            # 使用智能体提取文档元信息
-            metadata = agents.doc_metadata_extractor(file_name, full_content, doc_info["file_path"] if doc_info else None)
-            print(metadata)
-            if metadata:
-                logger.info(f"文档 {file_name} 元信息提取成功: {metadata}")
-                
-                if doc_info:
-                    # 确定owner_type和owner_id
-                    owner_type = doc_info.get("type", "个人")  # 默认个人类型
-                    owner_id = doc_info.get("owner_id", task_info["task_user_id"])
-                    
-                    # 获取文件大小信息
-                    import os
-                    file_size = 0
-                    try:
-                        if os.path.exists(doc_info["file_path"]):
-                            file_size = os.path.getsize(doc_info["file_path"])
-                    except Exception as e:
-                        logger.warning(f"无法获取文件大小: {str(e)}")
-                    
-                    # 调用milvus接口上传文档元数据
-                    import milvusdb
-                    try:
-                        milvus_result = milvusdb.add_doc_metadata(
-                            doc_id=int(doc_info["id"]),
-                            name=metadata.get("doc_name", file_name),
-                            authors=metadata.get("authors", []),
-                            doc_type=metadata.get("doc_type", ""),
-                            size=int(file_size),
-                            related_timestamp=metadata.get("related_timestamp", 0),
-                            abstract=metadata.get("abstract", ""),
-                            owner_type=owner_type,
-                            owner_id=int(owner_id),
-                            folder_ids=[int(folder_id)]
-                        )
-                        logger.info(f"文档 {file_name} 元数据上传成功: {milvus_result}")
-                    except Exception as e:
-                        logger.error(f"文档 {file_name} 元数据上传失败: {str(e)}")
-                else:
-                    logger.error(f"无法获取文档 {file_name} 的基本信息，跳过元数据上传")
-            else:
-                logger.warning(f"文档 {file_name} 元信息提取失败或返回空结果")
-                
+        # 获取文档的完整内容用于元信息提取
+        # 合并所有段落的原始文本内容
+        full_content = ""
+        if originalText_multi:
+            for original_texts in originalText_multi:
+                if original_texts:
+                    for text in original_texts:
+                        if text and text.strip():
+                            full_content += text + "\n"
+
+        # 限制内容长度，避免超出模型限制
+        if len(full_content) > 20000:
+            full_content = full_content[:20000] + "..."
+
+        # 获取文档的基本信息用于元数据上传和文件路径获取
+        doc_info = database.get_doc_info(task_info["related_id"])
+        if not doc_info:
+            raise VectorUploadError(f"无法获取文档 {file_name} 的基本信息，无法上传元数据")
+
+        print(f"文档 {file_name} 元信息提取中，内容长度: {len(full_content)}")
+        metadata = agents.doc_metadata_extractor(
+            file_name,
+            full_content,
+            doc_info["file_path"] if doc_info else None,
+        )
+        print(metadata)
+        if not metadata:
+            raise VectorUploadError(f"文档 {file_name} 元信息提取失败或返回空结果")
+
+        logger.info(f"文档 {file_name} 元信息提取成功: {metadata}")
+
+        owner_type = doc_info.get("type", "个人")
+        owner_id = doc_info.get("owner_id", task_info["task_user_id"])
+
+        import os
+        file_size = 0
+        try:
+            if os.path.exists(doc_info["file_path"]):
+                file_size = os.path.getsize(doc_info["file_path"])
         except Exception as e:
-            logger.error(f"文档 {file_name} 元信息提取过程发生错误: {str(e)}")
-            # 元信息提取失败不应影响整个任务的完成状态
+            logger.warning(f"无法获取文件大小: {str(e)}")
+
+        import milvusdb
+        milvus_result = milvusdb.add_doc_metadata(
+            doc_id=int(doc_info["id"]),
+            name=metadata.get("doc_name", file_name) or file_name,
+            authors=metadata.get("authors", []),
+            doc_type=metadata.get("doc_type", ""),
+            size=int(file_size),
+            related_timestamp=metadata.get("related_timestamp", 0),
+            abstract=metadata.get("abstract", ""),
+            owner_type=owner_type,
+            owner_id=int(owner_id),
+            folder_ids=[int(folder_id)]
+        )
+        if not isinstance(milvus_result, dict) or milvus_result.get("success") is not True:
+            raise VectorUploadError(f"文档 {file_name} 元数据上传失败: {milvus_result}")
+        logger.info(f"文档 {file_name} 元数据上传成功: {milvus_result}")
         
         # 标记任务完成
         database.set_task_status(task_info["id"], "completed")
diff --git a/service/task_executor/llm_clients.py b/service/task_executor/llm_clients.py
index 77c1c8f..9447790 100644
--- a/service/task_executor/llm_clients.py
+++ b/service/task_executor/llm_clients.py
@@ -682,7 +682,16 @@ def get_model_response_azure(model, history, temperature,streamMode = False):
 
 
 
-def get_model_response_json(model, history, temperature,streamMode = True,continueing_prompt = False,max_completion_tokens=None):
+def get_model_response_json(
+    model,
+    history,
+    temperature,
+    streamMode=True,
+    continueing_prompt=False,
+    max_completion_tokens=None,
+    max_retries=3,
+    use_retry_list=True,
+):
     # By default, we use the retry list. The 'model' parameter is kept for backward compatibility
     # or for specific calls that need to target one model.
     # To use the retry list, we can ignore the 'model' parameter and pass a placeholder.
@@ -694,7 +703,8 @@ def get_model_response_json(model, history, temperature,streamMode = True,contin
         type="json",
         continuing_prompt=continueing_prompt,
         max_completion_tokens=max_completion_tokens,
-        use_retry_list=True # Enable the new retry mechanism
+        max_retries=max_retries,
+        use_retry_list=use_retry_list,
     )
     print("Model Response:", modelResponse)
     
diff --git a/service/task_executor/milvusdb.py b/service/task_executor/milvusdb.py
index 3951fbf..0280c82 100644
--- a/service/task_executor/milvusdb.py
+++ b/service/task_executor/milvusdb.py
@@ -86,7 +86,12 @@ def add_paragraph(folder_id,doc_id, paragraph_index,paragraph_embedding,paragrap
         "paragraph_original": paragraph_original,
         "paragraph_summary": paragraph_summary
     }
-    return _post_json("/update", data)
+    result = _post_json("/update", data)
+    if isinstance(result, dict) and "success" not in result:
+        result["success"] = "error" not in result
+    if isinstance(result, dict) and "inserted_count" not in result and result.get("success"):
+        result["inserted_count"] = 1
+    return result
 
 def add_paragraphs(folder_id, doc_id, paragraphs):
     data = {
