diff --git a/service/task_executor/advanced_task.py b/service/task_executor/advanced_task.py
index 662aaec..323f6dc 100644
--- a/service/task_executor/advanced_task.py
+++ b/service/task_executor/advanced_task.py
@@ -286,6 +286,7 @@ class DirectTaskExecutor:
         self._queue_wait_total_ms = 0.0
         self._queue_wait_count = 0
         self._use_event_dispatch = USE_EVENT_DISPATCH
+        self._dispatcher_restarts = 0
         
         # 任务管理
         self._task_queue = queue.PriorityQueue(max_task_queue_size)
@@ -333,6 +334,7 @@ class DirectTaskExecutor:
             daemon=True,
             name="GCMonitor"
         )
+        self._process_monitor_thread = None
         
         # 统计信息
         self._stats = {
@@ -361,6 +363,13 @@ class DirectTaskExecutor:
             self._load_monitor_thread.start()
         self._stats_thread.start()
         self._gc_thread.start()
+        if use_processes:
+            self._process_monitor_thread = threading.Thread(
+                target=self._monitor_processes,
+                daemon=True,
+                name="ProcessMonitor"
+            )
+            self._process_monitor_thread.start()
         
         # 注册信号处理
         if not use_processes:
@@ -471,50 +480,54 @@ class DirectTaskExecutor:
     def _dispatch_loop(self):
         """事件驱动调度循环"""
         while not self._shutdown_event.is_set():
-            tasks_to_submit = []
-            with self._dispatch_cond:
-                while not self._shutdown_event.is_set():
-                    if self._task_queue.empty():
+            try:
+                tasks_to_submit = []
+                with self._dispatch_cond:
+                    while not self._shutdown_event.is_set():
+                        if self._task_queue.empty():
+                            self._dispatch_cond.wait(timeout=0.1)
+                            continue
+                        if self._inflight_semaphore.acquire(blocking=False):
+                            break
                         self._dispatch_cond.wait(timeout=0.1)
-                        continue
-                    if self._inflight_semaphore.acquire(blocking=False):
-                        break
-                    self._dispatch_cond.wait(timeout=0.1)
-
-                if self._shutdown_event.is_set():
-                    break
 
-                try:
-                    task = self._task_queue.get_nowait()
-                except queue.Empty:
-                    self._inflight_semaphore.release()
-                    continue
-
-                self._record_task_start(task)
-                tasks_to_submit.append(task)
-
-                while not self._task_queue.empty():
-                    if not self._inflight_semaphore.acquire(blocking=False):
+                    if self._shutdown_event.is_set():
                         break
+
                     try:
                         task = self._task_queue.get_nowait()
                     except queue.Empty:
                         self._inflight_semaphore.release()
-                        break
+                        continue
+
                     self._record_task_start(task)
                     tasks_to_submit.append(task)
 
-            for task in tasks_to_submit:
-                try:
-                    if self.use_processes:
-                        self._submit_task_to_process_pool(task)
-                    else:
-                        future = self._submit_task_to_thread_pool(task)
-                        future.add_done_callback(
-                            functools.partial(self._task_completed_callback, task.id)
-                        )
-                except Exception as e:
-                    self._handle_submit_error(task, e)
+                    while not self._task_queue.empty():
+                        if not self._inflight_semaphore.acquire(blocking=False):
+                            break
+                        try:
+                            task = self._task_queue.get_nowait()
+                        except queue.Empty:
+                            self._inflight_semaphore.release()
+                            break
+                        self._record_task_start(task)
+                        tasks_to_submit.append(task)
+
+                for task in tasks_to_submit:
+                    try:
+                        if self.use_processes:
+                            self._submit_task_to_process_pool(task)
+                        else:
+                            future = self._submit_task_to_thread_pool(task)
+                            future.add_done_callback(
+                                functools.partial(self._task_completed_callback, task.id)
+                            )
+                    except Exception as e:
+                        self._handle_submit_error(task, e)
+            except Exception as e:
+                logger.error(f"Dispatcher loop error: {e}")
+                time.sleep(0.1)
             
     def _load_monitor_loop(self):
         """负载监控循环 - 确保工作线程始终满负载并替换需要更新的进程"""
@@ -1046,11 +1059,70 @@ class DirectTaskExecutor:
     
     def _stats_loop(self):
         """统计信息循环 (保持不变)"""
-        # ...existing code...
+        while not self._shutdown_event.is_set():
+            try:
+                process = psutil.Process(os.getpid())
+                memory_mb = process.memory_info().rss / 1024 / 1024
+
+                with self._lock:
+                    self._stats['memory_usage_mb'] = memory_mb
+                    self._stats['queue_size'] = self._task_queue.qsize()
+                    self._stats['active_workers'] = len(self._running_tasks)
+
+                    if self._use_event_dispatch:
+                        dispatcher_dead = (
+                            self._dispatch_thread is None
+                            or not self._dispatch_thread.is_alive()
+                        )
+                        if dispatcher_dead and not self._shutdown_event.is_set():
+                            logger.error("Dispatcher thread is dead, restarting it")
+                            self._dispatch_thread = threading.Thread(
+                                target=self._dispatch_loop,
+                                daemon=True,
+                                name="Dispatcher"
+                            )
+                            self._dispatch_thread.start()
+                            self._dispatcher_restarts += 1
+
+                sleep_sec = max(1, int(self.stats_interval))
+                time.sleep(sleep_sec)
+            except Exception as e:
+                logger.error(f"Stats monitor error: {e}")
+                time.sleep(1)
     
     def _gc_loop(self):
         """垃圾回收循环 (保持不变)"""
-        # ...existing code...
+        interval_sec = int(os.getenv("TASK_EXECUTOR_GC_INTERVAL_SEC", "30"))
+        interval_sec = max(5, interval_sec)
+        max_completed_keep = int(os.getenv("TASK_EXECUTOR_COMPLETED_KEEP", "5000"))
+        max_completed_keep = max(100, max_completed_keep)
+
+        while not self._shutdown_event.is_set():
+            try:
+                with self._lock:
+                    completed_count = len(self._completed_tasks)
+                    if completed_count > max_completed_keep:
+                        remove_count = completed_count - max_completed_keep
+                        for _ in range(remove_count):
+                            try:
+                                self._completed_tasks.pop(next(iter(self._completed_tasks)))
+                            except StopIteration:
+                                break
+
+                    if self.use_processes and hasattr(self, "_task_futures"):
+                        done_task_ids = [
+                            task_id
+                            for task_id, async_result in self._task_futures.items()
+                            if async_result.ready()
+                        ]
+                        for task_id in done_task_ids:
+                            self._task_futures.pop(task_id, None)
+
+                gc.collect()
+                time.sleep(interval_sec)
+            except Exception as e:
+                logger.error(f"GC monitor error: {e}")
+                time.sleep(5)
     
     def shutdown(self, wait: bool = True, timeout: float = 30.0):
         """关闭执行器"""
@@ -1088,6 +1160,8 @@ class DirectTaskExecutor:
 
         # 强制清理资源
         ResourceFactory.clear_resources()
+        if self._process_monitor_thread and self._process_monitor_thread.is_alive():
+            self._process_monitor_thread.join(timeout=1.0)
     
     def _monitor_processes(self):
         """监控和清理僵尸进程"""
@@ -1101,7 +1175,7 @@ class DirectTaskExecutor:
                             completed_futures.append(task_id)
                 
                     for task_id in completed_futures:
-                        del self._task_futures[task_id]
+                        self._task_futures.pop(task_id, None)
                 
                 # 检查系统中的僵尸进程
                 current_process = psutil.Process()
@@ -1122,6 +1196,9 @@ class DirectTaskExecutor:
                 
                 time.sleep(5)  # 每5秒检查一次
                 
+            except (psutil.AccessDenied, PermissionError) as e:
+                logger.warning(f"Process monitor disabled due to permission error: {e}")
+                break
             except Exception as e:
                 logger.error(f"Process monitor error: {e}")
                 time.sleep(10)
diff --git a/service/task_executor/database.py b/service/task_executor/database.py
index 9daa31b..7129e2b 100644
--- a/service/task_executor/database.py
+++ b/service/task_executor/database.py
@@ -1053,12 +1053,14 @@ def reset_stale_tasks_by_status(
                 Task.deleted_at.is_(None),
                 Task.updated_at < cutoff,
             )
-            .order_by(Task.updated_at.asc(), Task.id.asc())
-            .limit(safe_limit)
-            .with_for_update(skip_locked=True)
         )
         if exclude_ids:
             query = query.filter(~Task.id.in_(exclude_ids))
+        query = (
+            query.order_by(Task.updated_at.asc(), Task.id.asc())
+            .limit(safe_limit)
+            .with_for_update(skip_locked=True)
+        )
 
         stale_ids = [row.id for row in query.all()]
         if not stale_ids:
diff --git a/service/task_executor/doc_processor.py b/service/task_executor/doc_processor.py
index e7fda3f..0e6a597 100644
--- a/service/task_executor/doc_processor.py
+++ b/service/task_executor/doc_processor.py
@@ -10,8 +10,23 @@ import parse_doc
 import traceback
 import agents
 import re
+import os
 
 MILVUS_VARCHAR_SAFE_BYTES = 65000
+PARAGRAPH_PROCESS_MAX_WORKERS = max(
+    1, int(os.getenv("TASK_PARAGRAPH_PROCESS_MAX_WORKERS", "20"))
+)
+PARAGRAPH_UPLOAD_MAX_WORKERS = max(
+    1, int(os.getenv("TASK_PARAGRAPH_UPLOAD_MAX_WORKERS", "20"))
+)
+
+
+def _safe_append_failedlog(message: str):
+    try:
+        with open("failedlog.txt", "a", encoding="utf-8") as f:
+            f.write(message)
+    except Exception as file_error:
+        logging.error(f"写入 failedlog.txt 失败: {file_error}")
 
 
 def _truncate_utf8_bytes(text, max_bytes=MILVUS_VARCHAR_SAFE_BYTES):
@@ -409,8 +424,8 @@ def parse_doc_personal_task(task_info, doc_info_task):
                 
             paragraphs_prepare_to_directly_upload = []
             processParagraphExecutor = BatchTaskExecutor(
-                max_workers=20,
-                default_timeout=120,  # 5分钟超时
+                max_workers=PARAGRAPH_PROCESS_MAX_WORKERS,
+                default_timeout=None,  # 避免内层超时线程泄漏，交由外层任务超时控制
                 default_max_retries= 3,
                 default_retry_delay=2.0,  # 重试延迟2秒
             )
@@ -486,8 +501,8 @@ def parse_doc_personal_task(task_info, doc_info_task):
                 
                 # 创建多线程执行器用于段落上传
                 upload_executor = BatchTaskExecutor(
-                    max_workers=20,  # 同时上传20个片段
-                    default_timeout=60,  # 1分钟超时
+                    max_workers=PARAGRAPH_UPLOAD_MAX_WORKERS,
+                    default_timeout=None,  # 避免内层超时线程泄漏，交由外层任务超时控制
                     default_max_retries=3,
                     default_retry_delay=1.0,  # 重试延迟1秒
                 )
@@ -574,14 +589,13 @@ def parse_doc_personal_task(task_info, doc_info_task):
                             del upload_failed
 
             except Exception as e:
-                traceback.print_exc()
+                logging.error(traceback.format_exc())
                 logger.error(
                     f"文档 {file_name} 第 {k+1} 批段落处理任务创建失败,错误信息: {str(e)}"
                 )
-                with open("failedlog.txt", "a", encoding="utf-8") as f:
-                    f.write(
-                        f"文档 {file_name} 第 {k+1} 批段落处理任务创建失败,错误信息: {str(e)}，错误位置：2\n"
-                    )
+                _safe_append_failedlog(
+                    f"文档 {file_name} 第 {k+1} 批段落处理任务创建失败,错误信息: {str(e)}，错误位置：2\n"
+                )
                 # 不要因为一批失败就整个任务失败，继续处理下一批
                 continue
             finally:
@@ -695,9 +709,10 @@ def parse_doc_personal_task(task_info, doc_info_task):
         logger.error(f"文档 {file_name} 解析失败,错误信息: {str(e)}")
         logger.error(f"完整错误堆栈: {error_trace}")
         
-        with open("failedlog.txt", "a", encoding="utf-8") as f:
-            f.write(f"文档 {file_name} 解析失败,错误信息: {str(e)},错误位置:1\n")
-            f.write(f"完整错误堆栈: {error_trace}\n")
+        _safe_append_failedlog(
+            f"文档 {file_name} 解析失败,错误信息: {str(e)},错误位置:1\n"
+            f"完整错误堆栈: {error_trace}\n"
+        )
             
         database.set_task_status(task_info["id"], "failed")
         database.set_document_status(task_info["related_id"], "解析失败")
diff --git a/service/task_executor/task.py b/service/task_executor/task.py
index 76ef686..79ff60b 100644
--- a/service/task_executor/task.py
+++ b/service/task_executor/task.py
@@ -226,6 +226,7 @@ EXECUTOR_RESTART_COOLDOWN_SEC = int(os.getenv("TASK_EXECUTOR_RESTART_COOLDOWN_SE
 QUEUED_TASK_STALE_SEC = int(os.getenv("TASK_QUEUED_STALE_SEC", "120"))
 RUNNING_TASK_STALE_SEC = int(os.getenv("TASK_RUNNING_STALE_SEC", "180"))
 MAX_PARSE_RETRY_COUNT = int(os.getenv("TASK_PARSE_MAX_RETRY_COUNT", "3"))
+PARSE_DOC_MAX_WORKERS = max(1, int(os.getenv("TASK_PARSE_DOC_MAX_WORKERS", "40")))
 
 def create_db_connection():
     # 创建数据库连接的代码
@@ -246,7 +247,7 @@ paragragh_uploader_task_executor = AdvancedTaskExecutor(
 
 def _create_parse_doc_executor():
     return DirectTaskExecutor(
-        max_workers=40,
+        max_workers=PARSE_DOC_MAX_WORKERS,
         use_processes=True,  # 使用进程模式
         memory_limit_mb=4096,
         gc_threshold=30,
@@ -974,16 +975,26 @@ if __name__ == "__main__":
     multiprocessing.freeze_support()
     
     try:
-        recovered = database.reset_tasks_by_status("文档解析", "queued")
+        recovered = database.reset_stale_tasks_by_status(
+            "文档解析",
+            "queued",
+            QUEUED_TASK_STALE_SEC,
+            "pending",
+        )
         if recovered:
-            print(f"[任务恢复] 已重置 {recovered} 个queued状态的文档解析任务")
+            print(f"[任务恢复] 已重置 {recovered} 个超时queued状态的文档解析任务")
     except Exception as e:
         print(f"[任务恢复] 重置queued任务失败: {e}")
 
     try:
-        recovered = database.reset_tasks_by_status("文档解析", "running")
+        recovered = database.reset_stale_tasks_by_status(
+            "文档解析",
+            "running",
+            RUNNING_TASK_STALE_SEC,
+            "pending",
+        )
         if recovered:
-            print(f"[任务恢复] 已重置 {recovered} 个running状态的文档解析任务")
+            print(f"[任务恢复] 已重置 {recovered} 个超时running状态的文档解析任务")
     except Exception as e:
         print(f"[任务恢复] 重置running任务失败: {e}")
 
