import json
import subprocess
import requests
import sys
import time
from pathlib import Path

IDS = [412078, 412168, 412270, 432824, 432825, 432826, 432827, 432828, 432829, 432830, 432831, 432832, 432833, 432834, 432835, 432836, 432837, 432838, 432839, 432840, 432841, 432842, 432843]
BACKUP_DIR = Path('/root/knowledge/service/task_executor/ops_backups')
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
TS = time.strftime('%Y%m%d-%H%M%S')
ids_csv = ','.join(str(x) for x in IDS)

backup_path = BACKUP_DIR / f'pre_low_concurrency_reset_manual_{TS}.csv'
backup_sql = f"\\copy (SELECT t.id task_id,t.related_id doc_id,t.task_status,d.status doc_status,t.task_user_id,t.task_arguments,t.task_result,t.task_log,t.created_at,t.updated_at FROM tasks t LEFT JOIN docs d ON d.id=t.related_id WHERE t.id IN ({ids_csv}) ORDER BY t.id) TO STDOUT WITH CSV HEADER"
with backup_path.open('w', encoding='utf-8') as f:
    subprocess.run(['docker', 'exec', 'postgres_db', 'psql', '-U', 'knowledge', '-d', 'yunwoai', '-P', 'pager=off', '-c', backup_sql], check=True, stdout=f)

sql = "SELECT id, related_id, task_user_id, task_arguments FROM tasks WHERE id IN (%s) ORDER BY id" % ids_csv
out = subprocess.check_output(['docker', 'exec', 'postgres_db', 'psql', '-U', 'knowledge', '-d', 'yunwoai', '-At', '-F', '\t', '-c', sql], text=True)
cleanup_path = BACKUP_DIR / f'milvus_cleanup_manual_{TS}.log'
failures = []
rows = []
with cleanup_path.open('w', encoding='utf-8') as log:
    header = 'task_id\tdoc_id\ttype\ttrue_folder_id\tdelete_ok\tresponse'
    print(header)
    print(header, file=log)
    for line in out.splitlines():
        task_id, doc_id, user_id, args_json = line.split('\t', 3)
        args = json.loads(args_json)
        typ = args.get('type')
        folder_id = int(args.get('folder_id') or 0)
        if typ == 'personal':
            true_folder_id = folder_id or int('1238600' + str(user_id))
        elif typ == 'enterprise':
            true_folder_id = folder_id or int('8612300' + str(args.get('org_id')))
        elif typ == 'industry':
            true_folder_id = folder_id or int('2004500' + str(args.get('industry_id')))
        else:
            true_folder_id = folder_id
        try:
            r = requests.post('http://127.0.0.1:4999/delete_document', json={'folder_id': true_folder_id, 'doc_id': int(doc_id)}, timeout=30)
            ok = r.status_code == 200
            text = r.text[:300].replace('\n', ' ')
        except Exception as e:
            ok = False
            text = repr(e)
        row = f'{task_id}\t{doc_id}\t{typ}\t{true_folder_id}\t{ok}\t{text}'
        print(row)
        print(row, file=log)
        rows.append((int(task_id), int(doc_id)))
        if not ok:
            failures.append(row)
if failures:
    print('Milvus cleanup failures:', file=sys.stderr)
    for item in failures:
        print(item, file=sys.stderr)
    sys.exit(2)

marker = f'low_concurrency_reset_manual_{TS}'
update_sql = f"""
BEGIN;
UPDATE tasks
SET task_status='pending',
    updated_at=NOW(),
    task_result=CONCAT(COALESCE(task_result,''), E'\n旧高并发解析进程触发模型429，已清理Milvus旧向量并切换低并发重试: {marker}'),
    task_log=CONCAT(COALESCE(task_log,''), E'\nretry_reason: {marker}')
WHERE id IN ({ids_csv})
  AND task_type='文档解析'
  AND task_status IN ('running','queued','failed','pending');
UPDATE docs d
SET status='待处理', updated_at=NOW()
FROM tasks t
WHERE d.id=t.related_id
  AND t.id IN ({ids_csv})
  AND t.task_type='文档解析';
COMMIT;
"""
subprocess.run(['docker', 'exec', '-i', 'postgres_db', 'psql', '-U', 'knowledge', '-d', 'yunwoai', '-P', 'pager=off'], input=update_sql, text=True, check=True)
print(f'MARKER={marker}')
print(f'BACKUP={backup_path}')
print(f'CLEANUP_LOG={cleanup_path}')
